Sending Messages to Kafka in Kubernetes

Sebastien Goasguen
ITNEXT
Published in
3 min readJun 18, 2021

--

In this previous blog I showed how to consume messages from Kafka declaratively. Today I am going to show you how to do the opposite, how to produce messages into Kafka aka how to send Messages to a Kafka topic declaratively.

Head over to the previous blog for context. Basically we are trying to write Kubernetes manifests to declare how we send and receive or produce/consume messages to a Kafka topic. The diagram above shows it all. For the confusion part of this blog note that a KafkaSink produces an event to a Kafka topic and therefore is a Kafka source from the POV of the Kafka cluster :) and therefore the KafkaSource consumes an event from a Kafka topic and therefore is a Kafka sink from the POV of the Kafka cluster, it would have been to easy otherwise !!!

Why ?

Because it would be much easier than writing my own Kafka clients, compiling, packaging, deploying etc. If only I could write a bit of config and let a system like Kubernetes manage it for me that would be great.

How ?

We install two controllers and a CRD for a new kind called KafkaSink and for good measures (otherwise shit hits the fan) we install the knative eventing CRD (we will get that fixed because it would be nice to avoid having to do that step…).

kubectl apply -f https://github.com/knative/eventing/releases/download/v0.23.0/eventing-crds.yamlkubectl apply --filename https://github.com/knative-sandbox/eventing-kafka-broker/releases/download/v0.23.0/eventing-kafka-controller.yamlkubectl apply --filename https://github.com/knative-sandbox/eventing-kafka-broker/releases/download/v0.23.0/eventing-kafka-sink.yaml

Shortly after the apply, the Pods are running. Note that the sample below also shows the controller for KafkaSource

kubectl get pods -n knative-eventing
NAME READY STATUS RESTARTS AGE
kafka-controller-86948cddc6-g5mnn 1/1 Running 0 2m18s
kafka-controller-manager-78c5588947-wt5bj 1/1 Running 0 5h29m
kafka-sink-receiver-7dcb7b9b44–5w6j7 1/1 Running 0 115s
kafka-webhook-eventing-fb6ddfd9d-jsmzm 1/1 Running 0 2m18s

And our new fancy CRD KafkaSink is in place

kubectl get crd | grep kafka
kafkabindings.bindings.knative.dev 2021–06–18T08:44:51Z
kafkasinks.eventing.knative.dev 2021–06–18T14:12:12Z
kafkasources.sources.knative.dev 2021–06–18T08:44:52Z

The KafkaSink in Action

Using the same secret that allows us to talk to our Kafka cluster in confluent cloud than in the previous blog, we can now setup a Kafka sink declaratively like so:

apiVersion: eventing.knative.dev/v1alpha1
kind: KafkaSink
metadata:
name: my-kafka-topic
spec:
auth:
secret:
ref:
name: kafkahackathon
bootstrapServers:
- pkc-456q9.us-east4.gcp.confluent.cloud:9092
topic: hackathon

What this will do is setup an HTTP endpoint in my Kubernetes cluster where I can POST CloudEvents. These CloudEvents will then be produced to my Kafka cluster. The KafkaSink looks like:

kubectl get kafkasink
NAME URL AGE READY REASON
my-kafka-topic http://kafka-sink-ingress.knative-eventing.svc.cluster.local/default/my-kafka-topic 12m True

Now that we have an HTTP endpoint acting as an HTTP proxy to a Kafka topic, we can simply POST a CloudEvent to this endpoint and the Sink will “produce” the event to the topic. To do this we can get a quick shell in our cluster and with curl craft a CloudEvent by hand like so:

curl -v “http://kafka-sink-ingress.knative-eventing.svc.cluster.local/default/my-kafka-topic" \
> -X POST \
> -H “Ce-Id: 536808d3–88be-4077–9d7a-a3f162705f79” \
> -H “Ce-Specversion: 1.0” \
> -H “Ce-Type: dev.knative.samples.helloworld” \
> -H “Ce-Source: dev.knative.samples/helloworldsource” \
> -H “Content-Type: application/json” \
> -d ‘{“msg”:”Hello World from the curl pod.”}’

Checking the logs in the display Pod we see our CloudEvent being received by that KafkaSource that we setup in our previous Blog.

☁️ cloudevents.Event
Context Attributes,
specversion: 1.0
type: dev.knative.samples.helloworld
source: dev.knative.samples/helloworldsource
id: 536808d3–88be-4077–9d7a-a3f162705f79
datacontenttype: application/json
Data,
{
“msg”: “Hello World from the curl pod.”
}

The flow is finished, we can produce events anywhere as CloudEvents and get them into a Kafka topic then consume that event and send it to a Kubernetes workload all of that defined declaratively.

Which opens the door for …drum roll…a declarative/kubernetes way to configure Kafka sources and sinks. This will be the next blog.

--

--

@sebgoa, Kubernetes lover, R&D enthusiast, co-Founder of TriggerMesh, O’Reilly author, sports fan, husband and father…