60 MINS intermediate
7. Bots & Agents Architecture
Module 07: Bot Architecture
Designing Bots That Don't Fall Apart
Building a bot that runs for 10 minutes is easy. Building one that runs reliably for 10 months โ handling rate limits, platform API changes, network failures, authentication expiry, and unexpected edge cases โ requires deliberate architectural design. This module covers the patterns that distinguish hobbyist scripts from production-grade bot systems.
๐๏ธ The Bot Architecture Stack
A production bot system consists of six layers, each with distinct responsibilities:
- Layer 1 โ Transport: The communication channel to the platform. HTTP API, WebSocket connection, or message queue. Responsible for authentication, connection management, and retry logic.
- Layer 2 โ Event Router: Receives all incoming events and routes them to the correct handler based on event type, content, or metadata. Acts as a dispatcher.
- Layer 3 โ Command Parser: Interprets user input and extracts intent, entities, and parameters. Transforms raw text/events into structured commands.
- Layer 4 โ Business Logic: The actual bot behavior. Pure functions that take structured inputs and produce actions. Should have zero I/O โ making it fully testable.
- Layer 5 โ State Management: Persists conversation context, user preferences, session data, and bot state. Usually a combination of in-memory cache (Redis) and persistent database (SQLite/PostgreSQL).
- Layer 6 โ Action Executor: Performs the actual effects โ sending messages, making API calls, writing to databases, triggering webhooks.
๐ Event-Driven Architecture
Every bot is fundamentally an event processor. The correct mental model is a loop: receive event โ classify โ route โ handle โ respond โ log.
from dataclasses import dataclass, field
from typing import Any, Callable, Dict, List, Optional
from enum import Enum
from datetime import datetime
class EventType(Enum):
MESSAGE = 'message'
COMMAND = 'command'
CALLBACK = 'callback'
JOIN = 'member_join'
LEAVE = 'member_leave'
ERROR = 'error'
@dataclass
class Event:
type: EventType
platform: str # 'telegram', 'discord', 'slack'
user_id: str
chat_id: str
content: Any
metadata: Dict = field(default_factory=dict)
timestamp: datetime = field(default_factory=datetime.now)
@dataclass
class Action:
type: str # 'send_message', 'edit_message', 'delete_message'
target: str # chat_id or user_id
payload: Dict = field(default_factory=dict)
class EventRouter:
def __init__(self):
self._handlers: Dict[EventType, List[Callable]] = {}
self._middleware: List[Callable] = []
def on(self, event_type: EventType):
def decorator(handler: Callable):
if event_type not in self._handlers:
self._handlers[event_type] = []
self._handlers[event_type].append(handler)
return handler
return decorator
def use(self, middleware: Callable):
self._middleware.append(middleware)
async def dispatch(self, event: Event) -> List[Action]:
# Run middleware chain
for mw in self._middleware:
event = await mw(event)
if event is None:
return [] # middleware filtered this event
handlers = self._handlers.get(event.type, [])
actions = []
for handler in handlers:
result = await handler(event)
if result:
actions.extend(result if isinstance(result, list) else [result])
return actions
# Usage
router = EventRouter()
@router.on(EventType.COMMAND)
async def handle_command(event: Event) -> Optional[Action]:
command = event.content.get('command')
if command == '/start':
return Action(
type='send_message',
target=event.chat_id,
payload={'text': f'Hello {event.user_id}! Bot is online.'}
)
return None๐พ State Management Patterns
import json
from typing import Any, Optional
from datetime import datetime, timedelta
import sqlite3
class ConversationState:
'''Manages per-user conversation state with TTL support'''
def __init__(self, db_path='bot_state.db'):
self.conn = sqlite3.connect(db_path, check_same_thread=False)
self._init_db()
def _init_db(self):
self.conn.execute('''
CREATE TABLE IF NOT EXISTS user_state (
user_id TEXT,
chat_id TEXT,
key TEXT,
value TEXT,
expires_at TEXT,
updated_at TEXT,
PRIMARY KEY (user_id, chat_id, key)
)
''')
self.conn.commit()
def set(self, user_id: str, chat_id: str, key: str, value: Any, ttl_minutes: int = 30):
expires_at = (datetime.now() + timedelta(minutes=ttl_minutes)).isoformat()
self.conn.execute('''
INSERT OR REPLACE INTO user_state VALUES (?, ?, ?, ?, ?, ?)
''', (user_id, chat_id, key, json.dumps(value), expires_at, datetime.now().isoformat()))
self.conn.commit()
def get(self, user_id: str, chat_id: str, key: str) -> Optional[Any]:
row = self.conn.execute(
'SELECT value, expires_at FROM user_state WHERE user_id=? AND chat_id=? AND key=?',
(user_id, chat_id, key)
).fetchone()
if not row:
return None
if datetime.now() > datetime.fromisoformat(row[1]):
self.delete(user_id, chat_id, key) # expired
return None
return json.loads(row[0])
def delete(self, user_id: str, chat_id: str, key: str):
self.conn.execute(
'DELETE FROM user_state WHERE user_id=? AND chat_id=? AND key=?',
(user_id, chat_id, key)
)
self.conn.commit()
state = ConversationState()
state.set('user123', 'chat456', 'step', 'awaiting_email', ttl_minutes=10)
current_step = state.get('user123', 'chat456', 'step')
print(f'User is at step: {current_step}')๐ก๏ธ Rate Limiting and Resilience
import asyncio
from collections import defaultdict
from datetime import datetime, timedelta
from functools import wraps
from loguru import logger
class RateLimiter:
'''Token bucket rate limiter โ platform-safe request throttling'''
def __init__(self, calls_per_second: float = 1.0):
self.calls_per_second = calls_per_second
self._locks: Dict[str, asyncio.Lock] = defaultdict(asyncio.Lock)
self._last_call: Dict[str, datetime] = {}
async def acquire(self, key: str = 'default'):
async with self._locks[key]:
if key in self._last_call:
elapsed = (datetime.now() - self._last_call[key]).total_seconds()
wait = (1.0 / self.calls_per_second) - elapsed
if wait > 0:
await asyncio.sleep(wait)
self._last_call[key] = datetime.now()
def with_retry(max_retries=3, backoff=2.0, exceptions=(Exception,)):
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
for attempt in range(max_retries):
try:
return await func(*args, **kwargs)
except exceptions as e:
if attempt == max_retries - 1:
raise
wait = backoff ** attempt
logger.warning(f'Attempt {attempt+1} failed: {e}. Retrying in {wait}s')
await asyncio.sleep(wait)
return wrapper
return decoratorKnowledge Check
Ready to test your understanding of 7. Bots & Agents Architecture?