Implement Phase 22: REST API (FastAPI + SSE streaming)
- api.py: FastAPI 앱 신규 생성 - GET /health, POST /chat (SSE), POST /reset, POST /ingest, GET/DELETE /documents - SSE 포맷: data: <JSON 토큰>\n\n / data: [DONE]\n\n - Bearer Token 인증 (API_TOKEN 미설정 시 개발 모드) - user_id 파라미터로 멀티유저 지원 (기존 AgentService·DB 구조 재사용) - config.py: api_token 필드 추가 - app.py: _get_agent에 query_rewrite_enabled 누락 수정 - requirements.txt: fastapi, uvicorn[standard], python-multipart 추가 - ROADMAP: Phase 22 ✅, Telegram Bot 클라이언트 예시 추가 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,128 @@
|
||||
"""율봇 REST API — Phase 22.
|
||||
|
||||
실행:
|
||||
uvicorn api:app --host 0.0.0.0 --port 8000
|
||||
|
||||
클라이언트 예시:
|
||||
import httpx, json
|
||||
headers = {"Authorization": "Bearer YOUR_TOKEN"}
|
||||
with httpx.Client() as c:
|
||||
with c.stream("POST", "http://localhost:8000/chat",
|
||||
json={"message": "안녕", "user_id": "홍길동"},
|
||||
headers=headers, timeout=120) as r:
|
||||
for line in r.iter_lines():
|
||||
if line.startswith("data: ") and line != "data: [DONE]":
|
||||
print(json.loads(line[6:]), end="", flush=True)
|
||||
"""
|
||||
import json
|
||||
import os
|
||||
import tempfile
|
||||
|
||||
from dotenv import load_dotenv
|
||||
load_dotenv()
|
||||
|
||||
from fastapi import Depends, FastAPI, File, Header, HTTPException, UploadFile
|
||||
from fastapi.responses import StreamingResponse
|
||||
from pydantic import BaseModel
|
||||
|
||||
from container import Container
|
||||
from services.agent.agent_service import AgentService
|
||||
|
||||
app = FastAPI(title="율봇 API", version="1.0")
|
||||
|
||||
_container = Container()
|
||||
_container.db_service().connect()
|
||||
_container.db_service().init_schema()
|
||||
|
||||
_cfg = _container.config()
|
||||
_agent_cache: dict[str, AgentService] = {}
|
||||
|
||||
|
||||
def _get_agent(user_id: str) -> AgentService:
|
||||
if user_id not in _agent_cache:
|
||||
_agent_cache[user_id] = AgentService(
|
||||
chat_model=_container.chat_model(),
|
||||
retriever_service=_container.retriever_service(),
|
||||
system_prompt=_cfg.system_prompt,
|
||||
rag_verbose=_cfg.rag_verbose,
|
||||
rag_show_sources=_cfg.rag_show_sources,
|
||||
langgraph_verbose=_cfg.langgraph_verbose,
|
||||
think_verbose=_cfg.think_verbose,
|
||||
query_rewrite_enabled=_cfg.query_rewrite_enabled,
|
||||
user_profile_repository=_container.user_profile_repository(),
|
||||
conversation_repository=_container.conversation_repository(),
|
||||
user_id=user_id,
|
||||
)
|
||||
return _agent_cache[user_id]
|
||||
|
||||
|
||||
def _auth(authorization: str = Header(default="")):
|
||||
"""API_TOKEN 설정 시 Bearer 토큰 검증. 미설정 시 인증 스킵(개발 모드)."""
|
||||
token = _cfg.api_token
|
||||
if token and authorization != f"Bearer {token}":
|
||||
raise HTTPException(status_code=401, detail="Unauthorized")
|
||||
|
||||
|
||||
# ── 요청/응답 모델 ────────────────────────────────────────────
|
||||
|
||||
|
||||
class ChatRequest(BaseModel):
|
||||
message: str
|
||||
user_id: str = "default"
|
||||
show_thinking: bool = False
|
||||
|
||||
|
||||
# ── 엔드포인트 ────────────────────────────────────────────────
|
||||
|
||||
|
||||
@app.get("/health")
|
||||
async def health():
|
||||
return {"status": "ok"}
|
||||
|
||||
|
||||
@app.post("/chat")
|
||||
async def chat(req: ChatRequest, _=Depends(_auth)):
|
||||
"""SSE 스트리밍 응답. 각 라인: `data: <JSON 토큰>\n\n`, 종료: `data: [DONE]\n\n`"""
|
||||
agent = _get_agent(req.user_id)
|
||||
|
||||
async def generate():
|
||||
async for token in agent.stream_response(req.message, show_thinking=req.show_thinking):
|
||||
yield f"data: {json.dumps(token, ensure_ascii=False)}\n\n"
|
||||
yield "data: [DONE]\n\n"
|
||||
|
||||
return StreamingResponse(generate(), media_type="text/event-stream")
|
||||
|
||||
|
||||
@app.post("/reset")
|
||||
async def reset(user_id: str = "default", _=Depends(_auth)):
|
||||
"""대화 이력 초기화."""
|
||||
if user_id in _agent_cache:
|
||||
_agent_cache[user_id].reset()
|
||||
return {"reset": True, "user_id": user_id}
|
||||
|
||||
|
||||
@app.post("/ingest")
|
||||
async def ingest(file: UploadFile = File(...), _=Depends(_auth)):
|
||||
"""PDF 또는 TXT 파일을 업로드해 벡터DB에 수집."""
|
||||
suffix = os.path.splitext(file.filename or "")[1] or ".bin"
|
||||
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as f:
|
||||
f.write(await file.read())
|
||||
tmp_path = f.name
|
||||
try:
|
||||
count = _container.ingestion_service().ingest([tmp_path])
|
||||
return {"chunks": count, "filename": file.filename}
|
||||
finally:
|
||||
os.unlink(tmp_path)
|
||||
|
||||
|
||||
@app.get("/documents")
|
||||
async def list_documents(_=Depends(_auth)):
|
||||
"""등록된 문서 경로 목록 반환."""
|
||||
return {"documents": _container.retriever_service().list_documents()}
|
||||
|
||||
|
||||
@app.delete("/documents/{source:path}")
|
||||
async def delete_document(source: str, _=Depends(_auth)):
|
||||
"""source 경로에 해당하는 모든 청크 삭제."""
|
||||
_container.retriever_service().delete_document(source)
|
||||
return {"deleted": source}
|
||||
Reference in New Issue
Block a user