Apache Kafka protocol with serde, quote, syn and proc_macro2
Tansu uses serde, quote, syn and proc_macro2 to implement the Apache Kafka® protocol. Tansu is a drop-in replacement for Apache Kafka with PostgreSQL, S3 or memory storage engines. Licensed under the GNU AGPL, and written in 100% safe 🦺 async 🚀 Rust 🦀.
Overview
Implemented by KAFKA-7609, since late 2018,
the Apache Kafka® message protocol is generated from
JSON documents.
Each message has its own versioned schema.
The protocol has evolved over the years, by introducing flexible
types (using variable rather than fixed length integers) and
tagged
fields (via KIP-482)
allowing additional data to be introduced without having to increment the version number of the message.
A separate protocol document describes the primitive types
that are supported. A grammar describing how the protocol operates (slightly dated) is available here.
The format of batches used by produce and fetch requests with the Record
type that is
defined in this document. Similarly, the format of data used while
joining and synchronizing as a member of a consumer group, is defined as Bytes
in the message schema, but is also
further defined in separate documents.
Record
and parts of consumer group protocol have their own versioning system within those types.
The client chooses the version of the message to use. A well behaved client might use an ApiVersionsRequest
to determine the messages and associated versions supported by the broker, but is completely optional.
The version used is encoded into the request header, the response must use the same version or return an error.
Implementation
A DescribeGroupsRequest
is valid for versions: 0 through 5, using a flexible format from version 5 onwards:
{
"apiKey": 15,
"type": "request",
"listeners": ["zkBroker", "broker"],
"name": "DescribeGroupsRequest",
"validVersions": "0-5",
"flexibleVersions": "5+",
"fields": [
{ "name": "Groups", "type": "[]string", "versions": "0+", "entityType": "groupId",
"about": "The names of the groups to describe" },
{ "name": "IncludeAuthorizedOperations", "type": "bool", "versions": "3+",
"about": "Whether to include authorized operations." }
]
}
Looking at the Groups
field its type is []string
. In versions 0-4 this field would be encoded
as an ARRAY[T]
. In versions 5+ this field is encoded as a COMPACT_ARRAY[T]
.
ARRAY[T]
has ani16
representing the length, with-1
representingnull
.COMPACT_ARRAY[T]
has a variable length unsigned integer, with0
representing thenull
array andN + 1
representing the length.
The elements of the array are also encoded differently, in versions 0-4 encoded as a STRING
and from 5+ as a COMPACT_STRING
.
STRING
the length N is given as ani16
. Then N bytes follow which are the UTF-8 encoding of the character sequence. Length must not be negative.COMPACT_STRING
the lengthN + 1
is given as an unsigned variable length integer. Then N bytes follow which are the UTF-8 encoding of the character sequence.
To represent this protocol in serde we need the following:
- Map each JSON message into its appropriate types.
- Write a serializer and deserializer,
adapting to use
ARRAY[T]
orCOMPACT_ARRAY[T]
,STRING
orCOMPACT_STRING
, depending on the version of protocol being used by that message.
As a first step we transform DescribeGroupsRequest
into a Rust structure:
#[derive(Deserialize, Serialize)]
struct DescribeGroupsRequest {
groups: Option<Vec<String>>,
include_authorized_operations: Option<bool>,
}
The groups
field here is an Option
because all arrays can be null
. The include_authorized_operations
field is also an Option
because it is only valid in versions 3+ of this message.
When serializing or deserializing we need to know the version being used so that we can use ARRAY[T]
or COMPACT_ARRAY[T]
, and whether a field is valid for that version.
Early versions of the Kafka protocol implementation in Tansu, used an enumeration with variants representing each version. For example, the flexible variant for
groups
usedOption<CompactArray<String>>
, the inflexible variant usedOption<Array<String>>
. While this method achieved a strong type mapping, it rapidly became unwieldy, with 18 versions of fetch request (also meaning another 18 response versions), resulting in long broker code blocks withmatch
arms covering all the cases. Tag fields that can be added into any flexible format message further misshaped the strong type model. A different approach was taken, where the mapping resulted in a singlestruct
that could represent all message versions. Instead, the serializer is aware of the version of message, using theARRAY[T]
representation forgroups
and not using theinclude_authorized_operations
in versions 0-2. This decision enables simpler broker code, at the expense of more complex serialization and deserialization.
Tagged Fields
Flexible format messages also introduced tagged fields. Tagged fields can appear at the top level of a message, or inside any structure, including the header. A tagged field can be added to an existing protocol version without an increment. Unexpected tagged fields are ignored by the receiver.
In Tansu, structures containing the encoded tag buffers are found in the mezzanine
module.
This creates a two stage serialization (or deserialization), any tagged fields are encoded
into tag_buffer
and then that whole structure is serialized. There are conversions using
From<mezzanine::DescribeGroupsRequest> for DescribeGroupsRequest
and From<DescribeGroupsRequest> for mezzanine::DescribeGroupsRequest
that are generated
as part of the type mapping.
Continuing with DescribeGroupsRequest
, with tagged fields the structure looks like:
mod mezzanine {
#[derive(Deserialize, Serialize)]
struct DescribeGroupsRequest {
groups: Option<Vec<String>>,
include_authorized_operations: Option<bool>,
tag_buffer: Option<TagBuffer>,
}
}
A tag_buffer
is always at the end of any mezzanine
structure, because it is
encoded/decoded after the "normal" fields.
In flexible versions of DescribeGroupsRequest
the tag_buffer
will
be present with Some
, inflexible versions will be None
.
Both TagBuffer
and TagField
are defined using the
new type idiom
so that custom implementations of Deserialize
and Serialize
are used:
struct TagBuffer(Vec<TagField>);
struct TagField(u32, Vec<u8>);
So not much change then? Actually... there is, let's examine a subset of FetchRequest
,
which from version 12 has tagged fields:
{
"validVersions": "0-17",
"deprecatedVersions": "0-3",
"flexibleVersions": "12+",
"fields": [
{ "name": "ClusterId", "type": "string", "versions": "12+", "nullableVersions": "12+", "default": "null",
"taggedVersions": "12+", "tag": 0, "ignorable": true,
"about": "The clusterId if known. This is used to validate metadata fetches prior to broker registration." },
{ "name": "ReplicaId", "type": "int32", "versions": "0-14", "default": "-1", "entityType": "brokerId",
"about": "The broker ID of the follower, of -1 if this request is from a consumer." },
{ "name": "ReplicaState", "type": "ReplicaState", "versions": "15+", "taggedVersions": "15+", "tag": 1, "fields": [
{ "name": "ReplicaId", "type": "int32", "versions": "15+", "default": "-1", "entityType": "brokerId",
"about": "The replica ID of the follower, or -1 if this request is from a consumer." },
{ "name": "ReplicaEpoch", "type": "int64", "versions": "15+", "default": "-1",
"about": "The epoch of this follower, or -1 if not available." }
]},
// ...lots more fields...
]
}
Note that both ClusterId
and ReplicaState
are tagged fields (have a tag
attribute), whereas ReplicaId
does not.
The structure for FetchRequest
:
#[derive(Deserialize, Serialize)]
struct FetchRequest {
cluster_id: Option<String>,
replica_id: Option<i32>,
replica_state: Option<ReplicaState>,
// ...lots more fields...
}
Both cluster_id
and replica_state
are Option
because they are tagged fields,
present only versions 12+ and 15+ respectfully (and even in those versions their presence is optional).
The replica_id
is also Option
because it is only valid for versions 0-14 (essentially
being deprecated by the ReplicaId
found in the ReplicaState
).
The mezzanine
version of FetchRequest
, where the tagged fields have
been encoded into the tag_buffer
is:
mod mezzanine {
#[derive(Deserialize, Serialize)]
struct FetchRequest {
replica_id: Option<i32>,
// ...lots more fields...
tag_buffer: Option<TagBuffer>,
}
}
Again, tag_buffer
in the mezzanine
is an Option
because in versions 0-11 it isn't present. Both cluster_id
and replica_state
are not present because
they are already encoded into tag_buffer
at this point.
The representation of ReplicaState
also contains a tag_buffer
,
because tagged fields can be extended with further tagged fields:
mod mezzanine {
#[derive(Deserialize, Serialize)]
struct ReplicaState {
replica_id: i32,
replica_epoch: i64,
tag_buffer: Option<TagBuffer>,
}
}
All the type mapping parses the original Apache Kafka® JSON
descriptors generating the appropriate types (both tagged and untagged) with
helpers to convert from tagged to untagged (and vice versa). All the types
also derive Clone
, Debug
, Default
, Eq
(where possible),
Hash
(where possible), Ord
(where possible), PartialEq
,
PartialOrd
, Deserialize
and Serialize
.
The code generation is done by tansu-kafka-sans-io/build.rs, with quote, syn and proc_macro2 doing all the heavy lifting. Finally! The generated code is incorporated into tansu-kafka-sans-io/lib.rs with:
include!(concat!(env!("OUT_DIR"), "/generate.rs"));
Once we have the generated types, we just need to write the
Deserializer
and Serializer
to actually implement the Apache Kafka® protocol. For this article, we'll concentrate
on serializing a String
looking at STRING
and COMPACT_STRING
. In Tansu the Serializer
is an Encoder
:
pub struct Encoder<'a> {
writer: &'a mut dyn Write,
// lots of other fields
}
With the following implementation for serializing either STRING
or COMPACT_STRING
:
impl Serializer for &mut Encoder<'_> {
fn serialize_str(self, v: &str) -> Result<Self::Ok, Self::Error> {
if self.in_header()
&& self.kind.is_some_and(|kind| kind == Kind::Request)
&& self.field.is_some_and(|field| field == "client_id")
{
v.len()
.try_into()
.map_err(Into::into)
.and_then(|len| self.serialize_i16(len))
.and(self.writer.write_all(v.as_bytes()).map_err(Into::into))
} else if self.is_valid() {
if self.is_flexible() {
(v.len() + 1)
.try_into()
.map_err(Into::into)
.and_then(|len| self.unsigned_varint(len))
.and(self.writer.write_all(v.as_bytes()).map_err(Into::into))
} else {
v.len()
.try_into()
.map_err(Into::into)
.and_then(|len| self.serialize_i16(len))
.and(self.writer.write_all(v.as_bytes()).map_err(Into::into))
}
} else {
Ok(())
}
}
}
The first part of the implementation deals with the client_id
in any request header,
that for backwards compatibility uses an i16
to
encode its length. There are only 2 exceptions like this (AFAIK), the other is in the header
response to an ApiVersionsResponse
which for similar reasons has an inflexible header, followed by a flexible body!
The self.is_valid()
determines whether the field should be serialized in this version of the message.
If it is, a COMPACT_STRING
(flexible) has v.len() + 1
using an unsigned variable length integer,
whereas STRING
just uses a plain v.len()
as i16
, both followed by the bytes in the field.
If you're wondering, when the field is null
that is handled separately by serialize_none
.
Testing
Tansu includes an Apache Kafka® Proxy used to capture traffic between various clients and brokers. The packets are then used in test cases to verify that encoding and decoding are correct.
In the following test case expected
is the raw bytes from an ApiVersionsRequest
version 3, captured by the proxy.
Firstly, the test deserializes the raw expected
bytes into a protocol frame using Frame::request_from_bytes(&expected)
,
that frame
is then serialized again with Frame::request(frame.header, frame.body)
and verified that it is the same
as the original expected
bytes.
#[test]
fn api_versions_request_v3_000() -> Result<()> {
let _guard = init_tracing()?;
let expected = vec![
0, 0, 0, 52, 0, 18, 0, 3, 0, 0, 0, 3, 0, 16, 99, 111, 110, 115, 111, 108, 101, 45, 112,
114, 111, 100, 117, 99, 101, 114, 0, 18, 97, 112, 97, 99, 104, 101, 45, 107, 97, 102, 107,
97, 45, 106, 97, 118, 97, 6, 51, 46, 54, 46, 49, 0,
];
assert_eq!(
expected,
Frame::request_from_bytes(&expected)
.and_then(|frame| Frame::request(frame.header, frame.body))?
);
Ok(())
}
Conclusion
Tansu uses serde, quote, syn and proc_macro2 to implement the Apache Kafka® protocol. Tansu is a drop-in replacement for Apache Kafka® with PostgreSQL, S3 or memory storage engines. Licensed under the GNU AGPL, and written in 100% safe 🦺 async 🚀 Rust 🦀.