RabbitMQ Tutorials for LFE

Sending

Our first module, kt-sending.lfe, will send a single message to the queue. The first things we need to do is to establish a TCP connection with the RabbitMQ server. With that in place, we will be able to create an AMQP channel. Remember:

  1. The connections takes care of protocol version negotiation and authentication and so on for us.
  2. The channel is like a virtual connection inside the TCP connection, and it is over this that AMQP commands are issued.

This will be done with the following two functions:

(kanin-conn:start ...)
(kanin-conn:open-channel ...)

Before sending andy messages, we will need to make sure that the recipient queue exists. If we send a message to non-existing location, RabbitMQ will just trash the message. We will create the channal by sending a queue.declare record on our channel with (kanin-chan:call ...).

To send the message to the queue, we'll need to tell RabbitMQ the following:

  • the exchange we want to use,
  • the routing key to use, and then
  • the actual data

The first two will be used to create the "method", a basic.publish record. The "message", an amqp_msg record, will hold the last bullet item. The method and the message will be send with (kanin-chan:cast ...).

Finally, the channel and connection will be closed down with the following calls:

(kanin-chan:close channel)
(kanin-conn:close connection)

The complete code for the module that encapsulates this logic is below:

(defmodule kt-sending
  (export all))

(include-lib "kanin/include/amqp-client.lfe")

(defun send ()
  (let* ((net-opts (make-amqp_params_network host "localhost"))
           ;; create the connection and channel we'll use for sending
         (`#(ok ,connection) (kanin-conn:start net-opts))
         (`#(ok ,channel) (kanin-conn:open-channel connection))
         ;; declare the names we will use
         (queue-name "hello")
         (routing-key "hello")
         (exchange-name "")
         (payload "Hello, world!")
         ;; then create the needed records
         (queue (make-queue.declare
                  queue (list_to_binary queue-name)))
         (method (make-basic.publish
                   exchange (list_to_binary exchange-name)
                   routing_key (list_to_binary routing-key)))
         (message (make-amqp_msg
                    payload (list_to_binary payload))))
    ;; perform the actual send
    (kanin-chan:call channel queue)
    (kanin-chan:cast channel method message)
    (io:format "[x] Sent message '~p'~n" `(,payload))
    ;; clean up
    (kanin-chan:close channel)
    (kanin-conn:close connection)))

Start up the REPL:

$ make repl-no-deps
Starting an LFE REPL ...
Erlang/OTP 17 [erts-6.2] [source] [64-bit] [smp:4:4] [async-threads:10] ...

LFE Shell V6.2 (abort with ^G)

Now you should be able to call your send function successfully:

> (kt-sending:send)
[x] Sent message '"Hello, world!"'
ok
>

That's it for our sender, but go ahead and keep a terminal window open with this REPL running. We'll use it shortly.