d81a2f5888
- config.py: APIConfig + AppConfig dataclasses, env vars centralized - api_client.py: APIClientProtocol (Protocol) + HTTPAPIClient class, remove module-level globals - services.py: ChatService, DocumentService, TTSService (TTS moved from app.py) - container.py: manual DI container with lazy singleton properties - app.py: all callbacks converted to async, asyncio.run() fully removed, container wired in - .env.example: add TTS_EDGE_VOICE entry - ROADMAP.md: P0/P1 checklist updated to reflect completed work Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
364 lines
13 KiB
Python
364 lines
13 KiB
Python
"""율봇 WebUI — youlbot REST API를 호출하는 Gradio 프론트엔드.
|
|
|
|
실행:
|
|
python app.py
|
|
|
|
환경변수 (.env):
|
|
YOULBOT_API_URL=http://localhost:8000
|
|
YOULBOT_API_TOKEN= ← api.py에 API_TOKEN 설정 시 동일 값
|
|
"""
|
|
import html as _html
|
|
import os
|
|
|
|
import gradio as gr
|
|
from dotenv import load_dotenv
|
|
|
|
load_dotenv()
|
|
|
|
from config import AppConfig
|
|
from container import Container
|
|
|
|
container = Container(AppConfig())
|
|
|
|
USER_LABELS = ["아록", "근혜", "도율", "하율"]
|
|
DEFAULT_USER = "아록"
|
|
|
|
# ── STT (Whisper) — 로컬 실행 유지 ──────────────────────────────
|
|
_whisper_model = None
|
|
|
|
|
|
def _get_whisper():
|
|
global _whisper_model
|
|
if _whisper_model is None:
|
|
import whisper
|
|
_whisper_model = whisper.load_model(container.config.whisper_model_size)
|
|
return _whisper_model
|
|
|
|
|
|
def transcribe_audio(filepath: str) -> str:
|
|
if not filepath:
|
|
return ""
|
|
model = _get_whisper()
|
|
result = model.transcribe(filepath, language="ko")
|
|
return result["text"].strip()
|
|
|
|
|
|
# ── 채팅 ─────────────────────────────────────────────────────────
|
|
|
|
async def respond(message, history, show_thinking, user_id, use_tts, run_ids):
|
|
if not message.strip():
|
|
yield history, "", None, run_ids, "", ""
|
|
return
|
|
|
|
history = list(history)
|
|
run_ids = list(run_ids)
|
|
history.append({"role": "user", "content": message})
|
|
history.append({"role": "assistant", "content": ""})
|
|
yield history, "", None, run_ids, "", "" # thinking_box + source_box 초기화
|
|
|
|
collected_run_id: str | None = None
|
|
tts_text = "" # 순수 답변만 누적 (TTS용)
|
|
thinking_acc = "" # 전체 누적 (완료 후 details용)
|
|
thinking_text = "" # __thinking 토큰만 (줄 감지용)
|
|
thinking_finalized = False
|
|
|
|
try:
|
|
async for token, run_id in container.chat_service.chat(message, user_id, show_thinking):
|
|
if run_id is not None:
|
|
collected_run_id = run_id
|
|
break
|
|
|
|
# 즉시 상태 — thinking_acc에 누적 안 함
|
|
if isinstance(token, dict) and "__status" in token:
|
|
if not thinking_acc:
|
|
yield history, "", None, run_ids, _status_html(token["__status"]), gr.update()
|
|
continue
|
|
|
|
# 사고 과정(LLM thinking) — 현재 줄만 live_html로 표시
|
|
if isinstance(token, dict) and "__thinking" in token:
|
|
thinking_text += token["__thinking"]
|
|
thinking_acc += token["__thinking"]
|
|
yield history, "", None, run_ids, _live_html(_last_line(thinking_text)), gr.update()
|
|
continue
|
|
|
|
# 진행 로그(LangGraph, 검색 등) — 메시지 전체를 live_html로 표시
|
|
if isinstance(token, dict) and "__meta" in token:
|
|
thinking_acc += token["__meta"]
|
|
live = token["__meta"].strip()
|
|
if live:
|
|
yield history, "", None, run_ids, _live_html(live), gr.update()
|
|
continue
|
|
|
|
# RAG 출처 — 별도 source_box로 표시
|
|
if isinstance(token, dict) and "__sources" in token:
|
|
source_box_html = _sources_html(token["__sources"])
|
|
yield history, "", None, run_ids, gr.update(), source_box_html
|
|
continue
|
|
|
|
# 첫 답변 토큰 도착 — 전체를 details로 전환 (접힌 상태)
|
|
if thinking_acc and not thinking_finalized:
|
|
thinking_finalized = True
|
|
yield history, "", None, run_ids, _thinking_html(thinking_acc), gr.update()
|
|
|
|
tts_text += token
|
|
history[-1]["content"] += token
|
|
yield history, "", None, run_ids, gr.update(), gr.update()
|
|
|
|
except Exception as e:
|
|
history[-1]["content"] += f"\n\n[오류: {e}]"
|
|
yield history, "", None, run_ids, gr.update(), gr.update()
|
|
return
|
|
|
|
run_ids.append(collected_run_id)
|
|
|
|
if use_tts:
|
|
audio_path = await container.tts_service.speak(tts_text)
|
|
yield history, "", audio_path, run_ids, gr.update(), gr.update()
|
|
else:
|
|
yield history, "", None, run_ids, gr.update(), gr.update()
|
|
|
|
|
|
async def handle_feedback(like_data: gr.LikeData, history, run_ids, user_id):
|
|
idx = like_data.index
|
|
if isinstance(idx, (list, tuple)):
|
|
idx = idx[0]
|
|
if not isinstance(idx, int) or idx < 0 or idx >= len(history):
|
|
return
|
|
if history[idx].get("role") != "assistant":
|
|
return
|
|
asst_turn = sum(1 for m in history[:idx] if m.get("role") == "assistant")
|
|
run_id = run_ids[asst_turn] if run_ids and asst_turn < len(run_ids) else None
|
|
|
|
user_msg = str(history[idx - 1]["content"]) if idx > 0 else ""
|
|
asst_msg = str(history[idx]["content"])
|
|
rating = 1 if like_data.liked else -1
|
|
|
|
try:
|
|
await container.chat_service.save_feedback(user_id, user_msg, asst_msg, rating, run_id)
|
|
except Exception as e:
|
|
print(f"[Feedback] 저장 실패: {e}")
|
|
|
|
|
|
def switch_user(user_id):
|
|
return [], []
|
|
|
|
|
|
async def reset_chat(user_id):
|
|
try:
|
|
await container.chat_service.reset(user_id)
|
|
except Exception as e:
|
|
print(f"[Reset] 실패: {e}")
|
|
return [], []
|
|
|
|
|
|
# ── 문서 관리 ─────────────────────────────────────────────────────
|
|
|
|
async def ingest_files(files):
|
|
if not files:
|
|
return "파일을 선택해주세요."
|
|
paths = [f if isinstance(f, str) else f.name for f in files]
|
|
results = []
|
|
for path in paths:
|
|
try:
|
|
result = await container.document_service.ingest(path)
|
|
name = os.path.basename(path)
|
|
results.append(f"{name} → {result.get('chunks', '?')}개 청크")
|
|
except Exception as e:
|
|
results.append(f"{os.path.basename(path)} 오류: {e}")
|
|
return "\n".join(results)
|
|
|
|
|
|
async def list_docs():
|
|
try:
|
|
sources = await container.document_service.list_documents()
|
|
return [[os.path.basename(s), s] for s in sources]
|
|
except Exception as e:
|
|
return [[f"오류: {e}", ""]]
|
|
|
|
|
|
async def delete_doc(source):
|
|
if not source.strip():
|
|
return "삭제할 파일 경로를 입력하세요.", await list_docs()
|
|
try:
|
|
await container.document_service.delete_document(source.strip())
|
|
return f"삭제 완료: {os.path.basename(source.strip())}", await list_docs()
|
|
except Exception as e:
|
|
return f"오류: {e}", await list_docs()
|
|
|
|
|
|
# ── UI 구성 ──────────────────────────────────────────────────────
|
|
|
|
_BOX_STYLE = (
|
|
"background:#f9f9f9;border-left:3px solid #bbb;border-radius:6px;"
|
|
"padding:8px 14px;margin-bottom:6px;"
|
|
)
|
|
_CONTENT_STYLE = (
|
|
"margin-top:6px;white-space:pre-wrap;font-size:0.85em;"
|
|
"color:#555;max-height:160px;overflow-y:auto;"
|
|
)
|
|
|
|
|
|
def _last_line(text: str) -> str:
|
|
"""현재 진행 중인 마지막 비어있지 않은 줄 반환."""
|
|
lines = [l for l in text.split("\n") if l.strip()]
|
|
return lines[-1] if lines else text.strip()
|
|
|
|
|
|
def _live_html(text: str) -> str:
|
|
"""스트리밍 중 현재 줄만 보여주는 단순 div (details 미사용 → 닫힘 현상 없음)."""
|
|
return (
|
|
f'<div style="{_BOX_STYLE}">'
|
|
f'<strong>🤔 분석 중...</strong>'
|
|
f'<div style="{_CONTENT_STYLE}">{_html.escape(text)} ▌</div>'
|
|
f'</div>'
|
|
)
|
|
|
|
|
|
def _thinking_html(text: str) -> str:
|
|
"""완료 후 전체 내용을 접기/펼치기로 표시."""
|
|
return (
|
|
f'<details style="{_BOX_STYLE}">'
|
|
f'<summary style="cursor:pointer;font-weight:bold;">💭 분석 완료</summary>'
|
|
f'<div style="{_CONTENT_STYLE}">{_html.escape(text)}</div>'
|
|
f'</details>'
|
|
)
|
|
|
|
|
|
def _status_html(status: str) -> str:
|
|
"""내용 없이 상태만 표시하는 단순 헤더."""
|
|
return (
|
|
f'<div style="{_BOX_STYLE}">'
|
|
f'<strong>🤔 {_html.escape(status)}</strong>'
|
|
f'</div>'
|
|
)
|
|
|
|
|
|
def _sources_html(sources: list) -> str:
|
|
"""RAG 출처 목록을 접기/펼치기로 표시."""
|
|
items = "".join(
|
|
f"<li>{_html.escape(s['filename'])}"
|
|
+ (f" — {s['page']}페이지" if "page" in s else "")
|
|
+ "</li>"
|
|
for s in sources
|
|
)
|
|
return (
|
|
f'<details style="{_BOX_STYLE}">'
|
|
f'<summary style="cursor:pointer;font-weight:bold;">📄 출처 ({len(sources)}개)</summary>'
|
|
f'<ul style="margin:6px 0;padding-left:18px;font-size:0.85em;color:#555;">{items}</ul>'
|
|
f'</details>'
|
|
)
|
|
|
|
|
|
with gr.Blocks(title="율봇") as demo:
|
|
gr.Markdown("# 율봇\n육아·금융 전문 AI 상담 도우미")
|
|
|
|
user_state = gr.State(DEFAULT_USER)
|
|
run_ids_state = gr.State([])
|
|
|
|
with gr.Tab("대화"):
|
|
with gr.Row():
|
|
user_selector = gr.Dropdown(
|
|
choices=USER_LABELS,
|
|
value=DEFAULT_USER,
|
|
label="사용자",
|
|
scale=1,
|
|
)
|
|
|
|
thinking_box = gr.HTML(value="")
|
|
chatbot = gr.Chatbot(label="율봇", height=500)
|
|
source_box = gr.HTML(value="")
|
|
with gr.Row():
|
|
msg_box = gr.Textbox(
|
|
placeholder="질문을 입력하세요... (Enter로 전송)",
|
|
label="",
|
|
scale=5,
|
|
autofocus=True,
|
|
)
|
|
send_btn = gr.Button("전송", variant="primary", scale=1)
|
|
|
|
with gr.Row():
|
|
audio_input = gr.Audio(
|
|
sources=["microphone"],
|
|
type="filepath",
|
|
label="음성으로 질문하기",
|
|
scale=4,
|
|
)
|
|
transcribe_btn = gr.Button("음성 → 텍스트 변환", scale=1)
|
|
|
|
with gr.Row():
|
|
show_thinking = gr.Checkbox(label="사고 과정 표시", value=True)
|
|
use_tts = gr.Checkbox(label="음성으로 답변 읽기 (TTS)", value=False)
|
|
reset_btn = gr.Button("대화 초기화", size="sm")
|
|
|
|
tts_output = gr.Audio(label="음성 답변", autoplay=True, visible=False)
|
|
use_tts.change(lambda v: gr.Audio(visible=v), inputs=[use_tts], outputs=[tts_output])
|
|
|
|
user_selector.change(
|
|
switch_user,
|
|
inputs=[user_selector],
|
|
outputs=[chatbot, run_ids_state],
|
|
).then(
|
|
lambda u: u, inputs=[user_selector], outputs=[user_state]
|
|
)
|
|
|
|
transcribe_btn.click(transcribe_audio, inputs=[audio_input], outputs=[msg_box])
|
|
|
|
send_btn.click(
|
|
respond,
|
|
inputs=[msg_box, chatbot, show_thinking, user_state, use_tts, run_ids_state],
|
|
outputs=[chatbot, msg_box, tts_output, run_ids_state, thinking_box, source_box],
|
|
)
|
|
msg_box.submit(
|
|
respond,
|
|
inputs=[msg_box, chatbot, show_thinking, user_state, use_tts, run_ids_state],
|
|
outputs=[chatbot, msg_box, tts_output, run_ids_state, thinking_box, source_box],
|
|
)
|
|
reset_btn.click(reset_chat, inputs=[user_state], outputs=[chatbot, run_ids_state])
|
|
|
|
chatbot.like(
|
|
handle_feedback,
|
|
inputs=[chatbot, run_ids_state, user_state],
|
|
outputs=[],
|
|
)
|
|
|
|
with gr.Tab("문서 등록"):
|
|
gr.Markdown("PDF 또는 TXT 파일을 업로드하면 율봇이 내용을 참고해 답변합니다.")
|
|
file_input = gr.File(
|
|
file_types=[".pdf", ".txt"],
|
|
file_count="multiple",
|
|
label="파일 선택",
|
|
)
|
|
ingest_btn = gr.Button("문서 수집", variant="primary")
|
|
ingest_status = gr.Textbox(label="결과", interactive=False)
|
|
ingest_btn.click(ingest_files, inputs=[file_input], outputs=[ingest_status])
|
|
|
|
with gr.Tab("문서 관리"):
|
|
gr.Markdown("Qdrant에 등록된 문서 목록입니다. 불필요한 문서를 삭제할 수 있습니다.")
|
|
doc_table = gr.Dataframe(
|
|
headers=["파일명", "전체 경로"],
|
|
label="등록된 문서",
|
|
interactive=False,
|
|
)
|
|
refresh_btn = gr.Button("새로고침")
|
|
gr.Markdown("---")
|
|
with gr.Row():
|
|
delete_source = gr.Textbox(
|
|
label="삭제할 파일 경로",
|
|
placeholder="위 표에서 전체 경로를 복사해 붙여넣으세요",
|
|
scale=4,
|
|
)
|
|
delete_btn = gr.Button("삭제", variant="stop", scale=1)
|
|
delete_status = gr.Textbox(label="결과", interactive=False)
|
|
|
|
refresh_btn.click(list_docs, outputs=[doc_table])
|
|
delete_btn.click(delete_doc, inputs=[delete_source], outputs=[delete_status, doc_table])
|
|
demo.load(list_docs, outputs=[doc_table])
|
|
|
|
|
|
if __name__ == "__main__":
|
|
demo.launch(
|
|
server_name=container.config.server_host,
|
|
server_port=container.config.server_port,
|
|
theme=gr.themes.Soft(),
|
|
)
|