Merge pull request #88 from marcel-dempers/kafka-consumer

Kafka consumer
This commit is contained in:
Marcel Dempers 2021-08-09 21:49:08 +10:00 committed by GitHub
commit 7d7404a8ae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 529 additions and 112 deletions

View File

@ -194,7 +194,7 @@ cat /tmp/kafka-logs/Orders-2/*.log
So far we've taken a look at staring up Kafka and Zookeeper instances with
docker commands. </br>
We've explored the kafka configuration and how to produce and consume message. </br>
We've explored the kafka configuration and how to produce and consume messages. </br>
Let's put it all together in a docker compose file. </br>
With compose we'd like to be able to build our containers, pointing to a

View File

@ -1,92 +0,0 @@
package main
import (
"fmt"
"os"
"os/signal"
"strings"
"sync"
"syscall"
"gopkg.in/Shopify/sarama.v1"
)
var kafkaBrokers = os.Getenv("KAFKA_PEERS")
var kafkaTopic = os.Getenv("KAFKA_TOPIC")
var globalProducer sarama.SyncProducer
func main() {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Return.Successes = true
config.Producer.Partitioner = sarama.NewRandomPartitioner
consumer, err := sarama.NewConsumer(strings.Split(kafkaBrokers, ","), config)
if err != nil {
fmt.Printf("Failed to open Kafka consumer: %s", err)
panic(err)
}
partitionList, err := consumer.Partitions(kafkaTopic)
if err != nil {
fmt.Printf("Failed to get the list of partitions: %s", err)
panic(err)
}
var bufferSize = 256
var (
messages = make(chan *sarama.ConsumerMessage, bufferSize)
closing = make(chan struct{})
wg sync.WaitGroup
)
go func() {
signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGTERM, os.Interrupt)
<-signals
fmt.Println("Initiating shutdown of consumer...")
close(closing)
}()
for _, partition := range partitionList {
pc, err := consumer.ConsumePartition(kafkaTopic, partition, sarama.OffsetOldest)
if err != nil {
fmt.Printf("Failed to start consumer for partition %d: %s\n", partition, err)
panic(err)
}
go func(pc sarama.PartitionConsumer) {
<-closing
pc.AsyncClose()
}(pc)
wg.Add(1)
go func(pc sarama.PartitionConsumer) {
defer wg.Done()
for message := range pc.Messages() {
messages <- message
}
}(pc)
}
go func() {
for msg := range messages {
fmt.Printf("Partition:\t%d\n", msg.Partition)
fmt.Printf("Offset:\t%d\n", msg.Offset)
fmt.Printf("Key:\t%s\n", string(msg.Key))
fmt.Printf("Value:\t%s\n", string(msg.Value))
fmt.Println()
}
}()
wg.Wait()
fmt.Println("Done consuming topic", kafkaTopic)
close(messages)
if err := consumer.Close(); err != nil {
fmt.Printf("Failed to close consumer: %s", err)
panic(err)
}
}

View File

@ -1,17 +1,17 @@
module producer
module consumer
go 1.16
require (
github.com/DataDog/zstd v1.4.8 // indirect
github.com/Shopify/toxiproxy v2.1.4+incompatible // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/eapache/go-resiliency v1.2.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/golang/snappy v0.0.3 // indirect
github.com/julienschmidt/httprouter v1.3.0 // indirect
github.com/pierrec/lz4 v2.6.0+incompatible // indirect
github.com/frankban/quicktest v1.13.0 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/pierrec/lz4 v2.6.1+incompatible // indirect
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
github.com/sirupsen/logrus v1.8.1 // indirect
gopkg.in/Shopify/sarama.v1 v1.20.1 // indirect
gopkg.in/Shopify/sarama.v1 v1.20.1
)

View File

@ -1,5 +1,7 @@
github.com/DataDog/zstd v1.4.8 h1:Rpmta4xZ/MgZnriKNd24iZMhGpP5dvUcs/uqfBapKZY=
github.com/DataDog/zstd v1.4.8/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw=
github.com/Shopify/toxiproxy v2.1.4+incompatible h1:TKdv8HiTLgE5wdJuEML90aBgNWsokNbMijUGhmcoBJc=
github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/eapache/go-resiliency v1.2.0 h1:v7g92e/KSN71Rq7vSThKaWIq68fL4YHvWyiUKorFR1Q=
@ -8,19 +10,22 @@ github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc=
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
github.com/golang/snappy v0.0.3 h1:fHPg5GQYlCeLIPB9BZqMVR5nR9A+IM5zcgeTdjMYmLA=
github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/julienschmidt/httprouter v1.3.0 h1:U0609e9tgbseu3rBINet9P48AI/D3oJs4dN7jwJOQ1U=
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
github.com/pierrec/lz4 v2.6.0+incompatible h1:Ix9yFKn1nSPBLFl/yZknTp8TU5G4Ps0JDmguYK6iH1A=
github.com/pierrec/lz4 v2.6.0+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/frankban/quicktest v1.13.0 h1:yNZif1OkDfNoDfb9zZa9aXIpejNR4F23Wely0c+Qdqk=
github.com/frankban/quicktest v1.13.0/go.mod h1:qLE0fzW0VuyUAJgPU19zByoIr0HtCHN/r/VLSOOIySU=
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/pierrec/lz4 v2.6.1+incompatible h1:9UY3+iC23yxF0UfGaYrGplQ+79Rg+h/q9FV9ix19jjM=
github.com/pierrec/lz4 v2.6.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM=
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE=
github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037 h1:YyJpGZS1sBuBCzLAR1VEpK193GlqGZbnPFnPV/5Rsb4=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/Shopify/sarama.v1 v1.20.1 h1:Gi09A3fJXm0Jgt8kuKZ8YK+r60GfYn7MQuEmI3oq6hE=
gopkg.in/Shopify/sarama.v1 v1.20.1/go.mod h1:AxnvoaevB2nBjNK17cG61A3LleFcWFwVBHBt+cot4Oc=

View File

@ -0,0 +1,114 @@
package main
import (
"context"
"fmt"
"os"
"os/signal"
"strings"
"sync"
"syscall"
"gopkg.in/Shopify/sarama.v1"
)
var kafkaBrokers = os.Getenv("KAFKA_PEERS")
var kafkaTopic = os.Getenv("KAFKA_TOPIC")
var kafkaVersion = os.Getenv("KAFKA_VERSION")
var kafkaGroup = os.Getenv("KAFKA_GROUP")
type Consumer struct {
ready chan bool
}
func main() {
version, err := sarama.ParseKafkaVersion(kafkaVersion)
config := sarama.NewConfig()
config.Version = version
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
config.Consumer.Offsets.Initial = sarama.OffsetOldest
ctx, cancel := context.WithCancel(context.Background())
client, err := sarama.NewConsumerGroup(strings.Split(kafkaBrokers, ","), kafkaGroup, config)
if err != nil {
fmt.Printf("Failed to init Kafka consumer group: %s", err)
panic(err)
}
consumer := Consumer{
ready: make(chan bool),
}
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for {
// `Consume` should be called inside an infinite loop, when a
// server-side rebalance happens, the consumer session will need to be
// recreated to get the new claims
if err := client.Consume(ctx, strings.Split(kafkaTopic, ","), &consumer); err != nil {
fmt.Printf("Error from consumer: %v", err)
panic(err)
}
// check if context was cancelled, signaling that the consumer should stop
if ctx.Err() != nil {
return
}
consumer.ready = make(chan bool)
}
}()
<-consumer.ready // Await till the consumer has been set up
fmt.Println("Sarama consumer up and running!...")
sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
select {
case <-ctx.Done():
fmt.Println("terminating: context cancelled")
case <-sigterm:
fmt.Println("terminating: via signal")
}
cancel()
wg.Wait()
if err = client.Close(); err != nil {
fmt.Printf("Error closing client: %v", err)
panic(err)
}
}
// Setup is run at the beginning of a new session, before ConsumeClaim
func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
// Mark the consumer as ready
close(consumer.ready)
return nil
}
// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
return nil
}
// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
// NOTE:
// Do not move the code below to a goroutine.
// The `ConsumeClaim` itself is called within a goroutine, see:
// https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29
for msg := range claim.Messages() {
fmt.Printf("Partition:\t%d\n", msg.Partition)
fmt.Printf("Offset:\t%d\n", msg.Offset)
fmt.Printf("Key:\t%s\n", string(msg.Key))
fmt.Printf("Value:\t%s\n", string(msg.Value))
fmt.Printf("Topic:\t%s\n", msg.Topic)
fmt.Println()
session.MarkMessage(msg, "")
}
return nil
}

362
messaging/kafka/consumer.md Normal file
View File

@ -0,0 +1,362 @@
# Consuming Data in Kafka
* [Start a Kafka environment](#Start-a-Kafka-environment)
* [Building a consumer in Go](#Building-a-consumer-in-Go)
* [Starting our Kafka Consumer](#Starting-our-Kafka-Consumer)
* [Consume Messages in Random Order](#Consume-Messages-in-Random-Order)
* [Consume Messages in Order](#Consume-Messages-in-Order)
## Start a Kafka environment
Let's start our Kafka components:
```
cd messaging/kafka
#only start the kafka containers, not everything!
docker compose up zookeeper-1 kafka-1 kafka-2 kafka-3
#ensure its running!
docker ps
```
### Create a Topic: Orders
To create a topic, we can `exec` into any container on our kafka network and create it. </br>
We'll need a Topic for our orders:
```
docker exec -it zookeeper-1 bash
# create
/kafka/bin/kafka-topics.sh \
--create \
--zookeeper zookeeper-1:2181 \
--replication-factor 1 \
--partitions 3 \
--topic Orders
# describe
/kafka/bin/kafka-topics.sh \
--describe \
--zookeeper zookeeper-1:2181 \
--topic Orders
exit
```
Now that we have our Kafka infrastructure, let's create our consumer.
## Building a consumer in Go
Example code: [Sarama Library](https://github.com/Shopify/sarama/blob/master/examples/consumergroup/main.go)
We start with `main.go`:
```
package main
import (
"context"
"fmt"
"os"
"os/signal"
"strings"
"sync"
"syscall"
"gopkg.in/Shopify/sarama.v1"
)
var kafkaBrokers = os.Getenv("KAFKA_PEERS")
var kafkaTopic = os.Getenv("KAFKA_TOPIC")
var kafkaVersion = os.Getenv("KAFKA_VERSION")
var kafkaGroup = os.Getenv("KAFKA_GROUP")
func main() {
}
```
Above we define our dependencies, and connection details for kafka. <br/>
Let's define our client config:
```
version, err := sarama.ParseKafkaVersion(kafkaVersion)
config := sarama.NewConfig()
config.Version = version
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
config.Consumer.Offsets.Initial = sarama.OffsetOldest
ctx, cancel := context.WithCancel(context.Background())
client, err := sarama.NewConsumerGroup(strings.Split(kafkaBrokers, ","), kafkaGroup, config)
if err != nil {
fmt.Printf("Failed to init Kafka consumer group: %s", err)
panic(err)
}
```
Now that we have a client and consumer group connection with Kafka,
we can proceed to define our consumer:
```
// in main.go
type Consumer struct {
ready chan bool
}
//in main()
consumer := Consumer{
ready: make(chan bool),
}
```
With the consumer we can now define a wait group which will consume messages
as they come in:
```
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for {
// `Consume` should be called inside an infinite loop, when a
// server-side rebalance happens, the consumer session will need to be
// recreated to get the new claims
if err := client.Consume(ctx, strings.Split(kafkaTopic, ","), &consumer); err != nil {
fmt.Printf("Error from consumer: %v", err)
panic(err)
}
// check if context was cancelled, signaling that the consumer should stop
if ctx.Err() != nil {
return
}
consumer.ready = make(chan bool)
}
}()
<-consumer.ready // Await till the consumer has been set up
fmt.Println("Sarama consumer up and running!...")
```
In case we need to exit, let's handle exit signals
```
sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
select {
case <-ctx.Done():
fmt.Println("terminating: context cancelled")
case <-sigterm:
fmt.Println("terminating: via signal")
}
cancel()
wg.Wait()
if err = client.Close(); err != nil {
fmt.Printf("Error closing client: %v", err)
panic(err)
}
```
In addition to the stuct we created, the Sarama library needs us to handle certain functions:
```
// Setup is run at the beginning of a new session, before ConsumeClaim
func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
// Mark the consumer as ready
close(consumer.ready)
return nil
}
// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
return nil
}
```
And finally a ConsumeClaim() to handle the messages coming in:
```
// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
// NOTE:
// Do not move the code below to a goroutine.
// The `ConsumeClaim` itself is called within a goroutine, see:
// https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29
for msg := range claim.Messages() {
fmt.Printf("Partition:\t%d\n", msg.Partition)
fmt.Printf("Offset:\t%d\n", msg.Offset)
fmt.Printf("Key:\t%s\n", string(msg.Key))
fmt.Printf("Value:\t%s\n", string(msg.Value))
fmt.Printf("Topic:\t%s\n", msg.Topic)
fmt.Println()
session.MarkMessage(msg, "")
}
return nil
}
```
That's it for the code, we can build it by using Docker compose <br/>
Let's define it as a new service in our `docker-compose.yaml` file
```
kafka-consumer-go:
container_name: kafka-consumer-go
image: aimvector/kafka-consumer-go:1.0.0
build:
context: ./applications/consumer
environment:
- "KAFKA_PEERS=kafka-1:9092,kafka-2:9092,kafka-3:9092"
- "KAFKA_TOPIC=Orders"
- "KAFKA_VERSION=2.7.0"
- "KAFKA_GROUP=orders"
networks:
- kafka
```
To build our consumer, we'll need a `dockerfile`:
```
FROM golang:1.16-alpine as dev-env
RUN apk add --no-cache git gcc musl-dev
WORKDIR /app
```
Let's build our container so we can compile our `go` code:
```
cd .\messaging\kafka\applications\consumer\
docker build . -t consumer
docker run -it -v ${PWD}:/app consumer sh
ls -l
go mod init consumer
go mod tidy
go build
```
Now that we have our `go.mod` and `go.sum` files, and code is comiling, we can proceed to create a container that runs our app. Let's extend the `dockerfile`
```
FROM dev-env as build-env
COPY go.mod /go.sum /app/
RUN go mod download
COPY . /app/
RUN CGO_ENABLED=0 go build -o /consumer
FROM alpine:3.10 as runtime
COPY --from=build-env /consumer /usr/local/bin/consumer
RUN chmod +x /usr/local/bin/consumer
ENTRYPOINT ["consumer"]
```
Now we can proceed to build it:
```
cd .\messaging\kafka\
docker compose build kafka-consumer-go
```
## Starting our Kafka Consumer
Now with our Kafka environment and topic ready, our consumer can start and subscribe to the orders topic:
```
cd messaging/kafka
docker compose up kafka-consumer-go
```
## Consume Messages in Random Order
Let's produce messages in a loop
```
cd .\messaging\kafka\
docker compose up -d kafka-producer
docker exec -it kafka-producer bash
```
Produce messages in a loop:
```
upperlim=10
for ((i=0; i<=upperlim; i++)); do
echo "{ 'id' : 'order-$i', 'data' : 'random-data'}" | \
/kafka/bin/kafka-console-producer.sh \
--broker-list kafka-1:9092,kafka-2:9092,kafka-3:9092 \
--topic Orders > /dev/null
done
```
<p>
Notice messages go into partition in round robin.
Partition provides distribution of messages so all our messages
don't take up all the space on a single broker and distributed
more evenly
</p>
<p>
Notice when we stop and restart out consumer, it does not
re-consume message from the beginning.
</p>
<p>
This is because the brokers remember the offset where the consumer
has read up to in our case as we use consumer groups.
</p>
Reading messages will occur in random order because they exist in different partitions:
```
cd .\messaging\kafka\
docker compose up -d kafka-consumer
docker exec -it kafka-consumer bash
/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server kafka-1:9092,kafka-2:9092,kafka-3:9092 \
--topic Orders --from-beginning
```
Notice we receive messages out of order and also able to read
messages from the beginning too.
## Consume Messages in Order
To consume messages in given order, we need to use message keys.
When using keys, Kafka will place all messages with a given key in the same
partition.
This is useful for event based transactional systems where order is important.
Let's say we have `order-11` and we want to update it 10 times.
Order here is important so we'l structure our message with a key.
```
upperlim=10
for ((i=0; i<=upperlim; i++)); do
echo "order-11: { 'id' : 'order-11', 'data' : '$i'}" | \
/kafka/bin/kafka-console-producer.sh \
--broker-list kafka-1:9092,kafka-2:9092,kafka-3:9092 \
--topic Orders > /dev/null \
--property "parse.key=true" --property "key.separator=:"
done
```

View File

@ -7,6 +7,9 @@ services:
context: ./zookeeper
volumes:
- ./config/zookeeper-1/zookeeper.properties:/kafka/config/zookeeper.properties
- ./data/zookeeper-1/:/tmp/zookeeper/
networks:
- kafka
kafka-1:
container_name: kafka-1
image: aimvector/kafka:2.7.0
@ -15,6 +18,8 @@ services:
volumes:
- ./config/kafka-1/server.properties:/kafka/config/server.properties
- ./data/kafka-1/:/tmp/kafka-logs/
networks:
- kafka
kafka-2:
container_name: kafka-2
image: aimvector/kafka:2.7.0
@ -23,6 +28,8 @@ services:
volumes:
- ./config/kafka-2/server.properties:/kafka/config/server.properties
- ./data/kafka-2/:/tmp/kafka-logs/
networks:
- kafka
kafka-3:
container_name: kafka-3
image: aimvector/kafka:2.7.0
@ -31,6 +38,8 @@ services:
volumes:
- ./config/kafka-3/server.properties:/kafka/config/server.properties
- ./data/kafka-3/:/tmp/kafka-logs/
networks:
- kafka
kafka-producer:
container_name: kafka-producer
image: aimvector/kafka:2.7.0
@ -40,6 +49,8 @@ services:
entrypoint: /bin/bash
stdin_open: true
tty: true
networks:
- kafka
kafka-consumer:
container_name: kafka-consumer
image: aimvector/kafka:2.7.0
@ -48,4 +59,21 @@ services:
working_dir: /kafka
entrypoint: /bin/bash
stdin_open: true
tty: true
tty: true
networks:
- kafka
kafka-consumer-go:
container_name: kafka-consumer-go
image: aimvector/kafka-consumer-go:1.0.0
build:
context: ./applications/consumer
environment:
- "KAFKA_PEERS=kafka-1:9092,kafka-2:9092,kafka-3:9092"
- "KAFKA_TOPIC=Orders"
- "KAFKA_VERSION=2.7.0"
- "KAFKA_GROUP=orders"
networks:
- kafka
networks:
kafka:
name: kafka