RabbitMQ Tutorials for LFE

Receiving

Our receiver gets messages pushed to it from RabbitMQ, so unlike the sender which publishes a single message and then closes down, we'll keep the receiver running to listen for in-coming messages and print them out.

As before, we will need to connect to RabbitMQ server, and the code for that is the same as in the "send" section above. We will also define a queue and send the queue.declare message, to ensure that is exists (this way, regardless of whether the sender or the receiver starts first, the queue will exist for either).

You can see the list of queues that exist on your host by issueing the following command (as the same user who started up RabbitMQ):

$ /opt/rabbitmq/3.3.5/sbin/rabbitmqctl list_queues

This should give you output like the following:

Listing queues ...
hello    0
...done.

The next thing we'll need to do is set up the receive function process as a subscriber, consuming messages from the queue we defined. This is done by calling (kanin-chan:subscribe ...). To ensure that the subscription succeeded, we'll listen for a basic.consume_ok response, and then set up our message-receiving loop by calling (loop channel).

Our loop function will listen for data that matches a tuple of two records: a basic.deliver record and an amqp_msg record. Whenever a message matches, we'll display output to the terminal, and then start the loop listening again.

The complete code for the module that encapsulates this logic is below:

(defmodule kt-receiving
  (export all))

(include-lib "kanin/include/amqp-client.lfe")

(defun receive ()
  (let* ((net-opts (make-amqp_params_network host "localhost"))
         ;; create the connection and channel we'll use for sending
         (`#(ok ,connection) (kanin-conn:start net-opts))
         (`#(ok ,channel) (kanin-conn:open-channel connection))
         ;; declare the names we will use
         (queue-name "hello")
         ;; define the data we will send
         (payload "Hello, world!")
         ;; then create the needed records
         (queue (make-queue.declare
                  queue (list_to_binary queue-name)))
         (consumer (make-basic.consume
                     queue (list_to_binary queue-name)
                     no_ack 'true))
         (subscriber (self)))
    ;; set up the queue
    (kanin-chan:call channel queue)
    (io:format "[*] Waiting for messages. To exit press CTRL+C~n")
    ;; subscribe the receive funtion to the queue
    (kanin-chan:subscribe channel consumer subscriber)
    ;; verify that the 'receive' function gets a successful result from
    ;; the previous (kanin-chan:subscribe ...) call
    (receive
      ((match-basic.consume_ok)
        'ok))
    ;; start listening for messages
    (loop channel)))

(defun loop (channel)
  ;; listen for a message that is a 2-tuple of two records: a basic.deliver
  ;; one, and an amqp_msg one
  (receive
    ((tuple (match-basic.deliver) (match-amqp_msg payload body))
      (io:format "[x] Received: ~p~n" `(,body))
      ;; restart the loop to listen for the next message
      (loop channel))))