c264573a67
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
412 lines
21 KiB
Python
412 lines
21 KiB
Python
import os
|
|
import time
|
|
import uuid
|
|
from typing import AsyncIterator
|
|
|
|
from langchain_core.messages import AIMessage, AIMessageChunk, HumanMessage, SystemMessage
|
|
from langchain_core.runnables import RunnableConfig
|
|
from langgraph.checkpoint.memory import MemorySaver
|
|
from langgraph.config import get_stream_writer
|
|
from langgraph.graph import END, START, MessagesState, StateGraph
|
|
from langgraph.prebuilt import ToolNode
|
|
|
|
from services.agent.tools import get_current_date, make_memory_tools, make_retriever_tool, make_search_tool, make_vision_tool, web_search
|
|
|
|
|
|
class AgentService:
|
|
"""LangGraph ReAct 에이전트 서비스.
|
|
|
|
Tool Calling 루프, 대화 히스토리, 조건부 라우팅을 LangGraph가 담당한다.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
chat_model,
|
|
retriever_service,
|
|
system_prompt: str,
|
|
rag_verbose: bool = False,
|
|
rag_show_sources: bool = False,
|
|
langgraph_verbose: bool = False,
|
|
think_verbose: bool = False,
|
|
query_rewrite_enabled: bool = False,
|
|
user_profile_repository=None,
|
|
conversation_repository=None,
|
|
user_id: str = "default",
|
|
):
|
|
self._system_prompt = system_prompt
|
|
self._rag_verbose = rag_verbose
|
|
self._rag_show_sources = rag_show_sources
|
|
self._langgraph_verbose = langgraph_verbose
|
|
self._think_verbose = think_verbose
|
|
self._query_rewrite_enabled = query_rewrite_enabled
|
|
self._source_buffer: list[dict] = []
|
|
self._thread_id = "default"
|
|
self._profile_repo = user_profile_repository
|
|
self._conv_repo = conversation_repository
|
|
self._conv_id: int | None = None
|
|
self._pending_history: list = []
|
|
self._user_id = user_id
|
|
self._last_run_id: str | None = None
|
|
|
|
if conversation_repository:
|
|
try:
|
|
self._conv_id = conversation_repository.get_latest_conversation_id(user_id)
|
|
if self._conv_id is None:
|
|
self._conv_id = conversation_repository.create_conversation(user_id)
|
|
else:
|
|
turns = conversation_repository.load_turns_after(self._conv_id, None, limit=10)
|
|
for turn in turns:
|
|
if turn["role"] == "user":
|
|
self._pending_history.append(HumanMessage(content=turn["content"]))
|
|
elif turn["role"] == "assistant":
|
|
self._pending_history.append(AIMessage(content=turn["content"]))
|
|
if self._pending_history:
|
|
print(f"[Agent] 이전 대화 {len(self._pending_history) // 2}턴 복원")
|
|
except Exception as e:
|
|
print(f"[Agent] 이력 복원 실패: {e}")
|
|
self._conv_id = None
|
|
self._pending_history = []
|
|
|
|
if rag_show_sources:
|
|
search_tool = make_search_tool(retriever_service, self._source_buffer)
|
|
else:
|
|
search_tool = make_retriever_tool(retriever_service)
|
|
self._base_tools = [search_tool, web_search, get_current_date]
|
|
if user_profile_repository is not None:
|
|
remember_tool, recall_tool = make_memory_tools(user_profile_repository, user_id)
|
|
self._base_tools += [remember_tool, recall_tool]
|
|
self._vision_model = None # set via set_vision_model()
|
|
self._llm_with_tools = chat_model.bind_tools(self._base_tools)
|
|
self._chat_model = chat_model
|
|
|
|
async def call_model(state: MessagesState, config: RunnableConfig) -> dict:
|
|
from datetime import date
|
|
system_content = (
|
|
"【언어 규칙】모든 사고 과정(thinking)과 답변을 반드시 한국어로 작성하세요. "
|
|
"영어 사용 금지. Think in Korean only.\n\n"
|
|
f"오늘 날짜: {date.today().isoformat()}\n\n"
|
|
+ self._system_prompt
|
|
)
|
|
if self._profile_repo:
|
|
profile = self._profile_repo.get_all(self._user_id)
|
|
if profile:
|
|
import re
|
|
from datetime import date
|
|
today = date.today()
|
|
current_year = today.year
|
|
_DATE_KEYS = ("생년월일", "생년", "생일")
|
|
lines = []
|
|
for k, v in profile.items():
|
|
if any(term in k for term in _DATE_KEYS):
|
|
full_date = re.search(r'(\d{4})[년\-/.]\s*(\d{1,2})[월\-/.]\s*(\d{1,2})', v)
|
|
year_only = re.search(r'\b(19|20)\d{2}\b', v)
|
|
age_key = re.sub(r'생년월일|생년|생일', '나이', k)
|
|
if full_date:
|
|
by, bm, bd = int(full_date.group(1)), int(full_date.group(2)), int(full_date.group(3))
|
|
korean_age = current_year - by + 1
|
|
intl_age = current_year - by - (1 if today < date(current_year, bm, bd) else 0)
|
|
lines.append(f"- {age_key}: 한국 나이 {korean_age}세, 만 {intl_age}세")
|
|
elif year_only:
|
|
by = int(year_only.group())
|
|
korean_age = current_year - by + 1
|
|
intl_age = current_year - by
|
|
lines.append(f"- {age_key}: 한국 나이 {korean_age}세, 만 {intl_age}~{intl_age - 1}세 (생일에 따라 다름)")
|
|
else:
|
|
lines.append(f"- {k}: {v}")
|
|
else:
|
|
lines.append(f"- {k}: {v}")
|
|
system_content += f"\n\n## 사용자 정보 (이전 대화에서 기억된 내용)\n" + "\n".join(lines)
|
|
msgs = [SystemMessage(content=system_content)] + state["messages"]
|
|
thinking_acc, content_acc, tool_calls_acc = "", "", []
|
|
try:
|
|
writer = get_stream_writer()
|
|
except Exception:
|
|
writer = None
|
|
# LLM 추론 시작 직전에 즉시 신호 emit — UI에 "분석 중" 표시
|
|
if writer:
|
|
writer({"__start": True})
|
|
# 이미지 첨부 시 vision tool 동적 추가 (요청별로 독립적으로 바인딩)
|
|
cfg = config.get("configurable", {})
|
|
show_thinking = cfg.get("show_thinking", False)
|
|
image_path = cfg.get("image_path")
|
|
if image_path and self._vision_model:
|
|
tools_for_req = self._base_tools + [make_vision_tool(self._vision_model, image_path)]
|
|
_llm_base = self._chat_model.bind_tools(tools_for_req)
|
|
else:
|
|
_llm_base = self._llm_with_tools
|
|
_llm = _llm_base.bind(enable_thinking=show_thinking) if show_thinking != chat_model.enable_thinking else _llm_base
|
|
async for chunk in _llm.astream(msgs, config):
|
|
t = chunk.additional_kwargs.get("thinking", "")
|
|
if t:
|
|
thinking_acc += t
|
|
if writer:
|
|
writer({"__thinking": t})
|
|
if chunk.content and isinstance(chunk.content, str):
|
|
content_acc += chunk.content
|
|
if chunk.tool_calls:
|
|
tool_calls_acc.extend(chunk.tool_calls)
|
|
extra = {"thinking": thinking_acc} if thinking_acc else {}
|
|
return {"messages": [AIMessage(
|
|
content=content_acc,
|
|
tool_calls=tool_calls_acc,
|
|
additional_kwargs=extra,
|
|
)]}
|
|
|
|
async def query_rewrite_node(state: MessagesState, config: RunnableConfig) -> dict:
|
|
last_msg = state["messages"][-1]
|
|
if not (hasattr(last_msg, "tool_calls") and last_msg.tool_calls):
|
|
return {}
|
|
|
|
# 최근 사용자 메시지 2개를 컨텍스트로 활용 (대명사·지시어 해소)
|
|
recent_human = [m.content for m in state["messages"][:-1]
|
|
if isinstance(m, HumanMessage)][-2:]
|
|
ctx = ("\n\n이전 대화 컨텍스트:\n" + "\n".join(f"- {m}" for m in recent_human)
|
|
if recent_human else "")
|
|
|
|
try:
|
|
writer = get_stream_writer()
|
|
except Exception:
|
|
writer = None
|
|
|
|
_rewrite_llm = chat_model.bind(enable_thinking=False)
|
|
new_tool_calls = []
|
|
for tc in last_msg.tool_calls:
|
|
if tc["name"] == "search_documents":
|
|
original = tc["args"].get("query", "")
|
|
prompt = (
|
|
f"다음 구어체 질문을 문서 검색에 최적화된 키워드 중심 문장으로 변환하세요.{ctx}\n\n"
|
|
f"규칙:\n"
|
|
f"- 핵심 개념과 전문용어를 포함하세요\n"
|
|
f"- 대명사(이것, 그것, 그 논문 등)는 구체적인 명칭으로 교체하세요\n"
|
|
f"- 변환된 질문만 한 문장으로 출력하세요. 부가 설명 없이 질문만 출력하세요\n\n"
|
|
f"원본 질문: {original}\n최적화된 질문:"
|
|
)
|
|
try:
|
|
result = await _rewrite_llm.ainvoke([HumanMessage(content=prompt)])
|
|
rewritten = result.content.strip()
|
|
except Exception as e:
|
|
print(f"[QueryRewrite] 실패: {e}")
|
|
rewritten = original
|
|
if rewritten and rewritten != original:
|
|
new_tool_calls.append({**tc, "args": {**tc["args"], "query": rewritten}})
|
|
if writer:
|
|
writer({"__query_rewrite": {"original": original, "rewritten": rewritten}})
|
|
else:
|
|
new_tool_calls.append(tc)
|
|
else:
|
|
new_tool_calls.append(tc)
|
|
|
|
if not last_msg.id:
|
|
return {}
|
|
new_msg = AIMessage(
|
|
id=last_msg.id,
|
|
content=last_msg.content,
|
|
tool_calls=new_tool_calls,
|
|
additional_kwargs=last_msg.additional_kwargs,
|
|
)
|
|
return {"messages": [new_msg]}
|
|
|
|
def route_after_agent(state: MessagesState) -> str:
|
|
last_msg = state["messages"][-1]
|
|
if not (hasattr(last_msg, "tool_calls") and last_msg.tool_calls):
|
|
return END
|
|
if self._query_rewrite_enabled:
|
|
if any(tc["name"] == "search_documents" for tc in last_msg.tool_calls):
|
|
return "query_rewrite"
|
|
return "tools"
|
|
|
|
builder = StateGraph(MessagesState)
|
|
builder.add_node("agent", call_model)
|
|
builder.add_node("query_rewrite", query_rewrite_node)
|
|
builder.add_node("tools", ToolNode(self._base_tools))
|
|
builder.add_edge(START, "agent")
|
|
builder.add_conditional_edges("agent", route_after_agent)
|
|
builder.add_edge("query_rewrite", "tools")
|
|
builder.add_edge("tools", "agent")
|
|
|
|
self._agent = builder.compile(checkpointer=MemorySaver())
|
|
|
|
@property
|
|
def last_run_id(self) -> str | None:
|
|
return self._last_run_id
|
|
|
|
def set_vision_model(self, vision_model) -> None:
|
|
self._vision_model = vision_model
|
|
|
|
def _make_config(self, show_thinking: bool = False, image_path: str | None = None) -> dict:
|
|
cfg: dict = {"thread_id": self._thread_id, "show_thinking": show_thinking}
|
|
if image_path:
|
|
cfg["image_path"] = image_path
|
|
return {"configurable": cfg}
|
|
|
|
async def stream_response(
|
|
self,
|
|
user_input: str,
|
|
show_thinking: bool | None = None,
|
|
image_path: str | None = None,
|
|
) -> AsyncIterator[str | dict]:
|
|
"""사용자 입력을 받아 응답 토큰을 순서대로 yield한다.
|
|
|
|
실제 답변: plain str
|
|
진행/thinking/출처 메타데이터: {"__meta": str} ← 소비자가 TTS 등에서 필터링 가능
|
|
"""
|
|
_think_verbose = show_thinking if show_thinking is not None else self._think_verbose
|
|
self._source_buffer.clear()
|
|
run_id = uuid.uuid4()
|
|
run_config = {**self._make_config(_think_verbose, image_path=image_path), "run_id": str(run_id)}
|
|
|
|
# 재시작 후 첫 호출 시 MySQL 이력을 초기 상태에 주입
|
|
if self._pending_history:
|
|
all_messages = self._pending_history + [HumanMessage(content=user_input)]
|
|
self._pending_history = []
|
|
else:
|
|
all_messages = [HumanMessage(content=user_input)]
|
|
messages = {"messages": all_messages}
|
|
response_content = "" # 실제 답변 내용만 누적 (MySQL 저장용)
|
|
pending_tool_calls: dict = {} # tool_call_id → {name, args}
|
|
prev_node: str = ""
|
|
lg = self._langgraph_verbose
|
|
thinking_open = False # [사고 과정] 헤더 출력 여부
|
|
content_started = False # 노드 당 레이블 1회 출력 제어
|
|
start_time = time.perf_counter()
|
|
|
|
async for stream_event in self._agent.astream(
|
|
messages, run_config, stream_mode=["messages", "custom"]
|
|
):
|
|
mode, data = stream_event
|
|
|
|
# ── custom 이벤트 ────────────────────────────────────────────
|
|
if mode == "custom":
|
|
if isinstance(data, dict) and "__start" in data:
|
|
# call_model 시작 즉시 emit — LLM 추론 전에 상태 표시
|
|
label = "검색 결과를 분석하고 있습니다..." if prev_node == "tools" else "질문을 분석하고 있습니다..."
|
|
yield {"__status": label}
|
|
continue
|
|
if isinstance(data, dict) and "__query_rewrite" in data:
|
|
info = data["__query_rewrite"]
|
|
if lg or self._rag_verbose:
|
|
yield {"__meta": f'\n쿼리 최적화: "{info["original"]}" → "{info["rewritten"]}"\n'}
|
|
continue
|
|
if isinstance(data, dict) and "__thinking" in data:
|
|
# thinking 첫 토큰 도착 시 agent 레이블 + prev_node 갱신
|
|
if "agent" != prev_node:
|
|
thinking_open = False
|
|
content_started = False
|
|
if lg:
|
|
elapsed = time.perf_counter() - start_time
|
|
label = "agent: 검색 결과 반영 중" if prev_node == "tools" else "agent: 질문 분석 중"
|
|
yield {"__meta": f"\n[LangGraph → {label}] ({elapsed:.2f}s)\n"}
|
|
prev_node = "agent"
|
|
if _think_verbose:
|
|
thinking_open = True
|
|
yield {"__thinking": data["__thinking"]}
|
|
continue
|
|
|
|
# ── messages 이벤트 ──────────────────────────────────────
|
|
chunk, metadata = data
|
|
node = metadata.get("langgraph_node", "")
|
|
|
|
# ── 노드 전환 시 플래그 리셋 + 레이블 출력 ──────────────
|
|
# (agent 레이블은 custom 이벤트 핸들러에서 이미 처리될 수 있으므로 중복 방지)
|
|
if node != prev_node:
|
|
thinking_open = False
|
|
content_started = False
|
|
if lg:
|
|
elapsed = time.perf_counter() - start_time
|
|
if node == "agent":
|
|
label = "agent: 검색 결과 반영 중" if prev_node == "tools" else "agent: 질문 분석 중"
|
|
yield {"__meta": f"\n[LangGraph → {label}] ({elapsed:.2f}s)\n"}
|
|
elif node == "query_rewrite":
|
|
yield {"__meta": f"\n[LangGraph → query_rewrite: 쿼리 최적화 중] ({elapsed:.2f}s)\n"}
|
|
elif node == "tools":
|
|
yield {"__meta": f"\n[LangGraph → tools: 도구 실행 중] ({elapsed:.2f}s)\n"}
|
|
prev_node = node
|
|
|
|
# ── agent 노드 — AIMessageChunk만 처리 (중복 방지) ──────
|
|
if node == "agent" and isinstance(chunk, AIMessageChunk):
|
|
if chunk.tool_calls:
|
|
thinking_open = False
|
|
for tc in chunk.tool_calls:
|
|
pending_tool_calls[tc["id"]] = tc
|
|
if tc.get("name") == "search_documents":
|
|
query = tc.get("args", {}).get("query", "")
|
|
yield {"__meta": f'\n문서 검색 중... ("{query}")\n'} if query else {"__meta": "\n문서 검색 중...\n"}
|
|
elif tc.get("name") == "web_search":
|
|
query = tc.get("args", {}).get("query", "")
|
|
yield {"__meta": f'\n웹 검색 중... ("{query}")\n'} if query else {"__meta": "\n웹 검색 중...\n"}
|
|
elif lg:
|
|
args_str = ", ".join(f'{k}="{v}"' for k, v in tc["args"].items())
|
|
yield {"__meta": f" [tool_call: {tc['name']}({args_str})]\n"}
|
|
|
|
elif chunk.content:
|
|
thinking_open = False
|
|
if lg and not content_started:
|
|
yield {"__meta": "\n[LangGraph → agent: 최종 답변 생성]\n\n"}
|
|
content_started = True
|
|
response_content += chunk.content
|
|
yield chunk.content
|
|
|
|
# ── agent 노드 — AIMessage(최종 state) ──────────────────
|
|
# 청크 스트리밍이 없었던 경우(edge case)에만 처리
|
|
elif node == "agent" and isinstance(chunk, AIMessage):
|
|
if not content_started and not thinking_open:
|
|
thinking = chunk.additional_kwargs.get("thinking", "")
|
|
if thinking and _think_verbose:
|
|
yield {"__thinking": thinking}
|
|
if chunk.content:
|
|
if lg:
|
|
yield {"__meta": "\n[LangGraph → agent: 최종 답변 생성]\n\n"}
|
|
response_content += chunk.content
|
|
yield chunk.content
|
|
|
|
# ── tools 노드 ───────────────────────────────────────────
|
|
elif node == "tools" and hasattr(chunk, "name") and chunk.name == "search_documents":
|
|
if lg:
|
|
result_lines = [b for b in chunk.content.split("\n\n") if b.strip()]
|
|
yield {"__meta": f" [결과: {len(result_lines)}개 문서 반환 → agent 복귀]\n"}
|
|
|
|
if self._rag_verbose:
|
|
tc = pending_tool_calls.get(chunk.tool_call_id, {})
|
|
query = tc.get("args", {}).get("query", "")
|
|
yield {"__meta": f'\n[문서 검색: "{query}"]\n'}
|
|
for block in chunk.content.split("\n\n"):
|
|
if block.strip():
|
|
preview = block.strip().replace("\n", " ")[:80]
|
|
yield {"__meta": f" → {preview}\n"}
|
|
yield {"__meta": "\n"}
|
|
|
|
elif node == "tools" and hasattr(chunk, "name") and chunk.name == "web_search":
|
|
if lg:
|
|
result_lines = [b for b in chunk.content.split("\n\n") if b.strip()]
|
|
yield {"__meta": f" [웹 검색 결과: {len(result_lines)}건 → agent 복귀]\n"}
|
|
|
|
thinking_open = False
|
|
self._last_run_id = str(run_id)
|
|
|
|
# 대화 내용을 MySQL에 저장
|
|
if self._conv_repo and self._conv_id and response_content:
|
|
try:
|
|
self._conv_repo.save_message(self._conv_id, "user", user_input)
|
|
self._conv_repo.save_message(self._conv_id, "assistant", response_content)
|
|
except Exception as e:
|
|
print(f"[Agent] 대화 저장 실패: {e}")
|
|
|
|
if self._rag_show_sources and self._source_buffer:
|
|
sources = []
|
|
for src in self._source_buffer:
|
|
entry = {"filename": os.path.basename(src["source"])}
|
|
if "page" in src:
|
|
entry["page"] = src["page"]
|
|
sources.append(entry)
|
|
yield {"__sources": sources}
|
|
|
|
def reset(self) -> None:
|
|
"""새 thread_id로 대화 히스토리를 초기화한다."""
|
|
self._thread_id = str(uuid.uuid4())
|
|
self._pending_history = []
|
|
if self._conv_repo:
|
|
try:
|
|
self._conv_id = self._conv_repo.create_conversation(self._user_id)
|
|
except Exception:
|
|
self._conv_id = None
|