import os import time import uuid from typing import AsyncIterator from langchain_core.messages import AIMessage, AIMessageChunk, HumanMessage, SystemMessage from langchain_core.runnables import RunnableConfig from langgraph.checkpoint.memory import MemorySaver from langgraph.config import get_stream_writer from langgraph.graph import START, MessagesState, StateGraph from langgraph.prebuilt import ToolNode, tools_condition from services.agent.tools import get_current_date, make_memory_tools, make_retriever_tool, make_search_tool, web_search class AgentService: """LangGraph ReAct 에이전트 서비스. Tool Calling 루프, 대화 히스토리, 조건부 라우팅을 LangGraph가 담당한다. """ def __init__( self, chat_model, retriever_service, system_prompt: str, rag_verbose: bool = False, rag_show_sources: bool = False, langgraph_verbose: bool = False, think_verbose: bool = False, user_profile_repository=None, conversation_repository=None, user_id: str = "default", ): self._system_prompt = system_prompt self._rag_verbose = rag_verbose self._rag_show_sources = rag_show_sources self._langgraph_verbose = langgraph_verbose self._think_verbose = think_verbose self._source_buffer: list[dict] = [] self._thread_id = "default" self._profile_repo = user_profile_repository self._conv_repo = conversation_repository self._conv_id: int | None = None self._pending_history: list = [] self._user_id = user_id self._last_run_id: str | None = None if conversation_repository: try: self._conv_id = conversation_repository.get_latest_conversation_id(user_id) if self._conv_id is None: self._conv_id = conversation_repository.create_conversation(user_id) else: turns = conversation_repository.load_turns_after(self._conv_id, None, limit=10) for turn in turns: if turn["role"] == "user": self._pending_history.append(HumanMessage(content=turn["content"])) elif turn["role"] == "assistant": self._pending_history.append(AIMessage(content=turn["content"])) if self._pending_history: print(f"[Agent] 이전 대화 {len(self._pending_history) // 2}턴 복원") except Exception as e: print(f"[Agent] 이력 복원 실패: {e}") self._conv_id = None self._pending_history = [] if rag_show_sources: search_tool = make_search_tool(retriever_service, self._source_buffer) else: search_tool = make_retriever_tool(retriever_service) tools = [search_tool, web_search, get_current_date] if user_profile_repository is not None: remember_tool, recall_tool = make_memory_tools(user_profile_repository, user_id) tools += [remember_tool, recall_tool] llm_with_tools = chat_model.bind_tools(tools) async def call_model(state: MessagesState, config: RunnableConfig) -> dict: from datetime import date system_content = f"오늘 날짜: {date.today().isoformat()}\n\n" + self._system_prompt if self._profile_repo: profile = self._profile_repo.get_all(self._user_id) if profile: import re from datetime import date today = date.today() current_year = today.year _DATE_KEYS = ("생년월일", "생년", "생일") lines = [] for k, v in profile.items(): if any(term in k for term in _DATE_KEYS): full_date = re.search(r'(\d{4})[년\-/.]\s*(\d{1,2})[월\-/.]\s*(\d{1,2})', v) year_only = re.search(r'\b(19|20)\d{2}\b', v) age_key = re.sub(r'생년월일|생년|생일', '나이', k) if full_date: by, bm, bd = int(full_date.group(1)), int(full_date.group(2)), int(full_date.group(3)) korean_age = current_year - by + 1 intl_age = current_year - by - (1 if today < date(current_year, bm, bd) else 0) lines.append(f"- {age_key}: 한국 나이 {korean_age}세, 만 {intl_age}세") elif year_only: by = int(year_only.group()) korean_age = current_year - by + 1 intl_age = current_year - by lines.append(f"- {age_key}: 한국 나이 {korean_age}세, 만 {intl_age}~{intl_age - 1}세 (생일에 따라 다름)") else: lines.append(f"- {k}: {v}") else: lines.append(f"- {k}: {v}") system_content += f"\n\n## 사용자 정보 (이전 대화에서 기억된 내용)\n" + "\n".join(lines) msgs = [SystemMessage(content=system_content)] + state["messages"] thinking_acc, content_acc, tool_calls_acc = "", "", [] try: writer = get_stream_writer() except Exception: writer = None # 체크박스 값을 모델의 enable_thinking으로 전달 (런타임 오버라이드) show_thinking = config.get("configurable", {}).get("show_thinking", False) _llm = llm_with_tools.bind(enable_thinking=show_thinking) if show_thinking != chat_model.enable_thinking else llm_with_tools async for chunk in _llm.astream(msgs, config): t = chunk.additional_kwargs.get("thinking", "") if t: thinking_acc += t if writer: writer({"__thinking": t}) if chunk.content and isinstance(chunk.content, str): content_acc += chunk.content if chunk.tool_calls: tool_calls_acc.extend(chunk.tool_calls) extra = {"thinking": thinking_acc} if thinking_acc else {} return {"messages": [AIMessage( content=content_acc, tool_calls=tool_calls_acc, additional_kwargs=extra, )]} builder = StateGraph(MessagesState) builder.add_node("agent", call_model) builder.add_node("tools", ToolNode(tools)) builder.add_edge(START, "agent") builder.add_conditional_edges("agent", tools_condition) builder.add_edge("tools", "agent") self._agent = builder.compile(checkpointer=MemorySaver()) @property def last_run_id(self) -> str | None: return self._last_run_id def _make_config(self, show_thinking: bool = False) -> dict: return {"configurable": {"thread_id": self._thread_id, "show_thinking": show_thinking}} async def stream_response(self, user_input: str, show_thinking: bool | None = None) -> AsyncIterator[str]: """사용자 입력을 받아 응답 토큰을 순서대로 yield한다.""" _think_verbose = show_thinking if show_thinking is not None else self._think_verbose self._source_buffer.clear() run_id = uuid.uuid4() run_config = {**self._make_config(_think_verbose), "run_id": str(run_id)} # 재시작 후 첫 호출 시 MySQL 이력을 초기 상태에 주입 if self._pending_history: all_messages = self._pending_history + [HumanMessage(content=user_input)] self._pending_history = [] else: all_messages = [HumanMessage(content=user_input)] messages = {"messages": all_messages} response_content = "" # 실제 답변 내용만 누적 (MySQL 저장용) pending_tool_calls: dict = {} # tool_call_id → {name, args} prev_node: str = "" lg = self._langgraph_verbose thinking_open = False # [사고 과정] 헤더 출력 여부 content_started = False # 노드 당 레이블 1회 출력 제어 start_time = time.perf_counter() async for stream_event in self._agent.astream( messages, run_config, stream_mode=["messages", "custom"] ): mode, data = stream_event # ── custom 이벤트 — call_model writer가 emit한 thinking 토큰 ── if mode == "custom": if isinstance(data, dict) and "__thinking" in data: # thinking 첫 토큰 도착 시 agent 레이블 + prev_node 갱신 if "agent" != prev_node: if thinking_open: yield "\n[/사고 과정]\n" thinking_open = False content_started = False if lg: elapsed = time.perf_counter() - start_time label = "agent: 검색 결과 반영 중" if prev_node == "tools" else "agent: 질문 분석 중" yield f"\n[LangGraph → {label}] ({elapsed:.2f}s)\n" prev_node = "agent" if _think_verbose: if not thinking_open: yield "\n[사고 과정]\n" thinking_open = True yield data["__thinking"] continue # ── messages 이벤트 ────────────────────────────────────── chunk, metadata = data node = metadata.get("langgraph_node", "") # ── 노드 전환 시 플래그 리셋 + 레이블 출력 ────────────── # (agent 레이블은 custom 이벤트 핸들러에서 이미 처리될 수 있으므로 중복 방지) if node != prev_node: if thinking_open: yield "\n[/사고 과정]\n" thinking_open = False content_started = False if lg: if node == "agent": elapsed = time.perf_counter() - start_time label = "agent: 검색 결과 반영 중" if prev_node == "tools" else "agent: 질문 분석 중" yield f"\n[LangGraph → {label}] ({elapsed:.2f}s)\n" elif node == "tools": elapsed = time.perf_counter() - start_time yield f"\n[LangGraph → tools: 도구 실행 중] ({elapsed:.2f}s)\n" prev_node = node # ── agent 노드 — AIMessageChunk만 처리 (중복 방지) ────── if node == "agent" and isinstance(chunk, AIMessageChunk): if chunk.tool_calls: if thinking_open: yield "\n[/사고 과정]\n" thinking_open = False for tc in chunk.tool_calls: pending_tool_calls[tc["id"]] = tc if tc.get("name") == "search_documents": query = tc.get("args", {}).get("query", "") yield f'\n문서 검색 중... ("{query}")\n' if query else "\n문서 검색 중...\n" elif tc.get("name") == "web_search": query = tc.get("args", {}).get("query", "") yield f'\n웹 검색 중... ("{query}")\n' if query else "\n웹 검색 중...\n" elif lg: args_str = ", ".join(f'{k}="{v}"' for k, v in tc["args"].items()) yield f" [tool_call: {tc['name']}({args_str})]\n" elif chunk.content: if thinking_open: yield "\n[/사고 과정]\n" thinking_open = False if lg and not content_started: yield "\n[LangGraph → agent: 최종 답변 생성]\n\n" content_started = True response_content += chunk.content yield chunk.content # ── agent 노드 — AIMessage(최종 state) ────────────────── # 청크 스트리밍이 없었던 경우(edge case)에만 처리 elif node == "agent" and isinstance(chunk, AIMessage): if not content_started and not thinking_open: thinking = chunk.additional_kwargs.get("thinking", "") if thinking and _think_verbose: yield "\n[사고 과정]\n" yield thinking yield "\n[/사고 과정]\n" if chunk.content: if lg: yield "\n[LangGraph → agent: 최종 답변 생성]\n\n" response_content += chunk.content yield chunk.content # ── tools 노드 ─────────────────────────────────────────── elif node == "tools" and hasattr(chunk, "name") and chunk.name == "search_documents": if lg: result_lines = [b for b in chunk.content.split("\n\n") if b.strip()] yield f" [결과: {len(result_lines)}개 문서 반환 → agent 복귀]\n" if self._rag_verbose: tc = pending_tool_calls.get(chunk.tool_call_id, {}) query = tc.get("args", {}).get("query", "") yield f'\n[문서 검색: "{query}"]\n' for block in chunk.content.split("\n\n"): if block.strip(): preview = block.strip().replace("\n", " ")[:80] yield f" → {preview}\n" yield "\n" elif node == "tools" and hasattr(chunk, "name") and chunk.name == "web_search": if lg: result_lines = [b for b in chunk.content.split("\n\n") if b.strip()] yield f" [웹 검색 결과: {len(result_lines)}건 → agent 복귀]\n" if thinking_open: yield "\n[/사고 과정]\n" self._last_run_id = str(run_id) # 대화 내용을 MySQL에 저장 if self._conv_repo and self._conv_id and response_content: try: self._conv_repo.save_message(self._conv_id, "user", user_input) self._conv_repo.save_message(self._conv_id, "assistant", response_content) except Exception as e: print(f"[Agent] 대화 저장 실패: {e}") if self._rag_show_sources and self._source_buffer: yield "\n\n[참고 문서]\n" for src in self._source_buffer: filename = os.path.basename(src["source"]) page = f" {src['page']}페이지" if "page" in src else "" yield f"- {filename}{page}\n" def reset(self) -> None: """새 thread_id로 대화 히스토리를 초기화한다.""" self._thread_id = str(uuid.uuid4()) self._pending_history = [] if self._conv_repo: try: self._conv_id = self._conv_repo.create_conversation(self._user_id) except Exception: self._conv_id = None