Apache Kafka protocol with serde, quote, syn and proc_macro2

Tansu uses serde, quote, syn and proc_macro2 to implement the Apache Kafka® protocol. Tansu is a drop-in replacement for Apache Kafka with PostgreSQL, S3 or memory storage engines. Licensed under the GNU AGPL, and written in 100% safe 🦺 async 🚀 Rust 🦀.


Overview

Implemented by KAFKA-7609, since late 2018, the Apache Kafka® message protocol is generated from JSON documents. Each message has its own versioned schema. The protocol has evolved over the years, by introducing flexible types (using variable rather than fixed length integers) and tagged fields (via KIP-482) allowing additional data to be introduced without having to increment the version number of the message.

A separate protocol document describes the primitive types that are supported. A grammar describing how the protocol operates (slightly dated) is available here. The format of batches used by produce and fetch requests with the Record type that is defined in this document. Similarly, the format of data used while joining and synchronizing as a member of a consumer group, is defined as Bytes in the message schema, but is also further defined in separate documents. Record and parts of consumer group protocol have their own versioning system within those types.

The client chooses the version of the message to use. A well behaved client might use an ApiVersionsRequest to determine the messages and associated versions supported by the broker, but is completely optional. The version used is encoded into the request header, the response must use the same version or return an error.

Implementation

A DescribeGroupsRequest is valid for versions: 0 through 5, using a flexible format from version 5 onwards:

{
  "apiKey": 15,
  "type": "request",
  "listeners": ["zkBroker", "broker"],
  "name": "DescribeGroupsRequest",
  "validVersions": "0-5",
  "flexibleVersions": "5+",
  "fields": [
    { "name": "Groups", "type": "[]string", "versions": "0+", "entityType": "groupId",
      "about": "The names of the groups to describe" },
    { "name": "IncludeAuthorizedOperations", "type": "bool", "versions": "3+",
      "about": "Whether to include authorized operations." }
  ]
}

Looking at the Groups field its type is []string. In versions 0-4 this field would be encoded as an ARRAY[T]. In versions 5+ this field is encoded as a COMPACT_ARRAY[T].

  • ARRAY[T] has an i16 representing the length, with -1 representing null.
  • COMPACT_ARRAY[T] has a variable length unsigned integer, with 0 representing the null array and N + 1 representing the length.

The elements of the array are also encoded differently, in versions 0-4 encoded as a STRING and from 5+ as a COMPACT_STRING.

  • STRING the length N is given as an i16. Then N bytes follow which are the UTF-8 encoding of the character sequence. Length must not be negative.
  • COMPACT_STRING the length N + 1 is given as an unsigned variable length integer. Then N bytes follow which are the UTF-8 encoding of the character sequence.

To represent this protocol in serde we need the following:

  • Map each JSON message into its appropriate types.
  • Write a serializer and deserializer, adapting to use ARRAY[T] or COMPACT_ARRAY[T], STRING or COMPACT_STRING, depending on the version of protocol being used by that message.

As a first step we transform DescribeGroupsRequest into a Rust structure:

#[derive(Deserialize, Serialize)]
struct DescribeGroupsRequest {
        groups: Option<Vec<String>>,
        include_authorized_operations: Option<bool>,
}

The groups field here is an Option because all arrays can be null. The include_authorized_operations field is also an Option because it is only valid in versions 3+ of this message. When serializing or deserializing we need to know the version being used so that we can use ARRAY[T] or COMPACT_ARRAY[T], and whether a field is valid for that version.

Early versions of the Kafka protocol implementation in Tansu, used an enumeration with variants representing each version. For example, the flexible variant for groups used Option<CompactArray<String>>, the inflexible variant used Option<Array<String>>. While this method achieved a strong type mapping, it rapidly became unwieldy, with 18 versions of fetch request (also meaning another 18 response versions), resulting in long broker code blocks with match arms covering all the cases. Tag fields that can be added into any flexible format message further misshaped the strong type model. A different approach was taken, where the mapping resulted in a single struct that could represent all message versions. Instead, the serializer is aware of the version of message, using the ARRAY[T] representation for groups and not using the include_authorized_operations in versions 0-2. This decision enables simpler broker code, at the expense of more complex serialization and deserialization.

Tagged Fields

Flexible format messages also introduced tagged fields. Tagged fields can appear at the top level of a message, or inside any structure, including the header. A tagged field can be added to an existing protocol version without an increment. Unexpected tagged fields are ignored by the receiver.

In Tansu, structures containing the encoded tag buffers are found in the mezzanine module. This creates a two stage serialization (or deserialization), any tagged fields are encoded into tag_buffer and then that whole structure is serialized. There are conversions using From<mezzanine::DescribeGroupsRequest> for DescribeGroupsRequest and From<DescribeGroupsRequest> for mezzanine::DescribeGroupsRequest that are generated as part of the type mapping.

Continuing with DescribeGroupsRequest, with tagged fields the structure looks like:

mod mezzanine {
  #[derive(Deserialize, Serialize)]
  struct DescribeGroupsRequest {
    groups: Option<Vec<String>>,
    include_authorized_operations: Option<bool>,
    tag_buffer: Option<TagBuffer>,
  }
}

A tag_buffer is always at the end of any mezzanine structure, because it is encoded/decoded after the "normal" fields.

In flexible versions of DescribeGroupsRequest the tag_buffer will be present with Some, inflexible versions will be None.

Both TagBuffer and TagField are defined using the new type idiom so that custom implementations of Deserialize and Serialize are used:

struct TagBuffer(Vec<TagField>);
struct TagField(u32, Vec<u8>);

So not much change then? Actually... there is, let's examine a subset of FetchRequest, which from version 12 has tagged fields:

{
  "validVersions": "0-17",
  "deprecatedVersions": "0-3",
  "flexibleVersions": "12+",    
  "fields": [
    { "name": "ClusterId", "type": "string", "versions": "12+", "nullableVersions": "12+", "default": "null",
      "taggedVersions": "12+", "tag": 0, "ignorable": true,
      "about": "The clusterId if known. This is used to validate metadata fetches prior to broker registration." },

    { "name": "ReplicaId", "type": "int32", "versions": "0-14", "default": "-1", "entityType": "brokerId",
      "about": "The broker ID of the follower, of -1 if this request is from a consumer." },
      
    { "name": "ReplicaState", "type": "ReplicaState", "versions": "15+", "taggedVersions": "15+", "tag": 1, "fields": [
      { "name": "ReplicaId", "type": "int32", "versions": "15+", "default": "-1", "entityType": "brokerId",
        "about": "The replica ID of the follower, or -1 if this request is from a consumer." },
      { "name": "ReplicaEpoch", "type": "int64", "versions": "15+", "default": "-1",
        "about": "The epoch of this follower, or -1 if not available." }
    ]},
    // ...lots more fields...
 ]
}

Note that both ClusterId and ReplicaState are tagged fields (have a tag attribute), whereas ReplicaId does not.

The structure for FetchRequest:

#[derive(Deserialize, Serialize)]
struct FetchRequest {
  cluster_id: Option<String>,
  replica_id: Option<i32>,
  replica_state: Option<ReplicaState>,
  // ...lots more fields...
}

Both cluster_id and replica_state are Option because they are tagged fields, present only versions 12+ and 15+ respectfully (and even in those versions their presence is optional). The replica_id is also Option because it is only valid for versions 0-14 (essentially being deprecated by the ReplicaId found in the ReplicaState).

The mezzanine version of FetchRequest, where the tagged fields have been encoded into the tag_buffer is:

mod mezzanine {
  #[derive(Deserialize, Serialize)]
  struct FetchRequest {
    replica_id: Option<i32>,
    // ...lots more fields...
    tag_buffer: Option<TagBuffer>,
  }
}

Again, tag_buffer in the mezzanine is an Option because in versions 0-11 it isn't present. Both cluster_id and replica_state are not present because they are already encoded into tag_buffer at this point.

The representation of ReplicaState also contains a tag_buffer, because tagged fields can be extended with further tagged fields:

mod mezzanine {
  #[derive(Deserialize, Serialize)]
  struct ReplicaState {
    replica_id: i32,
    replica_epoch: i64,
    tag_buffer: Option<TagBuffer>,
  }
}

All the type mapping parses the original Apache Kafka® JSON descriptors generating the appropriate types (both tagged and untagged) with helpers to convert from tagged to untagged (and vice versa). All the types also derive Clone, Debug, Default, Eq (where possible), Hash (where possible), Ord (where possible), PartialEq, PartialOrd, Deserialize and Serialize.

The code generation is done by tansu-kafka-sans-io/build.rs, with quote, syn and proc_macro2 doing all the heavy lifting. Finally! The generated code is incorporated into tansu-kafka-sans-io/lib.rs with:

include!(concat!(env!("OUT_DIR"), "/generate.rs"));

Once we have the generated types, we just need to write the Deserializer and Serializer to actually implement the Apache Kafka® protocol. For this article, we'll concentrate on serializing a String looking at STRING and COMPACT_STRING. In Tansu the Serializer is an Encoder:

pub struct Encoder<'a> {
    writer: &'a mut dyn Write,
    // lots of other fields
}

With the following implementation for serializing either STRING or COMPACT_STRING:

impl Serializer for &mut Encoder<'_> {
    fn serialize_str(self, v: &str) -> Result<Self::Ok, Self::Error> {
        if self.in_header()
            && self.kind.is_some_and(|kind| kind == Kind::Request)
            && self.field.is_some_and(|field| field == "client_id")
        {
            v.len()
                .try_into()
                .map_err(Into::into)
                .and_then(|len| self.serialize_i16(len))
                .and(self.writer.write_all(v.as_bytes()).map_err(Into::into))
        } else if self.is_valid() {
            if self.is_flexible() {
                (v.len() + 1)
                    .try_into()
                    .map_err(Into::into)
                    .and_then(|len| self.unsigned_varint(len))
                    .and(self.writer.write_all(v.as_bytes()).map_err(Into::into))
            } else {
                v.len()
                    .try_into()
                    .map_err(Into::into)
                    .and_then(|len| self.serialize_i16(len))
                    .and(self.writer.write_all(v.as_bytes()).map_err(Into::into))
            }
        } else {
            Ok(())
        }
    }
}

The first part of the implementation deals with the client_id in any request header, that for backwards compatibility uses an i16 to encode its length. There are only 2 exceptions like this (AFAIK), the other is in the header response to an ApiVersionsResponse which for similar reasons has an inflexible header, followed by a flexible body!

The self.is_valid() determines whether the field should be serialized in this version of the message. If it is, a COMPACT_STRING (flexible) has v.len() + 1 using an unsigned variable length integer, whereas STRING just uses a plain v.len() as i16, both followed by the bytes in the field. If you're wondering, when the field is null that is handled separately by serialize_none.

Testing

Tansu includes an Apache Kafka® Proxy used to capture traffic between various clients and brokers. The packets are then used in test cases to verify that encoding and decoding are correct.

In the following test case expected is the raw bytes from an ApiVersionsRequest version 3, captured by the proxy. Firstly, the test deserializes the raw expected bytes into a protocol frame using Frame::request_from_bytes(&expected), that frame is then serialized again with Frame::request(frame.header, frame.body) and verified that it is the same as the original expected bytes.

#[test]
fn api_versions_request_v3_000() -> Result<()> {
    let _guard = init_tracing()?;

    let expected = vec![
        0, 0, 0, 52, 0, 18, 0, 3, 0, 0, 0, 3, 0, 16, 99, 111, 110, 115, 111, 108, 101, 45, 112,
        114, 111, 100, 117, 99, 101, 114, 0, 18, 97, 112, 97, 99, 104, 101, 45, 107, 97, 102, 107,
        97, 45, 106, 97, 118, 97, 6, 51, 46, 54, 46, 49, 0,
    ];

    assert_eq!(
        expected,
        Frame::request_from_bytes(&expected)
            .and_then(|frame| Frame::request(frame.header, frame.body))?
    );

    Ok(())
}

Conclusion

Tansu uses serde, quote, syn and proc_macro2 to implement the Apache Kafka® protocol. Tansu is a drop-in replacement for Apache Kafka® with PostgreSQL, S3 or memory storage engines. Licensed under the GNU AGPL, and written in 100% safe 🦺 async 🚀 Rust 🦀.