Consuming Kafka Messages in Kubernetes
I will keep this short and to the point. There has been an increasing need for real-time data handling within the enterprise. Whether that’s a freshness of data issue, a change data capture issue or an event driven workflow issue. The latest very nice public example I saw about this is Shopify rearchitecture of their data warehouse using Debezium.
Franz Kafka above…
So let’s take that for granted for a second. What do people do ? They run Apache Kafka , arguably the leader in message brokering and data streaming.
In those enterprises that use Kafka let’s sprinkle a bit of Kubernetes because it is now the de-facto platform for managing containerized workload and because containers have become a really handy application artefact to use. So now you have Kafka and you have Kubernetes and to marry the two what do you need ? You need a declarative API to describe what to do with your Kafka messages. Meaning that instead of writing your own Kafka client consumers, packaging that in your code etc, wouldn’t it be nice if you could declaratively define a Kafka Source (i.e A Kafka topic is the source of a message) and set the target for that message in a Kubernetes manifest ? Yes that would be nice.
A Declarative Kafka Source
That’s where Knative comes to the rescue. Knative is about serverless but what a lot of people mis-interpret is that serverless is mostly about handling events and building event-driven applications.
Knative gives us a Kubernetes API extension (i.e CRD) KafkaSource
and a controller to declaratively define how we consume messages from Kafka. Isn’t that handy ? Yes, no more writing your own clients, just write config. What is even more handy is that this particular KafkaSource codebase can run without Knative, you just need the namespace :) (mind blown I know, it just means that you don’t need to install all of the core Knative, just the Kafka bits). So let’s do it:
kubectl create ns knative-eventing
kubectl apply -f https://storage.googleapis.com/knative-nightly/eventing-kafka/latest/source.yaml
Easy enough, one apply and we have our CRD and our controller. Let’s check it:
kubectl get pods -n knative-eventing
NAME READY STATUS RESTARTS AGE
kafka-controller-manager-78c5588947-wt5bj 1/1 Running 0 53skubectl get crd | grep kafka
kafkabindings.bindings.knative.dev 2021–06–18T08:44:51Z
kafkasources.sources.knative.dev 2021–06–18T08:44:52Z
And now you can write which Kafka topic you want to consume messages from and where to send them with a beautiful Kubernetes manifest that looks like this:
apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
name: my-kafka
spec:
bootstrapServers:
- pkc-453q9.us-east4.gcp.confluent.cloud:9092
net:
sasl:
enable: true
password:
secretKeyRef:
key: password
name: kafkahackathon
type:
secretKeyRef:
key: sasl.mechanism
name: kafkahackathon
user:
secretKeyRef:
key: user
name: kafkahackathon
...
sink:
ref:
apiVersion: v1
kind: Service
name: display
topics:
- hackathon
For clarity I skipped the configuration of the TLS certificates. Note that in the example I am consuming messages from a Kafka cluster running in Confluent cloud. You can find more complete documentation. Or just reach out to me :)
To display the events we can deploy a super simple event Sink like this:
kubectl run display — image=gcr.io/knative-releases/knative.dev/eventing/cmd/event_display
kubectl expose pod display --port=80
The target for the Kafka messages is defined in the sink
section of the KafkaSource
manifest. What we then see is that whatever you have in your Kafka topic will get consumed and sent over to the display as a CloudEvent and will look something like:
kubectl logs -f display
☁️ cloudevents.Event
Context Attributes,
specversion: 1.0
type: dev.knative.kafka.event
source: /apis/v1/namespaces/default/kafkasources/my-kafka#hackathon
subject: partition:0#106
id: partition:0/offset:106
time: 2021–06–18T09:21:30.489Z
Extensions,
kafkaheaderkinesisapproximatearrivaltimestamp: 2021–06–18T09:21:30.489Z
kafkaheaderkinesispartitionkey: static
kafkaheaderkinesissequencenumber: 49618035385845691671998692582194447474413536493243990018
kafkaheaderkinesisshardid: shardId-000000000000
kafkaheaderkinesisstreamname: confluent
key: static
Data,
{“specversion”:”1.0",”id”:”8dda1630-d016–11eb-8b1d-f6456a02a124",”source”:”https://github.com/sebgoa/transform","type":"dev.knative.source.github.issues","subject":"8","datacontenttype":"application/json","time":"2021-06-18T09:21:28.036834426Z","data":{"action":"reopened","issue":...
So there you have it, declaratively defining your Kafka messages consumption and sending the messages in the CloudEvent format to a Kubernetes workload.
Next blog I will show you how to do the opposite: Send messages to a Kafka topic declaratively. And if I jump ahead a little bit, this means that you are able to declaratively define your Kafka sources and sinks with Kubernetes manifests, no more pesky jar compilation and jar uploading to configure Kafka connect. Isn’t that nice.