Sending Messages to Kafka in Kubernetes
In this previous blog I showed how to consume messages from Kafka declaratively. Today I am going to show you how to do the opposite, how to produce messages into Kafka aka how to send Messages to a Kafka topic declaratively.
Head over to the previous blog for context. Basically we are trying to write Kubernetes manifests to declare how we send and receive or produce/consume messages to a Kafka topic. The diagram above shows it all. For the confusion part of this blog note that a KafkaSink
produces an event to a Kafka topic and therefore is a Kafka source from the POV of the Kafka cluster :) and therefore the KafkaSource
consumes an event from a Kafka topic and therefore is a Kafka sink from the POV of the Kafka cluster, it would have been to easy otherwise !!!
Why ?
Because it would be much easier than writing my own Kafka clients, compiling, packaging, deploying etc. If only I could write a bit of config and let a system like Kubernetes manage it for me that would be great.
How ?
We install two controllers and a CRD for a new kind called KafkaSink
and for good measures (otherwise shit hits the fan) we install the knative eventing CRD (we will get that fixed because it would be nice to avoid having to do that step…).
kubectl apply -f https://github.com/knative/eventing/releases/download/v0.23.0/eventing-crds.yamlkubectl apply --filename https://github.com/knative-sandbox/eventing-kafka-broker/releases/download/v0.23.0/eventing-kafka-controller.yamlkubectl apply --filename https://github.com/knative-sandbox/eventing-kafka-broker/releases/download/v0.23.0/eventing-kafka-sink.yaml
Shortly after the apply, the Pods are running. Note that the sample below also shows the controller for KafkaSource
kubectl get pods -n knative-eventing
NAME READY STATUS RESTARTS AGE
kafka-controller-86948cddc6-g5mnn 1/1 Running 0 2m18s
kafka-controller-manager-78c5588947-wt5bj 1/1 Running 0 5h29m
kafka-sink-receiver-7dcb7b9b44–5w6j7 1/1 Running 0 115s
kafka-webhook-eventing-fb6ddfd9d-jsmzm 1/1 Running 0 2m18s
And our new fancy CRD KafkaSink
is in place
kubectl get crd | grep kafka
kafkabindings.bindings.knative.dev 2021–06–18T08:44:51Z
kafkasinks.eventing.knative.dev 2021–06–18T14:12:12Z
kafkasources.sources.knative.dev 2021–06–18T08:44:52Z
The KafkaSink in Action
Using the same secret that allows us to talk to our Kafka cluster in confluent cloud than in the previous blog, we can now setup a Kafka sink declaratively like so:
apiVersion: eventing.knative.dev/v1alpha1
kind: KafkaSink
metadata:
name: my-kafka-topic
spec:
auth:
secret:
ref:
name: kafkahackathon
bootstrapServers:
- pkc-456q9.us-east4.gcp.confluent.cloud:9092
topic: hackathon
What this will do is setup an HTTP endpoint in my Kubernetes cluster where I can POST CloudEvents. These CloudEvents will then be produced to my Kafka cluster. The KafkaSink
looks like:
kubectl get kafkasink
NAME URL AGE READY REASON
my-kafka-topic http://kafka-sink-ingress.knative-eventing.svc.cluster.local/default/my-kafka-topic 12m True
Now that we have an HTTP endpoint acting as an HTTP proxy to a Kafka topic, we can simply POST a CloudEvent to this endpoint and the Sink will “produce” the event to the topic. To do this we can get a quick shell in our cluster and with curl craft a CloudEvent by hand like so:
curl -v “http://kafka-sink-ingress.knative-eventing.svc.cluster.local/default/my-kafka-topic" \
> -X POST \
> -H “Ce-Id: 536808d3–88be-4077–9d7a-a3f162705f79” \
> -H “Ce-Specversion: 1.0” \
> -H “Ce-Type: dev.knative.samples.helloworld” \
> -H “Ce-Source: dev.knative.samples/helloworldsource” \
> -H “Content-Type: application/json” \
> -d ‘{“msg”:”Hello World from the curl pod.”}’
Checking the logs in the display Pod we see our CloudEvent being received by that KafkaSource
that we setup in our previous Blog.
☁️ cloudevents.Event
Context Attributes,
specversion: 1.0
type: dev.knative.samples.helloworld
source: dev.knative.samples/helloworldsource
id: 536808d3–88be-4077–9d7a-a3f162705f79
datacontenttype: application/json
Data,
{
“msg”: “Hello World from the curl pod.”
}
The flow is finished, we can produce events anywhere as CloudEvents and get them into a Kafka topic then consume that event and send it to a Kubernetes workload all of that defined declaratively.
Which opens the door for …drum roll…a declarative/kubernetes way to configure Kafka sources and sinks. This will be the next blog.