🧪 Mensajería entre aplicaciones con RabbitMQ - Parte II.

En el artículo anterior se describió la relevancia del paradigma de la mensajería en el desarrollo de sistemas de software y el rol que un broker de mensajes, como RabbitMQ, desempeña.

Es momento de pasar a la implementación, para ello observaremos el código que se encuentra disponible en este repositorio de Github. Este utiliza la librería amiquip que implementa AMQP 0.9.1.

Antes de comenzar

En el repositorio pueden encontrar un archivo docker-compose.yml que creará un servicio de rabbitmq usando Docker, permitiéndo conectarse mediante amqp://localhost:15672. También pueden optar por crear una cuenta en CloudAMQP y hacer uso del servicio gratuito.

Una vez teniendo la dirección de RabbitMQ pueden añadirla al proyecto mediante la variable de configuración amqp_url en el archivo Settings.toml que se guarda en el directorio principal del proyecto.

# Settings.toml
# Ejemplo de URL en cloudamqp
amqp_url = "amqps://usuario:contraseña@host.rmq.cloudamqp.com/imeplrkn"

Publicando y recibiendo el primer mensaje

El primer ejemplo [productor][consumidor] hace uso del exchange creado por defecto en RabbitMQ. Como se mencionó en el artículo anterior, a este exchange se conectan todas las colas de mensajes después de ser creadas y utiliza el nombre de la cola como clave de enrutamiento, lo cual a veces crea confusión ya que se podría pensar que no se necesita un exchange para enviar mensajes.

Primero debemos crear la cola para que el mensaje pueda llegar al consumidor. El código hace uso del canal de conexión para crear la cola de mensajes__queue.app.anon.

// default_exchange_consumer.rs

[...]
let queue_name = "queue.app.anon";

let queue = channel.queue_declare(queue_name, QueueDeclareOptions::default())?;

    let consumer = queue.consume(ConsumerOptions {
        no_ack: true,
        ..ConsumerOptions::default()
    })?;

    println!("Waiting for messages with routing key '{queue_name}'. Press Ctrl + C to quit");

    for (i, message) in consumer.receiver().iter().enumerate() {
        match message {
            ConsumerMessage::Delivery(message) => {
                let text = String::from_utf8_lossy(&message.body);
                println!("Got message {} - {:?}", i, text);
            },
            _ => {
                println!("Got a close connection message over the wire...");
                break;
            }
        }
    }
[...]

Para ejecutarlo desde la terminal utilizamos

$ cargo run --bin default_exchange_consumer 

Para enviar los mensajes también haremos uso directo de un canal de conexión.

// default_exchange_producer.rs

[...]
let routing_key = "queue.app.anon";
channel.basic_publish(
        "",
        Publish::with_properties(
            b"Hello, world!",
            routing_key,
            AmqpProperties::default().with_delivery_mode(2),
        ),
    )?;
[...]
# en una nueva terminal
$ cargo run --bin default_exchange_producer

Al producir el mensaje podemos observar la información correspondiente en la aplicación consumidor.

Un aspecto muy importante de este ejemplo es que el mensaje es publicado con delivery_mode 2, el cual le indica al broker que el mensaje es persistente, es decir, que debe ser guardado en disco una vez que ha llegado a la cola de mensajes.

Colas volátiles (auto_delete)

La creación de colas de mensajes sigue las propiedades definidas mediante la estructura QueueDeclareOptions que por defecto indica que la cola de mensajes permanecerá en el broker a pesar de no tener consumidores, permitiendo así almacenar mensajes hasta que estos sean descargados cuando un escucha se conecte (o cuando hagamos flush en la interfaz o cuando el servidor se reinicie o llegue a su tope de memoria).

Para crear una cola que sea desechada con el último consumidor debemos declararla como auto_delete=true [productor][consumidor].

// volatile_queue_consumer.rs

[...]
let queue = channel
  .queue_declare(
    queue_name,
    QueueDeclareOptions { auto_delete: true, ..QueueDeclareOptions::default()}
  )?;
[...]

Para observar el comportamiento ejecutamos el ejemplo volatile_queue_consumer.

Declarando un exchange directo

Ahora observaremos el funcionamiento de los exchanges, comenzando con uno de tipo directo [productor][consumidor]. Un exchange__directo es aquel que entrega mensajes a las colas de mensajes utilizando la clave de enrutamiento con el que se publica el mensaje.

// direct_consumer.rs

[...]

let exchange_options = ExchangeDeclareOptions { durable: true, ..ExchangeDeclareOptions::default() };

let exchange = channel.exchange_declare(
        amiquip::ExchangeType::Direct,
        exchange_name,
        exchange_options,
    )?;
let queue = channel.queue_declare(queue_name, QueueDeclareOptions::default())?;
queue.bind(&exchange, routing_key, FieldTable::new())?;

[...]

En el productor declararemos el mismo exchange de forma pasiva, indicando así que queremos que nuestor mensaje sea entregado sólo si hay alguien del lado del consumidor, con una cola de mensajes ya declarada.

// direct_producer.rs

[...]
let exchange = channel.exchange_declare_passive(exchange_name)?;

println!("Publishing message");
exchange.publish(Publish::new("Hello!".as_bytes(), routing_key))?;
[...]

El uso de exchanges directos permite completar tareas enviadas a un consumidor en concreto que será determinado por la clave de enrutamiento que escucha. Si más de un consumidor se conecta a un exchange directo con la misma cola los mensajes serán divididos entre cada uno de los consumidores utilizando una estrategia round-robin.

Declarado un exchange fanout

Un exchange fanout entrega los mensajes recibidos a todas las colas que se encuentren conectadas, sin importar la clave de enrutamiento [productor][consumidor].

Declararlo con amiquip es bastante sencillo.

// fanout_consumer.rs

[...]
let exchange = channel.exchange_declare(
        ExchangeType::Fanout,
        exchange_name,
        ExchangeDeclareOptions::default(),
    )?;
let queue = channel.queue_declare(queue_name, QueueDeclareOptions::default())?;
[...]

He incluido también un ejemplo donde se declara una cola de mensajes anónima, es decir, que el broker decide su nombre. Si ejecutamos este ejemplo podremos observar que esta cola también recibe los mismos mensajes que la cola en el ejemplo anterior.

Declarando un exchange de tópicos

Finalmente observamos un exchange de tópicos, que entrega mensajes dependiendo de expresiones o patrones en las colas de enrutamiento [productor][consumidor de todo los mensajes][consumidor de mensajes de usuario].

En los ejemplos hay dos consumidores, el primero se encarga de escuchar a todos los mensajes relacionados con el funcionamiento de la aplicación, usando la expresión app.#. El otro consumidor está interesado solamente en los mensajes de la aplicación que tienen que ver con el usuario, utilizando al expresión app.user.*.

El operador # sustituye cero o más palabras, mientras que el operador * sustituye exactamente a una palabra.

// topic_consumer_allapp.rs

[...]
let topic = "app.#";
let exchange = channel.exchange_declare(
        ExchangeType::Topic,
        exchange_name,
        amiquip::ExchangeDeclareOptions::default(),
    )?;

let queue = channel.queue_declare("", amiquip::QueueDeclareOptions::default())?;
queue.bind(&exchange, topic, FieldTable::default())?;
[...]


// topic_consumer.rs
[...]
let topic = "app.user.*";
queue.bind(&exchange, topic, FieldTable::default())?;
[...]

El productor envía todo tipo de mensajes

// topic_producer.rs

[...]
println!("Publish a delete message");
let message = r#"{"message": "User deleted"}"#.as_bytes();
exchange.publish(Publish::new(message, "app.user.deleted"))?;

println!("Publish a create message");
let message = r#"{"message": "User create"}"#.as_bytes();
exchange.publish(Publish::new(message, "app.user.created"))?;

// For all app events Q. See: topic_consumer_allapp
println!("Publish a content created message");
let message = r#"{"content": "Hello, world!", "user": "a"}"#.as_bytes();
exchange.publish(Publish::new(message, "app.content.created"))?;

Al observar los resultados de la ejecución de los consumidores tenemos el siguiente comportamiento:

$ cargo run --bin topic_consumer_allapp

Listening to messages on Q amq.gen-aLLieTiv5_b4QAOICiOgWw for topic app.#
Received message 0: {"message": "User deleted"}
Received message 1: {"message": "User create"}
Received message 2: {"content": "Hello, world!", "user": "a"}
$ cargo run --bin topic_consumer

Listening to messages on Q amq.gen-s_Y4oS6nrX22iRycRqOlWg for topic app.user.*
Received message 0: {"message": "User deleted"}
Received message 1: {"message": "User create"}

En estos ejemplos hemos observado el comportamiento básico de exchanges y colas de mensajes en RabbitMQ.

En artículos futuros utilizaremos estos conceptos para implementar patrones de implementación de sistemas basados en eventos.