Skip to content

Producers

Producer configuration

Producer configuration is a dictionary containing producer settings.

config = {
    "bootstrap_servers": "kafka:9092",

}
EVKafkaProducer is a tiny wrapper around AIOKafkaProducer. Full list of config options can be found in the official docs at the AIOKafkaProducer page.

Standalone producer

from evkafka import EVKafkaProducer
from pydantic import BaseModel


async def produce(event: BaseModel, event_type: str):
    config = {
        "topic": "topic", 
        "bootstrap_servers": "kafka:9092"
    }

    async with EVKafkaProducer(config) as producer:
        await producer.send_event(
            event=event,
            event_type=event_type,
        )
If events are being sent to a different topics you may skip adding a default topic to config and pass it to send_event call directly.
    await producer.send_event(
        event=event,
        event_type=event_type,
        topic=topic,
    )

Standalone producer lifetime

Usually you don't want to instantiate a producer every time you need to produce an event. A common way is to run async context manager once at application lifespan. This might be the EVKafka lifespan or FastAPI/Starlette.