Effortlessly Convert Kafka Messages to Apache Parquet with Tansu: A Step-by-Step Guide
Tansu seamlessly writes Apache Parquet record batches for any topic backed by a Protocol Buffer, Apache Avro, or JSON schema. It automatically transforms structured data into columnar Parquet format, making it easy to process with other tools. Tansu is fully compatible with Apache Kafka, so you can continue using your existing client code without changes.
In this example, we'll use Protocol Buffer to represent taxi ride data and send it as Kafka messages into Tansu. Tansu automatically converts the structured data into Apache Parquet files by first mapping it to equivalent Apache Arrow types, then writing it as a Parquet record batch.
We will be using:
- Tansu, licensed under the GNU AGPL, written in Rust and available on GitHub
- DuckDB, to easily read the generated Apache Parquet
In this example, the Protocol Buffer schema is
called taxi.proto and will be applied to the taxi
topic:
syntax = 'proto3';
enum Flag {
N = 0;
Y = 1;
}
message Value {
int64 vendor_id = 1;
int64 trip_id = 2;
float trip_distance = 3;
double fare_amount = 4;
Flag store_and_fwd = 5;
}
Tansu supports broker-side validation for topics backed by a schema. If a client
sends an invalid message to a schema-enforced topic, the broker will automatically
reject it with an InvalidRecord
error. This ensures that only schema-compliant,
validated data is stored in the topic.
Schemas can be easily shared between clients and the broker using an S3 bucket. Tansu can be configured to load schemas from either S3 or a local file system directory.
Kafka messages consist of a key and/or a value component. The schema shown above defines the
value structure expected on the topic. By including a Key
message in the schema,
Tansu can also validate the key component. For each topic, Tansu allows you to specify
schemas for the Key
, the Value
, or both.
Schemas are simply named after the topic that they represent, some examples:
- taxi.proto, a Protocol Buffer schema for the taxi topic
- observation.asvc, an Apache Avro schema for the observation topic
- person.json, a JSON schema for the person topic
To simplify command-line usage, we'll create a .env
file in the working directory.
Tansu automatically reads and applies environment variables from a .env
file located there.
STORAGE_ENGINE=s3://tansu/
DATA_LAKE=s3://lake/
SCHEMA_REGISTRY=file://./etc/schema
- STORAGE_ENGINE, is the location of Kafka metadata that is stored by Tansu
- DATA_LAKE, is the location where Tansu will write Apache Parquet files
- SCHEMA_REGISTRY, is the location of our schemas used by the broker to validate messages, and by the client to encode messages.
To start the broker:
tansu broker
Tansu will use the .env
to apply
--storage-engine=s3://tansu/
,
--data-lake=s3://lake/
and
--schema-registry=file://./etc/schema
arguments to the broker command.
Tansu supports S3, PostgreSQL and memory storage engines for Kafka messages
and related metadata. In this example we're using S3, and also using a
separate bucket to store Apache Parquet in our data lake.
Our schema registry is a directory on the local filesystem, but can also be on S3.
The broker will by default to listen the standard Kafka port of tcp://[::]:9092
.
We use tansu cat
to encode and send our messages to Tansu. It directly supports
the encoding and decoding of Protobuf, Avro and JSON schema messages. It uses a neutral JSON message
format that is converted into Protobuf/Avro/JSON without requiring other tools.
tansu cat produce taxi etc/data/trips.json
Tansu will use the .env
applying --schema-registry=file://./etc/schema
to the above command.
Our sample data is in etc/data/trips.json, containing a series of JSON objects within an array:
[{"value": {"vendor_id": 1, "trip_id": 1000371, "trip_distance": 1.8, "fare_amount": 15.32, "store_and_fwd": "N"}},
{"value": {"vendor_id": 2, "trip_id": 1000372, "trip_distance": 2.5, "fare_amount": 22.15, "store_and_fwd": "N"}},
{"value": {"vendor_id": 2, "trip_id": 1000373, "trip_distance": 0.9, "fare_amount": 9.01, "store_and_fwd": "N"}},
{"value": {"vendor_id": 1, "trip_id": 1000374, "trip_distance": 8.4, "fare_amount": 42.13, "store_and_fwd": "Y"}}]
The trips are piped into tansu cat produce
converting each JSON object into the schema used
for the taxi
topic. Tansu is a single statically linked application providing a broker,
proxy and various utility and administration subcommands (including cat
).
Producing messages will create a Parquet file, which we can verify using a little π¦ DuckDB magic πͺ:
duckdb :memory: "SELECT * FROM 's3://lake/taxi/*/*.parquet'"
Giving the following rows:
βββββββββββββ¬ββββββββββ¬ββββββββββββββββ¬ββββββββββββββ¬ββββββββββββββββ
β vendor_id β trip_id β trip_distance β fare_amount β store_and_fwd β
β int64 β int64 β float β double β int32 β
βββββββββββββΌββββββββββΌββββββββββββββββΌββββββββββββββΌββββββββββββββββ€
β 1 β 1000371 β 1.8 β 15.32 β 0 β
β 2 β 1000372 β 2.5 β 22.15 β 0 β
β 2 β 1000373 β 0.9 β 9.01 β 0 β
β 1 β 1000374 β 8.4 β 42.13 β 1 β
βββββββββββββ΄ββββββββββ΄ββββββββββββββββ΄ββββββββββββββ΄ββββββββββββββββ
The Tansu broker maps the protobuf encoded data into equivalent Apache Arrow data types, creating an Apache Arrow Record Batch for each Kafka message batch, the final step is then to write it as an Apache Parquet file. In the case of this example, the produce creates a single Kafka message batch of 4 records, that are written as a Parquet file named after the topic, partition and starting Kafka offset.
We can consume from the topic using the Kafka API:
tansu cat consume taxi
Tansu will use the .env
applying --schema-registry=file://./etc/schema
to the above.
The command above consumes messages from the taxi
topic and automatically
converts the binary Protobuf data into human-readable JSON. Since the Kafka
message doesn't include a key, the key
field is shown as null
.
[{"key": null,
"value":{
"fareAmount": 15.32,
"tripDistance": 1.8,
"tripId": "1000371",
"vendorId": "1"}},
{"key": null,
"value":{
"fareAmount": 22.15,
"tripDistance": 2.5,
"tripId": "1000372",
"vendorId": "2"}},
{"key": null,
"value":{
"fareAmount":9.01,
"tripDistance":0.9,
"tripId":"1000373",
"vendorId":"2"}},
{"key":null,
"value":{
"fareAmount":42.13,
"storeAndFwd":"Y",
"tripDistance":8.4,
"tripId":"1000374",
"vendorId":"1"}}]
In this article, weβve used Tansu to validate and automatically convert structured Kafka messages into Apache Parquet format. While we demonstrated this with a Protobuf schema, Tansu also supports Apache Avro and JSON schemas.