# events/kafka_topics
from aiokafka.admin import NewTopic
KAFKA_TOPICS = [NewTopic("orders_created", num_partitions=3, replication_factor=1)]
# events/create_topics.py
class KafkaTopicManager:
def __init__(self, settings: Settings) -> None:
self.settings = settings
async def __aenter__(self) -> Self:
self.admin_client = AIOKafkaAdminClient(bootstrap_servers=self.settings.kafka.kafka_url)
await self.admin_client.start()
return self
async def __aexit__(
self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exc_tb: Any
) -> None:
if self.admin_client:
await self.admin_client.close()
return None
async def create_topics(self, topics: list[NewTopic]) -> None:
if not self.admin_client:
raise RuntimeError("Admin client is not started")
existing_topics = await self.admin_client.list_topics()
topics_to_create = [topic for topic in topics if topic.name not in existing_topics]
if not topics_to_create:
logger.info("All topics already exist.")
return
await self.admin_client.create_topics(new_topics=topics_to_create)
logger.info(f"Created topics: {[topic.name for topic in topics_to_create]}")
return None
# main.py
from events.publisher import KafkaProducerBase
from events.create_topics import KafkaTopicManager
from contextlib import asynccontextmanager
from typing import AsyncGenerator
from events.kafka_topics import KAFKA_TOPICS
from depends import container
@asynccontextmanager
async def lifespan(_: FastAPI) -> AsyncGenerator[None, None]:
async with KafkaTopicManager(settings) as manager:
await manager.create_topics(KAFKA_TOPICS)
kafka: KafkaProducerBase = container.resolve(KafkaProducerBase)
await kafka.start()
yield
# events/publisher.py
import logging
from aiokafka import AIOKafkaProducer
from events.schemas import KafkaEventBase, OrderCreateEvent
from settings import Settings
logger = logging.getLogger(__name__)
class KafkaProducerBase:
def __init__(self, settings: Settings):
self._settings = settings
self._producer = AIOKafkaProducer(bootstrap_servers=self._settings.kafka.kafka_url, acks=1)
async def start(self) -> None:
await self._producer.start()
logging.info("Kafka producer started")
return None
async def stop(self) -> None:
if self._producer:
await self._producer.stop()
logging.info("Kafka producer stopped")
return None
async def send(self, topic: str, message: KafkaEventBase) -> None:
if not self._producer:
raise RuntimeError("Kafka producer not started")
await self._producer.send_and_wait(topic, message.to_kafka_bytes())
return None
class OrderProducer:
def __init__(self, kafka: KafkaProducerBase) -> None:
self._kafka = kafka
async def publish_order(self, message: OrderCreateEvent) -> None:
await self._kafka.send("orders_created", message)
logging.info(f"Send event 'orders_created' {message.id}")
return None
async with self._producer.transaction():
await self._producer.send_and_wait(topic, message.to_kafka_bytes())
#services/order_service.py
from events.publisher import OrderProducer
from events.schemas import OrderCreateEvent
from repositories.orders_repository import OrdersRepository
from schemas.order import OrderCreateSchema, OrderResponse
class OrderService:
def __init__(self, repository: OrdersRepository, producer: OrderProducer):
self.repository = repository
self.producer = producer
async def create_order(self, data: OrderCreateSchema) -> OrderResponse:
result = await self.repository.create(data)
await self.producer.publish_order(
message=OrderCreateEvent(id=result.id, user_id=result.user_id, total_price=result.total_price)
)
return OrderResponse(**result.model_dump())
import logging
from email.message import EmailMessage
import aiosmtplib
from enums.events import SubscriberEventType
from events.schemas import OrderCreateEvent
from repositories.subscribers_reposiroties import SubscribersRepository
from schemas.email import OrderEmailMessage
from settings import Settings
logger = logging.getLogger(__name__)
class EmailSendError(Exception):
pass
class EmailService:
def __init__(self, settings: Settings, repository: SubscribersRepository):
self.settings = settings
self.repository = repository
async def _send_email(self, message: OrderEmailMessage) -> None:
logger.info("Sending email")
email = EmailMessage()
email["From"] = self.settings.email.sender
email["Subject"] = message.subject
email.set_content(message.body)
try:
result = await aiosmtplib.send(
email,
recipients=message.recipients,
hostname=self.settings.email.host,
port=self.settings.email.port,
username=self.settings.email.username,
password=self.settings.email.password,
use_tls=self.settings.email.use_tls,
)
logger.info(f"Email was sent: {result}")
except Exception as e:
raise EmailSendError(f"Failed to send email - {e}")
async def notification_on_order_create(self, event: OrderCreateEvent) -> None:
emails = await self.repository.get_emails_for_notification(SubscriberEventType.order_create)
await self._send_email(
OrderEmailMessage(
recipients=emails,
subject=f"Была создана заявка - {event.id}",
body=f"Создана заявка на сумму - {event.total_price}",
)
)
#events/handler.py
import asyncio
import json
import logging
from asyncio import AbstractEventLoop, Task
from typing import Callable, Dict, Coroutine, Any
from aiokafka import AIOKafkaConsumer
from pydantic import ValidationError
from events.schemas import OrderCreateEvent
from service.email import EmailSendError, EmailService
from settings import Settings
logger = logging.getLogger(__name__)
class KafkaConsumerBase:
def __init__(self, settings: Settings, loop: AbstractEventLoop) -> None:
# Инициализация базового Kafka-потребителя
self._settings = settings
self._loop = loop
self._consumer = AIOKafkaConsumer(
bootstrap_servers=self._settings.kafka.kafka_url, # Адреса Kafka-брокеров
group_id="fastapi-consumer", # ID группы потребителей
loop=self._loop, # Event loop для асинхронной работы
auto_offset_reset="earliest", # Стратегия чтения: начинать с самого раннего доступного сообщения
)
self._handlers: Dict[str, Callable[[dict[Any, Any]], Coroutine[Any, Any, None]]] = {}
# Словарь для хранения обработчиков по каждому топику
self._task: Task[None] | None = None # Задача для фонового чтения сообщений
def register_handler(self, topic: str, handler: Callable[[dict[Any, Any]], Coroutine[Any, Any, None]]) -> None:
# Регистрируем обработчик для указанного топика
self._handlers[topic] = handler
async def start(self) -> None:
# Запускаем потребителя и подписываемся на все топики, для которых зарегистрированы обработчики
await self._consumer.start()
await self._consumer.subscribe(topics=list(self._handlers.keys()))
logging.info("Kafka consumer started")
# Запускаем фоновую задачу для обработки сообщений
self._task = asyncio.create_task(self._consume())
async def stop(self) -> None:
# Останавливаем потребителя и завершаем фоновую задачу
if self._task:
self._task.cancel()
if self._consumer:
await self._consumer.stop()
logging.info("Kafka consumer stopped")
async def _consume(self) -> None:
# Основной цикл обработки входящих сообщений Kafka
try:
async for msg in self._consumer:
topic = msg.topic # Определяем из какого топика пришло сообщение
value = json.loads(msg.value.decode("utf-8")) # Декодируем сообщение в JSON
handler = self._handlers.get(topic)
if handler:
await handler(value) # Вызываем обработчик, передавая ему сообщение
else:
logging.info(f"No handler for topic {topic}")
except asyncio.CancelledError:
pass
class OrderConsumerService(KafkaConsumerBase):
# Конкретная реализация потребителя для событий заказов
def __init__(self, settings: Settings, loop: AbstractEventLoop, email_service: EmailService):
super().__init__(settings, loop)
self.email_service = email_service
# Регистрируем обработчик событий для топика "orders_created"
self.register_handler("orders_created", self.handle_order_created)
async def handle_order_created(self, data: dict) -> None:
logger.info("Event received")
try:
event = OrderCreateEvent.model_validate(data)
except ValidationError as e:
# Если данные некорректные -- логируем ошибку и пропускаем сообщение
logger.info(f"Error data for event order created - {e}")
return
try:
# Отправляем email-уведомление о создании заказа
await self.email_service.notification_on_order_create(event)
logger.info(f"Send email 'order_created' - {event.id}")