From fd94ebd34173e693b744acb23962c70962288e65 Mon Sep 17 00:00:00 2001 From: marcel-dempers Date: Sun, 11 Jul 2021 18:54:06 +1000 Subject: [PATCH] consumer guide updates --- messaging/kafka/applications/consumer/go.mod | 2 +- messaging/kafka/applications/consumer/go.sum | 4 ++-- messaging/kafka/applications/consumer/main.go | 18 ++++++++---------- messaging/kafka/consumer.md | 2 +- 4 files changed, 12 insertions(+), 14 deletions(-) diff --git a/messaging/kafka/applications/consumer/go.mod b/messaging/kafka/applications/consumer/go.mod index 4ff9833..a3014f5 100644 --- a/messaging/kafka/applications/consumer/go.mod +++ b/messaging/kafka/applications/consumer/go.mod @@ -10,7 +10,7 @@ require ( github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect github.com/eapache/queue v1.1.0 // indirect github.com/frankban/quicktest v1.13.0 // indirect - github.com/golang/snappy v0.0.3 // indirect + github.com/golang/snappy v0.0.4 // indirect github.com/pierrec/lz4 v2.6.1+incompatible // indirect github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect gopkg.in/Shopify/sarama.v1 v1.20.1 diff --git a/messaging/kafka/applications/consumer/go.sum b/messaging/kafka/applications/consumer/go.sum index bf18cbf..3cd2fb7 100644 --- a/messaging/kafka/applications/consumer/go.sum +++ b/messaging/kafka/applications/consumer/go.sum @@ -12,8 +12,8 @@ github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= github.com/frankban/quicktest v1.13.0 h1:yNZif1OkDfNoDfb9zZa9aXIpejNR4F23Wely0c+Qdqk= github.com/frankban/quicktest v1.13.0/go.mod h1:qLE0fzW0VuyUAJgPU19zByoIr0HtCHN/r/VLSOOIySU= -github.com/golang/snappy v0.0.3 h1:fHPg5GQYlCeLIPB9BZqMVR5nR9A+IM5zcgeTdjMYmLA= -github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI= diff --git a/messaging/kafka/applications/consumer/main.go b/messaging/kafka/applications/consumer/main.go index e765771..81fd14c 100644 --- a/messaging/kafka/applications/consumer/main.go +++ b/messaging/kafka/applications/consumer/main.go @@ -16,23 +16,26 @@ var kafkaTopic = os.Getenv("KAFKA_TOPIC") var kafkaVersion = os.Getenv("KAFKA_VERSION") var kafkaGroup = os.Getenv("KAFKA_GROUP") +type Consumer struct { + ready chan bool +} func main() { - version, err := sarama.ParseKafkaVersion(kafkaVersion) + version, err := sarama.ParseKafkaVersion(kafkaVersion) config := sarama.NewConfig() config.Version = version config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin config.Consumer.Offsets.Initial = sarama.OffsetOldest - - ctx, cancel := context.WithCancel(context.Background()) + + ctx, cancel := context.WithCancel(context.Background()) client, err := sarama.NewConsumerGroup(strings.Split(kafkaBrokers, ","), kafkaGroup, config) - if err != nil { + if err != nil { fmt.Printf("Failed to init Kafka consumer group: %s", err) panic(err) } - + consumer := Consumer{ ready: make(chan bool), } @@ -77,11 +80,6 @@ func main() { } -type Consumer struct { - ready chan bool -} - - // Setup is run at the beginning of a new session, before ConsumeClaim func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error { // Mark the consumer as ready diff --git a/messaging/kafka/consumer.md b/messaging/kafka/consumer.md index 5ff00da..13cfde6 100644 --- a/messaging/kafka/consumer.md +++ b/messaging/kafka/consumer.md @@ -14,7 +14,7 @@ Let's start our Kafka components: cd messaging/kafka #only start the kafka containers, not everything! -docker compose up -d zookeeper-1 kafka-1 kafka-2 kafka-3 +docker compose up zookeeper-1 kafka-1 kafka-2 kafka-3 #ensure its running! docker ps