# Consuming Data in Kafka * [Start a Kafka environment](#Start-a-Kafka-environment) * [Building a consumer in Go](#Building-a-consumer-in-Go) * [Starting our Kafka Consumer](#Starting-our-Kafka-Consumer) * [Consume Messages in Random Order](#Consume-Messages-in-Random-Order) * [Consume Messages in Order](#Consume-Messages-in-Order) ## Start a Kafka environment Let's start our Kafka components: ``` cd messaging/kafka #only start the kafka containers, not everything! docker compose up zookeeper-1 kafka-1 kafka-2 kafka-3 #ensure its running! docker ps ``` ### Create a Topic: Orders To create a topic, we can `exec` into any container on our kafka network and create it.
We'll need a Topic for our orders: ``` docker exec -it zookeeper-1 bash # create /kafka/bin/kafka-topics.sh \ --create \ --zookeeper zookeeper-1:2181 \ --replication-factor 1 \ --partitions 3 \ --topic Orders # describe /kafka/bin/kafka-topics.sh \ --describe \ --zookeeper zookeeper-1:2181 \ --topic Orders exit ``` Now that we have our Kafka infrastructure, let's create our consumer. ## Building a consumer in Go Example code: [Sarama Library](https://github.com/Shopify/sarama/blob/master/examples/consumergroup/main.go) We start with `main.go`: ``` package main import ( "context" "fmt" "os" "os/signal" "strings" "sync" "syscall" "gopkg.in/Shopify/sarama.v1" ) var kafkaBrokers = os.Getenv("KAFKA_PEERS") var kafkaTopic = os.Getenv("KAFKA_TOPIC") var kafkaVersion = os.Getenv("KAFKA_VERSION") var kafkaGroup = os.Getenv("KAFKA_GROUP") func main() { } ``` Above we define our dependencies, and connection details for kafka.
Let's define our client config: ``` version, err := sarama.ParseKafkaVersion(kafkaVersion) config := sarama.NewConfig() config.Version = version config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin config.Consumer.Offsets.Initial = sarama.OffsetOldest ctx, cancel := context.WithCancel(context.Background()) client, err := sarama.NewConsumerGroup(strings.Split(kafkaBrokers, ","), kafkaGroup, config) if err != nil { fmt.Printf("Failed to init Kafka consumer group: %s", err) panic(err) } ``` Now that we have a client and consumer group connection with Kafka, we can proceed to define our consumer: ``` // in main.go type Consumer struct { ready chan bool } //in main() consumer := Consumer{ ready: make(chan bool), } ``` With the consumer we can now define a wait group which will consume messages as they come in: ``` wg := &sync.WaitGroup{} wg.Add(1) go func() { defer wg.Done() for { // `Consume` should be called inside an infinite loop, when a // server-side rebalance happens, the consumer session will need to be // recreated to get the new claims if err := client.Consume(ctx, strings.Split(kafkaTopic, ","), &consumer); err != nil { fmt.Printf("Error from consumer: %v", err) panic(err) } // check if context was cancelled, signaling that the consumer should stop if ctx.Err() != nil { return } consumer.ready = make(chan bool) } }() <-consumer.ready // Await till the consumer has been set up fmt.Println("Sarama consumer up and running!...") ``` In case we need to exit, let's handle exit signals ``` sigterm := make(chan os.Signal, 1) signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM) select { case <-ctx.Done(): fmt.Println("terminating: context cancelled") case <-sigterm: fmt.Println("terminating: via signal") } cancel() wg.Wait() if err = client.Close(); err != nil { fmt.Printf("Error closing client: %v", err) panic(err) } ``` In addition to the stuct we created, the Sarama library needs us to handle certain functions: ``` // Setup is run at the beginning of a new session, before ConsumeClaim func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error { // Mark the consumer as ready close(consumer.ready) return nil } // Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error { return nil } ``` And finally a ConsumeClaim() to handle the messages coming in: ``` // ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages(). func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { // NOTE: // Do not move the code below to a goroutine. // The `ConsumeClaim` itself is called within a goroutine, see: // https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29 for msg := range claim.Messages() { fmt.Printf("Partition:\t%d\n", msg.Partition) fmt.Printf("Offset:\t%d\n", msg.Offset) fmt.Printf("Key:\t%s\n", string(msg.Key)) fmt.Printf("Value:\t%s\n", string(msg.Value)) fmt.Printf("Topic:\t%s\n", msg.Topic) fmt.Println() session.MarkMessage(msg, "") } return nil } ``` That's it for the code, we can build it by using Docker compose
Let's define it as a new service in our `docker-compose.yaml` file ``` kafka-consumer-go: container_name: kafka-consumer-go image: aimvector/kafka-consumer-go:1.0.0 build: context: ./applications/consumer environment: - "KAFKA_PEERS=kafka-1:9092,kafka-2:9092,kafka-3:9092" - "KAFKA_TOPIC=Orders" - "KAFKA_VERSION=2.7.0" - "KAFKA_GROUP=orders" networks: - kafka ``` To build our consumer, we'll need a `dockerfile`: ``` FROM golang:1.16-alpine as dev-env RUN apk add --no-cache git gcc musl-dev WORKDIR /app ``` Let's build our container so we can compile our `go` code: ``` cd .\messaging\kafka\applications\consumer\ docker build . -t consumer docker run -it -v ${PWD}:/app consumer sh ls -l go mod init consumer go mod tidy go build ``` Now that we have our `go.mod` and `go.sum` files, and code is comiling, we can proceed to create a container that runs our app. Let's extend the `dockerfile` ``` FROM dev-env as build-env COPY go.mod /go.sum /app/ RUN go mod download COPY . /app/ RUN CGO_ENABLED=0 go build -o /consumer FROM alpine:3.10 as runtime COPY --from=build-env /consumer /usr/local/bin/consumer RUN chmod +x /usr/local/bin/consumer ENTRYPOINT ["consumer"] ``` Now we can proceed to build it: ``` cd .\messaging\kafka\ docker compose build kafka-consumer-go ``` ## Starting our Kafka Consumer Now with our Kafka environment and topic ready, our consumer can start and subscribe to the orders topic: ``` cd messaging/kafka docker compose up kafka-consumer-go ``` ## Consume Messages in Random Order Let's produce messages in a loop ``` cd .\messaging\kafka\ docker compose up -d kafka-producer docker exec -it kafka-producer bash ``` Produce messages in a loop: ``` upperlim=10 for ((i=0; i<=upperlim; i++)); do echo "{ 'id' : 'order-$i', 'data' : 'random-data'}" | \ /kafka/bin/kafka-console-producer.sh \ --broker-list kafka-1:9092,kafka-2:9092,kafka-3:9092 \ --topic Orders > /dev/null done ```

Notice messages go into partition in round robin. Partition provides distribution of messages so all our messages don't take up all the space on a single broker and distributed more evenly

Notice when we stop and restart out consumer, it does not re-consume message from the beginning.

This is because the brokers remember the offset where the consumer has read up to in our case as we use consumer groups.

Reading messages will occur in random order because they exist in different partitions: ``` cd .\messaging\kafka\ docker compose up -d kafka-consumer docker exec -it kafka-consumer bash /kafka/bin/kafka-console-consumer.sh \ --bootstrap-server kafka-1:9092,kafka-2:9092,kafka-3:9092 \ --topic Orders --from-beginning ``` Notice we receive messages out of order and also able to read messages from the beginning too. ## Consume Messages in Order To consume messages in given order, we need to use message keys. When using keys, Kafka will place all messages with a given key in the same partition. This is useful for event based transactional systems where order is important. Let's say we have `order-11` and we want to update it 10 times. Order here is important so we'l structure our message with a key. ``` upperlim=10 for ((i=0; i<=upperlim; i++)); do echo "order-11: { 'id' : 'order-11', 'data' : '$i'}" | \ /kafka/bin/kafka-console-producer.sh \ --broker-list kafka-1:9092,kafka-2:9092,kafka-3:9092 \ --topic Orders > /dev/null \ --property "parse.key=true" --property "key.separator=:" done ```