Route, Layer and Process Kafka Messages with Tansu Services

Tansu is an Apache licensed Kafka compatible streaming platform written in Rust. It uses a Service trait to route, layer and process messages on the broker and client.

Broker

Building a Tansu broker stack starts with a TcpListenerLayer, which until cancelled listens for, and spawns a new task for each incoming TCP connection. A TcpContextLayer is used to limit the maximum accepted frame size, and a TcpBytesLayer to read and write Bytes from the TCPStream. The BytesFrameLayer is used to convert between Bytes and an Apache Kafka protocol Frame:

let token = CancellationToken::new();

let stack = (
    TcpListenerLayer::new(token),
    TcpContextLayer::new(
        TcpContext::default()
        .maximum_frame_size(200_000)),
    TcpBytesLayer::<()>::default(),
    BytesFrameLayer,
);

In the real broker (and proxy) there are additional layers for security, rate limiting, frame hijacking, forwarding and modification, metrics and request batching that can be added to the stack depending on the configuration.

A BytesFrameLayer layers a service that takes a Frame and produces a Frame:

pub struct BytesFrameLayer;

impl<S> Layer<S> for BytesFrameLayer {
    type Service = BytesFrameService<S>;

    fn layer(&self, inner: S) -> Self::Service {
        Self::Service { inner }
    }
}

Taking a closer look at the BytesFrameService shows how the Frame is deserialized from Bytes with Frame::request_from_bytes, passed to the inner service (which takes and returns a Frame), and then serialized back into Bytes with Frame::response:

pub struct BytesFrameService<S> {
    inner: S,
}

impl<S> Service<Bytes> for BytesFrameService<S>
where
    S: Service<Frame, Response = Frame>,
    S::Error: From<tansu_sans_io::Error> + Debug,
{
    type Response = Bytes;
    type Error = S::Error;

    async fn serve(&self, ctx: Context, req: Bytes) -> Result<Self::Response, Self::Error> {
        let req = Frame::request_from_bytes(req)?;
        let api_key = req.api_key()?;
        let api_version = req.api_version()?;
        let correlation_id = req.correlation_id()?;

        self.inner
            .serve(ctx, req)
            .await
            .and_then(|Frame { body, .. }| {
                Frame::response(
                    Header::Response { correlation_id },
                    body,
                    api_key,
                    api_version,
                )
                .map_err(Into::into)
            })
    }
}

In Tansu, the Apache Kafka protocol is implemented with serde using proc macros to transform the JSON definitions into Rust data types at compilation time (as described in Apache Kafka protocol with serde, quote, syn and proc_macro2). This approach means that the generated code is a simple mapping of Kafka data types into the Serde data model.

A Frame is routed to services using a FrameRouteService. Each type of Request can be routed to a different service. In this example, a MetadataRequest is routed to a service that uses a StorageContainer to store and retrieve metadata about brokers, topics and partitions.

Firstly, lets setup some memory storage for the example broker:

let storage = StorageContainer::builder()
    .cluster_id("tansu")
    .node_id(111)
    .advertised_listener(Url::parse("tcp://localhost:9092")?)
    .storage(Url::parse("memory://tansu/")?)
    .build()
    .await?;

StorageContainer provides an abstraction over Storage that can be configured to use memory (for ephemeral enivironments), S3, PostgreSQL, libSQL and Turso (alpha: currently feature locked).

The following metadata function takes a FrameRouteBuilder and a Storage implementation and returns a FrameRouteBuilder with the MetadataRequest routed to a MetdataService using the provided Storage. Note that MapStateLayer is used to make the Storage available to the inner MetdataService service:

fn metadata<S>(
    builder: FrameRouteBuilder<(), Error>,
    storage: S,
) -> Result<FrameRouteBuilder<(), Error>, Error>
where
    S: Storage,
{
    builder
        .with_route(
            MetadataRequest::KEY,
            (
                MapErrLayer::new(Error::from),
                MapStateLayer::new(|_| storage),
                FrameRequestLayer::<MetadataRequest>::new(),
            )
                .into_layer(MetadataService)
                .boxed(),
        )
        .map_err(Into::into)
}

All services that require Storage are included in the following services function:

fn services<S>(
    builder: FrameRouteBuilder<(), Error>,
    storage: S,
) -> Result<FrameRouteBuilder<(), Error>, Error>
where
    S: Storage,
{
    [
        add_offsets_to_txn,
        consumer_group_describe,
        ...lots more services dependent on storage...,
        metadata,
        produce,
        txn_offset_commit_request,
    ]
    .iter()
    .try_fold(builder, |builder, service| {
        service(builder, storage.clone())
    })
}

(services that depend on a consumer group coordinator are separately setup using a very similar pattern to above).

Create the routes for all storage dependent services:

let builder = FrameRouteService::<(), Error>::builder();

// add all services, and then build the routes:
let routes = services(builder, storage)
        .and_then(|builder| builder.build())?;

The FrameRouteService service automatically implements a service that responds to ApiVersionsRequest with an appropriate ApiVersionsResponse indicating the supported API keys and versions (using RootMessageMeta which is also generated at compile time from the JSON message descriptors). Clients use this information to determine which API versions they can use when communicating with the broker.

Combining the layers and services into a complete stack to serve broker API requests:

// listen for incoming TCP connections:
let listener = TcpListener::bind("localhost:9092").await?;

// TCP layers, framing and routing to services:
let broker = stack.into_layer(routes);

// serve requests until cancelled:
broker.serve(Context::default(), listener).await?

Client

We can now also compose a client stack in a similar way, starting with a ConnectionManager that manages a Pool of Connection to a broker:

let origin = Url::parse("tcp://localhost:9092")?;

let pool = ConnectionManager::builder(origin)
    .client_id("tansu-client".into())
    .build()
    .await?;

let client = (
    RequestPoolLayer::new(pool.clone()),
    RequestConnectionLayer,
    FrameBytesLayer,
)
    .into_layer(BytesConnectionService);

In the above we layer:

Using the client to send a MetadataRequest to the broker, waiting for a MetadataResponse:

let response = client.serve(
    Context::default(),
    MetadataRequest::default()
            .topics(Some([].into()))
            .allow_auto_topic_creation(Some(false))
            .include_cluster_authorized_operations(Some(false))
            .include_topic_authorized_operations(Some(false)))
    .await?;

// pull out the brokers from the response:
let brokers = response.brokers.as_deref().unwrap_or_default();

assert_eq!(1, brokers.len());

assert_eq!("localhost", brokers[0].host);
assert_eq!(9092, brokers[0].port);
assert_eq!(111, brokers[0].node_id);
assert!(brokers[0].rack.is_none());

In this article we have seen how Tansu uses the Service and Layer traits to route, layer and process Apache Kafka messages in a modular and composable way. We have explored the core concepts of the Tansu architecture, including how it implements the Apache Kafka protocol using serde and proc macros, and how services can be combined to create a flexible and efficient streaming platform through to storage.

Note that: Some of the code samples have been simplified for clarity. The full source code is available under the Apache license on GitHub, crates.io and docs.rs.