145b0cc96f
- Phase 12: FeedbackRepository + td_feedback 테이블, Gradio 👍/👎 이벤트, run_id 추적, LangSmith create_feedback() 연동 - Phase 13: 커스텀 _SemanticSplitter 제거 → langchain_experimental.SemanticChunker 교체, buffer_size/threshold_type 환경변수 적용 - Phase 13-B: RerankService (Cross-Encoder), RetrieverService.search()에 reranker 통합, tools.py as_retriever() → search() 전환 - Bug 5: mlx_chat_model enable_thinking 런타임 오버라이드, agent_service stream_mode=["messages","custom"] 이중 스트림, thinking 토큰 custom 이벤트로 emit - ROADMAP: LLM 모델명 8B 반영, RAG에 Reranker 추가, 추천 진행 순서 갱신 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
135 lines
4.7 KiB
Python
135 lines
4.7 KiB
Python
from __future__ import annotations
|
|
import threading
|
|
from typing import Any
|
|
|
|
|
|
class DatabaseService:
|
|
"""MySQL 연결을 캡슐화하는 서비스. 미설정 시 graceful skip.
|
|
|
|
스레드별 독립 연결(thread-local)을 사용해 LangGraph ToolNode의
|
|
스레드 풀 실행과 pymysql 비안전성 문제를 해결한다.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
host: str,
|
|
port: int,
|
|
db: str,
|
|
user: str,
|
|
password: str,
|
|
):
|
|
self._config = dict(host=host, port=port, db=db, user=user, passwd=password)
|
|
self._local = threading.local()
|
|
|
|
# ── DB 연결 ────────────────────────────────────────────────────────
|
|
|
|
def _get_conn(self):
|
|
if not self._config["user"]:
|
|
return None
|
|
|
|
import pymysql
|
|
conn = getattr(self._local, "conn", None)
|
|
if conn is None:
|
|
try:
|
|
self._local.conn = pymysql.connect(**self._config)
|
|
except Exception as e:
|
|
print(f"[DB] 연결 실패: {e}")
|
|
return None
|
|
else:
|
|
try:
|
|
conn.ping(reconnect=True)
|
|
except Exception:
|
|
try:
|
|
self._local.conn = pymysql.connect(**self._config)
|
|
except Exception as e:
|
|
print(f"[DB] 재연결 실패: {e}")
|
|
return None
|
|
return self._local.conn
|
|
|
|
def connect(self) -> None:
|
|
self._get_conn()
|
|
|
|
def execute(self, sql: str, params: tuple = ()) -> list[dict[str, Any]]:
|
|
conn = self._get_conn()
|
|
if conn is None:
|
|
return []
|
|
cursor = conn.cursor()
|
|
cursor.execute(sql, params)
|
|
columns = [d[0] for d in cursor.description or []]
|
|
return [dict(zip(columns, row)) for row in cursor.fetchall()]
|
|
|
|
def execute_write(self, sql: str, params: tuple = ()) -> int:
|
|
conn = self._get_conn()
|
|
if conn is None:
|
|
return 0
|
|
cursor = conn.cursor()
|
|
cursor.execute(sql, params)
|
|
conn.commit()
|
|
return cursor.lastrowid
|
|
|
|
def init_schema(self) -> None:
|
|
conn = self._get_conn()
|
|
if conn is None:
|
|
return
|
|
cursor = conn.cursor()
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS td_conversations (
|
|
id INT AUTO_INCREMENT PRIMARY KEY,
|
|
user_id VARCHAR(50) NOT NULL DEFAULT 'default',
|
|
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
|
|
)
|
|
""")
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS td_messages (
|
|
id INT AUTO_INCREMENT PRIMARY KEY,
|
|
conversation_id INT NOT NULL,
|
|
role VARCHAR(20) NOT NULL,
|
|
content TEXT NOT NULL,
|
|
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
|
|
FOREIGN KEY (conversation_id) REFERENCES td_conversations(id)
|
|
)
|
|
""")
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS td_user_profile (
|
|
id INT AUTO_INCREMENT PRIMARY KEY,
|
|
user_id VARCHAR(50) NOT NULL DEFAULT 'default',
|
|
key_name VARCHAR(100) NOT NULL,
|
|
value TEXT NOT NULL,
|
|
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
|
|
UNIQUE KEY uq_user_key (user_id, key_name)
|
|
)
|
|
""")
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS td_feedback (
|
|
id INT AUTO_INCREMENT PRIMARY KEY,
|
|
user_id VARCHAR(50) NOT NULL DEFAULT 'default',
|
|
message TEXT,
|
|
response TEXT,
|
|
rating TINYINT,
|
|
langsmith_run_id VARCHAR(100),
|
|
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
|
|
)
|
|
""")
|
|
conn.commit()
|
|
self._migrate_schema(conn)
|
|
|
|
def _migrate_schema(self, conn) -> None:
|
|
cursor = conn.cursor()
|
|
for sql in [
|
|
"ALTER TABLE td_conversations ADD COLUMN user_id VARCHAR(50) NOT NULL DEFAULT 'default'",
|
|
"ALTER TABLE td_user_profile ADD COLUMN user_id VARCHAR(50) NOT NULL DEFAULT 'default'",
|
|
"ALTER TABLE td_user_profile DROP INDEX key_name",
|
|
"ALTER TABLE td_user_profile ADD UNIQUE KEY uq_user_key (user_id, key_name)",
|
|
]:
|
|
try:
|
|
cursor.execute(sql)
|
|
conn.commit()
|
|
except Exception:
|
|
pass
|
|
|
|
def close(self) -> None:
|
|
conn = getattr(self._local, "conn", None)
|
|
if conn:
|
|
conn.close()
|
|
self._local.conn = None
|