55ea69d902
During streaming: _live_html (plain div) shows only the current line — no DOM reset, no closing issue. __thinking shows last non-empty line, __meta shows the full trimmed message. On completion: _thinking_html (<details>) shows all accumulated content collapsed, expands on click. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
387 lines
13 KiB
Python
387 lines
13 KiB
Python
"""율봇 WebUI — youlbot REST API를 호출하는 Gradio 프론트엔드.
|
|
|
|
실행:
|
|
python app.py
|
|
|
|
환경변수 (.env):
|
|
YOULBOT_API_URL=http://localhost:8000
|
|
YOULBOT_API_TOKEN= ← api.py에 API_TOKEN 설정 시 동일 값
|
|
"""
|
|
import asyncio
|
|
import html as _html
|
|
import os
|
|
import platform
|
|
import subprocess
|
|
import tempfile
|
|
|
|
import gradio as gr
|
|
from dotenv import load_dotenv
|
|
|
|
load_dotenv()
|
|
|
|
import api_client
|
|
|
|
USER_LABELS = ["아록", "근혜", "도율", "하율"]
|
|
DEFAULT_USER = "아록"
|
|
|
|
# ── STT (Whisper) — 로컬 실행 유지 ──────────────────────────────
|
|
_whisper_model = None
|
|
_WHISPER_SIZE = os.getenv("WHISPER_MODEL_SIZE", "small")
|
|
_TTS_VOICE = os.getenv("TTS_VOICE", "Yuna") # macOS say 보이스
|
|
_TTS_EDGE_VOICE = os.getenv("TTS_EDGE_VOICE", "ko-KR-SunHiNeural") # edge-tts 보이스
|
|
|
|
|
|
def _get_whisper():
|
|
global _whisper_model
|
|
if _whisper_model is None:
|
|
import whisper
|
|
_whisper_model = whisper.load_model(_WHISPER_SIZE)
|
|
return _whisper_model
|
|
|
|
|
|
def transcribe_audio(filepath: str) -> str:
|
|
if not filepath:
|
|
return ""
|
|
model = _get_whisper()
|
|
result = model.transcribe(filepath, language="ko")
|
|
return result["text"].strip()
|
|
|
|
|
|
async def tts_speak(text: str) -> str | None:
|
|
"""크로스플랫폼 TTS. macOS: say→edge-tts→pyttsx3 / Windows: edge-tts→pyttsx3"""
|
|
if not text:
|
|
return None
|
|
|
|
# macOS: say 우선 (오프라인, 내장 한국어)
|
|
if platform.system() == "Darwin":
|
|
try:
|
|
tmp = tempfile.NamedTemporaryFile(suffix=".aiff", delete=False)
|
|
tmp.close()
|
|
await asyncio.to_thread(
|
|
subprocess.run,
|
|
["say", "-v", _TTS_VOICE, "-o", tmp.name, text],
|
|
check=True,
|
|
capture_output=True,
|
|
)
|
|
return tmp.name
|
|
except Exception:
|
|
pass
|
|
|
|
# Windows 1순위 / macOS say 실패 시: edge-tts (온라인)
|
|
try:
|
|
import edge_tts
|
|
tmp = tempfile.NamedTemporaryFile(suffix=".mp3", delete=False)
|
|
tmp.close()
|
|
await edge_tts.Communicate(text, _TTS_EDGE_VOICE).save(tmp.name)
|
|
return tmp.name
|
|
except Exception:
|
|
pass
|
|
|
|
# 최종 폴백: pyttsx3 (오프라인)
|
|
try:
|
|
import pyttsx3
|
|
tmp = tempfile.NamedTemporaryFile(suffix=".wav", delete=False)
|
|
tmp.close()
|
|
def _save():
|
|
engine = pyttsx3.init()
|
|
engine.save_to_file(text, tmp.name)
|
|
engine.runAndWait()
|
|
await asyncio.to_thread(_save)
|
|
return tmp.name
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
# ── 채팅 ─────────────────────────────────────────────────────────
|
|
|
|
async def respond(message, history, show_thinking, user_id, use_tts, run_ids):
|
|
if not message.strip():
|
|
yield history, "", None, run_ids, ""
|
|
return
|
|
|
|
history = list(history)
|
|
run_ids = list(run_ids)
|
|
history.append({"role": "user", "content": message})
|
|
history.append({"role": "assistant", "content": ""})
|
|
yield history, "", None, run_ids, "" # thinking_box 초기화
|
|
|
|
collected_run_id: str | None = None
|
|
tts_text = "" # 순수 답변만 누적 (TTS용)
|
|
thinking_acc = "" # 전체 누적 (완료 후 details용)
|
|
thinking_text = "" # __thinking 토큰만 (줄 감지용)
|
|
thinking_finalized = False
|
|
|
|
try:
|
|
async for token, run_id in api_client.chat(message, user_id, show_thinking):
|
|
if run_id is not None:
|
|
collected_run_id = run_id
|
|
break
|
|
|
|
# 즉시 상태 — thinking_acc에 누적 안 함
|
|
if isinstance(token, dict) and "__status" in token:
|
|
if not thinking_acc:
|
|
yield history, "", None, run_ids, _status_html(token["__status"])
|
|
continue
|
|
|
|
# 사고 과정(LLM thinking) — 현재 줄만 live_html로 표시
|
|
if isinstance(token, dict) and "__thinking" in token:
|
|
thinking_text += token["__thinking"]
|
|
thinking_acc += token["__thinking"]
|
|
yield history, "", None, run_ids, _live_html(_last_line(thinking_text))
|
|
continue
|
|
|
|
# 진행 로그(LangGraph, 검색 등) — 메시지 전체를 live_html로 표시
|
|
if isinstance(token, dict) and "__meta" in token:
|
|
thinking_acc += token["__meta"]
|
|
live = token["__meta"].strip()
|
|
if live:
|
|
yield history, "", None, run_ids, _live_html(live)
|
|
continue
|
|
|
|
# 첫 답변 토큰 도착 — 전체를 details로 전환 (접힌 상태)
|
|
if thinking_acc and not thinking_finalized:
|
|
thinking_finalized = True
|
|
yield history, "", None, run_ids, _thinking_html(thinking_acc)
|
|
|
|
tts_text += token
|
|
history[-1]["content"] += token
|
|
yield history, "", None, run_ids, gr.update()
|
|
|
|
except Exception as e:
|
|
history[-1]["content"] += f"\n\n[오류: {e}]"
|
|
yield history, "", None, run_ids, gr.update()
|
|
return
|
|
|
|
run_ids.append(collected_run_id)
|
|
|
|
if use_tts:
|
|
audio_path = await tts_speak(tts_text)
|
|
yield history, "", audio_path, run_ids, gr.update()
|
|
else:
|
|
yield history, "", None, run_ids, gr.update()
|
|
|
|
|
|
def handle_feedback(like_data: gr.LikeData, history, run_ids, user_id):
|
|
idx = like_data.index
|
|
if isinstance(idx, (list, tuple)):
|
|
idx = idx[0]
|
|
if not isinstance(idx, int) or idx < 0 or idx >= len(history):
|
|
return
|
|
if history[idx].get("role") != "assistant":
|
|
return
|
|
# idx 위치까지 등장한 assistant 메시지 수 = 이 메시지의 0-based 턴 번호
|
|
asst_turn = sum(1 for m in history[:idx] if m.get("role") == "assistant")
|
|
run_id = run_ids[asst_turn] if run_ids and asst_turn < len(run_ids) else None
|
|
|
|
user_msg = str(history[idx - 1]["content"]) if idx > 0 else ""
|
|
asst_msg = str(history[idx]["content"])
|
|
rating = 1 if like_data.liked else -1
|
|
|
|
try:
|
|
asyncio.run(api_client.save_feedback(user_id, user_msg, asst_msg, rating, run_id))
|
|
except Exception as e:
|
|
print(f"[Feedback] 저장 실패: {e}")
|
|
|
|
|
|
def switch_user(user_id):
|
|
return [], []
|
|
|
|
|
|
def reset_chat(user_id):
|
|
try:
|
|
asyncio.run(api_client.reset(user_id))
|
|
except Exception as e:
|
|
print(f"[Reset] 실패: {e}")
|
|
return [], []
|
|
|
|
|
|
# ── 문서 관리 ─────────────────────────────────────────────────────
|
|
|
|
def ingest_files(files):
|
|
if not files:
|
|
return "파일을 선택해주세요."
|
|
paths = [f if isinstance(f, str) else f.name for f in files]
|
|
results = []
|
|
for path in paths:
|
|
try:
|
|
result = asyncio.run(api_client.ingest(path))
|
|
name = os.path.basename(path)
|
|
results.append(f"{name} → {result.get('chunks', '?')}개 청크")
|
|
except Exception as e:
|
|
results.append(f"{os.path.basename(path)} 오류: {e}")
|
|
return "\n".join(results)
|
|
|
|
|
|
def list_docs():
|
|
try:
|
|
sources = asyncio.run(api_client.list_documents())
|
|
return [[os.path.basename(s), s] for s in sources]
|
|
except Exception as e:
|
|
return [[f"오류: {e}", ""]]
|
|
|
|
|
|
def delete_doc(source):
|
|
if not source.strip():
|
|
return "삭제할 파일 경로를 입력하세요.", list_docs()
|
|
try:
|
|
asyncio.run(api_client.delete_document(source.strip()))
|
|
return f"삭제 완료: {os.path.basename(source.strip())}", list_docs()
|
|
except Exception as e:
|
|
return f"오류: {e}", list_docs()
|
|
|
|
|
|
# ── UI 구성 ──────────────────────────────────────────────────────
|
|
|
|
_BOX_STYLE = (
|
|
"background:#f9f9f9;border-left:3px solid #bbb;border-radius:6px;"
|
|
"padding:8px 14px;margin-bottom:6px;"
|
|
)
|
|
_CONTENT_STYLE = (
|
|
"margin-top:6px;white-space:pre-wrap;font-size:0.85em;"
|
|
"color:#555;max-height:160px;overflow-y:auto;"
|
|
)
|
|
|
|
|
|
def _last_line(text: str) -> str:
|
|
"""현재 진행 중인 마지막 비어있지 않은 줄 반환."""
|
|
lines = [l for l in text.split("\n") if l.strip()]
|
|
return lines[-1] if lines else text.strip()
|
|
|
|
|
|
def _live_html(text: str) -> str:
|
|
"""스트리밍 중 현재 줄만 보여주는 단순 div (details 미사용 → 닫힘 현상 없음)."""
|
|
return (
|
|
f'<div style="{_BOX_STYLE}">'
|
|
f'<strong>🤔 분석 중...</strong>'
|
|
f'<div style="{_CONTENT_STYLE}">{_html.escape(text)} ▌</div>'
|
|
f'</div>'
|
|
)
|
|
|
|
|
|
def _thinking_html(text: str) -> str:
|
|
"""완료 후 전체 내용을 접기/펼치기로 표시."""
|
|
return (
|
|
f'<details style="{_BOX_STYLE}">'
|
|
f'<summary style="cursor:pointer;font-weight:bold;">💭 분석 완료</summary>'
|
|
f'<div style="{_CONTENT_STYLE}">{_html.escape(text)}</div>'
|
|
f'</details>'
|
|
)
|
|
|
|
|
|
def _status_html(status: str) -> str:
|
|
"""내용 없이 상태만 표시하는 단순 헤더."""
|
|
return (
|
|
f'<div style="{_BOX_STYLE}">'
|
|
f'<strong>🤔 {_html.escape(status)}</strong>'
|
|
f'</div>'
|
|
)
|
|
|
|
|
|
with gr.Blocks(title="율봇") as demo:
|
|
gr.Markdown("# 율봇\n육아·금융 전문 AI 상담 도우미")
|
|
|
|
user_state = gr.State(DEFAULT_USER)
|
|
run_ids_state = gr.State([])
|
|
|
|
with gr.Tab("대화"):
|
|
with gr.Row():
|
|
user_selector = gr.Dropdown(
|
|
choices=USER_LABELS,
|
|
value=DEFAULT_USER,
|
|
label="사용자",
|
|
scale=1,
|
|
)
|
|
|
|
thinking_box = gr.HTML(value="")
|
|
chatbot = gr.Chatbot(label="율봇", height=500)
|
|
with gr.Row():
|
|
msg_box = gr.Textbox(
|
|
placeholder="질문을 입력하세요... (Enter로 전송)",
|
|
label="",
|
|
scale=5,
|
|
autofocus=True,
|
|
)
|
|
send_btn = gr.Button("전송", variant="primary", scale=1)
|
|
|
|
with gr.Row():
|
|
audio_input = gr.Audio(
|
|
sources=["microphone"],
|
|
type="filepath",
|
|
label="음성으로 질문하기",
|
|
scale=4,
|
|
)
|
|
transcribe_btn = gr.Button("음성 → 텍스트 변환", scale=1)
|
|
|
|
with gr.Row():
|
|
show_thinking = gr.Checkbox(label="사고 과정 표시", value=True)
|
|
use_tts = gr.Checkbox(label="음성으로 답변 읽기 (TTS)", value=False)
|
|
reset_btn = gr.Button("대화 초기화", size="sm")
|
|
|
|
tts_output = gr.Audio(label="음성 답변", autoplay=True, visible=False)
|
|
use_tts.change(lambda v: gr.Audio(visible=v), inputs=[use_tts], outputs=[tts_output])
|
|
|
|
user_selector.change(
|
|
switch_user,
|
|
inputs=[user_selector],
|
|
outputs=[chatbot, run_ids_state],
|
|
).then(
|
|
lambda u: u, inputs=[user_selector], outputs=[user_state]
|
|
)
|
|
|
|
transcribe_btn.click(transcribe_audio, inputs=[audio_input], outputs=[msg_box])
|
|
|
|
send_btn.click(
|
|
respond,
|
|
inputs=[msg_box, chatbot, show_thinking, user_state, use_tts, run_ids_state],
|
|
outputs=[chatbot, msg_box, tts_output, run_ids_state, thinking_box],
|
|
)
|
|
msg_box.submit(
|
|
respond,
|
|
inputs=[msg_box, chatbot, show_thinking, user_state, use_tts, run_ids_state],
|
|
outputs=[chatbot, msg_box, tts_output, run_ids_state, thinking_box],
|
|
)
|
|
reset_btn.click(reset_chat, inputs=[user_state], outputs=[chatbot, run_ids_state])
|
|
|
|
chatbot.like(
|
|
handle_feedback,
|
|
inputs=[chatbot, run_ids_state, user_state],
|
|
outputs=[],
|
|
)
|
|
|
|
with gr.Tab("문서 등록"):
|
|
gr.Markdown("PDF 또는 TXT 파일을 업로드하면 율봇이 내용을 참고해 답변합니다.")
|
|
file_input = gr.File(
|
|
file_types=[".pdf", ".txt"],
|
|
file_count="multiple",
|
|
label="파일 선택",
|
|
)
|
|
ingest_btn = gr.Button("문서 수집", variant="primary")
|
|
ingest_status = gr.Textbox(label="결과", interactive=False)
|
|
ingest_btn.click(ingest_files, inputs=[file_input], outputs=[ingest_status])
|
|
|
|
with gr.Tab("문서 관리"):
|
|
gr.Markdown("Qdrant에 등록된 문서 목록입니다. 불필요한 문서를 삭제할 수 있습니다.")
|
|
doc_table = gr.Dataframe(
|
|
headers=["파일명", "전체 경로"],
|
|
label="등록된 문서",
|
|
interactive=False,
|
|
)
|
|
refresh_btn = gr.Button("새로고침")
|
|
gr.Markdown("---")
|
|
with gr.Row():
|
|
delete_source = gr.Textbox(
|
|
label="삭제할 파일 경로",
|
|
placeholder="위 표에서 전체 경로를 복사해 붙여넣으세요",
|
|
scale=4,
|
|
)
|
|
delete_btn = gr.Button("삭제", variant="stop", scale=1)
|
|
delete_status = gr.Textbox(label="결과", interactive=False)
|
|
|
|
refresh_btn.click(list_docs, outputs=[doc_table])
|
|
delete_btn.click(delete_doc, inputs=[delete_source], outputs=[delete_status, doc_table])
|
|
demo.load(list_docs, outputs=[doc_table])
|
|
|
|
|
|
if __name__ == "__main__":
|
|
demo.launch(server_name="0.0.0.0", server_port=7860, theme=gr.themes.Soft())
|