From 81476f540d93c9d53a790477b44ac2f2954b278c Mon Sep 17 00:00:00 2001 From: marcel-dempers Date: Tue, 15 Jun 2021 20:27:23 +1000 Subject: [PATCH] consumer wip --- messaging/kafka/README.md | 2 +- messaging/kafka/applications/consumer/go.mod | 10 +- messaging/kafka/applications/consumer/go.sum | 25 +-- .../consumer/{consumer.go => main.go} | 17 +- messaging/kafka/consumer.md | 167 +++++++++++++++--- 5 files changed, 169 insertions(+), 52 deletions(-) rename messaging/kafka/applications/consumer/{consumer.go => main.go} (91%) diff --git a/messaging/kafka/README.md b/messaging/kafka/README.md index d3dba19..d5d8017 100644 --- a/messaging/kafka/README.md +++ b/messaging/kafka/README.md @@ -194,7 +194,7 @@ cat /tmp/kafka-logs/Orders-2/*.log So far we've taken a look at staring up Kafka and Zookeeper instances with docker commands.
-We've explored the kafka configuration and how to produce and consume message.
+We've explored the kafka configuration and how to produce and consume messages.
Let's put it all together in a docker compose file.
With compose we'd like to be able to build our containers, pointing to a diff --git a/messaging/kafka/applications/consumer/go.mod b/messaging/kafka/applications/consumer/go.mod index 77b4e70..4ff9833 100644 --- a/messaging/kafka/applications/consumer/go.mod +++ b/messaging/kafka/applications/consumer/go.mod @@ -1,17 +1,17 @@ -module producer +module consumer go 1.16 require ( github.com/DataDog/zstd v1.4.8 // indirect + github.com/Shopify/toxiproxy v2.1.4+incompatible // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/eapache/go-resiliency v1.2.0 // indirect github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect github.com/eapache/queue v1.1.0 // indirect + github.com/frankban/quicktest v1.13.0 // indirect github.com/golang/snappy v0.0.3 // indirect - github.com/julienschmidt/httprouter v1.3.0 // indirect - github.com/pierrec/lz4 v2.6.0+incompatible // indirect + github.com/pierrec/lz4 v2.6.1+incompatible // indirect github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect - github.com/sirupsen/logrus v1.8.1 // indirect - gopkg.in/Shopify/sarama.v1 v1.20.1 // indirect + gopkg.in/Shopify/sarama.v1 v1.20.1 ) diff --git a/messaging/kafka/applications/consumer/go.sum b/messaging/kafka/applications/consumer/go.sum index 8adf821..bf18cbf 100644 --- a/messaging/kafka/applications/consumer/go.sum +++ b/messaging/kafka/applications/consumer/go.sum @@ -1,5 +1,7 @@ github.com/DataDog/zstd v1.4.8 h1:Rpmta4xZ/MgZnriKNd24iZMhGpP5dvUcs/uqfBapKZY= github.com/DataDog/zstd v1.4.8/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw= +github.com/Shopify/toxiproxy v2.1.4+incompatible h1:TKdv8HiTLgE5wdJuEML90aBgNWsokNbMijUGhmcoBJc= +github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/eapache/go-resiliency v1.2.0 h1:v7g92e/KSN71Rq7vSThKaWIq68fL4YHvWyiUKorFR1Q= @@ -8,19 +10,22 @@ github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8 github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= +github.com/frankban/quicktest v1.13.0 h1:yNZif1OkDfNoDfb9zZa9aXIpejNR4F23Wely0c+Qdqk= +github.com/frankban/quicktest v1.13.0/go.mod h1:qLE0fzW0VuyUAJgPU19zByoIr0HtCHN/r/VLSOOIySU= github.com/golang/snappy v0.0.3 h1:fHPg5GQYlCeLIPB9BZqMVR5nR9A+IM5zcgeTdjMYmLA= github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= -github.com/julienschmidt/httprouter v1.3.0 h1:U0609e9tgbseu3rBINet9P48AI/D3oJs4dN7jwJOQ1U= -github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= -github.com/pierrec/lz4 v2.6.0+incompatible h1:Ix9yFKn1nSPBLFl/yZknTp8TU5G4Ps0JDmguYK6iH1A= -github.com/pierrec/lz4 v2.6.0+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= -github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI= +github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/pierrec/lz4 v2.6.1+incompatible h1:9UY3+iC23yxF0UfGaYrGplQ+79Rg+h/q9FV9ix19jjM= +github.com/pierrec/lz4 v2.6.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= -github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE= -github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= -github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= -golang.org/x/sys v0.0.0-20191026070338-33540a1f6037 h1:YyJpGZS1sBuBCzLAR1VEpK193GlqGZbnPFnPV/5Rsb4= -golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/Shopify/sarama.v1 v1.20.1 h1:Gi09A3fJXm0Jgt8kuKZ8YK+r60GfYn7MQuEmI3oq6hE= gopkg.in/Shopify/sarama.v1 v1.20.1/go.mod h1:AxnvoaevB2nBjNK17cG61A3LleFcWFwVBHBt+cot4Oc= diff --git a/messaging/kafka/applications/consumer/consumer.go b/messaging/kafka/applications/consumer/main.go similarity index 91% rename from messaging/kafka/applications/consumer/consumer.go rename to messaging/kafka/applications/consumer/main.go index e534bab..e765771 100644 --- a/messaging/kafka/applications/consumer/consumer.go +++ b/messaging/kafka/applications/consumer/main.go @@ -16,13 +16,15 @@ var kafkaTopic = os.Getenv("KAFKA_TOPIC") var kafkaVersion = os.Getenv("KAFKA_VERSION") var kafkaGroup = os.Getenv("KAFKA_GROUP") + func main() { + version, err := sarama.ParseKafkaVersion(kafkaVersion) config := sarama.NewConfig() config.Version = version config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin config.Consumer.Offsets.Initial = sarama.OffsetOldest - + ctx, cancel := context.WithCancel(context.Background()) client, err := sarama.NewConsumerGroup(strings.Split(kafkaBrokers, ","), kafkaGroup, config) @@ -30,7 +32,7 @@ func main() { fmt.Printf("Failed to init Kafka consumer group: %s", err) panic(err) } - + consumer := Consumer{ ready: make(chan bool), } @@ -54,9 +56,8 @@ func main() { consumer.ready = make(chan bool) } }() - - <-consumer.ready // Await till the consumer has been set up - fmt.Println("Sarama consumer up and running!...") + <-consumer.ready // Await till the consumer has been set up + fmt.Println("Sarama consumer up and running!...") sigterm := make(chan os.Signal, 1) signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM) @@ -68,6 +69,7 @@ func main() { } cancel() wg.Wait() + if err = client.Close(); err != nil { fmt.Printf("Error closing client: %v", err) panic(err) @@ -75,11 +77,11 @@ func main() { } -// Consumer represents a Sarama consumer group consumer type Consumer struct { ready chan bool } + // Setup is run at the beginning of a new session, before ConsumeClaim func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error { // Mark the consumer as ready @@ -111,5 +113,4 @@ func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, clai } return nil -} - +} \ No newline at end of file diff --git a/messaging/kafka/consumer.md b/messaging/kafka/consumer.md index 2f97f56..5ff00da 100644 --- a/messaging/kafka/consumer.md +++ b/messaging/kafka/consumer.md @@ -2,8 +2,9 @@ * [Start a Kafka environment](#Start-a-Kafka-environment) * [Building a consumer in Go](#Building-a-consumer-in-Go) -* [Create a Kafka Topic](#Create-a-Kafka-Topic) * [Starting our Kafka Consumer](#Starting-our-Kafka-Consumer) +* [Consume Messages in Random Order](#Consume-Messages-in-Random-Order) +* [Consume Messages in Order](#Consume-Messages-in-Order) ## Start a Kafka environment @@ -12,24 +13,36 @@ Let's start our Kafka components: ``` cd messaging/kafka +#only start the kafka containers, not everything! docker compose up -d zookeeper-1 kafka-1 kafka-2 kafka-3 #ensure its running! docker ps ``` +### Create a Topic: Orders + +To create a topic, we can `exec` into any container on our kafka network and create it.
We'll need a Topic for our orders: ``` -docker compose up -d kafka-producer -docker exec -it kafka-producer bash +docker exec -it zookeeper-1 bash +# create /kafka/bin/kafka-topics.sh \ --create \ --zookeeper zookeeper-1:2181 \ --replication-factor 1 \ --partitions 3 \ --topic Orders + +# describe +/kafka/bin/kafka-topics.sh \ +--describe \ +--zookeeper zookeeper-1:2181 \ +--topic Orders + +exit ``` Now that we have our Kafka infrastructure, let's create our consumer. @@ -209,6 +222,50 @@ kafka-consumer-go: ``` +To build our consumer, we'll need a `dockerfile`: + +``` +FROM golang:1.16-alpine as dev-env + +RUN apk add --no-cache git gcc musl-dev + +WORKDIR /app + +``` + +Let's build our container so we can compile our `go` code: + +``` +cd .\messaging\kafka\applications\consumer\ +docker build . -t consumer +docker run -it -v ${PWD}:/app consumer sh +ls -l + +go mod init consumer +go mod tidy + +go build +``` + +Now that we have our `go.mod` and `go.sum` files, and code is comiling, we can proceed to create a container that runs our app. Let's extend the `dockerfile` + +``` +FROM dev-env as build-env +COPY go.mod /go.sum /app/ +RUN go mod download + +COPY . /app/ + +RUN CGO_ENABLED=0 go build -o /consumer + +FROM alpine:3.10 as runtime + +COPY --from=build-env /consumer /usr/local/bin/consumer +RUN chmod +x /usr/local/bin/consumer + +ENTRYPOINT ["consumer"] +``` + Now we can proceed to build it: ``` @@ -216,36 +273,90 @@ cd .\messaging\kafka\ docker compose build kafka-consumer-go ``` -Now before we start it, we want to create a Kafka Topic. - -## Create a Kafka Topic - -Let's split the terminal and we'll create the topic from the producer.
-You can create the topic from any container. - -``` -docker compose up -d kafka-producer -docker exec -it kafka-producer bash - -``` - -Create the Topic for Orders: - -``` -/kafka/bin/kafka-topics.sh \ ---create \ ---zookeeper zookeeper-1:2181 \ ---replication-factor 1 \ ---partitions 3 \ ---topic Orders -``` - ## Starting our Kafka Consumer -Now with the Topic ready, our consumer can start and subscribe to the orders topic: +Now with our Kafka environment and topic ready, our consumer can start and subscribe to the orders topic: ``` cd messaging/kafka +docker compose up kafka-consumer-go +``` + +## Consume Messages in Random Order + +Let's produce messages in a loop + +``` +cd .\messaging\kafka\ +docker compose up -d kafka-producer +docker exec -it kafka-producer bash +``` + +Produce messages in a loop: + +``` +upperlim=10 +for ((i=0; i<=upperlim; i++)); do + echo "{ 'id' : 'order-$i', 'data' : 'random-data'}" | \ + /kafka/bin/kafka-console-producer.sh \ + --broker-list kafka-1:9092,kafka-2:9092,kafka-3:9092 \ + --topic Orders > /dev/null +done +``` + +

+Notice messages go into partition in round robin. +Partition provides distribution of messages so all our messages +don't take up all the space on a single broker and distributed +more evenly +

+ +

+Notice when we stop and restart out consumer, it does not +re-consume message from the beginning. +

+ +

+This is because the brokers remember the offset where the consumer +has read up to in our case as we use consumer groups. +

+ +Reading messages will occur in random order because they exist in different partitions: + +``` +cd .\messaging\kafka\ +docker compose up -d kafka-consumer +docker exec -it kafka-consumer bash + +/kafka/bin/kafka-console-consumer.sh \ +--bootstrap-server kafka-1:9092,kafka-2:9092,kafka-3:9092 \ +--topic Orders --from-beginning + + +``` + +Notice we receive messages out of order and also able to read +messages from the beginning too. + +## Consume Messages in Order + +To consume messages in given order, we need to use message keys. +When using keys, Kafka will place all messages with a given key in the same +partition. +This is useful for event based transactional systems where order is important. + +Let's say we have `order-11` and we want to update it 10 times. +Order here is important so we'l structure our message with a key. + +``` +upperlim=10 +for ((i=0; i<=upperlim; i++)); do + echo "order-11: { 'id' : 'order-11', 'data' : '$i'}" | \ + /kafka/bin/kafka-console-producer.sh \ + --broker-list kafka-1:9092,kafka-2:9092,kafka-3:9092 \ + --topic Orders > /dev/null \ + --property "parse.key=true" --property "key.separator=:" +done ``` \ No newline at end of file