Files
youlbot/api.py
T
shinalok e9a6d00059 Phase 23: WebUI separation — api.py + youlbot-webui project
- api.py: /chat SSE done event now includes run_id for feedback linking
  (format: data: {"__done": true, "run_id": "uuid"})
- api.py: Add POST /feedback endpoint with LangSmith integration
- ROADMAP.md: Add Phase 23 documentation

Note: youlbot-webui/ created at /Users/sal/workspace/youlbot-webui/
(separate project, tracked independently)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-30 22:00:22 +09:00

156 lines
5.3 KiB
Python

"""율봇 REST API — Phase 22.
실행:
uvicorn api:app --host 0.0.0.0 --port 8000
클라이언트 예시:
import httpx, json
headers = {"Authorization": "Bearer YOUR_TOKEN"}
with httpx.Client() as c:
with c.stream("POST", "http://localhost:8000/chat",
json={"message": "안녕", "user_id": "홍길동"},
headers=headers, timeout=120) as r:
for line in r.iter_lines():
if not line.startswith("data: "):
continue
payload = json.loads(line[6:])
if isinstance(payload, dict) and payload.get("__done"):
break # run_id = payload["run_id"]
print(payload, end="", flush=True)
"""
import json
import os
import tempfile
from dotenv import load_dotenv
load_dotenv()
from fastapi import Depends, FastAPI, File, Header, HTTPException, UploadFile
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from container import Container
from services.agent.agent_service import AgentService
app = FastAPI(title="율봇 API", version="1.0")
_container = Container()
_container.db_service().connect()
_container.db_service().init_schema()
_cfg = _container.config()
_agent_cache: dict[str, AgentService] = {}
def _get_agent(user_id: str) -> AgentService:
if user_id not in _agent_cache:
_agent_cache[user_id] = AgentService(
chat_model=_container.chat_model(),
retriever_service=_container.retriever_service(),
system_prompt=_cfg.system_prompt,
rag_verbose=_cfg.rag_verbose,
rag_show_sources=_cfg.rag_show_sources,
langgraph_verbose=_cfg.langgraph_verbose,
think_verbose=_cfg.think_verbose,
query_rewrite_enabled=_cfg.query_rewrite_enabled,
user_profile_repository=_container.user_profile_repository(),
conversation_repository=_container.conversation_repository(),
user_id=user_id,
)
return _agent_cache[user_id]
def _auth(authorization: str = Header(default="")):
"""API_TOKEN 설정 시 Bearer 토큰 검증. 미설정 시 인증 스킵(개발 모드)."""
token = _cfg.api_token
if token and authorization != f"Bearer {token}":
raise HTTPException(status_code=401, detail="Unauthorized")
# ── 요청/응답 모델 ────────────────────────────────────────────
class ChatRequest(BaseModel):
message: str
user_id: str = "default"
show_thinking: bool = False
class FeedbackRequest(BaseModel):
user_id: str = "default"
user_msg: str
asst_msg: str
rating: int
run_id: str | None = None
# ── 엔드포인트 ────────────────────────────────────────────────
@app.get("/health")
async def health():
return {"status": "ok"}
@app.post("/chat")
async def chat(req: ChatRequest, _=Depends(_auth)):
"""SSE 스트리밍 응답. 각 라인: `data: <JSON 토큰>\n\n`, 종료: `data: [DONE]\n\n`"""
agent = _get_agent(req.user_id)
async def generate():
async for token in agent.stream_response(req.message, show_thinking=req.show_thinking):
yield f"data: {json.dumps(token, ensure_ascii=False)}\n\n"
yield f"data: {json.dumps({'__done': True, 'run_id': agent.last_run_id}, ensure_ascii=False)}\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")
@app.post("/feedback")
async def save_feedback(req: FeedbackRequest, _=Depends(_auth)):
"""👍/👎 피드백 저장. LangSmith 트레이싱 활성화 시 자동 연동."""
_container.feedback_repository().save_feedback(
req.user_id, req.user_msg, req.asst_msg, req.rating, req.run_id
)
if req.run_id and os.getenv("LANGCHAIN_TRACING_V2") == "true":
try:
from langsmith import Client
Client().create_feedback(run_id=req.run_id, key="user_feedback", score=req.rating)
except Exception:
pass
return {"saved": True}
@app.post("/reset")
async def reset(user_id: str = "default", _=Depends(_auth)):
"""대화 이력 초기화."""
if user_id in _agent_cache:
_agent_cache[user_id].reset()
return {"reset": True, "user_id": user_id}
@app.post("/ingest")
async def ingest(file: UploadFile = File(...), _=Depends(_auth)):
"""PDF 또는 TXT 파일을 업로드해 벡터DB에 수집."""
suffix = os.path.splitext(file.filename or "")[1] or ".bin"
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as f:
f.write(await file.read())
tmp_path = f.name
try:
count = _container.ingestion_service().ingest([tmp_path])
return {"chunks": count, "filename": file.filename}
finally:
os.unlink(tmp_path)
@app.get("/documents")
async def list_documents(_=Depends(_auth)):
"""등록된 문서 경로 목록 반환."""
return {"documents": _container.retriever_service().list_documents()}
@app.delete("/documents/{source:path}")
async def delete_document(source: str, _=Depends(_auth)):
"""source 경로에 해당하는 모든 청크 삭제."""
_container.retriever_service().delete_document(source)
return {"deleted": source}