Apache Kafka protocol with serde, quote, syn and proc_macro2
Tansu uses serde, quote, syn and proc_macro2 to implement the Apache Kafka® protocol. Tansu is a drop-in replacement for Apache Kafka with PostgreSQL, S3 or memory storage engines. Licensed under the Apache License, and written in 100% safe 🦺 async 🚀 Rust 🦀.
Overview
Implemented by KAFKA-7609, since late 2018,
the Apache Kafka® message protocol is generated from
JSON documents.
Each message has its own versioned schema.
The protocol has evolved over the years, by introducing flexible types (using variable rather than fixed length integers) and
tagged fields (via KIP-482)
allowing additional data to be introduced without having to increment the version number of the message.
A separate protocol document describes the primitive types
that are supported. A grammar describing how the protocol operates (slightly dated) is available here.
The format of batches used by produce and fetch requests with the Record type that is
defined in this document. Similarly, the format of data used while
joining and synchronizing as a member of a consumer group, is defined as Bytes in the message schema, but is also
further defined in separate documents.
Record and parts of consumer group protocol have their own versioning system within those types.
The client chooses the version of the message to use. A well behaved client might use an ApiVersionsRequest
to determine the messages and associated versions supported by the broker, but is completely optional.
The version used is encoded into the request header, the response must use the same version or return an error.
Implementation
A DescribeGroupsRequest is valid for versions: 0 through 5, using a flexible format from version 5 onwards:
{
"apiKey": 15,
"type": "request",
"listeners": ["zkBroker", "broker"],
"name": "DescribeGroupsRequest",
"validVersions": "0-5",
"flexibleVersions": "5+",
"fields": [
{ "name": "Groups", "type": "[]string", "versions": "0+", "entityType": "groupId",
"about": "The names of the groups to describe" },
{ "name": "IncludeAuthorizedOperations", "type": "bool", "versions": "3+",
"about": "Whether to include authorized operations." }
]
}
Looking at the Groups field its type is []string. In versions 0-4 this field would be encoded
as an ARRAY[T]. In versions 5+ this field is encoded as a COMPACT_ARRAY[T].
ARRAY[T]has ani16representing the length, with-1representingnull.COMPACT_ARRAY[T]has a variable length unsigned integer, with0representing thenullarray andN + 1representing the length.
The elements of the array are also encoded differently, in versions 0-4 encoded as a STRING and from 5+ as a COMPACT_STRING.
STRINGthe length N is given as ani16. Then N bytes follow which are the UTF-8 encoding of the character sequence. Length must not be negative.COMPACT_STRINGthe lengthN + 1is given as an unsigned variable length integer. Then N bytes follow which are the UTF-8 encoding of the character sequence.
To represent this protocol in serde we need the following:
- Map each JSON message into its appropriate types.
- Write a serializer and deserializer,
adapting to use
ARRAY[T]orCOMPACT_ARRAY[T],STRINGorCOMPACT_STRING, depending on the version of protocol being used by that message.
As a first step we transform DescribeGroupsRequest into a Rust structure:
#[derive(Deserialize, Serialize)]
struct DescribeGroupsRequest {
groups: Option<Vec<String>>,
include_authorized_operations: Option<bool>,
}
The groups field here is an Option because all arrays can be null. The include_authorized_operations field is also an Option because it is only valid in versions 3+ of this message.
When serializing or deserializing we need to know the version being used so that we can use ARRAY[T] or COMPACT_ARRAY[T], and whether a field is valid for that version.
Early versions of the Kafka protocol implementation in Tansu, used an enumeration with variants representing each version. For example, the flexible variant for
groupsusedOption<CompactArray<String>>, the inflexible variant usedOption<Array<String>>. While this method achieved a strong type mapping, it rapidly became unwieldy, with 18 versions of fetch request (also meaning another 18 response versions), resulting in long broker code blocks withmatcharms covering all the cases. Tag fields that can be added into any flexible format message further misshaped the strong type model. A different approach was taken, where the mapping resulted in a singlestructthat could represent all message versions. Instead, the serializer is aware of the version of message, using theARRAY[T]representation forgroupsand not using theinclude_authorized_operationsin versions 0-2. This decision enables simpler broker code, at the expense of more complex serialization and deserialization.
Tagged Fields
Flexible format messages also introduced tagged fields. Tagged fields can appear at the top level of a message, or inside any structure, including the header. A tagged field can be added to an existing protocol version without an increment. Unexpected tagged fields are ignored by the receiver.
In Tansu, structures containing the encoded tag buffers are found in the mezzanine module.
This creates a two stage serialization (or deserialization), any tagged fields are encoded
into tag_buffer and then that whole structure is serialized. There are conversions using
From<mezzanine::DescribeGroupsRequest> for DescribeGroupsRequest
and From<DescribeGroupsRequest> for mezzanine::DescribeGroupsRequest that are generated
as part of the type mapping.
Continuing with DescribeGroupsRequest, with tagged fields the structure looks like:
mod mezzanine {
#[derive(Deserialize, Serialize)]
struct DescribeGroupsRequest {
groups: Option<Vec<String>>,
include_authorized_operations: Option<bool>,
tag_buffer: Option<TagBuffer>,
}
}
A tag_buffer is always at the end of any mezzanine structure, because it is
encoded/decoded after the "normal" fields.
In flexible versions of DescribeGroupsRequest the tag_buffer will
be present with Some, inflexible versions will be None.
Both TagBuffer and TagField are defined using the
new type idiom
so that custom implementations of Deserialize and Serialize are used:
struct TagBuffer(Vec<TagField>);
struct TagField(u32, Vec<u8>);
So not much change then? Actually... there is, let's examine a subset of FetchRequest,
which from version 12 has tagged fields:
{
"validVersions": "0-17",
"deprecatedVersions": "0-3",
"flexibleVersions": "12+",
"fields": [
{ "name": "ClusterId", "type": "string", "versions": "12+", "nullableVersions": "12+", "default": "null",
"taggedVersions": "12+", "tag": 0, "ignorable": true,
"about": "The clusterId if known. This is used to validate metadata fetches prior to broker registration." },
{ "name": "ReplicaId", "type": "int32", "versions": "0-14", "default": "-1", "entityType": "brokerId",
"about": "The broker ID of the follower, of -1 if this request is from a consumer." },
{ "name": "ReplicaState", "type": "ReplicaState", "versions": "15+", "taggedVersions": "15+", "tag": 1, "fields": [
{ "name": "ReplicaId", "type": "int32", "versions": "15+", "default": "-1", "entityType": "brokerId",
"about": "The replica ID of the follower, or -1 if this request is from a consumer." },
{ "name": "ReplicaEpoch", "type": "int64", "versions": "15+", "default": "-1",
"about": "The epoch of this follower, or -1 if not available." }
]},
// ...lots more fields...
]
}
Note that both ClusterId and ReplicaState are tagged fields (have a tag attribute), whereas ReplicaId
does not.
The structure for FetchRequest:
#[derive(Deserialize, Serialize)]
struct FetchRequest {
cluster_id: Option<String>,
replica_id: Option<i32>,
replica_state: Option<ReplicaState>,
// ...lots more fields...
}
Both cluster_id and replica_state are Option because they are tagged fields,
present only versions 12+ and 15+ respectfully (and even in those versions their presence is optional).
The replica_id is also Option because it is only valid for versions 0-14 (essentially
being deprecated by the ReplicaId found in the ReplicaState).
The mezzanine version of FetchRequest, where the tagged fields have
been encoded into the tag_buffer is:
mod mezzanine {
#[derive(Deserialize, Serialize)]
struct FetchRequest {
replica_id: Option<i32>,
// ...lots more fields...
tag_buffer: Option<TagBuffer>,
}
}
Again, tag_buffer in the mezzanine is an Option because in versions 0-11 it isn't present. Both cluster_id and replica_state are not present because
they are already encoded into tag_buffer at this point.
The representation of ReplicaState also contains a tag_buffer,
because tagged fields can be extended with further tagged fields:
mod mezzanine {
#[derive(Deserialize, Serialize)]
struct ReplicaState {
replica_id: i32,
replica_epoch: i64,
tag_buffer: Option<TagBuffer>,
}
}
All the type mapping parses the original Apache Kafka® JSON
descriptors generating the appropriate types (both tagged and untagged) with
helpers to convert from tagged to untagged (and vice versa). All the types
also derive Clone, Debug, Default, Eq (where possible),
Hash (where possible), Ord (where possible), PartialEq,
PartialOrd, Deserialize and Serialize.
The code generation is done by tansu-kafka-sans-io/build.rs, with quote, syn and proc_macro2 doing all the heavy lifting. Finally! The generated code is incorporated into tansu-kafka-sans-io/lib.rs with:
include!(concat!(env!("OUT_DIR"), "/generate.rs"));
Once we have the generated types, we just need to write the
Deserializer and Serializer to actually implement the Apache Kafka® protocol. For this article, we'll concentrate
on serializing a String looking at STRING and COMPACT_STRING. In Tansu the Serializer is an Encoder:
pub struct Encoder<'a> {
writer: &'a mut dyn Write,
// lots of other fields
}
With the following implementation for serializing either STRING or COMPACT_STRING:
impl Serializer for &mut Encoder<'_> {
fn serialize_str(self, v: &str) -> Result<Self::Ok, Self::Error> {
if self.in_header()
&& self.kind.is_some_and(|kind| kind == Kind::Request)
&& self.field.is_some_and(|field| field == "client_id")
{
v.len()
.try_into()
.map_err(Into::into)
.and_then(|len| self.serialize_i16(len))
.and(self.writer.write_all(v.as_bytes()).map_err(Into::into))
} else if self.is_valid() {
if self.is_flexible() {
(v.len() + 1)
.try_into()
.map_err(Into::into)
.and_then(|len| self.unsigned_varint(len))
.and(self.writer.write_all(v.as_bytes()).map_err(Into::into))
} else {
v.len()
.try_into()
.map_err(Into::into)
.and_then(|len| self.serialize_i16(len))
.and(self.writer.write_all(v.as_bytes()).map_err(Into::into))
}
} else {
Ok(())
}
}
}
The first part of the implementation deals with the client_id in any request header,
that for backwards compatibility uses an i16 to
encode its length. There are only 2 exceptions like this (AFAIK), the other is in the header
response to an ApiVersionsResponse which for similar reasons has an inflexible header, followed by a flexible body!
The self.is_valid() determines whether the field should be serialized in this version of the message.
If it is, a COMPACT_STRING (flexible) has v.len() + 1 using an unsigned variable length integer,
whereas STRING just uses a plain v.len() as i16, both followed by the bytes in the field.
If you're wondering, when the field is null that is handled separately by serialize_none.
Testing
Tansu includes an Apache Kafka® Proxy used to capture traffic between various clients and brokers. The packets are then used in test cases to verify that encoding and decoding are correct.
In the following test case expected is the raw bytes from an ApiVersionsRequest version 3, captured by the proxy.
Firstly, the test deserializes the raw expected bytes into a protocol frame using Frame::request_from_bytes(&expected),
that frame is then serialized again with Frame::request(frame.header, frame.body) and verified that it is the same
as the original expected bytes.
#[test]
fn api_versions_request_v3_000() -> Result<()> {
let _guard = init_tracing()?;
let expected = vec![
0, 0, 0, 52, 0, 18, 0, 3, 0, 0, 0, 3, 0, 16, 99, 111, 110, 115, 111, 108, 101, 45, 112,
114, 111, 100, 117, 99, 101, 114, 0, 18, 97, 112, 97, 99, 104, 101, 45, 107, 97, 102, 107,
97, 45, 106, 97, 118, 97, 6, 51, 46, 54, 46, 49, 0,
];
assert_eq!(
expected,
Frame::request_from_bytes(&expected)
.and_then(|frame| Frame::request(frame.header, frame.body))?
);
Ok(())
}
Conclusion
Tansu uses serde, quote, syn and proc_macro2 to implement the Apache Kafka® protocol. Tansu is a drop-in replacement for Apache Kafka® with PostgreSQL, S3 or memory storage engines. Licensed under the Apache License, and written in 100% safe 🦺 async 🚀 Rust 🦀.