IDEA-2/1/5/7: 스마트 알림, 대화 기반 RAG, CRAG, 파라미터 자동 튜닝

- IDEA-2 스마트 알림: td_reminders 테이블, set_reminder/list_reminders 도구,
  SchedulerService(asyncio 60초 루프, D-7/D-1/D-0 Telegram push),
  FastAPI lifespan 연동, GET /reminders/{user_id} 엔드포인트

- IDEA-1 대화 기반 RAG: IngestionService.store_text() 추가,
  AgentService._maybe_index_conversation() — 응답 후 LLM 판단 → Qdrant 저장
  (CONV_RAG_ENABLED=true 활성화, background task로 응답 속도 무관)

- IDEA-5 CRAG: AgentState에 crag_fallback_used 플래그 추가,
  crag_check LangGraph 노드 — search_documents 결과 없으면 web_search 자동 주입,
  route_after_crag으로 fallback 1회 루프 제어 (CRAG_ENABLED=true 활성화)

- IDEA-7 RAG Auto-Eval: eval/auto_tune.py — API 서버 없이 파라미터 조합별
  context_precision/recall 비교, 최적 설정 추천

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
sal
2026-06-04 10:04:05 +09:00
parent c264573a67
commit 0b50444e43
11 changed files with 715 additions and 11 deletions
+121 -7
View File
@@ -1,16 +1,22 @@
import asyncio
import os
import time
import uuid
from typing import AsyncIterator
from typing import Annotated, AsyncIterator, TypedDict
from langchain_core.messages import AIMessage, AIMessageChunk, HumanMessage, SystemMessage
from langchain_core.runnables import RunnableConfig
from langgraph.checkpoint.memory import MemorySaver
from langgraph.config import get_stream_writer
from langgraph.graph import END, START, MessagesState, StateGraph
from langgraph.graph import END, START, MessagesState, StateGraph, add_messages
from langgraph.prebuilt import ToolNode
from services.agent.tools import get_current_date, make_memory_tools, make_retriever_tool, make_search_tool, make_vision_tool, web_search
class AgentState(TypedDict):
messages: Annotated[list, add_messages]
crag_fallback_used: bool
from services.agent.tools import get_current_date, make_memory_tools, make_reminder_tools, make_retriever_tool, make_search_tool, make_vision_tool, web_search
class AgentService:
@@ -31,6 +37,10 @@ class AgentService:
query_rewrite_enabled: bool = False,
user_profile_repository=None,
conversation_repository=None,
reminder_repository=None,
ingestion_service=None,
crag_enabled: bool = False,
conv_rag_enabled: bool = False,
user_id: str = "default",
):
self._system_prompt = system_prompt
@@ -47,6 +57,9 @@ class AgentService:
self._pending_history: list = []
self._user_id = user_id
self._last_run_id: str | None = None
self._ingestion_service = ingestion_service
self._crag_enabled = crag_enabled
self._conv_rag_enabled = conv_rag_enabled
if conversation_repository:
try:
@@ -75,6 +88,9 @@ class AgentService:
if user_profile_repository is not None:
remember_tool, recall_tool = make_memory_tools(user_profile_repository, user_id)
self._base_tools += [remember_tool, recall_tool]
if reminder_repository is not None:
set_reminder_tool, list_reminders_tool = make_reminder_tools(reminder_repository, user_id)
self._base_tools += [set_reminder_tool, list_reminders_tool]
self._vision_model = None # set via set_vision_model()
self._llm_with_tools = chat_model.bind_tools(self._base_tools)
self._chat_model = chat_model
@@ -206,7 +222,61 @@ class AgentService:
)
return {"messages": [new_msg]}
def route_after_agent(state: MessagesState) -> str:
async def crag_check_node(state: AgentState) -> dict:
"""검색 결과 없을 때 web_search 자동 fallback 주입 (CRAG)."""
if state.get("crag_fallback_used", False):
return {}
messages = state["messages"]
# 마지막 search_documents 결과 탐색
last_search_msg = None
for msg in reversed(messages):
if hasattr(msg, "name") and msg.name == "search_documents":
last_search_msg = msg
break
if not last_search_msg or "관련 문서를 찾을 수 없습니다" not in last_search_msg.content:
return {}
# 해당 ToolMessage의 tool_call_id로 원본 AIMessage에서 검색 쿼리 추출
tool_call_id = getattr(last_search_msg, "tool_call_id", None)
query = ""
for msg in reversed(messages):
if isinstance(msg, AIMessage) and msg.tool_calls:
for tc in msg.tool_calls:
if tc.get("id") == tool_call_id:
query = tc.get("args", {}).get("query", "")
break
if query:
break
if not query:
return {}
fallback_msg = AIMessage(
content="",
tool_calls=[{
"id": str(uuid.uuid4()),
"name": "web_search",
"args": {"query": query},
"type": "tool_call",
}],
)
try:
writer = get_stream_writer()
writer({"__meta": f'\n[CRAG] 문서 없음 → 웹 검색으로 전환... ("{query}")\n'})
except Exception:
pass
return {"messages": [fallback_msg], "crag_fallback_used": True}
def route_after_crag(state: AgentState) -> str:
last_msg = state["messages"][-1] if state["messages"] else None
if (isinstance(last_msg, AIMessage) and last_msg.tool_calls
and state.get("crag_fallback_used", False)):
return "tools"
return "agent"
def route_after_agent(state: AgentState) -> str:
last_msg = state["messages"][-1]
if not (hasattr(last_msg, "tool_calls") and last_msg.tool_calls):
return END
@@ -215,14 +285,19 @@ class AgentService:
return "query_rewrite"
return "tools"
builder = StateGraph(MessagesState)
builder = StateGraph(AgentState)
builder.add_node("agent", call_model)
builder.add_node("query_rewrite", query_rewrite_node)
builder.add_node("tools", ToolNode(self._base_tools))
builder.add_edge(START, "agent")
builder.add_conditional_edges("agent", route_after_agent)
builder.add_edge("query_rewrite", "tools")
builder.add_edge("tools", "agent")
if crag_enabled:
builder.add_node("crag_check", crag_check_node)
builder.add_edge("tools", "crag_check")
builder.add_conditional_edges("crag_check", route_after_crag)
else:
builder.add_edge("tools", "agent")
self._agent = builder.compile(checkpointer=MemorySaver())
@@ -261,7 +336,7 @@ class AgentService:
self._pending_history = []
else:
all_messages = [HumanMessage(content=user_input)]
messages = {"messages": all_messages}
messages = {"messages": all_messages, "crag_fallback_used": False}
response_content = "" # 실제 답변 내용만 누적 (MySQL 저장용)
pending_tool_calls: dict = {} # tool_call_id → {name, args}
prev_node: str = ""
@@ -391,6 +466,10 @@ class AgentService:
except Exception as e:
print(f"[Agent] 대화 저장 실패: {e}")
# 대화 내용을 RAG에 비동기 인덱싱 (IDEA-1)
if self._conv_rag_enabled and self._ingestion_service and response_content:
asyncio.create_task(self._maybe_index_conversation(user_input, response_content))
if self._rag_show_sources and self._source_buffer:
sources = []
for src in self._source_buffer:
@@ -400,6 +479,41 @@ class AgentService:
sources.append(entry)
yield {"__sources": sources}
async def _maybe_index_conversation(self, user_input: str, response: str) -> None:
"""대화 내용이 RAG에 저장할 만한 정보를 포함하면 Qdrant에 비동기 인덱싱."""
if len(response) < 80:
return
prompt = (
"다음 대화에서 육아·금융·건강 등 나중에 검색할 만한 유용한 정보가 있으면 "
"핵심만 2~4문장으로 간결하게 요약하세요. "
"단순 인사, 날짜 확인, 수치 계산은 '없음'이라고만 답하세요.\n\n"
f"질문: {user_input}\n"
f"답변: {response[:600]}\n\n"
"요약 (또는 '없음'):"
)
try:
result = await self._chat_model.bind(enable_thinking=False).ainvoke(
[HumanMessage(content=prompt)]
)
summary = result.content.strip()
if not summary or summary == "없음" or len(summary) < 20:
return
from datetime import datetime
metadata = {
"source": "conversation",
"user_id": self._user_id,
"question": user_input[:100],
"timestamp": datetime.now().isoformat(),
}
loop = asyncio.get_event_loop()
await loop.run_in_executor(
None, self._ingestion_service.store_text, summary, metadata
)
print(f"[ConvRAG] 인덱싱 완료: {summary[:60]}...")
except Exception as e:
print(f"[ConvRAG] 인덱싱 실패: {e}")
def reset(self) -> None:
"""새 thread_id로 대화 히스토리를 초기화한다."""
self._thread_id = str(uuid.uuid4())
+30 -1
View File
@@ -1,4 +1,4 @@
from datetime import date
from datetime import date, datetime
from langchain_core.tools import tool
@@ -74,6 +74,35 @@ def make_memory_tools(profile_repo, user_id: str = "default"):
return remember_user_info, recall_user_info
def make_reminder_tools(reminder_repo, user_id: str = "default"):
"""알림 등록/조회 Tool 쌍을 반환한다."""
@tool
def set_reminder(remind_date: str, message: str) -> str:
"""특정 날짜에 텔레그램으로 알림을 보냅니다.
예방접종, 병원 예약, 기념일 등 기억해야 할 날짜를 등록하세요.
- remind_date: 알림 날짜 (YYYY-MM-DD 형식). 날짜를 모르면 get_current_date를 먼저 호출하세요.
- message: 알림 내용 (구체적으로 작성)
등록 시 D-7(7일 전), D-1(하루 전), D-0(당일) 세 번 알림이 발송됩니다."""
try:
parsed = datetime.strptime(remind_date, "%Y-%m-%d").date()
except ValueError:
return f"날짜 형식이 잘못되었습니다. YYYY-MM-DD 형식으로 입력해 주세요. (예: 2026-07-01)"
reminder_repo.add(user_id, parsed, message)
return f"알림이 등록되었습니다. {remind_date}'{message}' 알림을 보내드릴게요."
@tool
def list_reminders() -> str:
"""등록된 예정 알림 목록을 조회합니다. (향후 30일 이내)"""
items = reminder_repo.get_upcoming(user_id, days_ahead=30)
if not items:
return "등록된 예정 알림이 없습니다."
lines = [f"- {r['remind_date']}: {r['message']}" for r in items]
return "등록된 알림 목록:\n" + "\n".join(lines)
return set_reminder, list_reminders
def make_search_tool(retriever_service, source_buffer: list | None = None):
"""RetrieverService를 클로저로 감싼 문서 검색 Tool을 반환합니다.
+12
View File
@@ -110,6 +110,18 @@ class DatabaseService:
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS td_reminders (
id INT AUTO_INCREMENT PRIMARY KEY,
user_id VARCHAR(50) NOT NULL,
remind_date DATE NOT NULL,
message TEXT NOT NULL,
sent_d0 TINYINT(1) NOT NULL DEFAULT 0,
sent_d1 TINYINT(1) NOT NULL DEFAULT 0,
sent_d7 TINYINT(1) NOT NULL DEFAULT 0,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
""")
conn.commit()
self._migrate_schema(conn)
+57
View File
@@ -0,0 +1,57 @@
from __future__ import annotations
from datetime import date, timedelta
from services.db.mysql_service import DatabaseService
class ReminderRepository:
"""td_reminders 테이블을 통한 알림 저장소."""
def __init__(self, db: DatabaseService):
self._db = db
def add(self, user_id: str, remind_date: date, message: str) -> int:
return self._db.execute_write(
"INSERT INTO td_reminders (user_id, remind_date, message) VALUES (%s, %s, %s)",
(user_id, remind_date.isoformat(), message),
)
def get_upcoming(self, user_id: str, days_ahead: int = 30) -> list[dict]:
today = date.today()
limit = today + timedelta(days=days_ahead)
return self._db.execute(
"""SELECT id, remind_date, message
FROM td_reminders
WHERE user_id = %s AND remind_date >= %s AND remind_date <= %s
ORDER BY remind_date""",
(user_id, today.isoformat(), limit.isoformat()),
)
def get_due(self, today: date) -> list[dict]:
"""D-0(당일), D-1(내일), D-7(7일 후) 미발송 알림 반환."""
d1 = today + timedelta(days=1)
d7 = today + timedelta(days=7)
return self._db.execute(
"""SELECT *,
CASE
WHEN remind_date = %s THEN 'd0'
WHEN remind_date = %s THEN 'd1'
WHEN remind_date = %s THEN 'd7'
END AS notify_type
FROM td_reminders
WHERE (remind_date = %s AND sent_d0 = 0)
OR (remind_date = %s AND sent_d1 = 0)
OR (remind_date = %s AND sent_d7 = 0)""",
(
today.isoformat(), d1.isoformat(), d7.isoformat(),
today.isoformat(), d1.isoformat(), d7.isoformat(),
),
)
def mark_sent(self, reminder_id: int, notify_type: str) -> None:
col = {"d0": "sent_d0", "d1": "sent_d1", "d7": "sent_d7"}.get(notify_type)
if col:
self._db.execute_write(
f"UPDATE td_reminders SET {col} = 1 WHERE id = %s",
(reminder_id,),
)
+15
View File
@@ -59,6 +59,21 @@ class IngestionService:
except Exception:
pass # 컬렉션이 없을 때(최초 수집) 무시
def store_text(self, text: str, metadata: dict) -> None:
"""단일 텍스트를 Qdrant에 직접 저장 (semantic chunking 없이)."""
from langchain_core.documents import Document
doc = Document(page_content=text, metadata=metadata)
kwargs = dict(
documents=[doc],
embedding=self._embeddings,
url=self._qdrant_url,
collection_name=self._collection_name,
)
if self._sparse_embeddings:
kwargs["sparse_embedding"] = self._sparse_embeddings
kwargs["retrieval_mode"] = RetrievalMode.HYBRID
QdrantVectorStore.from_documents(**kwargs)
def ingest(self, file_paths: list[str]) -> int:
self._ensure_collection_schema()
docs = []
+83
View File
@@ -0,0 +1,83 @@
from __future__ import annotations
import asyncio
import json
import logging
import urllib.error
import urllib.parse
import urllib.request
from datetime import date
logger = logging.getLogger(__name__)
_NOTIFY_PREFIX = {
"d0": "🔔 오늘 일정",
"d1": "📅 내일 일정",
"d7": "📆 7일 후 일정",
}
class SchedulerService:
"""asyncio 태스크 기반 알림 스케줄러.
매 60초마다 td_reminders를 확인해 D-7/D-1/D-0 Telegram 알림을 발송한다.
TELEGRAM_BOT_TOKEN이 비어 있으면 발송 없이 로그만 출력한다.
"""
def __init__(self, reminder_repo, bot_token: str, user_map_json: str):
self._repo = reminder_repo
self._token = bot_token
try:
self._user_map: dict[str, str] = json.loads(user_map_json) if user_map_json else {}
except Exception:
self._user_map = {}
self._task: asyncio.Task | None = None
def start(self) -> None:
self._task = asyncio.create_task(self._loop())
logger.info("[Scheduler] 알림 스케줄러 시작 (60초 간격)")
def shutdown(self) -> None:
if self._task:
self._task.cancel()
logger.info("[Scheduler] 알림 스케줄러 종료")
async def _loop(self) -> None:
while True:
try:
await self._check_reminders()
except Exception as e:
logger.error(f"[Scheduler] 알림 확인 중 오류: {e}")
await asyncio.sleep(60)
async def _check_reminders(self) -> None:
today = date.today()
reminders = self._repo.get_due(today)
for r in reminders:
notify_type = r.get("notify_type")
if not notify_type:
continue
text = self._format_message(r["message"], notify_type, r["remind_date"])
chat_id = self._user_map.get(r["user_id"])
if self._token and chat_id:
try:
self._send_telegram(chat_id, text)
except Exception as e:
logger.error(f"[Scheduler] Telegram 발송 실패 (user={r['user_id']}): {e}")
continue
else:
logger.info(f"[Scheduler] 알림(Telegram 미설정): {text}")
try:
self._repo.mark_sent(r["id"], notify_type)
except Exception as e:
logger.error(f"[Scheduler] mark_sent 실패: {e}")
def _format_message(self, message: str, notify_type: str, remind_date) -> str:
prefix = _NOTIFY_PREFIX.get(notify_type, "알림")
return f"[율봇 알림] {prefix}\n날짜: {remind_date}\n{message}"
def _send_telegram(self, chat_id: str, text: str) -> None:
url = f"https://api.telegram.org/bot{self._token}/sendMessage"
data = urllib.parse.urlencode({"chat_id": chat_id, "text": text}).encode()
req = urllib.request.Request(url, data=data, method="POST")
with urllib.request.urlopen(req, timeout=10):
pass