Add Telegram bot and API client integration with SSE support

This commit is contained in:
sal
2026-05-30 23:45:45 +09:00
commit 8c99562e0b
4 changed files with 217 additions and 0 deletions
+11
View File
@@ -0,0 +1,11 @@
TELEGRAM_BOT_TOKEN=your_bot_token_here
YOULBOT_API_URL=http://192.168.10.x:8000
YOULBOT_API_TOKEN=your_api_token_here
# Telegram numeric ID → youlbot user_id 매핑
# BotFather에서 봇 생성 후, 각 가족의 Telegram 계정 ID 입력
# ID 확인: 봇에 /start 전송 후 로그에서 확인
USER_아록_TELEGRAM_ID=
USER_근혜_TELEGRAM_ID=
USER_도율_TELEGRAM_ID=
USER_하율_TELEGRAM_ID=
+61
View File
@@ -0,0 +1,61 @@
"""율봇 API 클라이언트 — youlbot REST API를 httpx로 호출."""
import json
import os
from typing import AsyncIterator
import httpx
from dotenv import load_dotenv
load_dotenv()
_API_URL = os.getenv("YOULBOT_API_URL", "http://localhost:8000").rstrip("/")
_API_TOKEN = os.getenv("YOULBOT_API_TOKEN", "")
def _headers() -> dict:
if _API_TOKEN:
return {"Authorization": f"Bearer {_API_TOKEN}"}
return {}
async def chat(
message: str,
user_id: str = "default",
show_thinking: bool = False,
) -> AsyncIterator[tuple[str, str | None]]:
"""SSE 스트림을 읽어 (token, run_id) 튜플을 yield.
- 일반 토큰: (token_str, None)
- 스트림 종료: ("", run_id_or_None) ← __done 이벤트
"""
async with httpx.AsyncClient(timeout=180) as client:
async with client.stream(
"POST",
f"{_API_URL}/chat",
json={"message": message, "user_id": user_id, "show_thinking": show_thinking},
headers=_headers(),
) as response:
response.raise_for_status()
async for line in response.aiter_lines():
if not line.startswith("data: "):
continue
raw = line[6:]
try:
payload = json.loads(raw)
except json.JSONDecodeError:
yield raw, None
continue
if isinstance(payload, dict) and payload.get("__done"):
yield "", payload.get("run_id")
return
yield payload, None
async def reset(user_id: str = "default") -> None:
async with httpx.AsyncClient(timeout=30) as client:
r = await client.post(
f"{_API_URL}/reset",
params={"user_id": user_id},
headers=_headers(),
)
r.raise_for_status()
+142
View File
@@ -0,0 +1,142 @@
"""율봇 Telegram Bot — youlbot REST API를 호출하는 독립 클라이언트."""
import asyncio
import logging
import os
import time
from dotenv import load_dotenv
from telegram import Update
from telegram.constants import ChatAction
from telegram.ext import Application, CommandHandler, ContextTypes, MessageHandler, filters
import api_client
load_dotenv()
logging.basicConfig(
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
level=logging.INFO,
)
logger = logging.getLogger(__name__)
_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN", "")
# Telegram numeric ID → youlbot user_id 매핑
TELEGRAM_USER_MAP: dict[str, str] = {
k: v
for k, v in {
os.getenv("USER_아록_TELEGRAM_ID"): "아록",
os.getenv("USER_근혜_TELEGRAM_ID"): "근혜",
os.getenv("USER_도율_TELEGRAM_ID"): "도율",
os.getenv("USER_하율_TELEGRAM_ID"): "하율",
}.items()
if k # 값이 설정된 항목만 포함
}
# 스트리밍 편집 주기 (초) — Telegram은 chat당 1msg/s 제한
_EDIT_INTERVAL = 0.6
# 편집 트리거 누적 문자 수
_EDIT_THRESHOLD = 80
def _get_user_id(telegram_id: int) -> str | None:
return TELEGRAM_USER_MAP.get(str(telegram_id))
async def cmd_start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
user_id = _get_user_id(update.effective_user.id)
if user_id:
text = (
f"안녕하세요! 율봇입니다.\n"
f"현재 사용자: *{user_id}*\n\n"
"질문을 입력하거나 아래 명령어를 사용하세요:\n"
"/reset — 대화 이력 초기화"
)
else:
text = (
"등록되지 않은 사용자입니다. 관리자에게 문의하세요.\n"
f"내 Telegram ID: `{update.effective_user.id}`"
)
await update.message.reply_text(text, parse_mode="Markdown")
async def cmd_reset(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
user_id = _get_user_id(update.effective_user.id)
if not user_id:
await update.message.reply_text(
f"등록되지 않은 사용자입니다. (ID: {update.effective_user.id})"
)
return
try:
await api_client.reset(user_id)
await update.message.reply_text(f"*{user_id}*님의 대화 이력을 초기화했습니다.", parse_mode="Markdown")
except Exception as e:
logger.error("reset error: %s", e)
await update.message.reply_text("초기화 중 오류가 발생했습니다. 잠시 후 다시 시도하세요.")
async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
user_id = _get_user_id(update.effective_user.id)
if not user_id:
await update.message.reply_text(
"등록되지 않은 사용자입니다. 관리자에게 문의하세요.\n"
f"내 Telegram ID: `{update.effective_user.id}`",
parse_mode="Markdown",
)
return
await update.effective_chat.send_action(ChatAction.TYPING)
# 응답 메시지 플레이스홀더 전송
reply_msg = await update.message.reply_text("...")
accumulated = ""
last_edit_time = time.monotonic()
last_edit_text = ""
try:
async for token, run_id in api_client.chat(update.message.text, user_id):
if run_id is not None:
# 스트림 종료 — 최종 텍스트로 한 번 더 편집
if accumulated and accumulated != last_edit_text:
await reply_msg.edit_text(accumulated)
break
accumulated += token
now = time.monotonic()
chars_since_edit = len(accumulated) - len(last_edit_text)
if (now - last_edit_time >= _EDIT_INTERVAL or chars_since_edit >= _EDIT_THRESHOLD) and accumulated:
try:
await reply_msg.edit_text(accumulated)
last_edit_text = accumulated
last_edit_time = now
except Exception:
pass # FloodWait 등 일시적 오류 무시 — 다음 주기에 재시도
# 빈 응답 처리
if not accumulated:
await reply_msg.edit_text("(응답 없음)")
except Exception as e:
logger.error("chat error for user=%s: %s", user_id, e)
await reply_msg.edit_text("오류가 발생했습니다. API 서버 상태를 확인하세요.")
def main() -> None:
if not _BOT_TOKEN:
raise ValueError("TELEGRAM_BOT_TOKEN이 설정되지 않았습니다. .env 파일을 확인하세요.")
logger.info("Telegram 유저 매핑: %s", {v: k for k, v in TELEGRAM_USER_MAP.items()})
app = Application.builder().token(_BOT_TOKEN).build()
app.add_handler(CommandHandler("start", cmd_start))
app.add_handler(CommandHandler("reset", cmd_reset))
app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message))
logger.info("율봇 Telegram Bot 시작")
app.run_polling(allowed_updates=Update.ALL_TYPES)
if __name__ == "__main__":
main()
+3
View File
@@ -0,0 +1,3 @@
python-telegram-bot>=20.0
httpx>=0.27.0
python-dotenv>=1.0.0