馃И Mensajer铆a entre aplicaciones con RabbitMQ – Parte II.

En el art铆culo anterior se describi贸 la relevancia del paradigma de la mensajer铆a en el desarrollo de sistemas de software y el rol que un broker de mensajes, como RabbitMQ, desempe帽a.

Es momento de pasar a la implementaci贸n, para ello observaremos el c贸digo que se encuentra disponible en este repositorio de Github. Este utiliza la librer铆a amiquip que implementa AMQP 0.9.1.

Antes de comenzar

En el repositorio pueden encontrar un archivo docker-compose.yml que crear谩 un servicio de rabbitmq usando Docker, permiti茅ndo conectarse mediante amqp://localhost:15672. Tambi茅n pueden optar por crear una cuenta en CloudAMQP y hacer uso del servicio gratuito.

Una vez teniendo la direcci贸n de RabbitMQ pueden a帽adirla al proyecto mediante la variable de configuraci贸n amqp_url en el archivo Settings.toml que se guarda en el directorio principal del proyecto.

# Settings.toml
# Ejemplo de URL en cloudamqp
amqp_url = "amqps://usuario:contrase帽a@host.rmq.cloudamqp.com/imeplrkn"

Publicando y recibiendo el primer mensaje

El primer ejemplo [productor][consumidor] hace uso del exchange creado por defecto en RabbitMQ. Como se mencion贸 en el art铆culo anterior, a este exchange se conectan todas las colas de mensajes despu茅s de ser creadas y utiliza el nombre de la cola como clave de enrutamiento, lo cual a veces crea confusi贸n ya que se podr铆a pensar que no se necesita un exchange para enviar mensajes.

Primero debemos crear la cola para que el mensaje pueda llegar al consumidor. El c贸digo hace uso del canal de conexi贸n para crear la cola de mensajes queue.app.anon.

// default_exchange_consumer.rs

[...]
let queue_name = "queue.app.anon";

let queue = channel.queue_declare(queue_name, QueueDeclareOptions::default())?;

    let consumer = queue.consume(ConsumerOptions {
        no_ack: true,
        ..ConsumerOptions::default()
    })?;

    println!("Waiting for messages with routing key '{queue_name}'. Press Ctrl + C to quit");

    for (i, message) in consumer.receiver().iter().enumerate() {
        match message {
            ConsumerMessage::Delivery(message) => {
                let text = String::from_utf8_lossy(&message.body);
                println!("Got message {} - {:?}", i, text);
            },
            _ => {
                println!("Got a close connection message over the wire...");
                break;
            }
        }
    }
[...]

Para ejecutarlo desde la terminal utilizamos

$ cargo run --bin default_exchange_consumer

Para enviar los mensajes tambi茅n haremos uso directo de un canal de conexi贸n.

// default_exchange_producer.rs

[...]
let routing_key = "queue.app.anon";
channel.basic_publish(
        "",
        Publish::with_properties(
            b"Hello, world!",
            routing_key,
            AmqpProperties::default().with_delivery_mode(2),
        ),
    )?;
[...]
# en una nueva terminal

$ cargo run --bin default_exchange_producer

Al producir el mensaje podemos observar la informaci贸n correspondiente en la aplicaci贸n consumidor.

Un aspecto muy importante de este ejemplo es que el mensaje es publicado con delivery_mode 2, el cual le indica al broker que el mensaje es persistente, es decir, que debe ser guardado en disco una vez que ha llegado a la cola de mensajes.

Colas vol谩tiles (auto_delete)

La creaci贸n de colas de mensajes sigue las propiedades definidas mediante la estructura QueueDeclareOptions que por defecto indica que la cola de mensajes permanecer谩 en el broker a pesar de no tener consumidores, permitiendo as铆 almacenar mensajes hasta que estos sean descargados cuando un escucha se conecte (o cuando hagamos flush en la interfaz o cuando el servidor se reinicie o llegue a su tope de memoria).

Para crear una cola que sea desechada con el 煤ltimo consumidor debemos declararla como auto_delete=true [productor][consumidor].

// volatile_queue_consumer.rs

[...]
let queue = channel
  .queue_declare(
    queue_name,
    QueueDeclareOptions { auto_delete: true, ..QueueDeclareOptions::default()}
  )?;
[...]

Para observar el comportamiento ejecutamos el ejemplo volatile_queue_consumer.

Declarando un exchange directo

Ahora observaremos el funcionamiento de los exchanges, comenzando con uno de tipo directo [productor][consumidor]. Un exchange directo es aquel que entrega mensajes a las colas de mensajes utilizando la clave de enrutamiento con el que se publica el mensaje.

// direct_consumer.rs

[...]

let exchange_options = ExchangeDeclareOptions { durable: true, ..ExchangeDeclareOptions::default() };

let exchange = channel.exchange_declare(
        amiquip::ExchangeType::Direct,
        exchange_name,
        exchange_options,
    )?;
let queue = channel.queue_declare(queue_name, QueueDeclareOptions::default())?;
queue.bind(&exchange, routing_key, FieldTable::new())?;

[...]

En el productor declararemos el mismo exchange de forma pasiva, indicando as铆 que queremos que nuestor mensaje sea entregado s贸lo si hay alguien del lado del consumidor, con una cola de mensajes ya declarada.

// direct_producer.rs

[...]
let exchange = channel.exchange_declare_passive(exchange_name)?;

println!("Publishing message");
exchange.publish(Publish::new("Hello!".as_bytes(), routing_key))?;
[...]

El uso de exchanges directos permite completar tareas enviadas a un consumidor en concreto que ser谩 determinado por la clave de enrutamiento que escucha. Si m谩s de un consumidor se conecta a un exchange directo con la misma cola los mensajes ser谩n divididos entre cada uno de los consumidores utilizando una estrategia round-robin.

Declarado un exchange fanout

Un exchange fanout entrega los mensajes recibidos a todas las colas que se encuentren conectadas, sin importar la clave de enrutamiento [productor][consumidor].

Declararlo con amiquip es bastante sencillo.

// fanout_consumer.rs

[...]
let exchange = channel.exchange_declare(
        ExchangeType::Fanout,
        exchange_name,
        ExchangeDeclareOptions::default(),
    )?;
let queue = channel.queue_declare(queue_name, QueueDeclareOptions::default())?;
[...]

He incluido tambi茅n un ejemplo donde se declara una cola de mensajes an贸nima, es decir, que el broker decide su nombre. Si ejecutamos este ejemplo podremos observar que esta cola tambi茅n recibe los mismos mensajes que la cola en el ejemplo anterior.

Declarando un exchange de t贸picos

Finalmente observamos un exchange de t贸picos, que entrega mensajes dependiendo de expresiones o patrones en las colas de enrutamiento [productor][consumidor de todo los mensajes][consumidor de mensajes de usuario].

En los ejemplos hay dos consumidores, el primero se encarga de escuchar a todos los mensajes relacionados con el funcionamiento de la aplicaci贸n, usando la expresi贸n app.#. El otro consumidor est谩 interesado solamente en los mensajes de la aplicaci贸n que tienen que ver con el usuario, utilizando al expresi贸n app.user.*.

El operador # sustituye cero o m谩s palabras, mientras que el operador * sustituye exactamente a una palabra.

// topic_consumer_allapp.rs

[...]
let topic = "app.#";
let exchange = channel.exchange_declare(
        ExchangeType::Topic,
        exchange_name,
        amiquip::ExchangeDeclareOptions::default(),
    )?;

let queue = channel.queue_declare("", amiquip::QueueDeclareOptions::default())?;
queue.bind(&exchange, topic, FieldTable::default())?;
[...]


// topic_consumer.rs
[...]
let topic = "app.user.*";
queue.bind(&exchange, topic, FieldTable::default())?;
[...]

El productor env铆a todo tipo de mensajes

// topic_producer.rs

[...]
println!("Publish a delete message");
let message = r#"{"message": "User deleted"}"#.as_bytes();
exchange.publish(Publish::new(message, "app.user.deleted"))?;

println!("Publish a create message");
let message = r#"{"message": "User create"}"#.as_bytes();
exchange.publish(Publish::new(message, "app.user.created"))?;

// For all app events Q. See: topic_consumer_allapp
println!("Publish a content created message");
let message = r#"{"content": "Hello, world!", "user": "a"}"#.as_bytes();
exchange.publish(Publish::new(message, "app.content.created"))?;

Al observar los resultados de la ejecuci贸n de los consumidores tenemos el siguiente comportamiento:

$ cargo run --bin topic_consumer_allapp

Listening to messages on Q amq.gen-aLLieTiv5_b4QAOICiOgWw for topic app.#
Received message 0: {"message": "User deleted"}
Received message 1: {"message": "User create"}
Received message 2: {"content": "Hello, world!", "user": "a"}
$ cargo run --bin topic_consumer

Listening to messages on Q amq.gen-s_Y4oS6nrX22iRycRqOlWg for topic app.user.*
Received message 0: {"message": "User deleted"}
Received message 1: {"message": "User create"}

En estos ejemplos hemos observado el comportamiento b谩sico de exchanges y colas de mensajes en RabbitMQ.

En art铆culos futuros utilizaremos estos conceptos para implementar patrones de implementaci贸n de sistemas basados en eventos.

Write a comment