Files
youlbot/services/rag/ingestion_service.py
T
shinalok 86370f6c1e Implement Phase 18: Hybrid Search (BM25 + Vector)
- FastEmbedSparse(Qdrant/bm25) 기반 sparse 임베딩 추가 (fastembed 패키지)
- IngestionService: HYBRID_SEARCH_ENABLED 시 dense + sparse 동시 저장 (RetrievalMode.HYBRID)
  - _ensure_collection_schema(): sparse vector 미설정 컬렉션 자동 삭제·재생성
- RetrieverService: hybrid 스토어 + dense 폴백 구조, Qdrant 내장 RRF로 결과 통합
- container.py: sparse_embeddings Singleton 프로바이더, ingestion/retriever 양쪽 주입
- .env.example: HYBRID_SEARCH_ENABLED, SPARSE_MODEL_ID 항목 추가

활성화: .env에 HYBRID_SEARCH_ENABLED=true 설정 후 기존 문서 재수집 필요

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-29 17:47:17 +09:00

82 lines
3.2 KiB
Python

from langchain_community.document_loaders import PDFPlumberLoader, TextLoader
from langchain_experimental.text_splitter import SemanticChunker
from langchain_qdrant import QdrantVectorStore, RetrievalMode
from qdrant_client import QdrantClient
from qdrant_client.models import Filter, FieldCondition, MatchValue, FilterSelector
class IngestionService:
"""문서를 의미 단위 청크로 분할해 Qdrant에 저장하는 수집 파이프라인."""
def __init__(
self,
embeddings,
qdrant_url: str,
collection_name: str,
breakpoint_threshold_type: str = "percentile",
buffer_size: int = 1,
sparse_embeddings=None,
):
self._embeddings = embeddings
self._qdrant_url = qdrant_url
self._collection_name = collection_name
self._sparse_embeddings = sparse_embeddings
self._splitter = SemanticChunker(
embeddings=embeddings,
breakpoint_threshold_type=breakpoint_threshold_type,
buffer_size=buffer_size,
)
self._client = QdrantClient(url=qdrant_url)
def _ensure_collection_schema(self) -> None:
"""Hybrid 모드 전환 시 컬렉션에 sparse vector 설정이 없으면 삭제해 재생성을 유도한다."""
if not self._sparse_embeddings:
return
try:
info = self._client.get_collection(self._collection_name)
if not info.config.params.sparse_vectors:
print(f"[Hybrid] '{self._collection_name}' 컬렉션에 sparse vector 설정이 없어 재생성합니다.")
self._client.delete_collection(self._collection_name)
except Exception:
pass # 컬렉션 미존재 시 무시
def _delete_by_source(self, source_path: str) -> None:
"""같은 파일 경로로 저장된 기존 청크를 모두 삭제한다."""
try:
self._client.delete(
collection_name=self._collection_name,
points_selector=FilterSelector(
filter=Filter(
must=[
FieldCondition(
key="metadata.source",
match=MatchValue(value=source_path),
)
]
)
),
)
except Exception:
pass # 컬렉션이 없을 때(최초 수집) 무시
def ingest(self, file_paths: list[str]) -> int:
self._ensure_collection_schema()
docs = []
for path in file_paths:
self._delete_by_source(path)
loader = PDFPlumberLoader(path) if path.endswith(".pdf") else TextLoader(path, encoding="utf-8")
docs.extend(loader.load())
chunks = self._splitter.split_documents(docs)
kwargs = dict(
documents=chunks,
embedding=self._embeddings,
url=self._qdrant_url,
collection_name=self._collection_name,
)
if self._sparse_embeddings:
kwargs["sparse_embedding"] = self._sparse_embeddings
kwargs["retrieval_mode"] = RetrievalMode.HYBRID
QdrantVectorStore.from_documents(**kwargs)
return len(chunks)