In the first tutorial we sent a message containing the string "Hello World!".
Now we'll be sending strings that stand for complex tasks. We don't have a real-
world task, like images to be resized or pdf files to be rendered, so let's fake
it by just pretending we're busy - by adding a timeout section to the
(receive ...)
part of our (loop ...)
function. We'll take the number of
dots in the string as its complexity; every dot will account for one second of
"work". For example, a fake task described by Hello...
will take three
seconds.
We will slightly modify the kt-sending.lfe
code from our previous example,
to allow arbitrary messages to be sent from the command line. This program will
schedule tasks to our work queue, so let's name it kt-new-task.lfe
. Also,
note that we'll use the kanin-uri
module to parse a connection string
this time, instead of creating a record directly.
(defmodule kt-new-task
(export all))
(include-lib "kanin/include/amqp-client.lfe")
(defun send ()
(send '()))
(defun send (data)
(let* ((net-opts (kanin-uri:parse "amqp://localhost"))
(`#(ok ,connection) (kanin-conn:start net-opts))
(`#(ok ,channel) (kanin-conn:open-channel connection))
(queue-name "task-queue")
(routing-key "task-queue")
(exchange-name "")
(payload (make-message data))
(queue (make-queue.declare
queue (list_to_binary queue-name)
durable 'true))
(method (make-basic.publish
exchange (list_to_binary exchange-name)
routing_key (list_to_binary routing-key)))
(message (make-amqp_msg
props (make-P_basic delivery_mode 2)
payload (list_to_binary payload))))
(kanin-chan:call channel queue)
(kanin-chan:cast channel method message)
(io:format "[x] Sent message '~p'~n" `(,payload))
(kanin-chan:close channel)
(kanin-conn:close connection)))
(defun make-message
(('())
"Hello, world!")
((data)
data))
Our old kt-receiving.lfe
module also requires some changes: it needs to
fake a second of work for every dot in the message body. It will pop messages
from the queue and perform the task, so let's call it kt-worker.lfe
:
(defmodule kt-worker
(export all))
(include-lib "kanin/include/amqp-client.lfe")
(defun receive ()
(let* ((net-opts (kanin-uri:parse "amqp://localhost"))
(`#(ok ,connection) (kanin-conn:start net-opts))
(`#(ok ,channel) (kanin-conn:open-channel connection))
(queue-name "task-queue")
(queue (make-queue.declare
queue (list_to_binary queue-name)
durable 'true))
(qos (make-basic.qos prefetch_count 1))
(consumer (make-basic.consume
queue (list_to_binary queue-name)))
(subscriber (self)))
(kanin-chan:call channel queue)
(io:format "[*] Waiting for messages. To exit press CTRL+C~n")
(kanin-chan:call channel qos)
(kanin-chan:subscribe channel consumer subscriber)
(receive
((match-basic.consume_ok)
'ok))
(loop channel)))
(defun loop (channel)
(receive
((tuple (match-basic.deliver delivery_tag tag)
(match-amqp_msg payload body))
(io:format "[x] Received: ~p~n" `(,body))
(do-work body)
(io:format "[x] Done.~n")
(kanin-chan:cast channel (make-basic.ack delivery_tag tag))
(loop channel))))
(defun get-dot-count (data)
(length
(list-comp
((<- char (binary_to_list data)) (== char #\.))
char)))
(defun do-work (body)
(let ((dots (get-dot-count body)))
(receive
(after (* dots 1000)
'ok))))