Trabajando con RabbitMQ

En esta página aprenderemos cómo instalar RabbitMQ y cómo usarlo para comunicación entre procesos.

¿Qué es RabbitMQ?

RabbitMQ es un "Servidor de cola de mensajes", que permite comunicarse entre sí a dos o más procesos, que pueden estar en la misma máquina o distinta. Está escrito en el lenguaje de programación Erlang, el cual fue diseñado para operaciones multitarea.

Cómo instalar RabbitMQ

Para tener RabbitMQ andando, se necesitan los siguientes pasos. Partimos de un CentOS 7 básico y a continuación hacemos lo siguiente:

Dependencias de RabbitMQ

Primero de todo tenemos que instalar las dependencias:

  • epel-release
  • Repositorios erlang
  • Erlang

Para eso aplicar los siguientes comandos, como usuario root:

yum install wget
yum install epel-release
wget https://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm
rpm -Uvh erlang-solutions-1.0-1.noarch.rpm
yum install erlang

Instalación de RabbitMQ

Resuelto lo de las dependencias, pasamos a instalar el RabbitMQ:

rpm --import https://packagecloud.io/rabbitmq/rabbitmq-server/gpgkey
rpm --import https://github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc

Con eso importamos las claves que se necesitan para validar la integridad de los paquetes de software. Ahora tenemos que agregar un repositorio yum para bajar los paquetes.

En el directorio /etc/yum.repos.d, hay que agregar un archivo llamado bintray-rabbitmq-server.repo con el siguiente contenido:

[bintray-rabbitmq-server]
name=bintray-rabbitmq-rpm
baseurl=https://dl.bintray.com/rabbitmq/rpm/rabbitmq-server/v3.7.x/el/7/
gpgcheck=0
repo_gpgcheck=0
enabled=1

Luego instalar rabbitmq:

yum install rabbitmq-server

Ahora, activar RabbitMQ para que se inicie junto con el sistema:

chkconfig rabbitmq on

Bien, a partir de aquí tenemos instalado el RabbitMQ.

Un ejemplo

Vamos a hacer un ejemplo de RabbitMQ en Java, para comunicar dos procesos de manera asíncrona mediante el patrón "Publish-subscribe". Así, vamos a establecer un emisor de mensajes, y un consumidor. Ambos procesos se van a comunicar de manera asíncrona, el mensaje va a ser emitido por el emisor, y se llamará a un método en el Consumidor cuando haya un mensaje para que este consuma.

package rabbitmqdemo;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import org.apache.commons.lang3.SerializationUtils;
import org.junit.Test;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.AMQP.BasicProperties;

public class TestAsyncQueue 
{
	@Test
	public void testAsyncMessage()
	{
		AsyncMessageConsumer consumer;
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("localhost");
		try (Connection connection = factory.newConnection(); 
			Channel channel = connection.createChannel()) 
		{
			consumer=new AsyncMessageConsumer(channel);
			channel.queueDeclare("AsyncMQ", false, false, false, null);
			MyMessage m=new MyMessage();
			m.setMessage("HelloAsync");
			channel.basicConsume("AsyncMQ", false,"ConsumerTag",consumer);
			channel.basicPublish("", "AsyncMQ", new BasicProperties.Builder().type("MyMessage").build(), SerializationUtils.serialize(m));
			System.out.println(" [x] Sent '" + m.getMessage() + "'");
			m.setMessage("AnotherHelloAsync");
			channel.basicPublish("", "AsyncMQ", new BasicProperties.Builder().type("MyMessage").build(), SerializationUtils.serialize(m));
			System.out.println(" [y] Sent '"+m.getMessage()+"'");
			Thread.sleep(3L * 1000L);
		}
	 catch (IOException e) {
		e.printStackTrace();
	} catch (TimeoutException e1) {
		// TODO Auto-generated catch block
		e1.printStackTrace();
	} catch (InterruptedException e) {
		// TODO Auto-generated catch block
		e.printStackTrace();
	}
	}
}

Veamos qué hace este código:

		AsyncMessageConsumer consumer;
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("localhost");
		try (Connection connection = factory.newConnection(); 
			Channel channel = connection.createChannel()) 
		{

Aquí obtenemos acceso al servidor RabbitMQ instalado en “localhost”. Si el servidor se encuentra en otra máquina, ahí es el lugar para cambiarlo.

AsyncMessageConsumer es una clase que ustedes crean, que tiene que extender la clase DefaultConsumer, que representa un consumidor de mensajes. Más abajo vamos a poner el código de esta función.

El concepto central detrás de RabbitMQ es el Channel: los procesos emisor y consumidor se tienen que suscribir ambos a un canal. Dentro de ese canal, tenemos que declarar una cola de mensajes. Eso es lo que hacemos a continuación:

			consumer=new AsyncMessageConsumer(channel);
			channel.queueDeclare("AsyncMQ", false, false, false, null);

La declaración de una cola es “idempotente”, o sea que si la cola ya existe, esa llamada no hace nada.

Ahora, creo un mensaje cualquiera y le asigno un contenido. Luego, indico al RabbitMQ quién va a ser el consumidor del mensaje, pasándole el objeto y un ConsumerTag, que va a usar para diferenciar entre distintos consumidores. Una cola de mensajes puede ser usada por distintos consumidores, los cuales consumirán mensajes solamente dirigidos a ellos, lo cual se especifica con distintos Tags.

			MyMessage m=new MyMessage();
			m.setMessage("HelloAsync");
			channel.basicConsume("AsyncMQ", false,"ConsumerTag",consumer);

Bien, Ahora enviamos 2 mensajes. El sistema se ocupa del resto:

			channel.basicPublish("", "AsyncMQ", new BasicProperties.Builder().type("MyMessage").build(), SerializationUtils.serialize(m));
			System.out.println(" [x] Sent '" + m.getMessage() + "'");
			m.setMessage("AnotherHelloAsync");
			channel.basicPublish("", "AsyncMQ", new BasicProperties.Builder().type("MyMessage").build(), SerializationUtils.serialize(m));
			System.out.println(" [y] Sent '"+m.getMessage()+"'");
			Thread.sleep(3L * 1000L);

Noten lo siguiente: El RabbitMQ manda “Streams”, o sea, flujos de bytes, y nosotros tenemos que proveer una forma de “Serializar” el objeto y convertirlo en un array de Bytes para que sea enviado. Para eso, uso SerializationUtils, que es parte de Apache Commons. Esa dependencia se especifica en el pom.xml

Ahora vamos a poner el contenido de la clase AsyncMessageConsumer:

package rabbitmqdemo;

import java.io.IOException;

import org.apache.commons.lang3.SerializationUtils;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class AsyncMessageConsumer extends DefaultConsumer
{
	private Channel channel;
	public AsyncMessageConsumer(Channel channel) 
	{
		super(channel);
		this.channel=channel;
	}
	public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException
	{
		long deliveryTag = envelope.getDeliveryTag();
		// (process the message components here ...)
		
		String type=properties.getType();
		if(type.equals("MyMessage"))
		{
			MyMessage message=(MyMessage)SerializationUtils.deserialize(body);
			System.out.println("Message received asynchronously: "+message.getMessage());
		}

		channel.basicAck(deliveryTag, false);
	}
}

Cuando el emisor del mensaje invoca el método basicPublish del canal, el RabbitMQ invoca el método handleDelivery del Consumidor. El consumidor recibe el mensaje y hace la operación que necesita con él, ante todo, debe des-serializarlo y volverlo a convertir en un objeto. Ahora, una vez recibido el mensaje, y procesado, el Consumidor debe indicarle al RabbitMQ que el mensaje ha sido consumido apropiadamente. Esto indicará al RabbitMQ el éxito de la operación, que será informado al Emisor. Se podrán definir políticas de reenvío de mensajes, en caso de que el mensaje no haya sido aceptado.

Aquí ponemos el pom.xml que usé para el proyecto:

<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>rabbitmq-test</groupId>
	<artifactId>rabbitmq-test</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<build>
		<plugins>
			<plugin>
				<artifactId>maven-compiler-plugin</artifactId>
				<version>3.8.0</version>
				<configuration>
					<source>1.8</source>
					<target>1.8</target>
				</configuration>
			</plugin>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-eclipse-plugin</artifactId>
				<version>2.10</version>
				<configuration>
					<downloadSources>true</downloadSources>
					<downloadJavadocs>true</downloadJavadocs>
				</configuration>
			</plugin>

		</plugins>
	</build>
	<dependencies>
		<dependency>
			<groupId>com.rabbitmq</groupId>
			<artifactId>amqp-client</artifactId>
			<version>5.7.1</version>
		</dependency>
		<dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>slf4j-log4j12</artifactId>
			<version>1.6.1</version>
		</dependency>
		<dependency>
			<groupId>log4j</groupId>
			<artifactId>log4j</artifactId>
			<version>1.2.17</version>
			<exclusions>
				<exclusion>
					<groupId>javax.mail</groupId>
					<artifactId>mail</artifactId>
				</exclusion>
				<exclusion>
					<groupId>javax.jms</groupId>
					<artifactId>jms</artifactId>
				</exclusion>
				<exclusion>
					<groupId>com.sun.jdmk</groupId>
					<artifactId>jmxtools</artifactId>
				</exclusion>
				<exclusion>
					<groupId>com.sun.jmx</groupId>
					<artifactId>jmxri</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
		<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
		<dependency>
			<groupId>org.apache.commons</groupId>
			<artifactId>commons-lang3</artifactId>
			<version>3.9</version>
		</dependency>
	</dependencies>
</project>

Ejemplo de ejecución del código

Esto es lo que se muestra en pantalla cuando ejecuto mi código:

 [x] Sent 'HelloAsync'
 [y] Sent 'AnotherHelloAsync'
Message received asynchronously: HelloAsync
Message received asynchronously: AnotherHelloAsync

Se puede observar, que el basicConsume se realizó una vez sola; una vez suscripto el consumidor al canal, este puede recibir tantos mensajes como se quiera. En este caso, envié 2, uno tras otro, y el método handleDelivery del consumidor fue invocado 2 veces.