diff --git a/messaging/rabbitmq/applications/consumer/consumer.go b/messaging/rabbitmq/applications/consumer/consumer.go index 98cdd7e..1273862 100644 --- a/messaging/rabbitmq/applications/consumer/consumer.go +++ b/messaging/rabbitmq/applications/consumer/consumer.go @@ -1,5 +1,6 @@ package main + import ( "fmt" log "github.com/sirupsen/logrus" @@ -13,13 +14,11 @@ var rabbit_user = os.Getenv("RABBIT_USERNAME") var rabbit_password = os.Getenv("RABBIT_PASSWORD") func main() { - consume() } - func consume() { - + conn, err := amqp.Dial("amqp://" + rabbit_user + ":" +rabbit_password + "@" + rabbit_host + ":" + rabbit_port +"/") if err != nil { @@ -27,11 +26,11 @@ func consume() { } ch, err := conn.Channel() - + if err != nil { log.Fatalf("%s: %s", "Failed to open a channel", err) } - + q, err := ch.QueueDeclare( "publisher", // name false, // durable @@ -59,7 +58,7 @@ func consume() { false, // no-wait nil, // args ) - + if err != nil { log.Fatalf("%s: %s", "Failed to register consumer", err) } @@ -76,8 +75,4 @@ func consume() { fmt.Println("Running...") <-forever - - - - } \ No newline at end of file diff --git a/messaging/rabbitmq/applications/publisher/deployment.yaml b/messaging/rabbitmq/applications/publisher/deployment.yaml new file mode 100644 index 0000000..aa4a940 --- /dev/null +++ b/messaging/rabbitmq/applications/publisher/deployment.yaml @@ -0,0 +1,62 @@ +apiVersion: v1 +kind: Secret +metadata: + name: rabbitmq-publisher +type: Opaque +data: + RABBIT_USERNAME: Z3Vlc3Q= + RABBIT_PASSWORD: Z3Vlc3Q= +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: rabbitmq-publisher + labels: + app: rabbitmq-publisher +spec: + selector: + matchLabels: + app: rabbitmq-publisher + replicas: 1 + template: + metadata: + labels: + app: rabbitmq-publisher + spec: + containers: + - name: rabbitmq-publisher + image: aimvector/rabbitmq-publisher:v1.0.0 + imagePullPolicy: Always + ports: + - containerPort: 80 + env: + - name: RABBIT_HOST + value: "rabbitmq-0.rabbitmq.rabbits.svc.cluster.local" + - name: RABBIT_PORT + value: "5672" + - name: RABBIT_USERNAME + valueFrom: + secretKeyRef: + name: rabbitmq-publisher + key: RABBIT_USERNAME + - name: RABBIT_PASSWORD + valueFrom: + secretKeyRef: + name: rabbitmq-publisher + key: RABBIT_PASSWORD +--- +apiVersion: v1 +kind: Service +metadata: + name: rabbitmq-publisher + labels: + app: rabbitmq-publisher +spec: + type: LoadBalancer + selector: + app: rabbitmq-publisher + ports: + - protocol: TCP + name: http + port: 80 + targetPort: 80 diff --git a/messaging/rabbitmq/applications/publisher/dockerfile b/messaging/rabbitmq/applications/publisher/dockerfile index 258fef4..6d7a0b6 100644 --- a/messaging/rabbitmq/applications/publisher/dockerfile +++ b/messaging/rabbitmq/applications/publisher/dockerfile @@ -12,7 +12,6 @@ COPY publisher.go /src RUN go build publisher.go - FROM alpine as runtime COPY --from=build /src/publisher /app/publisher diff --git a/messaging/rabbitmq/applications/publisher/publisher.go b/messaging/rabbitmq/applications/publisher/publisher.go index 514699e..897fec6 100644 --- a/messaging/rabbitmq/applications/publisher/publisher.go +++ b/messaging/rabbitmq/applications/publisher/publisher.go @@ -15,6 +15,7 @@ var rabbit_user = os.Getenv("RABBIT_USERNAME") var rabbit_password = os.Getenv("RABBIT_PASSWORD") func main() { + router := httprouter.New() router.POST("/publish/:message", func(w http.ResponseWriter, r *http.Request, p httprouter.Params){ @@ -25,11 +26,11 @@ func main() { log.Fatal(http.ListenAndServe(":80", router)) } - func submit(writer http.ResponseWriter, request *http.Request, p httprouter.Params) { message := p.ByName("message") - + fmt.Println("Received message: " + message) + conn, err := amqp.Dial("amqp://" + rabbit_user + ":" +rabbit_password + "@" + rabbit_host + ":" + rabbit_port +"/") if err != nil { @@ -37,16 +38,18 @@ func submit(writer http.ResponseWriter, request *http.Request, p httprouter.Para } defer conn.Close() + ch, err := conn.Channel() - + if err != nil { log.Fatalf("%s: %s", "Failed to open a channel", err) } + defer ch.Close() q, err := ch.QueueDeclare( "publisher", // name - false, // durable + true, // durable false, // delete when unused false, // exclusive false, // no-wait @@ -66,11 +69,10 @@ func submit(writer http.ResponseWriter, request *http.Request, p httprouter.Para ContentType: "text/plain", Body: []byte(message), }) - + if err != nil { log.Fatalf("%s: %s", "Failed to publish a message", err) } fmt.Println("publish success!") - } \ No newline at end of file diff --git a/messaging/rabbitmq/config/rabbit-1/rabbitmq.conf b/messaging/rabbitmq/config/rabbit-1/rabbitmq.conf new file mode 100644 index 0000000..960d3ab --- /dev/null +++ b/messaging/rabbitmq/config/rabbit-1/rabbitmq.conf @@ -0,0 +1,7 @@ +loopback_users.guest = false +listeners.tcp.default = 5672 + +cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config +cluster_formation.classic_config.nodes.1 = rabbit@rabbit-1 +cluster_formation.classic_config.nodes.2 = rabbit@rabbit-2 +cluster_formation.classic_config.nodes.3 = rabbit@rabbit-3 \ No newline at end of file diff --git a/messaging/rabbitmq/config/rabbit-2/rabbitmq.conf b/messaging/rabbitmq/config/rabbit-2/rabbitmq.conf new file mode 100644 index 0000000..960d3ab --- /dev/null +++ b/messaging/rabbitmq/config/rabbit-2/rabbitmq.conf @@ -0,0 +1,7 @@ +loopback_users.guest = false +listeners.tcp.default = 5672 + +cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config +cluster_formation.classic_config.nodes.1 = rabbit@rabbit-1 +cluster_formation.classic_config.nodes.2 = rabbit@rabbit-2 +cluster_formation.classic_config.nodes.3 = rabbit@rabbit-3 \ No newline at end of file diff --git a/messaging/rabbitmq/config/rabbit-3/rabbitmq.conf b/messaging/rabbitmq/config/rabbit-3/rabbitmq.conf new file mode 100644 index 0000000..960d3ab --- /dev/null +++ b/messaging/rabbitmq/config/rabbit-3/rabbitmq.conf @@ -0,0 +1,7 @@ +loopback_users.guest = false +listeners.tcp.default = 5672 + +cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config +cluster_formation.classic_config.nodes.1 = rabbit@rabbit-1 +cluster_formation.classic_config.nodes.2 = rabbit@rabbit-2 +cluster_formation.classic_config.nodes.3 = rabbit@rabbit-3 \ No newline at end of file diff --git a/messaging/rabbitmq/kubernetes/rabbit-configmap.yaml b/messaging/rabbitmq/kubernetes/rabbit-configmap.yaml new file mode 100644 index 0000000..3352d2d --- /dev/null +++ b/messaging/rabbitmq/kubernetes/rabbit-configmap.yaml @@ -0,0 +1,19 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: rabbitmq-config +data: + enabled_plugins: | + [rabbitmq_federation,rabbitmq_management,rabbitmq_peer_discovery_k8s]. + rabbitmq.conf: | + loopback_users.guest = false + listeners.tcp.default = 5672 + + cluster_formation.peer_discovery_backend = rabbit_peer_discovery_k8s + cluster_formation.k8s.host = kubernetes.default.svc.cluster.local + cluster_formation.k8s.address_type = hostname + cluster_formation.node_cleanup.only_log_warning = true + ##cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config + ##cluster_formation.classic_config.nodes.1 = rabbit@rabbitmq-0.rabbitmq.rabbits.svc.cluster.local + ##cluster_formation.classic_config.nodes.2 = rabbit@rabbitmq-1.rabbitmq.rabbits.svc.cluster.local + ##cluster_formation.classic_config.nodes.3 = rabbit@rabbitmq-2.rabbitmq.rabbits.svc.cluster.local \ No newline at end of file diff --git a/messaging/rabbitmq/kubernetes/rabbit-rbac.yaml b/messaging/rabbitmq/kubernetes/rabbit-rbac.yaml new file mode 100644 index 0000000..fda2bf6 --- /dev/null +++ b/messaging/rabbitmq/kubernetes/rabbit-rbac.yaml @@ -0,0 +1,32 @@ +apiVersion: v1 +kind: ServiceAccount +metadata: + name: rabbitmq +--- +kind: Role +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: rabbitmq +rules: +- apiGroups: + - "" + resources: + - endpoints + verbs: + - get + - list + - watch +--- +kind: RoleBinding +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: rabbitmq + namespace: rabbits +subjects: +- kind: ServiceAccount + name: rabbitmq + namespace: rabbits +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: Role + name: rabbitmq \ No newline at end of file diff --git a/messaging/rabbitmq/kubernetes/rabbit-secret.yaml b/messaging/rabbitmq/kubernetes/rabbit-secret.yaml new file mode 100644 index 0000000..0011c73 --- /dev/null +++ b/messaging/rabbitmq/kubernetes/rabbit-secret.yaml @@ -0,0 +1,8 @@ +apiVersion: v1 +kind: Secret +metadata: + name: rabbit-secret +type: Opaque +data: + # echo -n "cookie-value" | base64 + RABBITMQ_ERLANG_COOKIE: V0lXVkhDRFRDSVVBV0FOTE1RQVc= \ No newline at end of file diff --git a/messaging/rabbitmq/kubernetes/rabbit-statefulset.yaml b/messaging/rabbitmq/kubernetes/rabbit-statefulset.yaml new file mode 100644 index 0000000..988745e --- /dev/null +++ b/messaging/rabbitmq/kubernetes/rabbit-statefulset.yaml @@ -0,0 +1,101 @@ +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: rabbitmq +spec: + serviceName: rabbitmq + replicas: 4 + selector: + matchLabels: + app: rabbitmq + template: + metadata: + labels: + app: rabbitmq + spec: + serviceAccountName: rabbitmq + initContainers: + - name: config + image: busybox + command: ['/bin/sh', '-c', 'cp /tmp/config/rabbitmq.conf /config/rabbitmq.conf && ls -l /config/ && cp /tmp/config/enabled_plugins /etc/rabbitmq/enabled_plugins'] + volumeMounts: + - name: config + mountPath: /tmp/config/ + readOnly: false + - name: config-file + mountPath: /config/ + - name: plugins-file + mountPath: /etc/rabbitmq/ + containers: + - name: rabbitmq + image: rabbitmq:3.8-management + ports: + - containerPort: 4369 + name: discovery + - containerPort: 5672 + name: amqp + env: + - name: RABBIT_POD_NAME + valueFrom: + fieldRef: + apiVersion: v1 + fieldPath: metadata.name + - name: RABBIT_POD_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + - name: RABBITMQ_NODENAME + value: rabbit@$(RABBIT_POD_NAME).rabbitmq.$(RABBIT_POD_NAMESPACE).svc.cluster.local + - name: RABBITMQ_USE_LONGNAME + value: "true" + - name: RABBITMQ_CONFIG_FILE + value: "/config/rabbitmq" + - name: RABBITMQ_ERLANG_COOKIE + valueFrom: + secretKeyRef: + name: rabbit-secret + key: RABBITMQ_ERLANG_COOKIE + - name: K8S_HOSTNAME_SUFFIX + value: .rabbitmq.$(RABBIT_POD_NAMESPACE).svc.cluster.local + volumeMounts: + - name: data + mountPath: /var/lib/rabbitmq + readOnly: false + - name: config-file + mountPath: /config/ + - name: plugins-file + mountPath: /etc/rabbitmq/ + volumes: + - name: config-file + emptyDir: {} + - name: plugins-file + emptyDir: {} + - name: config + configMap: + name: rabbitmq-config + defaultMode: 0755 + volumeClaimTemplates: + - metadata: + name: data + spec: + accessModes: [ "ReadWriteOnce" ] + storageClassName: "standard" + resources: + requests: + storage: 50Mi +--- +apiVersion: v1 +kind: Service +metadata: + name: rabbitmq +spec: + clusterIP: None + ports: + - port: 4369 + targetPort: 4369 + name: discovery + - port: 5672 + targetPort: 5672 + name: amqp + selector: + app: rabbitmq \ No newline at end of file diff --git a/messaging/rabbitmq/kubernetes/readme.md b/messaging/rabbitmq/kubernetes/readme.md new file mode 100644 index 0000000..5cc5507 --- /dev/null +++ b/messaging/rabbitmq/kubernetes/readme.md @@ -0,0 +1,60 @@ +# RabbitMQ on Kubernetes + +Create a cluster with [kind](https://kind.sigs.k8s.io/docs/user/quick-start/) + +``` +kind create cluster --name rabbit --image kindest/node:v1.18.4 +``` + +## Namespace + +``` +kubectl create ns rabbits +``` + +## Storage Class + +``` +kubectl get storageclass +NAME PROVISIONER RECLAIMPOLICY VOLUMEBINDINGMODE ALLOWVOLUMEEXPANSION AGE +standard (default) rancher.io/local-path Delete WaitForFirstConsumer false 84s +``` + +## Deployment + +``` +kubectl apply -n rabbits -f .\kubernetes\rabbit-rbac.yaml +kubectl apply -n rabbits -f .\kubernetes\rabbit-configmap.yaml +kubectl apply -n rabbits -f .\kubernetes\rabbit-secret.yaml +kubectl apply -n rabbits -f .\kubernetes\rabbit-statefulset.yaml +``` + +## Access the UI + +``` +kubectl -n rabbits port-forward rabbitmq-0 8080:15672 +``` +Go to htttp://localhost:8080
+Username: `guest`
+Password: `guest`
+ +# Message Publisher + +``` + +cd messaging\rabbitmq\applications\publisher +docker build . -t aimvector/rabbitmq-publisher:v1.0.0 + +kubectl apply -f rabbits deployment.yaml +``` + +# Automatic Synchronization + +https://www.rabbitmq.com/ha.html#unsynchronised-mirrors + +``` +rabbitmqctl set_policy ha-fed \ + ".*" '{"federation-upstream-set":"all", "ha-sync-mode":"automatic", "ha-mode":"nodes", "ha-params":["rabbit@rabbitmq-0.rabbitmq.rabbits.svc.cluster.local","rabbit@rabbitmq-1.rabbitmq.rabbits.svc.cluster.local","rabbit@rabbitmq-2.rabbitmq.rabbits.svc.cluster.local"]}' \ + --priority 1 \ + --apply-to queues +``` \ No newline at end of file diff --git a/messaging/rabbitmq/readme.md b/messaging/rabbitmq/readme.md index bb56e73..4d1a081 100644 --- a/messaging/rabbitmq/readme.md +++ b/messaging/rabbitmq/readme.md @@ -2,44 +2,17 @@ Docker image over [here](https://hub.docker.com/_/rabbitmq) ``` +# run a standalone instance docker network create rabbits -docker run -d --rm --net rabbits -e RABBITMQ_ERLANG_COOKIE=DSHEVCXBBETJJVJWTOWT --hostname rabbit-1 --name rabbit-1 rabbitmq:3.8 -docker run -d --rm --net rabbits -e RABBITMQ_ERLANG_COOKIE=DSHEVCXBBETJJVJWTOWT --hostname rabbit-2 --name rabbit-2 rabbitmq:3.8 -docker run -d --rm --net rabbits -e RABBITMQ_ERLANG_COOKIE=DSHEVCXBBETJJVJWTOWT --hostname rabbit-3 --name rabbit-3 rabbitmq:3.8 +docker run -d --rm --net rabbits --hostname rabbit-1 --name rabbit-1 rabbitmq:3.8 # how to grab existing erlang cookie docker exec -it rabbit-1 cat /var/lib/rabbitmq/.erlang.cookie -DSHEVCXBBETJJVJWTOWT +# clean up docker rm -f rabbit-1 -docker rm -f rabbit-2 -docker rm -f rabbit-3 ``` -``` -docker exec -it rabbit-1 rabbitmqctl cluster_status - -#join node 2 - -docker exec -it rabbit-2 rabbitmqctl stop_app -docker exec -it rabbit-2 rabbitmqctl reset -docker exec -it rabbit-2 rabbitmqctl join_cluster rabbit@rabbit-1 -docker exec -it rabbit-2 rabbitmqctl start_app -docker exec -it rabbit-2 rabbitmqctl cluster_status - - -#join node 3 -docker exec -it rabbit-3 rabbitmqctl stop_app -docker exec -it rabbit-3 rabbitmqctl reset -docker exec -it rabbit-3 rabbitmqctl join_cluster rabbit@rabbit-1 -docker exec -it rabbit-3 rabbitmqctl start_app -docker exec -it rabbit-3 rabbitmqctl cluster_status - -``` - - - - # Management ``` @@ -63,11 +36,130 @@ docker exec -it rabbit-3 rabbitmq-plugins enable rabbitmq_management # Message Publisher ``` -docker run -it --rm --net rabbits -e RABBIT_HOST=rabbit-1 -e RABBIT_PORT=5672 -e RABBIT_USERNAME=guest -e RABBIT_PASSWORD=guest -p 80:80 publisher + +cd messaging\rabbitmq\applications\publisher +docker build . -t aimvector/rabbitmq-publisher:v1.0.0 + +docker run -it --rm --net rabbits -e RABBIT_HOST=rabbit-1 -e RABBIT_PORT=5672 -e RABBIT_USERNAME=guest -e RABBIT_PASSWORD=guest -p 80:80 aimvector/rabbitmq-publisher:v1.0.0 ``` # Message Consumer ``` -docker run -it --rm --net rabbits -e RABBIT_HOST=rabbit-1 -e RABBIT_PORT=5672 -e RABBIT_USERNAME=guest -e RABBIT_PASSWORD=guest consumer + +docker build . -t aimvector/rabbitmq-consumer:v1.0.0 +docker run -it --rm --net rabbits -e RABBIT_HOST=rabbit-1 -e RABBIT_PORT=5672 -e RABBIT_USERNAME=guest -e RABBIT_PASSWORD=guest aimvector/rabbitmq-consumer:v1.0.0 ``` + +# Clustering + +https://www.rabbitmq.com/cluster-formation.html + +## Note + +Remember we will need the Erlang Cookie to allow instances to authenticate with each other. + +# Manual Clustering + +``` + +docker exec -it rabbit-1 rabbitmqctl cluster_status + +#join node 2 + +docker exec -it rabbit-2 rabbitmqctl stop_app +docker exec -it rabbit-2 rabbitmqctl reset +docker exec -it rabbit-2 rabbitmqctl join_cluster rabbit@rabbit-1 +docker exec -it rabbit-2 rabbitmqctl start_app +docker exec -it rabbit-2 rabbitmqctl cluster_status + +#join node 3 +docker exec -it rabbit-3 rabbitmqctl stop_app +docker exec -it rabbit-3 rabbitmqctl reset +docker exec -it rabbit-3 rabbitmqctl join_cluster rabbit@rabbit-1 +docker exec -it rabbit-3 rabbitmqctl start_app +docker exec -it rabbit-3 rabbitmqctl cluster_status + +``` + +# Automated Clustering + +``` +docker run -d --rm --net rabbits ` +-v ${PWD}/config/rabbit-1/:/config/ ` +-e RABBITMQ_CONFIG_FILE=/config/rabbitmq ` +-e RABBITMQ_ERLANG_COOKIE=WIWVHCDTCIUAWANLMQAW ` +--hostname rabbit-1 ` +--name rabbit-1 ` +-p 8081:15672 ` +rabbitmq:3.8-management + +docker run -d --rm --net rabbits ` +-v ${PWD}/config/rabbit-2/:/config/ ` +-e RABBITMQ_CONFIG_FILE=/config/rabbitmq ` +-e RABBITMQ_ERLANG_COOKIE=WIWVHCDTCIUAWANLMQAW ` +--hostname rabbit-2 ` +--name rabbit-2 ` +-p 8082:15672 ` +rabbitmq:3.8-management + +docker run -d --rm --net rabbits ` +-v ${PWD}/config/rabbit-3/:/config/ ` +-e RABBITMQ_CONFIG_FILE=/config/rabbitmq ` +-e RABBITMQ_ERLANG_COOKIE=WIWVHCDTCIUAWANLMQAW ` +--hostname rabbit-3 ` +--name rabbit-3 ` +-p 8083:15672 ` +rabbitmq:3.8-management + +#NODE 1 : MANAGEMENT http://localhost:8081 +#NODE 2 : MANAGEMENT http://localhost:8082 +#NODE 3 : MANAGEMENT http://localhost:8083 + +# enable federation plugin +docker exec -it rabbit-1 rabbitmq-plugins enable rabbitmq_federation +docker exec -it rabbit-2 rabbitmq-plugins enable rabbitmq_federation +docker exec -it rabbit-3 rabbitmq-plugins enable rabbitmq_federation + +``` + +# Basic Queue Mirroring + +``` +docker exec -it rabbit-1 bash + +# https://www.rabbitmq.com/ha.html#mirroring-arguments + +rabbitmqctl set_policy ha-fed \ + ".*" '{"federation-upstream-set":"all", "ha-mode":"nodes", "ha-params":["rabbit@rabbit-1","rabbit@rabbit-2","rabbit@rabbit-3"]}' \ + --priority 1 \ + --apply-to queues +``` + +# Automatic Synchronization + +https://www.rabbitmq.com/ha.html#unsynchronised-mirrors + +``` +rabbitmqctl set_policy ha-fed \ + ".*" '{"federation-upstream-set":"all", "ha-sync-mode":"automatic", "ha-mode":"nodes", "ha-params":["rabbit@rabbit-1","rabbit@rabbit-2","rabbit@rabbit-3"]}' \ + --priority 1 \ + --apply-to queues +``` + +# Further Reading + +https://www.rabbitmq.com/ha.html + + +# Clean up + +``` +docker rm -f rabbit-1 +docker rm -f rabbit-2 +docker rm -f rabbit-3 +``` + +# RabbitMQ on Kubernetes + +Checkout the Kubernetes walkthrough [here](./kubernetes/readme.md) \ No newline at end of file