diff --git a/messaging/rabbitmq/applications/consumer/consumer.go b/messaging/rabbitmq/applications/consumer/consumer.go new file mode 100644 index 0000000..1273862 --- /dev/null +++ b/messaging/rabbitmq/applications/consumer/consumer.go @@ -0,0 +1,78 @@ +package main + + +import ( + "fmt" + log "github.com/sirupsen/logrus" + "github.com/streadway/amqp" + "os" +) + +var rabbit_host = os.Getenv("RABBIT_HOST") +var rabbit_port = os.Getenv("RABBIT_PORT") +var rabbit_user = os.Getenv("RABBIT_USERNAME") +var rabbit_password = os.Getenv("RABBIT_PASSWORD") + +func main() { + consume() +} + +func consume() { + + conn, err := amqp.Dial("amqp://" + rabbit_user + ":" +rabbit_password + "@" + rabbit_host + ":" + rabbit_port +"/") + + if err != nil { + log.Fatalf("%s: %s", "Failed to connect to RabbitMQ", err) + } + + ch, err := conn.Channel() + + if err != nil { + log.Fatalf("%s: %s", "Failed to open a channel", err) + } + + q, err := ch.QueueDeclare( + "publisher", // name + false, // durable + false, // delete when unused + false, // exclusive + false, // no-wait + nil, // arguments + ) + + if err != nil { + log.Fatalf("%s: %s", "Failed to declare a queue", err) + } + + fmt.Println("Channel and Queue established") + + defer conn.Close() + defer ch.Close() + + msgs, err := ch.Consume( + q.Name, // queue + "", // consumer + false, // auto-ack + false, // exclusive + false, // no-local + false, // no-wait + nil, // args + ) + + if err != nil { + log.Fatalf("%s: %s", "Failed to register consumer", err) + } + + forever := make(chan bool) + + go func() { + for d := range msgs { + log.Printf("Received a message: %s", d.Body) + + d.Ack(false) + } + }() + + fmt.Println("Running...") + <-forever +} \ No newline at end of file diff --git a/messaging/rabbitmq/applications/consumer/dockerfile b/messaging/rabbitmq/applications/consumer/dockerfile new file mode 100644 index 0000000..b63b85d --- /dev/null +++ b/messaging/rabbitmq/applications/consumer/dockerfile @@ -0,0 +1,19 @@ +FROM golang:1.14-alpine as build + +RUN apk add --no-cache git + +WORKDIR /src + +RUN go get github.com/sirupsen/logrus +RUN go get github.com/streadway/amqp + +COPY consumer.go /src + +RUN go build consumer.go + + +FROM alpine as runtime + +COPY --from=build /src/consumer /app/consumer + +CMD [ "/app/consumer" ] \ No newline at end of file diff --git a/messaging/rabbitmq/applications/publisher/deployment.yaml b/messaging/rabbitmq/applications/publisher/deployment.yaml new file mode 100644 index 0000000..aa4a940 --- /dev/null +++ b/messaging/rabbitmq/applications/publisher/deployment.yaml @@ -0,0 +1,62 @@ +apiVersion: v1 +kind: Secret +metadata: + name: rabbitmq-publisher +type: Opaque +data: + RABBIT_USERNAME: Z3Vlc3Q= + RABBIT_PASSWORD: Z3Vlc3Q= +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: rabbitmq-publisher + labels: + app: rabbitmq-publisher +spec: + selector: + matchLabels: + app: rabbitmq-publisher + replicas: 1 + template: + metadata: + labels: + app: rabbitmq-publisher + spec: + containers: + - name: rabbitmq-publisher + image: aimvector/rabbitmq-publisher:v1.0.0 + imagePullPolicy: Always + ports: + - containerPort: 80 + env: + - name: RABBIT_HOST + value: "rabbitmq-0.rabbitmq.rabbits.svc.cluster.local" + - name: RABBIT_PORT + value: "5672" + - name: RABBIT_USERNAME + valueFrom: + secretKeyRef: + name: rabbitmq-publisher + key: RABBIT_USERNAME + - name: RABBIT_PASSWORD + valueFrom: + secretKeyRef: + name: rabbitmq-publisher + key: RABBIT_PASSWORD +--- +apiVersion: v1 +kind: Service +metadata: + name: rabbitmq-publisher + labels: + app: rabbitmq-publisher +spec: + type: LoadBalancer + selector: + app: rabbitmq-publisher + ports: + - protocol: TCP + name: http + port: 80 + targetPort: 80 diff --git a/messaging/rabbitmq/applications/publisher/dockerfile b/messaging/rabbitmq/applications/publisher/dockerfile new file mode 100644 index 0000000..6d7a0b6 --- /dev/null +++ b/messaging/rabbitmq/applications/publisher/dockerfile @@ -0,0 +1,19 @@ +FROM golang:1.14-alpine as build + +RUN apk add --no-cache git + +WORKDIR /src + +RUN go get github.com/julienschmidt/httprouter +RUN go get github.com/sirupsen/logrus +RUN go get github.com/streadway/amqp + +COPY publisher.go /src + +RUN go build publisher.go + +FROM alpine as runtime + +COPY --from=build /src/publisher /app/publisher + +CMD [ "/app/publisher" ] \ No newline at end of file diff --git a/messaging/rabbitmq/applications/publisher/publisher.go b/messaging/rabbitmq/applications/publisher/publisher.go new file mode 100644 index 0000000..897fec6 --- /dev/null +++ b/messaging/rabbitmq/applications/publisher/publisher.go @@ -0,0 +1,78 @@ +package main + +import ( + "fmt" + "net/http" + "github.com/julienschmidt/httprouter" + log "github.com/sirupsen/logrus" + "github.com/streadway/amqp" + "os" +) + +var rabbit_host = os.Getenv("RABBIT_HOST") +var rabbit_port = os.Getenv("RABBIT_PORT") +var rabbit_user = os.Getenv("RABBIT_USERNAME") +var rabbit_password = os.Getenv("RABBIT_PASSWORD") + +func main() { + + router := httprouter.New() + + router.POST("/publish/:message", func(w http.ResponseWriter, r *http.Request, p httprouter.Params){ + submit(w,r,p) + }) + + fmt.Println("Running...") + log.Fatal(http.ListenAndServe(":80", router)) +} + +func submit(writer http.ResponseWriter, request *http.Request, p httprouter.Params) { + message := p.ByName("message") + + fmt.Println("Received message: " + message) + + conn, err := amqp.Dial("amqp://" + rabbit_user + ":" +rabbit_password + "@" + rabbit_host + ":" + rabbit_port +"/") + + if err != nil { + log.Fatalf("%s: %s", "Failed to connect to RabbitMQ", err) + } + + defer conn.Close() + + ch, err := conn.Channel() + + if err != nil { + log.Fatalf("%s: %s", "Failed to open a channel", err) + } + + defer ch.Close() + + q, err := ch.QueueDeclare( + "publisher", // name + true, // durable + false, // delete when unused + false, // exclusive + false, // no-wait + nil, // arguments + ) + + if err != nil { + log.Fatalf("%s: %s", "Failed to declare a queue", err) + } + + err = ch.Publish( + "", // exchange + q.Name, // routing key + false, // mandatory + false, // immediate + amqp.Publishing { + ContentType: "text/plain", + Body: []byte(message), + }) + + if err != nil { + log.Fatalf("%s: %s", "Failed to publish a message", err) + } + + fmt.Println("publish success!") +} \ No newline at end of file diff --git a/messaging/rabbitmq/config/rabbit-1/rabbitmq.conf b/messaging/rabbitmq/config/rabbit-1/rabbitmq.conf new file mode 100644 index 0000000..960d3ab --- /dev/null +++ b/messaging/rabbitmq/config/rabbit-1/rabbitmq.conf @@ -0,0 +1,7 @@ +loopback_users.guest = false +listeners.tcp.default = 5672 + +cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config +cluster_formation.classic_config.nodes.1 = rabbit@rabbit-1 +cluster_formation.classic_config.nodes.2 = rabbit@rabbit-2 +cluster_formation.classic_config.nodes.3 = rabbit@rabbit-3 \ No newline at end of file diff --git a/messaging/rabbitmq/config/rabbit-2/rabbitmq.conf b/messaging/rabbitmq/config/rabbit-2/rabbitmq.conf new file mode 100644 index 0000000..960d3ab --- /dev/null +++ b/messaging/rabbitmq/config/rabbit-2/rabbitmq.conf @@ -0,0 +1,7 @@ +loopback_users.guest = false +listeners.tcp.default = 5672 + +cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config +cluster_formation.classic_config.nodes.1 = rabbit@rabbit-1 +cluster_formation.classic_config.nodes.2 = rabbit@rabbit-2 +cluster_formation.classic_config.nodes.3 = rabbit@rabbit-3 \ No newline at end of file diff --git a/messaging/rabbitmq/config/rabbit-3/rabbitmq.conf b/messaging/rabbitmq/config/rabbit-3/rabbitmq.conf new file mode 100644 index 0000000..960d3ab --- /dev/null +++ b/messaging/rabbitmq/config/rabbit-3/rabbitmq.conf @@ -0,0 +1,7 @@ +loopback_users.guest = false +listeners.tcp.default = 5672 + +cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config +cluster_formation.classic_config.nodes.1 = rabbit@rabbit-1 +cluster_formation.classic_config.nodes.2 = rabbit@rabbit-2 +cluster_formation.classic_config.nodes.3 = rabbit@rabbit-3 \ No newline at end of file diff --git a/messaging/rabbitmq/kubernetes/rabbit-configmap.yaml b/messaging/rabbitmq/kubernetes/rabbit-configmap.yaml new file mode 100644 index 0000000..3352d2d --- /dev/null +++ b/messaging/rabbitmq/kubernetes/rabbit-configmap.yaml @@ -0,0 +1,19 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: rabbitmq-config +data: + enabled_plugins: | + [rabbitmq_federation,rabbitmq_management,rabbitmq_peer_discovery_k8s]. + rabbitmq.conf: | + loopback_users.guest = false + listeners.tcp.default = 5672 + + cluster_formation.peer_discovery_backend = rabbit_peer_discovery_k8s + cluster_formation.k8s.host = kubernetes.default.svc.cluster.local + cluster_formation.k8s.address_type = hostname + cluster_formation.node_cleanup.only_log_warning = true + ##cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config + ##cluster_formation.classic_config.nodes.1 = rabbit@rabbitmq-0.rabbitmq.rabbits.svc.cluster.local + ##cluster_formation.classic_config.nodes.2 = rabbit@rabbitmq-1.rabbitmq.rabbits.svc.cluster.local + ##cluster_formation.classic_config.nodes.3 = rabbit@rabbitmq-2.rabbitmq.rabbits.svc.cluster.local \ No newline at end of file diff --git a/messaging/rabbitmq/kubernetes/rabbit-rbac.yaml b/messaging/rabbitmq/kubernetes/rabbit-rbac.yaml new file mode 100644 index 0000000..fda2bf6 --- /dev/null +++ b/messaging/rabbitmq/kubernetes/rabbit-rbac.yaml @@ -0,0 +1,32 @@ +apiVersion: v1 +kind: ServiceAccount +metadata: + name: rabbitmq +--- +kind: Role +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: rabbitmq +rules: +- apiGroups: + - "" + resources: + - endpoints + verbs: + - get + - list + - watch +--- +kind: RoleBinding +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: rabbitmq + namespace: rabbits +subjects: +- kind: ServiceAccount + name: rabbitmq + namespace: rabbits +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: Role + name: rabbitmq \ No newline at end of file diff --git a/messaging/rabbitmq/kubernetes/rabbit-secret.yaml b/messaging/rabbitmq/kubernetes/rabbit-secret.yaml new file mode 100644 index 0000000..0011c73 --- /dev/null +++ b/messaging/rabbitmq/kubernetes/rabbit-secret.yaml @@ -0,0 +1,8 @@ +apiVersion: v1 +kind: Secret +metadata: + name: rabbit-secret +type: Opaque +data: + # echo -n "cookie-value" | base64 + RABBITMQ_ERLANG_COOKIE: V0lXVkhDRFRDSVVBV0FOTE1RQVc= \ No newline at end of file diff --git a/messaging/rabbitmq/kubernetes/rabbit-statefulset.yaml b/messaging/rabbitmq/kubernetes/rabbit-statefulset.yaml new file mode 100644 index 0000000..988745e --- /dev/null +++ b/messaging/rabbitmq/kubernetes/rabbit-statefulset.yaml @@ -0,0 +1,101 @@ +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: rabbitmq +spec: + serviceName: rabbitmq + replicas: 4 + selector: + matchLabels: + app: rabbitmq + template: + metadata: + labels: + app: rabbitmq + spec: + serviceAccountName: rabbitmq + initContainers: + - name: config + image: busybox + command: ['/bin/sh', '-c', 'cp /tmp/config/rabbitmq.conf /config/rabbitmq.conf && ls -l /config/ && cp /tmp/config/enabled_plugins /etc/rabbitmq/enabled_plugins'] + volumeMounts: + - name: config + mountPath: /tmp/config/ + readOnly: false + - name: config-file + mountPath: /config/ + - name: plugins-file + mountPath: /etc/rabbitmq/ + containers: + - name: rabbitmq + image: rabbitmq:3.8-management + ports: + - containerPort: 4369 + name: discovery + - containerPort: 5672 + name: amqp + env: + - name: RABBIT_POD_NAME + valueFrom: + fieldRef: + apiVersion: v1 + fieldPath: metadata.name + - name: RABBIT_POD_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + - name: RABBITMQ_NODENAME + value: rabbit@$(RABBIT_POD_NAME).rabbitmq.$(RABBIT_POD_NAMESPACE).svc.cluster.local + - name: RABBITMQ_USE_LONGNAME + value: "true" + - name: RABBITMQ_CONFIG_FILE + value: "/config/rabbitmq" + - name: RABBITMQ_ERLANG_COOKIE + valueFrom: + secretKeyRef: + name: rabbit-secret + key: RABBITMQ_ERLANG_COOKIE + - name: K8S_HOSTNAME_SUFFIX + value: .rabbitmq.$(RABBIT_POD_NAMESPACE).svc.cluster.local + volumeMounts: + - name: data + mountPath: /var/lib/rabbitmq + readOnly: false + - name: config-file + mountPath: /config/ + - name: plugins-file + mountPath: /etc/rabbitmq/ + volumes: + - name: config-file + emptyDir: {} + - name: plugins-file + emptyDir: {} + - name: config + configMap: + name: rabbitmq-config + defaultMode: 0755 + volumeClaimTemplates: + - metadata: + name: data + spec: + accessModes: [ "ReadWriteOnce" ] + storageClassName: "standard" + resources: + requests: + storage: 50Mi +--- +apiVersion: v1 +kind: Service +metadata: + name: rabbitmq +spec: + clusterIP: None + ports: + - port: 4369 + targetPort: 4369 + name: discovery + - port: 5672 + targetPort: 5672 + name: amqp + selector: + app: rabbitmq \ No newline at end of file diff --git a/messaging/rabbitmq/kubernetes/readme.md b/messaging/rabbitmq/kubernetes/readme.md new file mode 100644 index 0000000..5cc5507 --- /dev/null +++ b/messaging/rabbitmq/kubernetes/readme.md @@ -0,0 +1,60 @@ +# RabbitMQ on Kubernetes + +Create a cluster with [kind](https://kind.sigs.k8s.io/docs/user/quick-start/) + +``` +kind create cluster --name rabbit --image kindest/node:v1.18.4 +``` + +## Namespace + +``` +kubectl create ns rabbits +``` + +## Storage Class + +``` +kubectl get storageclass +NAME PROVISIONER RECLAIMPOLICY VOLUMEBINDINGMODE ALLOWVOLUMEEXPANSION AGE +standard (default) rancher.io/local-path Delete WaitForFirstConsumer false 84s +``` + +## Deployment + +``` +kubectl apply -n rabbits -f .\kubernetes\rabbit-rbac.yaml +kubectl apply -n rabbits -f .\kubernetes\rabbit-configmap.yaml +kubectl apply -n rabbits -f .\kubernetes\rabbit-secret.yaml +kubectl apply -n rabbits -f .\kubernetes\rabbit-statefulset.yaml +``` + +## Access the UI + +``` +kubectl -n rabbits port-forward rabbitmq-0 8080:15672 +``` +Go to htttp://localhost:8080
+Username: `guest`
+Password: `guest`
+ +# Message Publisher + +``` + +cd messaging\rabbitmq\applications\publisher +docker build . -t aimvector/rabbitmq-publisher:v1.0.0 + +kubectl apply -f rabbits deployment.yaml +``` + +# Automatic Synchronization + +https://www.rabbitmq.com/ha.html#unsynchronised-mirrors + +``` +rabbitmqctl set_policy ha-fed \ + ".*" '{"federation-upstream-set":"all", "ha-sync-mode":"automatic", "ha-mode":"nodes", "ha-params":["rabbit@rabbitmq-0.rabbitmq.rabbits.svc.cluster.local","rabbit@rabbitmq-1.rabbitmq.rabbits.svc.cluster.local","rabbit@rabbitmq-2.rabbitmq.rabbits.svc.cluster.local"]}' \ + --priority 1 \ + --apply-to queues +``` \ No newline at end of file diff --git a/messaging/rabbitmq/readme.md b/messaging/rabbitmq/readme.md new file mode 100644 index 0000000..4d1a081 --- /dev/null +++ b/messaging/rabbitmq/readme.md @@ -0,0 +1,165 @@ +# RabbitMQ + +Docker image over [here](https://hub.docker.com/_/rabbitmq) +``` +# run a standalone instance +docker network create rabbits +docker run -d --rm --net rabbits --hostname rabbit-1 --name rabbit-1 rabbitmq:3.8 + +# how to grab existing erlang cookie +docker exec -it rabbit-1 cat /var/lib/rabbitmq/.erlang.cookie + +# clean up +docker rm -f rabbit-1 +``` + +# Management + +``` +docker run -d --rm --net rabbits -p 8080:15672 -e RABBITMQ_ERLANG_COOKIE=DSHEVCXBBETJJVJWTOWT --hostname rabbit-manager --name rabbit-manager rabbitmq:3.8-management + +#join the manager + +docker exec -it rabbit-manager rabbitmqctl stop_app +docker exec -it rabbit-manager rabbitmqctl reset +docker exec -it rabbit-manager rabbitmqctl join_cluster rabbit@rabbit-1 +docker exec -it rabbit-manager rabbitmqctl start_app +docker exec -it rabbit-manager rabbitmqctl cluster_status +``` + +# Enable Statistics + +docker exec -it rabbit-1 rabbitmq-plugins enable rabbitmq_management +docker exec -it rabbit-2 rabbitmq-plugins enable rabbitmq_management +docker exec -it rabbit-3 rabbitmq-plugins enable rabbitmq_management + +# Message Publisher + +``` + +cd messaging\rabbitmq\applications\publisher +docker build . -t aimvector/rabbitmq-publisher:v1.0.0 + +docker run -it --rm --net rabbits -e RABBIT_HOST=rabbit-1 -e RABBIT_PORT=5672 -e RABBIT_USERNAME=guest -e RABBIT_PASSWORD=guest -p 80:80 aimvector/rabbitmq-publisher:v1.0.0 +``` + +# Message Consumer + +``` + +docker build . -t aimvector/rabbitmq-consumer:v1.0.0 +docker run -it --rm --net rabbits -e RABBIT_HOST=rabbit-1 -e RABBIT_PORT=5672 -e RABBIT_USERNAME=guest -e RABBIT_PASSWORD=guest aimvector/rabbitmq-consumer:v1.0.0 +``` + +# Clustering + +https://www.rabbitmq.com/cluster-formation.html + +## Note + +Remember we will need the Erlang Cookie to allow instances to authenticate with each other. + +# Manual Clustering + +``` + +docker exec -it rabbit-1 rabbitmqctl cluster_status + +#join node 2 + +docker exec -it rabbit-2 rabbitmqctl stop_app +docker exec -it rabbit-2 rabbitmqctl reset +docker exec -it rabbit-2 rabbitmqctl join_cluster rabbit@rabbit-1 +docker exec -it rabbit-2 rabbitmqctl start_app +docker exec -it rabbit-2 rabbitmqctl cluster_status + +#join node 3 +docker exec -it rabbit-3 rabbitmqctl stop_app +docker exec -it rabbit-3 rabbitmqctl reset +docker exec -it rabbit-3 rabbitmqctl join_cluster rabbit@rabbit-1 +docker exec -it rabbit-3 rabbitmqctl start_app +docker exec -it rabbit-3 rabbitmqctl cluster_status + +``` + +# Automated Clustering + +``` +docker run -d --rm --net rabbits ` +-v ${PWD}/config/rabbit-1/:/config/ ` +-e RABBITMQ_CONFIG_FILE=/config/rabbitmq ` +-e RABBITMQ_ERLANG_COOKIE=WIWVHCDTCIUAWANLMQAW ` +--hostname rabbit-1 ` +--name rabbit-1 ` +-p 8081:15672 ` +rabbitmq:3.8-management + +docker run -d --rm --net rabbits ` +-v ${PWD}/config/rabbit-2/:/config/ ` +-e RABBITMQ_CONFIG_FILE=/config/rabbitmq ` +-e RABBITMQ_ERLANG_COOKIE=WIWVHCDTCIUAWANLMQAW ` +--hostname rabbit-2 ` +--name rabbit-2 ` +-p 8082:15672 ` +rabbitmq:3.8-management + +docker run -d --rm --net rabbits ` +-v ${PWD}/config/rabbit-3/:/config/ ` +-e RABBITMQ_CONFIG_FILE=/config/rabbitmq ` +-e RABBITMQ_ERLANG_COOKIE=WIWVHCDTCIUAWANLMQAW ` +--hostname rabbit-3 ` +--name rabbit-3 ` +-p 8083:15672 ` +rabbitmq:3.8-management + +#NODE 1 : MANAGEMENT http://localhost:8081 +#NODE 2 : MANAGEMENT http://localhost:8082 +#NODE 3 : MANAGEMENT http://localhost:8083 + +# enable federation plugin +docker exec -it rabbit-1 rabbitmq-plugins enable rabbitmq_federation +docker exec -it rabbit-2 rabbitmq-plugins enable rabbitmq_federation +docker exec -it rabbit-3 rabbitmq-plugins enable rabbitmq_federation + +``` + +# Basic Queue Mirroring + +``` +docker exec -it rabbit-1 bash + +# https://www.rabbitmq.com/ha.html#mirroring-arguments + +rabbitmqctl set_policy ha-fed \ + ".*" '{"federation-upstream-set":"all", "ha-mode":"nodes", "ha-params":["rabbit@rabbit-1","rabbit@rabbit-2","rabbit@rabbit-3"]}' \ + --priority 1 \ + --apply-to queues +``` + +# Automatic Synchronization + +https://www.rabbitmq.com/ha.html#unsynchronised-mirrors + +``` +rabbitmqctl set_policy ha-fed \ + ".*" '{"federation-upstream-set":"all", "ha-sync-mode":"automatic", "ha-mode":"nodes", "ha-params":["rabbit@rabbit-1","rabbit@rabbit-2","rabbit@rabbit-3"]}' \ + --priority 1 \ + --apply-to queues +``` + +# Further Reading + +https://www.rabbitmq.com/ha.html + + +# Clean up + +``` +docker rm -f rabbit-1 +docker rm -f rabbit-2 +docker rm -f rabbit-3 +``` + +# RabbitMQ on Kubernetes + +Checkout the Kubernetes walkthrough [here](./kubernetes/readme.md) \ No newline at end of file