consumer wip

This commit is contained in:
marcel-dempers 2021-06-14 15:16:56 +10:00
parent f15521318d
commit c75fe6c76c
3 changed files with 361 additions and 59 deletions

View File

@ -1,6 +1,7 @@
package main package main
import ( import (
"context"
"fmt" "fmt"
"os" "os"
"os/signal" "os/signal"
@ -12,81 +13,103 @@ import (
var kafkaBrokers = os.Getenv("KAFKA_PEERS") var kafkaBrokers = os.Getenv("KAFKA_PEERS")
var kafkaTopic = os.Getenv("KAFKA_TOPIC") var kafkaTopic = os.Getenv("KAFKA_TOPIC")
var kafkaVersion = os.Getenv("KAFKA_VERSION")
var globalProducer sarama.SyncProducer var kafkaGroup = os.Getenv("KAFKA_GROUP")
func main() { func main() {
version, err := sarama.ParseKafkaVersion(kafkaVersion)
config := sarama.NewConfig() config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll config.Version = version
config.Producer.Return.Successes = true config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
config.Producer.Partitioner = sarama.NewRandomPartitioner config.Consumer.Offsets.Initial = sarama.OffsetOldest
ctx, cancel := context.WithCancel(context.Background())
client, err := sarama.NewConsumerGroup(strings.Split(kafkaBrokers, ","), kafkaGroup, config)
consumer, err := sarama.NewConsumer(strings.Split(kafkaBrokers, ","), config)
if err != nil { if err != nil {
fmt.Printf("Failed to open Kafka consumer: %s", err) fmt.Printf("Failed to init Kafka consumer group: %s", err)
panic(err) panic(err)
} }
partitionList, err := consumer.Partitions(kafkaTopic) consumer := Consumer{
ready: make(chan bool),
if err != nil {
fmt.Printf("Failed to get the list of partitions: %s", err)
panic(err)
} }
var bufferSize = 256 wg := &sync.WaitGroup{}
var ( wg.Add(1)
messages = make(chan *sarama.ConsumerMessage, bufferSize)
closing = make(chan struct{})
wg sync.WaitGroup
)
go func() { go func() {
signals := make(chan os.Signal, 1) defer wg.Done()
signal.Notify(signals, syscall.SIGTERM, os.Interrupt) for {
<-signals // `Consume` should be called inside an infinite loop, when a
fmt.Println("Initiating shutdown of consumer...") // server-side rebalance happens, the consumer session will need to be
close(closing) // recreated to get the new claims
}() if err := client.Consume(ctx, strings.Split(kafkaTopic, ","), &consumer); err != nil {
fmt.Printf("Error from consumer: %v", err)
for _, partition := range partitionList { panic(err)
pc, err := consumer.ConsumePartition(kafkaTopic, partition, sarama.OffsetOldest)
if err != nil {
fmt.Printf("Failed to start consumer for partition %d: %s\n", partition, err)
panic(err)
}
go func(pc sarama.PartitionConsumer) {
<-closing
pc.AsyncClose()
}(pc)
wg.Add(1)
go func(pc sarama.PartitionConsumer) {
defer wg.Done()
for message := range pc.Messages() {
messages <- message
} }
}(pc) // check if context was cancelled, signaling that the consumer should stop
} if ctx.Err() != nil {
return
go func() { }
for msg := range messages { consumer.ready = make(chan bool)
fmt.Printf("Partition:\t%d\n", msg.Partition)
fmt.Printf("Offset:\t%d\n", msg.Offset)
fmt.Printf("Key:\t%s\n", string(msg.Key))
fmt.Printf("Value:\t%s\n", string(msg.Value))
fmt.Println()
} }
}() }()
wg.Wait() <-consumer.ready // Await till the consumer has been set up
fmt.Println("Done consuming topic", kafkaTopic) fmt.Println("Sarama consumer up and running!...")
close(messages)
if err := consumer.Close(); err != nil { sigterm := make(chan os.Signal, 1)
fmt.Printf("Failed to close consumer: %s", err) signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
select {
case <-ctx.Done():
fmt.Println("terminating: context cancelled")
case <-sigterm:
fmt.Println("terminating: via signal")
}
cancel()
wg.Wait()
if err = client.Close(); err != nil {
fmt.Printf("Error closing client: %v", err)
panic(err) panic(err)
} }
}
// Consumer represents a Sarama consumer group consumer
type Consumer struct {
ready chan bool
}
// Setup is run at the beginning of a new session, before ConsumeClaim
func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
// Mark the consumer as ready
close(consumer.ready)
return nil
}
// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
return nil
}
// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
// NOTE:
// Do not move the code below to a goroutine.
// The `ConsumeClaim` itself is called within a goroutine, see:
// https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29
for msg := range claim.Messages() {
fmt.Printf("Partition:\t%d\n", msg.Partition)
fmt.Printf("Offset:\t%d\n", msg.Offset)
fmt.Printf("Key:\t%s\n", string(msg.Key))
fmt.Printf("Value:\t%s\n", string(msg.Value))
fmt.Printf("Topic:\t%s\n", msg.Topic)
fmt.Println()
session.MarkMessage(msg, "")
}
return nil
} }

251
messaging/kafka/consumer.md Normal file
View File

@ -0,0 +1,251 @@
# Consuming Data in Kafka
* [Start a Kafka environment](#Start-a-Kafka-environment)
* [Building a consumer in Go](#Building-a-consumer-in-Go)
* [Create a Kafka Topic](#Create-a-Kafka-Topic)
* [Starting our Kafka Consumer](#Starting-our-Kafka-Consumer)
## Start a Kafka environment
Let's start our Kafka components:
```
cd messaging/kafka
docker compose up -d zookeeper-1 kafka-1 kafka-2 kafka-3
#ensure its running!
docker ps
```
We'll need a Topic for our orders:
```
docker compose up -d kafka-producer
docker exec -it kafka-producer bash
/kafka/bin/kafka-topics.sh \
--create \
--zookeeper zookeeper-1:2181 \
--replication-factor 1 \
--partitions 3 \
--topic Orders
```
Now that we have our Kafka infrastructure, let's create our consumer.
## Building a consumer in Go
Example code: [Sarama Library](https://github.com/Shopify/sarama/blob/master/examples/consumergroup/main.go)
We start with `main.go`:
```
package main
import (
"context"
"fmt"
"os"
"os/signal"
"strings"
"sync"
"syscall"
"gopkg.in/Shopify/sarama.v1"
)
var kafkaBrokers = os.Getenv("KAFKA_PEERS")
var kafkaTopic = os.Getenv("KAFKA_TOPIC")
var kafkaVersion = os.Getenv("KAFKA_VERSION")
var kafkaGroup = os.Getenv("KAFKA_GROUP")
func main() {
}
```
Above we define our dependencies, and connection details for kafka. <br/>
Let's define our client config:
```
version, err := sarama.ParseKafkaVersion(kafkaVersion)
config := sarama.NewConfig()
config.Version = version
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
config.Consumer.Offsets.Initial = sarama.OffsetOldest
ctx, cancel := context.WithCancel(context.Background())
client, err := sarama.NewConsumerGroup(strings.Split(kafkaBrokers, ","), kafkaGroup, config)
if err != nil {
fmt.Printf("Failed to init Kafka consumer group: %s", err)
panic(err)
}
```
Now that we have a client and consumer group connection with Kafka,
we can proceed to define our consumer:
```
// in main.go
type Consumer struct {
ready chan bool
}
//in main()
consumer := Consumer{
ready: make(chan bool),
}
```
With the consumer we can now define a wait group which will consume messages
as they come in:
```
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for {
// `Consume` should be called inside an infinite loop, when a
// server-side rebalance happens, the consumer session will need to be
// recreated to get the new claims
if err := client.Consume(ctx, strings.Split(kafkaTopic, ","), &consumer); err != nil {
fmt.Printf("Error from consumer: %v", err)
panic(err)
}
// check if context was cancelled, signaling that the consumer should stop
if ctx.Err() != nil {
return
}
consumer.ready = make(chan bool)
}
}()
<-consumer.ready // Await till the consumer has been set up
fmt.Println("Sarama consumer up and running!...")
```
In case we need to exit, let's handle exit signals
```
sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
select {
case <-ctx.Done():
fmt.Println("terminating: context cancelled")
case <-sigterm:
fmt.Println("terminating: via signal")
}
cancel()
wg.Wait()
if err = client.Close(); err != nil {
fmt.Printf("Error closing client: %v", err)
panic(err)
}
```
In addition to the stuct we created, the Sarama library needs us to handle certain functions:
```
// Setup is run at the beginning of a new session, before ConsumeClaim
func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
// Mark the consumer as ready
close(consumer.ready)
return nil
}
// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
return nil
}
```
And finally a ConsumeClaim() to handle the messages coming in:
```
// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
// NOTE:
// Do not move the code below to a goroutine.
// The `ConsumeClaim` itself is called within a goroutine, see:
// https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29
for msg := range claim.Messages() {
fmt.Printf("Partition:\t%d\n", msg.Partition)
fmt.Printf("Offset:\t%d\n", msg.Offset)
fmt.Printf("Key:\t%s\n", string(msg.Key))
fmt.Printf("Value:\t%s\n", string(msg.Value))
fmt.Printf("Topic:\t%s\n", msg.Topic)
fmt.Println()
session.MarkMessage(msg, "")
}
return nil
}
```
That's it for the code, we can build it by using Docker compose <br/>
Let's define it as a new service in our `docker-compose.yaml` file
```
kafka-consumer-go:
container_name: kafka-consumer-go
image: aimvector/kafka-consumer-go:1.0.0
build:
context: ./applications/consumer
environment:
- "KAFKA_PEERS=kafka-1:9092,kafka-2:9092,kafka-3:9092"
- "KAFKA_TOPIC=Orders"
- "KAFKA_VERSION=2.7.0"
- "KAFKA_GROUP=orders"
networks:
- kafka
```
Now we can proceed to build it:
```
cd .\messaging\kafka\
docker compose build kafka-consumer-go
```
Now before we start it, we want to create a Kafka Topic.
## Create a Kafka Topic
Let's split the terminal and we'll create the topic from the producer. <br/>
You can create the topic from any container.
```
docker compose up -d kafka-producer
docker exec -it kafka-producer bash
```
Create the Topic for Orders:
```
/kafka/bin/kafka-topics.sh \
--create \
--zookeeper zookeeper-1:2181 \
--replication-factor 1 \
--partitions 3 \
--topic Orders
```
## Starting our Kafka Consumer
Now with the Topic ready, our consumer can start and subscribe to the orders topic:
```
cd messaging/kafka
```

View File

@ -7,6 +7,9 @@ services:
context: ./zookeeper context: ./zookeeper
volumes: volumes:
- ./config/zookeeper-1/zookeeper.properties:/kafka/config/zookeeper.properties - ./config/zookeeper-1/zookeeper.properties:/kafka/config/zookeeper.properties
- ./data/zookeeper-1/:/tmp/zookeeper/
networks:
- kafka
kafka-1: kafka-1:
container_name: kafka-1 container_name: kafka-1
image: aimvector/kafka:2.7.0 image: aimvector/kafka:2.7.0
@ -15,6 +18,8 @@ services:
volumes: volumes:
- ./config/kafka-1/server.properties:/kafka/config/server.properties - ./config/kafka-1/server.properties:/kafka/config/server.properties
- ./data/kafka-1/:/tmp/kafka-logs/ - ./data/kafka-1/:/tmp/kafka-logs/
networks:
- kafka
kafka-2: kafka-2:
container_name: kafka-2 container_name: kafka-2
image: aimvector/kafka:2.7.0 image: aimvector/kafka:2.7.0
@ -23,6 +28,8 @@ services:
volumes: volumes:
- ./config/kafka-2/server.properties:/kafka/config/server.properties - ./config/kafka-2/server.properties:/kafka/config/server.properties
- ./data/kafka-2/:/tmp/kafka-logs/ - ./data/kafka-2/:/tmp/kafka-logs/
networks:
- kafka
kafka-3: kafka-3:
container_name: kafka-3 container_name: kafka-3
image: aimvector/kafka:2.7.0 image: aimvector/kafka:2.7.0
@ -31,6 +38,8 @@ services:
volumes: volumes:
- ./config/kafka-3/server.properties:/kafka/config/server.properties - ./config/kafka-3/server.properties:/kafka/config/server.properties
- ./data/kafka-3/:/tmp/kafka-logs/ - ./data/kafka-3/:/tmp/kafka-logs/
networks:
- kafka
kafka-producer: kafka-producer:
container_name: kafka-producer container_name: kafka-producer
image: aimvector/kafka:2.7.0 image: aimvector/kafka:2.7.0
@ -40,6 +49,8 @@ services:
entrypoint: /bin/bash entrypoint: /bin/bash
stdin_open: true stdin_open: true
tty: true tty: true
networks:
- kafka
kafka-consumer: kafka-consumer:
container_name: kafka-consumer container_name: kafka-consumer
image: aimvector/kafka:2.7.0 image: aimvector/kafka:2.7.0
@ -48,4 +59,21 @@ services:
working_dir: /kafka working_dir: /kafka
entrypoint: /bin/bash entrypoint: /bin/bash
stdin_open: true stdin_open: true
tty: true tty: true
networks:
- kafka
kafka-consumer-go:
container_name: kafka-consumer-go
image: aimvector/kafka-consumer-go:1.0.0
build:
context: ./applications/consumer
environment:
- "KAFKA_PEERS=kafka-1:9092,kafka-2:9092,kafka-3:9092"
- "KAFKA_TOPIC=Orders"
- "KAFKA_VERSION=2.7.0"
- "KAFKA_GROUP=orders"
networks:
- kafka
networks:
kafka:
name: kafka