Guarding Data Quality with Kafka Schema Validation
Being aware of structural changes in your incoming data can help with ensuring data quality in your data platform. When ingesting data into Kafka from an external (or internal) source the messages are received as-is. We want to be notified as soon as possible if something changes in the structure of the source data, ideally before it ends up in the data platform. If structurally different data ends up in the data platform it could cause errors in pipelines, or worse, silently cause data corruption. In this blog, we’ll see how we can avoid this by using the Confluent Schema Registry and AVRO schemas.
Confluent Schema Registry
The Confluent Schema Registry provides a RESTful interface for storing and retrieving AVRO, JSON and Protobuf schemas, and it’s the tool that we’ll be using to store the schemas.
Confluent is a company founded by the creators of Kafka. They provide fully managed Kafka, but also develop open source software to work alongside Kafka like the Confluent Schema Registry. There will be one schema per topic, so the messages within a topic should adhere to that topic’s schema. The Schema Registry provides versioning of schemas, and allows for the configuration of compatibility settings (forward compatibility, backwards, etc.)
Creating the schema
We will create a topic called reviews-dev which will contain reviews. We’ll implement the schema as a Python dictionary, and then convert it to an AVRO file using the fastavro library. Doing it this way has the advantage of validating the schema so we know it’s a valid AVRO schema.
The schema and the code to write it to an AVRO schema file looks like this:
Some things to note:
- Fields can have multiple types
- To make a field nullable add type ‘null’ (see line 24)
- AVRO supports a variety of data types, including enums (lines 14-22), but also nested structures. Naming these structs allows them to be reused later in the file (“ReviewType” for example could be reused later on as a type, by simply putting “ReviewType” as a type).
- The schema is parsed on line 28
- The schema is used to validate some example record (lines 30-35)
- The schema is written as JSON (lines 37-38)
- The _write_hint=False is included on line 28 to prevent the injection of a field called ‘__fastavro_parsed’.
Using the schema
Now that the schema has been created, we want to store it somewhere where it can be used by others, be versioned, and have compatibility checks. To do this, we will use the Schema Registry.
For this demo we are running a local deployment of the Confluent Platform, see these for instructions on how to do that. The schema can be attached to a topic by navigating to the Confluent Control Center (in our case at http://localhost:9021), select your cluster, then the topic, clicking the “Schema” tab and hitting “Set a Schema”. Paste the AVRO schema here as we created it in the previous step, this will result in an uploaded schema looking like this:
Having uploaded it to the Schema Registry, we can now fetch it in our code and validate new messages against the schema. The following pip libraries will be required for this method:
- pip install confluent-kafka
- pip install confluent-kafka[avro]
- pip install avro-python3==1.8.2
Note that the version for avro-python3 is pinned to 1.8.2, due to this bug.
The code then looks like this:
A couple of notes:
- We’re running a local deployment of the Confluent platform in a VM, available on the IP specified in the URL on line 6. These instructions were used to set this up. In a production setting credentials would be required in addition to this URL.
- On line 11, the topic name is suffixed with ‘-value’. This is needed as there can actually be two schemas per topic: one for the message value (which we’re using) and one for the message key.
The validation passes silently if the messages adhere to the schema (as is the case above), or raise an exception if it doesn’t. After the validation the message can be published to Kafka.
Now, say a field type is changed, the validation will catch it. Here we change the type of the ‘time’ field from long to a string:
Which will raise:
Or, leaving out a non-optional field (‘user_id’) from the message:
Validating messages before publishing them to Kafka and ingesting them into your data platform can be a powerful tool to ensure data quality. In this blog post we gave an example of how this can be accomplished using the Confluent Schema Registry and AVRO schemas.