Route, Layer and Process Kafka Messages with Tansu Services
Tansu is an Apache licensed Kafka compatible streaming platform written in Rust.
It uses a Service
trait to route, layer and process messages on the broker and client.
Broker
Building a Tansu broker stack starts with a
TcpListenerLayer
, which until
cancelled listens for,
and spawns a new task for each incoming TCP connection.
A TcpContextLayer
is used to limit the maximum accepted frame size,
and a TcpBytesLayer
to read and write
Bytes
from the
TCPStream
.
The BytesFrameLayer
is used to convert between
Bytes
and an Apache Kafka protocol
Frame
:
let token = CancellationToken::new();
let stack = (
TcpListenerLayer::new(token),
TcpContextLayer::new(
TcpContext::default()
.maximum_frame_size(200_000)),
TcpBytesLayer::<()>::default(),
BytesFrameLayer,
);
In the real broker (and proxy) there are additional layers for security, rate limiting, frame hijacking, forwarding and modification, metrics and request batching that can be added to the stack depending on the configuration.
A BytesFrameLayer
layers
a service that takes a Frame
and
produces a Frame
:
pub struct BytesFrameLayer;
impl<S> Layer<S> for BytesFrameLayer {
type Service = BytesFrameService<S>;
fn layer(&self, inner: S) -> Self::Service {
Self::Service { inner }
}
}
Taking a closer look at the BytesFrameService
shows how the
Frame
is deserialized from Bytes
with Frame::request_from_bytes
,
passed to the inner
service (which takes and returns a Frame
), and then serialized back into
Bytes
with Frame::response
:
pub struct BytesFrameService<S> {
inner: S,
}
impl<S> Service<Bytes> for BytesFrameService<S>
where
S: Service<Frame, Response = Frame>,
S::Error: From<tansu_sans_io::Error> + Debug,
{
type Response = Bytes;
type Error = S::Error;
async fn serve(&self, ctx: Context, req: Bytes) -> Result<Self::Response, Self::Error> {
let req = Frame::request_from_bytes(req)?;
let api_key = req.api_key()?;
let api_version = req.api_version()?;
let correlation_id = req.correlation_id()?;
self.inner
.serve(ctx, req)
.await
.and_then(|Frame { body, .. }| {
Frame::response(
Header::Response { correlation_id },
body,
api_key,
api_version,
)
.map_err(Into::into)
})
}
}
In Tansu, the Apache Kafka protocol is implemented with serde using proc macros to transform the JSON definitions into Rust data types at compilation time (as described in Apache Kafka protocol with serde, quote, syn and proc_macro2). This approach means that the generated code is a simple mapping of Kafka data types into the Serde data model.
A Frame
is routed to services using a
FrameRouteService
. Each type of
Request
can be routed to a different service. In this example, a
MetadataRequest
is routed to a service that uses a
StorageContainer
to store and retrieve metadata about brokers, topics and partitions.
Firstly, lets setup some memory storage for the example broker:
let storage = StorageContainer::builder()
.cluster_id("tansu")
.node_id(111)
.advertised_listener(Url::parse("tcp://localhost:9092")?)
.storage(Url::parse("memory://tansu/")?)
.build()
.await?;
StorageContainer
provides an abstraction over Storage
that can
be configured to use memory
(for ephemeral enivironments), S3,
PostgreSQL,
libSQL and
Turso (alpha: currently feature locked).
The following metadata
function takes a
FrameRouteBuilder
and a
Storage
implementation and returns a
FrameRouteBuilder
with the
MetadataRequest
routed to a MetdataService
using the provided Storage
. Note that
MapStateLayer
is used to make the
Storage
available to the
inner MetdataService
service:
fn metadata<S>(
builder: FrameRouteBuilder<(), Error>,
storage: S,
) -> Result<FrameRouteBuilder<(), Error>, Error>
where
S: Storage,
{
builder
.with_route(
MetadataRequest::KEY,
(
MapErrLayer::new(Error::from),
MapStateLayer::new(|_| storage),
FrameRequestLayer::<MetadataRequest>::new(),
)
.into_layer(MetadataService)
.boxed(),
)
.map_err(Into::into)
}
All services that require Storage
are included in the following services
function:
fn services<S>(
builder: FrameRouteBuilder<(), Error>,
storage: S,
) -> Result<FrameRouteBuilder<(), Error>, Error>
where
S: Storage,
{
[
add_offsets_to_txn,
consumer_group_describe,
...lots more services dependent on storage...,
metadata,
produce,
txn_offset_commit_request,
]
.iter()
.try_fold(builder, |builder, service| {
service(builder, storage.clone())
})
}
(services that depend on a consumer group coordinator are separately setup using a very similar pattern to above).
Create the routes for all storage dependent services:
let builder = FrameRouteService::<(), Error>::builder();
// add all services, and then build the routes:
let routes = services(builder, storage)
.and_then(|builder| builder.build())?;
The FrameRouteService
service automatically implements a service that responds
to ApiVersionsRequest
with an appropriate ApiVersionsResponse
indicating the supported API keys and versions
(using RootMessageMeta
which is also generated at compile time from the JSON message descriptors).
Clients use this information to determine
which API versions they can use when communicating with the broker.
Combining the layers and services into a complete stack to serve broker API requests:
// listen for incoming TCP connections:
let listener = TcpListener::bind("localhost:9092").await?;
// TCP layers, framing and routing to services:
let broker = stack.into_layer(routes);
// serve requests until cancelled:
broker.serve(Context::default(), listener).await?
Client
We can now also compose a client stack in a similar way, starting with a
ConnectionManager
that manages a Pool of
Connection to a broker:
let origin = Url::parse("tcp://localhost:9092")?;
let pool = ConnectionManager::builder(origin)
.client_id("tansu-client".into())
.build()
.await?;
let client = (
RequestPoolLayer::new(pool.clone()),
RequestConnectionLayer,
FrameBytesLayer,
)
.into_layer(BytesConnectionService);
In the above we layer:
RequestPoolLayer
making the connection Pool available to theinner
serviceRequestConnectionLayer
takes a Connection from the Pool. Enclose theRequest
in aFrame
using latest API version supported by the broker. Call theinner
services with theFrame
using the Connection as Context.FrameBytesLayer
converts betweenFrame
andBytes
BytesConnectionService
writes a frame represented byBytes
to a Connection, returning theBytes
frame response.
Using the client
to send a
MetadataRequest
to the broker, waiting for a
MetadataResponse
:
let response = client.serve(
Context::default(),
MetadataRequest::default()
.topics(Some([].into()))
.allow_auto_topic_creation(Some(false))
.include_cluster_authorized_operations(Some(false))
.include_topic_authorized_operations(Some(false)))
.await?;
// pull out the brokers from the response:
let brokers = response.brokers.as_deref().unwrap_or_default();
assert_eq!(1, brokers.len());
assert_eq!("localhost", brokers[0].host);
assert_eq!(9092, brokers[0].port);
assert_eq!(111, brokers[0].node_id);
assert!(brokers[0].rack.is_none());
In this article we have seen how Tansu uses the Service
and Layer
traits to route,
layer and process Apache Kafka messages in a modular and composable way.
We have explored the core concepts of the Tansu architecture, including how it implements the
Apache Kafka protocol using serde and proc macros, and how services
can be combined to create a flexible and efficient streaming platform through to storage.
Note that: Some of the code samples have been simplified for clarity. The full source code is available under the Apache license on GitHub, crates.io and docs.rs.