86370f6c1e
- FastEmbedSparse(Qdrant/bm25) 기반 sparse 임베딩 추가 (fastembed 패키지) - IngestionService: HYBRID_SEARCH_ENABLED 시 dense + sparse 동시 저장 (RetrievalMode.HYBRID) - _ensure_collection_schema(): sparse vector 미설정 컬렉션 자동 삭제·재생성 - RetrieverService: hybrid 스토어 + dense 폴백 구조, Qdrant 내장 RRF로 결과 통합 - container.py: sparse_embeddings Singleton 프로바이더, ingestion/retriever 양쪽 주입 - .env.example: HYBRID_SEARCH_ENABLED, SPARSE_MODEL_ID 항목 추가 활성화: .env에 HYBRID_SEARCH_ENABLED=true 설정 후 기존 문서 재수집 필요 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
82 lines
3.2 KiB
Python
82 lines
3.2 KiB
Python
from langchain_community.document_loaders import PDFPlumberLoader, TextLoader
|
|
from langchain_experimental.text_splitter import SemanticChunker
|
|
from langchain_qdrant import QdrantVectorStore, RetrievalMode
|
|
from qdrant_client import QdrantClient
|
|
from qdrant_client.models import Filter, FieldCondition, MatchValue, FilterSelector
|
|
|
|
|
|
class IngestionService:
|
|
"""문서를 의미 단위 청크로 분할해 Qdrant에 저장하는 수집 파이프라인."""
|
|
|
|
def __init__(
|
|
self,
|
|
embeddings,
|
|
qdrant_url: str,
|
|
collection_name: str,
|
|
breakpoint_threshold_type: str = "percentile",
|
|
buffer_size: int = 1,
|
|
sparse_embeddings=None,
|
|
):
|
|
self._embeddings = embeddings
|
|
self._qdrant_url = qdrant_url
|
|
self._collection_name = collection_name
|
|
self._sparse_embeddings = sparse_embeddings
|
|
self._splitter = SemanticChunker(
|
|
embeddings=embeddings,
|
|
breakpoint_threshold_type=breakpoint_threshold_type,
|
|
buffer_size=buffer_size,
|
|
)
|
|
self._client = QdrantClient(url=qdrant_url)
|
|
|
|
def _ensure_collection_schema(self) -> None:
|
|
"""Hybrid 모드 전환 시 컬렉션에 sparse vector 설정이 없으면 삭제해 재생성을 유도한다."""
|
|
if not self._sparse_embeddings:
|
|
return
|
|
try:
|
|
info = self._client.get_collection(self._collection_name)
|
|
if not info.config.params.sparse_vectors:
|
|
print(f"[Hybrid] '{self._collection_name}' 컬렉션에 sparse vector 설정이 없어 재생성합니다.")
|
|
self._client.delete_collection(self._collection_name)
|
|
except Exception:
|
|
pass # 컬렉션 미존재 시 무시
|
|
|
|
def _delete_by_source(self, source_path: str) -> None:
|
|
"""같은 파일 경로로 저장된 기존 청크를 모두 삭제한다."""
|
|
try:
|
|
self._client.delete(
|
|
collection_name=self._collection_name,
|
|
points_selector=FilterSelector(
|
|
filter=Filter(
|
|
must=[
|
|
FieldCondition(
|
|
key="metadata.source",
|
|
match=MatchValue(value=source_path),
|
|
)
|
|
]
|
|
)
|
|
),
|
|
)
|
|
except Exception:
|
|
pass # 컬렉션이 없을 때(최초 수집) 무시
|
|
|
|
def ingest(self, file_paths: list[str]) -> int:
|
|
self._ensure_collection_schema()
|
|
docs = []
|
|
for path in file_paths:
|
|
self._delete_by_source(path)
|
|
loader = PDFPlumberLoader(path) if path.endswith(".pdf") else TextLoader(path, encoding="utf-8")
|
|
docs.extend(loader.load())
|
|
|
|
chunks = self._splitter.split_documents(docs)
|
|
kwargs = dict(
|
|
documents=chunks,
|
|
embedding=self._embeddings,
|
|
url=self._qdrant_url,
|
|
collection_name=self._collection_name,
|
|
)
|
|
if self._sparse_embeddings:
|
|
kwargs["sparse_embedding"] = self._sparse_embeddings
|
|
kwargs["retrieval_mode"] = RetrievalMode.HYBRID
|
|
QdrantVectorStore.from_documents(**kwargs)
|
|
return len(chunks)
|