Consuming Kafka Messages in Kubernetes

Sebastien Goasguen
3 min readJun 18, 2021

I will keep this short and to the point. There has been an increasing need for real-time data handling within the enterprise. Whether that’s a freshness of data issue, a change data capture issue or an event driven workflow issue. The latest very nice public example I saw about this is Shopify rearchitecture of their data warehouse using Debezium.

Franz Kafka above…

So let’s take that for granted for a second. What do people do ? They run Apache Kafka , arguably the leader in message brokering and data streaming.

In those enterprises that use Kafka let’s sprinkle a bit of Kubernetes because it is now the de-facto platform for managing containerized workload and because containers have become a really handy application artefact to use. So now you have Kafka and you have Kubernetes and to marry the two what do you need ? You need a declarative API to describe what to do with your Kafka messages. Meaning that instead of writing your own Kafka client consumers, packaging that in your code etc, wouldn’t it be nice if you could declaratively define a Kafka Source (i.e A Kafka topic is the source of a message) and set the target for that message in a Kubernetes manifest ? Yes that would be nice.

A Declarative Kafka Source

That’s where Knative comes to the rescue. Knative is about serverless but what a lot of people mis-interpret is that serverless is mostly about handling events and building event-driven applications.

Knative gives us a Kubernetes API extension (i.e CRD) KafkaSource and a controller to declaratively define how we consume messages from Kafka. Isn’t that handy ? Yes, no more writing your own clients, just write config. What is even more handy is that this particular KafkaSource codebase can run without Knative, you just need the namespace :) (mind blown I know, it just means that you don’t need to install all of the core Knative, just the Kafka bits). So let’s do it:

kubectl create ns knative-eventing
kubectl apply -f https://storage.googleapis.com/knative-nightly/eventing-kafka/latest/source.yaml

Easy enough, one apply and we have our CRD and our controller. Let’s check it:

kubectl get pods -n knative-eventing
NAME READY STATUS RESTARTS AGE
kafka-controller-manager-78c5588947-wt5bj 1/1 Running 0 53s
kubectl get crd | grep kafka
kafkabindings.bindings.knative.dev 2021–06–18T08:44:51Z
kafkasources.sources.knative.dev 2021–06–18T08:44:52Z

And now you can write which Kafka topic you want to consume messages from and where to send them with a beautiful Kubernetes manifest that looks like this:

apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
name: my-kafka
spec:
bootstrapServers:
- pkc-453q9.us-east4.gcp.confluent.cloud:9092
net:
sasl:
enable: true
password:
secretKeyRef:
key: password
name: kafkahackathon
type:
secretKeyRef:
key: sasl.mechanism
name: kafkahackathon
user:
secretKeyRef:
key: user
name: kafkahackathon
...
sink:
ref:
apiVersion: v1
kind: Service
name: display
topics:
- hackathon

For clarity I skipped the configuration of the TLS certificates. Note that in the example I am consuming messages from a Kafka cluster running in Confluent cloud. You can find more complete documentation. Or just reach out to me :)

To display the events we can deploy a super simple event Sink like this:

kubectl run display — image=gcr.io/knative-releases/knative.dev/eventing/cmd/event_display

kubectl expose pod display --port=80

The target for the Kafka messages is defined in the sink section of the KafkaSource manifest. What we then see is that whatever you have in your Kafka topic will get consumed and sent over to the display as a CloudEvent and will look something like:

kubectl logs -f display
☁️ cloudevents.Event
Context Attributes,
specversion: 1.0
type: dev.knative.kafka.event
source: /apis/v1/namespaces/default/kafkasources/my-kafka#hackathon
subject: partition:0#106
id: partition:0/offset:106
time: 2021–06–18T09:21:30.489Z
Extensions,
kafkaheaderkinesisapproximatearrivaltimestamp: 2021–06–18T09:21:30.489Z
kafkaheaderkinesispartitionkey: static
kafkaheaderkinesissequencenumber: 49618035385845691671998692582194447474413536493243990018
kafkaheaderkinesisshardid: shardId-000000000000
kafkaheaderkinesisstreamname: confluent
key: static
Data,
{“specversion”:”1.0",”id”:”8dda1630-d016–11eb-8b1d-f6456a02a124",”source”:”https://github.com/sebgoa/transform","type":"dev.knative.source.github.issues","subject":"8","datacontenttype":"application/json","time":"2021-06-18T09:21:28.036834426Z","data":{"action":"reopened","issue":...

So there you have it, declaratively defining your Kafka messages consumption and sending the messages in the CloudEvent format to a Kubernetes workload.

Next blog I will show you how to do the opposite: Send messages to a Kafka topic declaratively. And if I jump ahead a little bit, this means that you are able to declaratively define your Kafka sources and sinks with Kubernetes manifests, no more pesky jar compilation and jar uploading to configure Kafka connect. Isn’t that nice.

--

--

Sebastien Goasguen

@sebgoa, Kubernetes lover, R&D enthusiast, co-Founder of TriggerMesh, O’Reilly author, sports fan, husband and father…