import re import numpy as np from langchain_community.document_loaders import PDFPlumberLoader, TextLoader from langchain_core.documents import Document from langchain_qdrant import QdrantVectorStore from qdrant_client import QdrantClient from qdrant_client.models import Filter, FieldCondition, MatchValue, FilterSelector def _cosine_similarity(a: np.ndarray, b: np.ndarray) -> float: return float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b) + 1e-10)) class _SemanticSplitter: """문장 임베딩 유사도 기반 청커. 인접 문장 간 코사인 유사도를 계산하고, 유사도가 낮은(= 의미 전환) 지점에서 청크를 분리한다. breakpoint_percentile=95이면 유사도 하위 5% 지점이 분리 경계가 된다. """ _SENTENCE_RE = re.compile(r"(?<=[.!?。!?])\s+") def __init__(self, embeddings, breakpoint_percentile: int = 95): self._embeddings = embeddings self._percentile = breakpoint_percentile def split_documents(self, docs: list[Document]) -> list[Document]: result = [] for doc in docs: for chunk_text in self._split_text(doc.page_content): result.append(Document(page_content=chunk_text, metadata=doc.metadata)) return result def _split_text(self, text: str) -> list[str]: sentences = [s for s in self._SENTENCE_RE.split(text.strip()) if s.strip()] if len(sentences) <= 1: return [text.strip()] if text.strip() else [] vecs = np.array(self._embeddings.embed_documents(sentences)) similarities = [_cosine_similarity(vecs[i], vecs[i + 1]) for i in range(len(vecs) - 1)] threshold = float(np.percentile(similarities, 100 - self._percentile)) breakpoints = [i + 1 for i, s in enumerate(similarities) if s < threshold] chunks, start = [], 0 for bp in breakpoints: chunk = " ".join(sentences[start:bp]).strip() if chunk: chunks.append(chunk) start = bp tail = " ".join(sentences[start:]).strip() if tail: chunks.append(tail) return chunks class IngestionService: """문서를 의미 단위 청크로 분할해 Qdrant에 저장하는 수집 파이프라인.""" def __init__( self, embeddings, qdrant_url: str, collection_name: str, breakpoint_threshold_type: str = "percentile", ): self._embeddings = embeddings self._qdrant_url = qdrant_url self._collection_name = collection_name # breakpoint_threshold_type은 향후 확장용으로 수용 (현재는 percentile 방식 고정) self._splitter = _SemanticSplitter(embeddings, breakpoint_percentile=95) self._client = QdrantClient(url=qdrant_url) def _delete_by_source(self, source_path: str) -> None: """같은 파일 경로로 저장된 기존 청크를 모두 삭제한다.""" try: self._client.delete( collection_name=self._collection_name, points_selector=FilterSelector( filter=Filter( must=[ FieldCondition( key="metadata.source", match=MatchValue(value=source_path), ) ] ) ), ) except Exception: pass # 컬렉션이 없을 때(최초 수집) 무시 def ingest(self, file_paths: list[str]) -> int: docs = [] for path in file_paths: self._delete_by_source(path) loader = PDFPlumberLoader(path) if path.endswith(".pdf") else TextLoader(path, encoding="utf-8") docs.extend(loader.load()) chunks = self._splitter.split_documents(docs) QdrantVectorStore.from_documents( documents=chunks, embedding=self._embeddings, url=self._qdrant_url, collection_name=self._collection_name, ) return len(chunks)