RabbitMQ Tutorials for LFE

Preparation

In the first tutorial we sent a message containing the string "Hello World!". Now we'll be sending strings that stand for complex tasks. We don't have a real- world task, like images to be resized or pdf files to be rendered, so let's fake it by just pretending we're busy - by adding a timeout section to the (receive ...) part of our (loop ...) function. We'll take the number of dots in the string as its complexity; every dot will account for one second of "work". For example, a fake task described by Hello... will take three seconds.

We will slightly modify the kt-sending.lfe code from our previous example, to allow arbitrary messages to be sent from the command line. This program will schedule tasks to our work queue, so let's name it kt-new-task.lfe. Also, note that we'll use the kanin-uri module to parse a connection string this time, instead of creating a record directly.

(defmodule kt-new-task
  (export all))

(include-lib "kanin/include/amqp-client.lfe")

(defun send ()
  (send '()))

(defun send (data)
  (let* ((net-opts (kanin-uri:parse "amqp://localhost"))
         (`#(ok ,connection) (kanin-conn:start net-opts))
         (`#(ok ,channel) (kanin-conn:open-channel connection))
         (queue-name "task-queue")
         (routing-key "task-queue")
         (exchange-name "")
         (payload (make-message data))
         (queue (make-queue.declare
                  queue (list_to_binary queue-name)
                  durable 'true))
         (method (make-basic.publish
                   exchange (list_to_binary exchange-name)
                   routing_key (list_to_binary routing-key)))
         (message (make-amqp_msg
                    props (make-P_basic delivery_mode 2)
                    payload (list_to_binary payload))))
    (kanin-chan:call channel queue)
    (kanin-chan:cast channel method message)
    (io:format "[x] Sent message '~p'~n" `(,payload))
    (kanin-chan:close channel)
    (kanin-conn:close connection)))

(defun make-message
  (('())
    "Hello, world!")
  ((data)
    data))

Our old kt-receiving.lfe module also requires some changes: it needs to fake a second of work for every dot in the message body. It will pop messages from the queue and perform the task, so let's call it kt-worker.lfe:

(defmodule kt-worker
  (export all))

(include-lib "kanin/include/amqp-client.lfe")

(defun receive ()
  (let* ((net-opts (kanin-uri:parse "amqp://localhost"))
         (`#(ok ,connection) (kanin-conn:start net-opts))
         (`#(ok ,channel) (kanin-conn:open-channel connection))
         (queue-name "task-queue")
         (queue (make-queue.declare
                  queue (list_to_binary queue-name)
                  durable 'true))
         (qos (make-basic.qos prefetch_count 1))
         (consumer (make-basic.consume
                     queue (list_to_binary queue-name)))
         (subscriber (self)))
    (kanin-chan:call channel queue)
    (io:format "[*] Waiting for messages. To exit press CTRL+C~n")
    (kanin-chan:call channel qos)
    (kanin-chan:subscribe channel consumer subscriber)
    (receive
      ((match-basic.consume_ok)
        'ok))
    (loop channel)))

(defun loop (channel)
  (receive
    ((tuple (match-basic.deliver delivery_tag tag)
            (match-amqp_msg payload body))
      (io:format "[x] Received: ~p~n" `(,body))
      (do-work body)
      (io:format "[x] Done.~n")
      (kanin-chan:cast channel (make-basic.ack delivery_tag tag))
      (loop channel))))

(defun get-dot-count (data)
  (length
    (list-comp
      ((<- char (binary_to_list data)) (== char #\.))
      char)))

(defun do-work (body)
  (let ((dots (get-dot-count body)))
        (receive
          (after (* dots 1000)
            'ok))))