Skip to content

Lifespan

Lifespan handler inside EVKafka allows for an application to execute code before and after the main application cycle.

@asynccontextmanager
async def lifespan():
    async init_resources()
    yield
    async teardown_resources()

app = EVKafkaApp(lifespan=lifespan)

State

You may want to share some objects between the lifespan and certain endpoints. This might be a database connection or a producer instance which are used to communicate with other services.

The lifespan can yield these objects in a dictionary which is later accessible with a State object attached to a Request instance.

import aiohttp
from contextlib import asynccontextmanager
from evkafka import EVKafkaApp, Request


@asynccontextmanager
async def lifespan():
    async with aiohttp.ClientSession() as session:
        yield {'client_session': session}

config = {
    ...
}

app = EVKafkaApp(config=config, lifespan=lifespan)


@app.event('Event')
async def handle_event(event: dict, request: Request):
    session = request.state.client_session
    async with session.post('http://example.com', data=event) as resp:
        assert resp.status == 200


app.run()