Files
youlbot/services/events/event_bus.py
T
shinalok cd41e9e33e - **Bootstrap IoC-based architecture with modular services.**
- **Implement `MlxModelService` for local LLM backend.**
- **Introduce `DatabaseService` for MySQL integration.**
- **Add `HistoryService` to manage conversation context.**
- **Set up CLI interface via `CliUiService`.**
- **Establish EventBus for token streaming.**
- **Include conversation repository for data persistence.**
- **Add environment-based configuration management.**
- **Draft IoC architectural plan.**
2026-04-25 01:14:37 +09:00

20 lines
596 B
Python

from collections import defaultdict
from typing import Callable
class EventBus:
"""Observer 패턴 기반 이벤트 버스."""
def __init__(self):
self._handlers: dict[str, list[Callable]] = defaultdict(list)
def subscribe(self, event: str, handler: Callable) -> None:
self._handlers[event].append(handler)
def unsubscribe(self, event: str, handler: Callable) -> None:
self._handlers[event].remove(handler)
def publish(self, event: str, *args, **kwargs) -> None:
for handler in self._handlers[event]:
handler(*args, **kwargs)