publish+consumer

This commit is contained in:
marcel-dempers 2020-07-24 15:59:33 +10:00
parent 9f53661cfc
commit f62a79be33
4 changed files with 198 additions and 0 deletions

View File

@ -0,0 +1,83 @@
package main
import (
"fmt"
log "github.com/sirupsen/logrus"
"github.com/streadway/amqp"
"os"
)
var rabbit_host = os.Getenv("RABBIT_HOST")
var rabbit_port = os.Getenv("RABBIT_PORT")
var rabbit_user = os.Getenv("RABBIT_USERNAME")
var rabbit_password = os.Getenv("RABBIT_PASSWORD")
func main() {
consume()
}
func consume() {
conn, err := amqp.Dial("amqp://" + rabbit_user + ":" +rabbit_password + "@" + rabbit_host + ":" + rabbit_port +"/")
if err != nil {
log.Fatalf("%s: %s", "Failed to connect to RabbitMQ", err)
}
ch, err := conn.Channel()
if err != nil {
log.Fatalf("%s: %s", "Failed to open a channel", err)
}
q, err := ch.QueueDeclare(
"publisher", // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
log.Fatalf("%s: %s", "Failed to declare a queue", err)
}
fmt.Println("Channel and Queue established")
defer conn.Close()
defer ch.Close()
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
log.Fatalf("%s: %s", "Failed to register consumer", err)
}
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
d.Ack(false)
}
}()
fmt.Println("Running...")
<-forever
}

View File

@ -0,0 +1,19 @@
FROM golang:1.14-alpine as build
RUN apk add --no-cache git
WORKDIR /src
RUN go get github.com/sirupsen/logrus
RUN go get github.com/streadway/amqp
COPY consumer.go /src
RUN go build consumer.go
FROM alpine as runtime
COPY --from=build /src/consumer /app/consumer
CMD [ "/app/consumer" ]

View File

@ -0,0 +1,20 @@
FROM golang:1.14-alpine as build
RUN apk add --no-cache git
WORKDIR /src
RUN go get github.com/julienschmidt/httprouter
RUN go get github.com/sirupsen/logrus
RUN go get github.com/streadway/amqp
COPY publisher.go /src
RUN go build publisher.go
FROM alpine as runtime
COPY --from=build /src/publisher /app/publisher
CMD [ "/app/publisher" ]

View File

@ -0,0 +1,76 @@
package main
import (
"fmt"
"net/http"
"github.com/julienschmidt/httprouter"
log "github.com/sirupsen/logrus"
"github.com/streadway/amqp"
"os"
)
var rabbit_host = os.Getenv("RABBIT_HOST")
var rabbit_port = os.Getenv("RABBIT_PORT")
var rabbit_user = os.Getenv("RABBIT_USERNAME")
var rabbit_password = os.Getenv("RABBIT_PASSWORD")
func main() {
router := httprouter.New()
router.POST("/publish/:message", func(w http.ResponseWriter, r *http.Request, p httprouter.Params){
submit(w,r,p)
})
fmt.Println("Running...")
log.Fatal(http.ListenAndServe(":80", router))
}
func submit(writer http.ResponseWriter, request *http.Request, p httprouter.Params) {
message := p.ByName("message")
fmt.Println("Received message: " + message)
conn, err := amqp.Dial("amqp://" + rabbit_user + ":" +rabbit_password + "@" + rabbit_host + ":" + rabbit_port +"/")
if err != nil {
log.Fatalf("%s: %s", "Failed to connect to RabbitMQ", err)
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
log.Fatalf("%s: %s", "Failed to open a channel", err)
}
defer ch.Close()
q, err := ch.QueueDeclare(
"publisher", // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
log.Fatalf("%s: %s", "Failed to declare a queue", err)
}
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing {
ContentType: "text/plain",
Body: []byte(message),
})
if err != nil {
log.Fatalf("%s: %s", "Failed to publish a message", err)
}
fmt.Println("publish success!")
}