From 0b50444e43ae67a24acffa4bcd4b897f51de28b0 Mon Sep 17 00:00:00 2001 From: sal Date: Thu, 4 Jun 2026 10:04:05 +0900 Subject: [PATCH] =?UTF-8?q?IDEA-2/1/5/7:=20=EC=8A=A4=EB=A7=88=ED=8A=B8=20?= =?UTF-8?q?=EC=95=8C=EB=A6=BC,=20=EB=8C=80=ED=99=94=20=EA=B8=B0=EB=B0=98?= =?UTF-8?q?=20RAG,=20CRAG,=20=ED=8C=8C=EB=9D=BC=EB=AF=B8=ED=84=B0=20?= =?UTF-8?q?=EC=9E=90=EB=8F=99=20=ED=8A=9C=EB=8B=9D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - IDEA-2 스마트 알림: td_reminders 테이블, set_reminder/list_reminders 도구, SchedulerService(asyncio 60초 루프, D-7/D-1/D-0 Telegram push), FastAPI lifespan 연동, GET /reminders/{user_id} 엔드포인트 - IDEA-1 대화 기반 RAG: IngestionService.store_text() 추가, AgentService._maybe_index_conversation() — 응답 후 LLM 판단 → Qdrant 저장 (CONV_RAG_ENABLED=true 활성화, background task로 응답 속도 무관) - IDEA-5 CRAG: AgentState에 crag_fallback_used 플래그 추가, crag_check LangGraph 노드 — search_documents 결과 없으면 web_search 자동 주입, route_after_crag으로 fallback 1회 루프 제어 (CRAG_ENABLED=true 활성화) - IDEA-7 RAG Auto-Eval: eval/auto_tune.py — API 서버 없이 파라미터 조합별 context_precision/recall 비교, 최적 설정 추천 Co-Authored-By: Claude Sonnet 4.6 --- api.py | 28 ++++- config.py | 13 ++ container.py | 14 +++ docs/ROADMAP.md | 154 ++++++++++++++++++++++- eval/auto_tune.py | 191 +++++++++++++++++++++++++++++ services/agent/agent_service.py | 128 +++++++++++++++++-- services/agent/tools.py | 31 ++++- services/db/mysql_service.py | 12 ++ services/db/reminder_repository.py | 57 +++++++++ services/rag/ingestion_service.py | 15 +++ services/scheduler_service.py | 83 +++++++++++++ 11 files changed, 715 insertions(+), 11 deletions(-) create mode 100644 eval/auto_tune.py create mode 100644 services/db/reminder_repository.py create mode 100644 services/scheduler_service.py diff --git a/api.py b/api.py index f308ea3..563af63 100644 --- a/api.py +++ b/api.py @@ -21,6 +21,7 @@ import json import os import tempfile +from contextlib import asynccontextmanager from dotenv import load_dotenv load_dotenv() @@ -32,12 +33,21 @@ from pydantic import BaseModel from container import Container from services.agent.agent_service import AgentService -app = FastAPI(title="율봇 API", version="1.0") - _container = Container() _container.db_service().connect() _container.db_service().init_schema() + +@asynccontextmanager +async def lifespan(app: FastAPI): + scheduler = _container.scheduler_service() + scheduler.start() + yield + scheduler.shutdown() + + +app = FastAPI(title="율봇 API", version="1.0", lifespan=lifespan) + _cfg = _container.config() _agent_cache: dict[str, AgentService] = {} @@ -58,6 +68,10 @@ def _get_agent(user_id: str) -> AgentService: query_rewrite_enabled=_cfg.query_rewrite_enabled, user_profile_repository=_container.user_profile_repository(), conversation_repository=_container.conversation_repository(), + reminder_repository=_container.reminder_repository(), + ingestion_service=_container.ingestion_service() if _cfg.conv_rag_enabled else None, + crag_enabled=_cfg.crag_enabled, + conv_rag_enabled=_cfg.conv_rag_enabled, user_id=user_id, ) if _vision_model: @@ -182,3 +196,13 @@ async def delete_document(source: str, _=Depends(_auth)): """source 경로에 해당하는 모든 청크 삭제.""" _container.retriever_service().delete_document(source) return {"deleted": source} + + +@app.get("/reminders/{user_id}") +async def list_reminders(user_id: str, days_ahead: int = 30, _=Depends(_auth)): + """user_id의 예정 알림 목록 반환 (기본 30일 이내).""" + items = _container.reminder_repository().get_upcoming(user_id, days_ahead=days_ahead) + return {"reminders": [ + {"id": r["id"], "remind_date": str(r["remind_date"]), "message": r["message"]} + for r in items + ]} diff --git a/config.py b/config.py index e01c06f..d1fed91 100644 --- a/config.py +++ b/config.py @@ -64,6 +64,19 @@ class Config(BaseSettings): vision_model_id: str = "mlx-community/Qwen2.5-VL-7B-Instruct-4bit" vision_max_tokens: int = 512 + # CRAG — 검색 결과 없을 때 web_search 자동 fallback (IDEA-5) + crag_enabled: bool = False + + # 대화 기반 자동 RAG 인덱싱 (IDEA-1) + conv_rag_enabled: bool = False + + # Scheduler / Telegram 알림 (IDEA-2) + # TELEGRAM_BOT_TOKEN: BotFather에서 발급받은 봇 토큰 + # TELEGRAM_USER_MAP: JSON 형식으로 user_id → Telegram chat_id 매핑 + # 예) TELEGRAM_USER_MAP={"아록": "123456789", "근혜": "987654321"} + telegram_bot_token: str = "" + telegram_user_map: str = "{}" + system_prompt: str = """모든 사고 과정(thinking)과 답변은 반드시 한국어로만 작성하세요. 영어 사용 절대 금지. 당신의 이름은 '율봇'입니다. 친절하고 따뜻한 한국어 상담 도우미입니다. diff --git a/container.py b/container.py index 503441e..eb6f52d 100644 --- a/container.py +++ b/container.py @@ -10,6 +10,8 @@ from services.db.mysql_service import DatabaseService from services.db.conversation_repository import ConversationRepository from services.db.user_profile_repository import UserProfileRepository from services.db.feedback_repository import FeedbackRepository +from services.db.reminder_repository import ReminderRepository +from services.scheduler_service import SchedulerService from services.ui.cli_service import CliUiService from services.events.event_bus import EventBus from services.events.handlers import StreamTokenHandler, StreamEndHandler @@ -69,6 +71,18 @@ class Container(containers.DeclarativeContainer): db=db_service, ) + reminder_repository = providers.Singleton( + ReminderRepository, + db=db_service, + ) + + scheduler_service = providers.Singleton( + SchedulerService, + reminder_repo=reminder_repository, + bot_token=providers.Callable(lambda c: c.telegram_bot_token, config), + user_map_json=providers.Callable(lambda c: c.telegram_user_map, config), + ) + history_service = providers.Factory( HistoryService, system_prompt=providers.Callable(lambda c: c.system_prompt, config), diff --git a/docs/ROADMAP.md b/docs/ROADMAP.md index eef0d6e..1cba601 100644 --- a/docs/ROADMAP.md +++ b/docs/ROADMAP.md @@ -6,8 +6,9 @@ |------|------| | LLM | Qwen3-8B-4bit (MLX, Apple Silicon) | | Agent | LangGraph ReAct + Tool Calling + Thinking 모드 | +| Scheduler | asyncio task 기반 알림 스케줄러 — D-7/D-1/D-0 Telegram push (`SchedulerService`) | | RAG | Qdrant + BAAI/bge-m3 임베딩 + Semantic Chunking (`SemanticChunker`) + Reranker (BAAI/bge-reranker-v2-m3) | -| Tools | `search_documents`, `web_search`, `get_current_date`, `remember_user_info`, `recall_user_info` (5개) | +| Tools | `search_documents`, `web_search`, `get_current_date`, `remember_user_info`, `recall_user_info`, `set_reminder`, `list_reminders` (7개) | | Feedback | Gradio 👍/👎 → `td_feedback` DB 저장 + LangSmith `create_feedback()` 연동 | | UI | CLI + Gradio Web UI + 음성 입력(STT)/출력(TTS) | | Memory | LangGraph MemorySaver (세션 내) + MySQL 대화 저장 + 장기 사용자 프로필 | @@ -544,3 +545,154 @@ Phase 20 RAGAS 평가 → Phase 15 (모델선택) → Phase 16 (Docke | Phase 15 모델 선택 | 🔲 미완 | 중간 | 중간 | 4순위 | | Phase 16 Docker | 🔲 미완 | 높음 | 중간 | 5순위 | | Phase 17 멀티모달 | ✅ 완료 | — | — | — | + +--- + +## 💡 IDEA — 신규 개선 아이디어 + +### 단기 — 빠르게 임팩트 큰 것 + +#### ✅ IDEA-1. 대화 기반 자동 RAG 업데이트 + +**배경**: 현재 문서 업로드만 RAG에 들어간다. 중요 대화 내용 자체를 자동으로 벡터DB에 추가하면 사용할수록 지식이 쌓이는 시스템이 된다. + +**구현 내용**: +- `IngestionService.store_text(text, metadata)` — 단일 텍스트 직접 저장 (semantic chunking 없이) +- `AgentService._maybe_index_conversation()` — 응답 완료 후 LLM이 유용한 정보 판단 → 요약 → Qdrant 저장 (asyncio background task) +- `source="conversation"`, `user_id`, `timestamp` 메타데이터로 문서 RAG와 구분 +- `.env` `CONV_RAG_ENABLED=true`로 활성화 (기본 비활성) + +**난이도**: 하 | **임팩트**: 높음 (지식 자동 축적) + +--- + +#### ✅ IDEA-2. 스마트 알림 & 일정 연동 + +**배경**: 예방접종, 약 먹을 시간, 병원 예약 등 날짜 기반 알림이 없다. + +**구현 내용**: +- `td_reminders` 테이블 (user_id, remind_date, message, sent_d0/d1/d7) +- `set_reminder(remind_date, message)` + `list_reminders()` 도구 — LangGraph 에이전트 자동 호출 +- `SchedulerService` — asyncio task 기반 60초 간격 체크 → D-7/D-1/D-0 Telegram push +- FastAPI `lifespan`으로 앱 시작/종료 시 스케줄러 자동 관리 +- `GET /reminders/{user_id}` API 엔드포인트 추가 +- `.env` 설정: `TELEGRAM_BOT_TOKEN`, `TELEGRAM_USER_MAP={"아록":"123456"}` + +**난이도**: 중간 | **임팩트**: 매우 높음 (육아 핵심 시나리오) + +--- + +#### IDEA-3. 대화 요약 일간 리포트 + +**배경**: 하루에 어떤 질문을 했고 어떤 결정을 내렸는지 돌아볼 방법이 없다. + +**구현 방향**: +- 매일 자정 cron → 당일 대화 요약 생성 (`CompactService` 재활용) +- Telegram으로 사용자별 요약 발송 +- `/chat` API + APScheduler로 구현 가능 (새 인프라 불필요) + +**난이도**: 하 | **임팩트**: 중간 + +--- + +#### IDEA-4. 텔레그램 그룹 채팅 지원 + +**배경**: 현재 1:1 채팅만 지원한다. 가족 그룹에서 `@율봇` 멘션으로 함께 사용하고 싶다. + +**구현 방향**: +- 그룹 메시지에서 `@율봇` 멘션 감지 → 발신자 Telegram ID로 `user_id` 매핑 +- 그룹 공용 컨텍스트(`user_id="family"`) 옵션 +- `python-telegram-bot` 기존 코드에 그룹 핸들러 추가 + +**난이도**: 하 | **임팩트**: 높음 (가족 공동 사용) + +--- + +### 중기 — RAG/에이전트 품질 향상 + +#### ✅ IDEA-5. Agentic RAG — 자기 교정 검색 (CRAG) + +**배경**: 현재 `query_rewrite → search_documents` 1회로 끝난다. 검색 결과가 부족하면 재시도나 웹 검색 fallback이 없다. + +**구현 내용**: +- `AgentState(TypedDict)` — `messages` + `crag_fallback_used` 커스텀 상태 +- `crag_check` LangGraph 노드 — `search_documents` 결과가 비었으면 동일 쿼리로 `web_search` AIMessage 자동 주입 +- `route_after_crag` — fallback AIMessage 있으면 tools 재실행, 없으면 agent로 복귀 +- 그래프: `tools → crag_check → route_after_crag → {tools, agent}` +- 무한 루프 방지: `crag_fallback_used` 플래그로 1회만 fallback +- `.env` `CRAG_ENABLED=true`로 활성화 (기본 비활성) + +**난이도**: 중간 | **임팩트**: 높음 (검색 실패 케이스 대폭 감소) + +--- + +#### IDEA-6. 영수증/가계부 OCR + +**배경**: `analyze_image` 도구가 이미 있다. 영수증 사진에서 지출을 자동 기록하면 가계 관리가 가능해진다. + +**구현 방향**: +- `analyze_image` → 금액·항목·날짜 추출 → MySQL `td_expenses` 저장 +- `get_monthly_expenses(month)` 도구 추가 → "이번 달 식비 얼마야?" 대응 +- 카테고리 자동 분류 (식비/의료비/교육비 등) + +**난이도**: 중간 | **임팩트**: 높음 (가계 관리 시나리오) + +--- + +#### ✅ IDEA-7. RAG 파라미터 자동 튜닝 (Auto-Eval Loop) + +**배경**: RAGAS 평가 인프라는 있는데, 파라미터 변경 효과를 수동으로 비교해야 한다. + +**구현 내용**: +- `eval/auto_tune.py` — API 서버 없이 `RetrieverService` 직접 사용, 파라미터 조합별 `context_precision` + `context_recall` 비교 +- 기본 조합 4개: `baseline(3/10)`, `top_k_5(5/15)`, `top_k_2(2/6)`, `fetch_k_20(3/20)` +- 평균 점수 기준 최적 조합 추천 + `.env` 설정값 안내 +- `eval/results/tune_YYYYMMDD.json` 저장 +- 실행: `python eval/auto_tune.py [--dataset eval/dataset.jsonl]` + +**난이도**: 중간 | **임팩트**: 중간 (장기 품질 자동 관리) + +--- + +### 장기 — 구조적 확장 + +#### IDEA-8. GraphRAG / 지식 그래프 + +**배경**: `td_user_profile`이 flat key-value라 엔티티 간 관계 추론이 불가능하다. + +**구현 방향**: +- `(도율) -[알레르기]→ (복숭아)`, `(아록) -[부모]→ (도율)` 형태 그래프 +- NetworkX 기반 로컬 그래프 + 그래프 쿼리 도구 +- 복잡한 추론 질문 ("도율이 먹으면 안 되는 음식은?") 대응 가능 + +**난이도**: 높음 | **임팩트**: 높음 (메모리 추론 능력 대폭 향상) + +--- + +#### IDEA-9. PWA / 모바일 Web UI + +**배경**: Gradio는 모바일 UX가 좋지 않다. 네이티브 앱처럼 설치하고 카메라 접근도 원활해야 한다. + +**구현 방향**: +- `youlbot-webui`를 Next.js + shadcn/ui PWA로 재작성 +- 홈 화면 설치, 오프라인 캐시, 네이티브 카메라 접근 +- 기존 REST API 그대로 재사용 (백엔드 변경 없음) +- STT는 Web Speech API로 대체 (브라우저 내장) + +**난이도**: 높음 | **임팩트**: 높음 (모바일 UX 대폭 개선) + +--- + +### IDEA 우선순위 매트릭스 + +| IDEA | 설명 | 난이도 | 임팩트 | 추천 순위 | +|------|------|--------|--------|-----------| +| IDEA-2 스마트 알림 | ✅ asyncio 스케줄러 + Telegram push | 중간 | 매우 높음 | — | +| IDEA-4 텔레그램 그룹 채팅 | 기존 Bot 코드 확장 | 하 | 높음 | 1순위 | +| IDEA-3 일간 리포트 | CompactService 재활용 + SchedulerService | 하 | 중간 | 2순위 | +| IDEA-1 대화 기반 RAG | ✅ asyncio background + Qdrant 저장 | 하 | 높음 | — | +| IDEA-5 CRAG | ✅ crag_check LangGraph 노드 | 중간 | 높음 | — | +| IDEA-7 Auto-Eval | ✅ eval/auto_tune.py | 중간 | 중간 | — | +| IDEA-6 영수증 OCR | analyze_image 재활용 | 중간 | 높음 | 1순위 | +| IDEA-8 GraphRAG | 새 데이터 구조 | 높음 | 높음 | 7순위 | +| IDEA-9 PWA WebUI | 프론트엔드 재작성 | 높음 | 높음 | 8순위 | diff --git a/eval/auto_tune.py b/eval/auto_tune.py new file mode 100644 index 0000000..4f8b92c --- /dev/null +++ b/eval/auto_tune.py @@ -0,0 +1,191 @@ +"""RAG 파라미터 자동 튜닝 스크립트 (IDEA-7) + +API 서버 없이 RetrieverService를 직접 사용해 파라미터 조합별 context 품질을 비교한다. +평가 지표: context_precision, context_recall (RAGAS) + +실행: + python eval/auto_tune.py [--dataset eval/dataset.jsonl] + +출력: + eval/results/tune_YYYYMMDD_HHMMSS.json — 조합별 점수 및 추천 파라미터 +""" +from __future__ import annotations + +import argparse +import json +import os +import sys +from datetime import datetime +from pathlib import Path + +ROOT = Path(__file__).resolve().parent.parent +sys.path.insert(0, str(ROOT)) +os.chdir(ROOT) + +from dotenv import load_dotenv +load_dotenv(ROOT / ".env") + +# ── Compatibility shim (run_ragas.py 동일) ───────────────────────────────────── +try: + import langchain_community.chat_models.vertexai # noqa: F401 +except ModuleNotFoundError: + try: + from langchain_google_vertexai import ChatVertexAI as _CV + _stub = type(sys)("langchain_community.chat_models.vertexai") + _stub.ChatVertexAI = _CV + sys.modules["langchain_community.chat_models.vertexai"] = _stub + except ImportError: + _stub = type(sys)("langchain_community.chat_models.vertexai") + _stub.ChatVertexAI = object + sys.modules["langchain_community.chat_models.vertexai"] = _stub + +from ragas import evaluate +from ragas.metrics import context_precision, context_recall +from ragas.embeddings import LangchainEmbeddingsWrapper +from ragas.llms import LangchainLLMWrapper +from datasets import Dataset +from ragas.run_config import RunConfig + +from container import Container +from services.rag.retriever_service import RetrieverService + +_container = Container() +_container.db_service().connect() +_container.db_service().init_schema() +_cfg = _container.config() + +# ── 튜닝 대상 파라미터 조합 ──────────────────────────────────────────────────── + +VARIANTS = [ + {"name": "baseline", "top_k": 3, "rerank_fetch_k": 10}, + {"name": "top_k_5", "top_k": 5, "rerank_fetch_k": 15}, + {"name": "top_k_2", "top_k": 2, "rerank_fetch_k": 6}, + {"name": "fetch_k_20", "top_k": 3, "rerank_fetch_k": 20}, +] + + +def _build_retriever(top_k: int, rerank_fetch_k: int) -> RetrieverService: + return RetrieverService( + embeddings=_container.embeddings(), + qdrant_url=_cfg.qdrant_url, + collection_name=_cfg.qdrant_collection, + top_k=top_k, + reranker=_container.reranker() if _cfg.reranker_enabled else None, + rerank_fetch_k=rerank_fetch_k, + sparse_embeddings=_container.sparse_embeddings() if _cfg.hybrid_search_enabled else None, + ) + + +def _build_evaluator(): + if os.getenv("OPENAI_API_KEY"): + from langchain_openai import ChatOpenAI + print("[AutoTune] 평가 LLM: OpenAI GPT-4o-mini") + return LangchainLLMWrapper(ChatOpenAI(model="gpt-4o-mini", temperature=0)) + if os.getenv("ANTHROPIC_API_KEY"): + from langchain_anthropic import ChatAnthropic + print("[AutoTune] 평가 LLM: Claude Haiku") + return LangchainLLMWrapper(ChatAnthropic(model="claude-haiku-4-5-20251001", temperature=0)) + print("[AutoTune] 평가 LLM: 로컬 Qwen3") + return LangchainLLMWrapper(_container.chat_model()) + + +def run(dataset_path: str) -> None: + samples = [] + with open(dataset_path, encoding="utf-8") as f: + for line in f: + line = line.strip() + if line: + samples.append(json.loads(line)) + + if not samples: + print(f"[오류] 데이터셋이 비어 있습니다: {dataset_path}") + sys.exit(1) + + print(f"[AutoTune] 파라미터 튜닝 시작 — {len(samples)}개 질문, {len(VARIANTS)}개 조합\n") + + llm = _build_evaluator() + emb = LangchainEmbeddingsWrapper(_container.embeddings()) + run_cfg = RunConfig(timeout=300, max_retries=1, max_workers=1) + + results = [] + + for variant in VARIANTS: + name = variant["name"] + print(f"── {name} (top_k={variant['top_k']}, fetch_k={variant['rerank_fetch_k']}) ──") + retriever = _build_retriever(variant["top_k"], variant["rerank_fetch_k"]) + + questions, ground_truths, contexts = [], [], [] + for s in samples: + q = s["question"] + docs = retriever.search(q) + contexts.append([d.page_content for d in docs]) + questions.append(q) + ground_truths.append(s["ground_truth"]) + print(f" [{q[:40]}] → {len(docs)}개 청크") + + ds = Dataset.from_dict({ + "question": questions, + "contexts": contexts, + "ground_truth": ground_truths, + }) + + result = evaluate( + ds, + metrics=[context_precision, context_recall], + llm=llm, + embeddings=emb, + run_config=run_cfg, + raise_exceptions=False, + ) + df = result.to_pandas() + + def _score(col: str) -> float | None: + if col not in df.columns: + return None + val = df[col].dropna().mean() + return float(val) if val == val else None + + scores = { + "context_precision": _score("context_precision"), + "context_recall": _score("context_recall"), + } + avg = sum(v for v in scores.values() if v is not None) / max( + sum(1 for v in scores.values() if v is not None), 1 + ) + results.append({**variant, "scores": scores, "avg": avg}) + print(f" precision={scores['context_precision']}, recall={scores['context_recall']}, avg={avg:.3f}\n") + + # ── 결과 출력 ────────────────────────────────────────────────────────────── + best = max(results, key=lambda r: r["avg"]) + + print("=" * 60) + print("AutoTune 결과") + print("=" * 60) + header = f"{'조합':<14} {'precision':>10} {'recall':>10} {'avg':>8}" + print(header) + print("-" * 60) + for r in sorted(results, key=lambda x: x["avg"], reverse=True): + marker = " ★" if r["name"] == best["name"] else "" + prec = f"{r['scores']['context_precision']:.3f}" if r['scores']['context_precision'] else "N/A" + rec = f"{r['scores']['context_recall']:.3f}" if r['scores']['context_recall'] else "N/A" + print(f" {r['name']:<12} {prec:>10} {rec:>10} {r['avg']:>8.3f}{marker}") + print("=" * 60) + print(f"\n추천: top_k={best['top_k']}, rerank_fetch_k={best['rerank_fetch_k']} ({best['name']})") + print(f" .env에 RAG_TOP_K={best['top_k']}, RERANKER_FETCH_K={best['rerank_fetch_k']} 설정\n") + + ts = datetime.now().strftime("%Y%m%d_%H%M%S") + results_dir = ROOT / "eval" / "results" + results_dir.mkdir(exist_ok=True) + out = results_dir / f"tune_{ts}.json" + out.write_text( + json.dumps({"timestamp": ts, "best": best, "all": results}, ensure_ascii=False, indent=2), + encoding="utf-8", + ) + print(f"JSON 저장: {out}") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="RAG 파라미터 자동 튜닝") + parser.add_argument("--dataset", default=str(ROOT / "eval" / "dataset.jsonl")) + args = parser.parse_args() + run(args.dataset) diff --git a/services/agent/agent_service.py b/services/agent/agent_service.py index f961987..0cd1143 100644 --- a/services/agent/agent_service.py +++ b/services/agent/agent_service.py @@ -1,16 +1,22 @@ +import asyncio import os import time import uuid -from typing import AsyncIterator +from typing import Annotated, AsyncIterator, TypedDict from langchain_core.messages import AIMessage, AIMessageChunk, HumanMessage, SystemMessage from langchain_core.runnables import RunnableConfig from langgraph.checkpoint.memory import MemorySaver from langgraph.config import get_stream_writer -from langgraph.graph import END, START, MessagesState, StateGraph +from langgraph.graph import END, START, MessagesState, StateGraph, add_messages from langgraph.prebuilt import ToolNode -from services.agent.tools import get_current_date, make_memory_tools, make_retriever_tool, make_search_tool, make_vision_tool, web_search + +class AgentState(TypedDict): + messages: Annotated[list, add_messages] + crag_fallback_used: bool + +from services.agent.tools import get_current_date, make_memory_tools, make_reminder_tools, make_retriever_tool, make_search_tool, make_vision_tool, web_search class AgentService: @@ -31,6 +37,10 @@ class AgentService: query_rewrite_enabled: bool = False, user_profile_repository=None, conversation_repository=None, + reminder_repository=None, + ingestion_service=None, + crag_enabled: bool = False, + conv_rag_enabled: bool = False, user_id: str = "default", ): self._system_prompt = system_prompt @@ -47,6 +57,9 @@ class AgentService: self._pending_history: list = [] self._user_id = user_id self._last_run_id: str | None = None + self._ingestion_service = ingestion_service + self._crag_enabled = crag_enabled + self._conv_rag_enabled = conv_rag_enabled if conversation_repository: try: @@ -75,6 +88,9 @@ class AgentService: if user_profile_repository is not None: remember_tool, recall_tool = make_memory_tools(user_profile_repository, user_id) self._base_tools += [remember_tool, recall_tool] + if reminder_repository is not None: + set_reminder_tool, list_reminders_tool = make_reminder_tools(reminder_repository, user_id) + self._base_tools += [set_reminder_tool, list_reminders_tool] self._vision_model = None # set via set_vision_model() self._llm_with_tools = chat_model.bind_tools(self._base_tools) self._chat_model = chat_model @@ -206,7 +222,61 @@ class AgentService: ) return {"messages": [new_msg]} - def route_after_agent(state: MessagesState) -> str: + async def crag_check_node(state: AgentState) -> dict: + """검색 결과 없을 때 web_search 자동 fallback 주입 (CRAG).""" + if state.get("crag_fallback_used", False): + return {} + + messages = state["messages"] + # 마지막 search_documents 결과 탐색 + last_search_msg = None + for msg in reversed(messages): + if hasattr(msg, "name") and msg.name == "search_documents": + last_search_msg = msg + break + + if not last_search_msg or "관련 문서를 찾을 수 없습니다" not in last_search_msg.content: + return {} + + # 해당 ToolMessage의 tool_call_id로 원본 AIMessage에서 검색 쿼리 추출 + tool_call_id = getattr(last_search_msg, "tool_call_id", None) + query = "" + for msg in reversed(messages): + if isinstance(msg, AIMessage) and msg.tool_calls: + for tc in msg.tool_calls: + if tc.get("id") == tool_call_id: + query = tc.get("args", {}).get("query", "") + break + if query: + break + + if not query: + return {} + + fallback_msg = AIMessage( + content="", + tool_calls=[{ + "id": str(uuid.uuid4()), + "name": "web_search", + "args": {"query": query}, + "type": "tool_call", + }], + ) + try: + writer = get_stream_writer() + writer({"__meta": f'\n[CRAG] 문서 없음 → 웹 검색으로 전환... ("{query}")\n'}) + except Exception: + pass + return {"messages": [fallback_msg], "crag_fallback_used": True} + + def route_after_crag(state: AgentState) -> str: + last_msg = state["messages"][-1] if state["messages"] else None + if (isinstance(last_msg, AIMessage) and last_msg.tool_calls + and state.get("crag_fallback_used", False)): + return "tools" + return "agent" + + def route_after_agent(state: AgentState) -> str: last_msg = state["messages"][-1] if not (hasattr(last_msg, "tool_calls") and last_msg.tool_calls): return END @@ -215,14 +285,19 @@ class AgentService: return "query_rewrite" return "tools" - builder = StateGraph(MessagesState) + builder = StateGraph(AgentState) builder.add_node("agent", call_model) builder.add_node("query_rewrite", query_rewrite_node) builder.add_node("tools", ToolNode(self._base_tools)) builder.add_edge(START, "agent") builder.add_conditional_edges("agent", route_after_agent) builder.add_edge("query_rewrite", "tools") - builder.add_edge("tools", "agent") + if crag_enabled: + builder.add_node("crag_check", crag_check_node) + builder.add_edge("tools", "crag_check") + builder.add_conditional_edges("crag_check", route_after_crag) + else: + builder.add_edge("tools", "agent") self._agent = builder.compile(checkpointer=MemorySaver()) @@ -261,7 +336,7 @@ class AgentService: self._pending_history = [] else: all_messages = [HumanMessage(content=user_input)] - messages = {"messages": all_messages} + messages = {"messages": all_messages, "crag_fallback_used": False} response_content = "" # 실제 답변 내용만 누적 (MySQL 저장용) pending_tool_calls: dict = {} # tool_call_id → {name, args} prev_node: str = "" @@ -391,6 +466,10 @@ class AgentService: except Exception as e: print(f"[Agent] 대화 저장 실패: {e}") + # 대화 내용을 RAG에 비동기 인덱싱 (IDEA-1) + if self._conv_rag_enabled and self._ingestion_service and response_content: + asyncio.create_task(self._maybe_index_conversation(user_input, response_content)) + if self._rag_show_sources and self._source_buffer: sources = [] for src in self._source_buffer: @@ -400,6 +479,41 @@ class AgentService: sources.append(entry) yield {"__sources": sources} + async def _maybe_index_conversation(self, user_input: str, response: str) -> None: + """대화 내용이 RAG에 저장할 만한 정보를 포함하면 Qdrant에 비동기 인덱싱.""" + if len(response) < 80: + return + + prompt = ( + "다음 대화에서 육아·금융·건강 등 나중에 검색할 만한 유용한 정보가 있으면 " + "핵심만 2~4문장으로 간결하게 요약하세요. " + "단순 인사, 날짜 확인, 수치 계산은 '없음'이라고만 답하세요.\n\n" + f"질문: {user_input}\n" + f"답변: {response[:600]}\n\n" + "요약 (또는 '없음'):" + ) + try: + result = await self._chat_model.bind(enable_thinking=False).ainvoke( + [HumanMessage(content=prompt)] + ) + summary = result.content.strip() + if not summary or summary == "없음" or len(summary) < 20: + return + from datetime import datetime + metadata = { + "source": "conversation", + "user_id": self._user_id, + "question": user_input[:100], + "timestamp": datetime.now().isoformat(), + } + loop = asyncio.get_event_loop() + await loop.run_in_executor( + None, self._ingestion_service.store_text, summary, metadata + ) + print(f"[ConvRAG] 인덱싱 완료: {summary[:60]}...") + except Exception as e: + print(f"[ConvRAG] 인덱싱 실패: {e}") + def reset(self) -> None: """새 thread_id로 대화 히스토리를 초기화한다.""" self._thread_id = str(uuid.uuid4()) diff --git a/services/agent/tools.py b/services/agent/tools.py index 24bd765..c14b923 100644 --- a/services/agent/tools.py +++ b/services/agent/tools.py @@ -1,4 +1,4 @@ -from datetime import date +from datetime import date, datetime from langchain_core.tools import tool @@ -74,6 +74,35 @@ def make_memory_tools(profile_repo, user_id: str = "default"): return remember_user_info, recall_user_info +def make_reminder_tools(reminder_repo, user_id: str = "default"): + """알림 등록/조회 Tool 쌍을 반환한다.""" + + @tool + def set_reminder(remind_date: str, message: str) -> str: + """특정 날짜에 텔레그램으로 알림을 보냅니다. + 예방접종, 병원 예약, 기념일 등 기억해야 할 날짜를 등록하세요. + - remind_date: 알림 날짜 (YYYY-MM-DD 형식). 날짜를 모르면 get_current_date를 먼저 호출하세요. + - message: 알림 내용 (구체적으로 작성) + 등록 시 D-7(7일 전), D-1(하루 전), D-0(당일) 세 번 알림이 발송됩니다.""" + try: + parsed = datetime.strptime(remind_date, "%Y-%m-%d").date() + except ValueError: + return f"날짜 형식이 잘못되었습니다. YYYY-MM-DD 형식으로 입력해 주세요. (예: 2026-07-01)" + reminder_repo.add(user_id, parsed, message) + return f"알림이 등록되었습니다. {remind_date}에 '{message}' 알림을 보내드릴게요." + + @tool + def list_reminders() -> str: + """등록된 예정 알림 목록을 조회합니다. (향후 30일 이내)""" + items = reminder_repo.get_upcoming(user_id, days_ahead=30) + if not items: + return "등록된 예정 알림이 없습니다." + lines = [f"- {r['remind_date']}: {r['message']}" for r in items] + return "등록된 알림 목록:\n" + "\n".join(lines) + + return set_reminder, list_reminders + + def make_search_tool(retriever_service, source_buffer: list | None = None): """RetrieverService를 클로저로 감싼 문서 검색 Tool을 반환합니다. diff --git a/services/db/mysql_service.py b/services/db/mysql_service.py index ec55001..e625017 100644 --- a/services/db/mysql_service.py +++ b/services/db/mysql_service.py @@ -110,6 +110,18 @@ class DatabaseService: created_at DATETIME DEFAULT CURRENT_TIMESTAMP ) """) + cursor.execute(""" + CREATE TABLE IF NOT EXISTS td_reminders ( + id INT AUTO_INCREMENT PRIMARY KEY, + user_id VARCHAR(50) NOT NULL, + remind_date DATE NOT NULL, + message TEXT NOT NULL, + sent_d0 TINYINT(1) NOT NULL DEFAULT 0, + sent_d1 TINYINT(1) NOT NULL DEFAULT 0, + sent_d7 TINYINT(1) NOT NULL DEFAULT 0, + created_at DATETIME DEFAULT CURRENT_TIMESTAMP + ) + """) conn.commit() self._migrate_schema(conn) diff --git a/services/db/reminder_repository.py b/services/db/reminder_repository.py new file mode 100644 index 0000000..71adc3e --- /dev/null +++ b/services/db/reminder_repository.py @@ -0,0 +1,57 @@ +from __future__ import annotations +from datetime import date, timedelta + +from services.db.mysql_service import DatabaseService + + +class ReminderRepository: + """td_reminders 테이블을 통한 알림 저장소.""" + + def __init__(self, db: DatabaseService): + self._db = db + + def add(self, user_id: str, remind_date: date, message: str) -> int: + return self._db.execute_write( + "INSERT INTO td_reminders (user_id, remind_date, message) VALUES (%s, %s, %s)", + (user_id, remind_date.isoformat(), message), + ) + + def get_upcoming(self, user_id: str, days_ahead: int = 30) -> list[dict]: + today = date.today() + limit = today + timedelta(days=days_ahead) + return self._db.execute( + """SELECT id, remind_date, message + FROM td_reminders + WHERE user_id = %s AND remind_date >= %s AND remind_date <= %s + ORDER BY remind_date""", + (user_id, today.isoformat(), limit.isoformat()), + ) + + def get_due(self, today: date) -> list[dict]: + """D-0(당일), D-1(내일), D-7(7일 후) 미발송 알림 반환.""" + d1 = today + timedelta(days=1) + d7 = today + timedelta(days=7) + return self._db.execute( + """SELECT *, + CASE + WHEN remind_date = %s THEN 'd0' + WHEN remind_date = %s THEN 'd1' + WHEN remind_date = %s THEN 'd7' + END AS notify_type + FROM td_reminders + WHERE (remind_date = %s AND sent_d0 = 0) + OR (remind_date = %s AND sent_d1 = 0) + OR (remind_date = %s AND sent_d7 = 0)""", + ( + today.isoformat(), d1.isoformat(), d7.isoformat(), + today.isoformat(), d1.isoformat(), d7.isoformat(), + ), + ) + + def mark_sent(self, reminder_id: int, notify_type: str) -> None: + col = {"d0": "sent_d0", "d1": "sent_d1", "d7": "sent_d7"}.get(notify_type) + if col: + self._db.execute_write( + f"UPDATE td_reminders SET {col} = 1 WHERE id = %s", + (reminder_id,), + ) diff --git a/services/rag/ingestion_service.py b/services/rag/ingestion_service.py index fe83f2d..b6c415e 100644 --- a/services/rag/ingestion_service.py +++ b/services/rag/ingestion_service.py @@ -59,6 +59,21 @@ class IngestionService: except Exception: pass # 컬렉션이 없을 때(최초 수집) 무시 + def store_text(self, text: str, metadata: dict) -> None: + """단일 텍스트를 Qdrant에 직접 저장 (semantic chunking 없이).""" + from langchain_core.documents import Document + doc = Document(page_content=text, metadata=metadata) + kwargs = dict( + documents=[doc], + embedding=self._embeddings, + url=self._qdrant_url, + collection_name=self._collection_name, + ) + if self._sparse_embeddings: + kwargs["sparse_embedding"] = self._sparse_embeddings + kwargs["retrieval_mode"] = RetrievalMode.HYBRID + QdrantVectorStore.from_documents(**kwargs) + def ingest(self, file_paths: list[str]) -> int: self._ensure_collection_schema() docs = [] diff --git a/services/scheduler_service.py b/services/scheduler_service.py new file mode 100644 index 0000000..ddc5ab4 --- /dev/null +++ b/services/scheduler_service.py @@ -0,0 +1,83 @@ +from __future__ import annotations +import asyncio +import json +import logging +import urllib.error +import urllib.parse +import urllib.request +from datetime import date + +logger = logging.getLogger(__name__) + +_NOTIFY_PREFIX = { + "d0": "🔔 오늘 일정", + "d1": "📅 내일 일정", + "d7": "📆 7일 후 일정", +} + + +class SchedulerService: + """asyncio 태스크 기반 알림 스케줄러. + + 매 60초마다 td_reminders를 확인해 D-7/D-1/D-0 Telegram 알림을 발송한다. + TELEGRAM_BOT_TOKEN이 비어 있으면 발송 없이 로그만 출력한다. + """ + + def __init__(self, reminder_repo, bot_token: str, user_map_json: str): + self._repo = reminder_repo + self._token = bot_token + try: + self._user_map: dict[str, str] = json.loads(user_map_json) if user_map_json else {} + except Exception: + self._user_map = {} + self._task: asyncio.Task | None = None + + def start(self) -> None: + self._task = asyncio.create_task(self._loop()) + logger.info("[Scheduler] 알림 스케줄러 시작 (60초 간격)") + + def shutdown(self) -> None: + if self._task: + self._task.cancel() + logger.info("[Scheduler] 알림 스케줄러 종료") + + async def _loop(self) -> None: + while True: + try: + await self._check_reminders() + except Exception as e: + logger.error(f"[Scheduler] 알림 확인 중 오류: {e}") + await asyncio.sleep(60) + + async def _check_reminders(self) -> None: + today = date.today() + reminders = self._repo.get_due(today) + for r in reminders: + notify_type = r.get("notify_type") + if not notify_type: + continue + text = self._format_message(r["message"], notify_type, r["remind_date"]) + chat_id = self._user_map.get(r["user_id"]) + if self._token and chat_id: + try: + self._send_telegram(chat_id, text) + except Exception as e: + logger.error(f"[Scheduler] Telegram 발송 실패 (user={r['user_id']}): {e}") + continue + else: + logger.info(f"[Scheduler] 알림(Telegram 미설정): {text}") + try: + self._repo.mark_sent(r["id"], notify_type) + except Exception as e: + logger.error(f"[Scheduler] mark_sent 실패: {e}") + + def _format_message(self, message: str, notify_type: str, remind_date) -> str: + prefix = _NOTIFY_PREFIX.get(notify_type, "알림") + return f"[율봇 알림] {prefix}\n날짜: {remind_date}\n{message}" + + def _send_telegram(self, chat_id: str, text: str) -> None: + url = f"https://api.telegram.org/bot{self._token}/sendMessage" + data = urllib.parse.urlencode({"chat_id": chat_id, "text": text}).encode() + req = urllib.request.Request(url, data=data, method="POST") + with urllib.request.urlopen(req, timeout=10): + pass