En esta página aprenderemos cómo instalar RabbitMQ y cómo usarlo para comunicación entre procesos.
¿Qué es RabbitMQ?
RabbitMQ es un "Servidor de cola de mensajes", que permite comunicarse entre sí a dos o más procesos, que pueden estar en la misma máquina o distinta. Está escrito en el lenguaje de programación Erlang, el cual fue diseñado para operaciones multitarea.
Cómo instalar RabbitMQ
Para tener RabbitMQ andando, se necesitan los siguientes pasos. Partimos de un CentOS 7 básico y a continuación hacemos lo siguiente:
Dependencias de RabbitMQ
Primero de todo tenemos que instalar las dependencias:
- epel-release
- Repositorios erlang
- Erlang
Para eso aplicar los siguientes comandos, como usuario root:
yum install wget yum install epel-release wget https://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm rpm -Uvh erlang-solutions-1.0-1.noarch.rpm yum install erlang
Instalación de RabbitMQ
Resuelto lo de las dependencias, pasamos a instalar el RabbitMQ:
rpm --import https://packagecloud.io/rabbitmq/rabbitmq-server/gpgkey rpm --import https://github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc
Con eso importamos las claves que se necesitan para validar la integridad de los paquetes de software. Ahora tenemos que agregar un repositorio yum para bajar los paquetes.
En el directorio /etc/yum.repos.d, hay que agregar un archivo llamado bintray-rabbitmq-server.repo con el siguiente contenido:
[bintray-rabbitmq-server] name=bintray-rabbitmq-rpm baseurl=https://dl.bintray.com/rabbitmq/rpm/rabbitmq-server/v3.7.x/el/7/ gpgcheck=0 repo_gpgcheck=0 enabled=1
Luego instalar rabbitmq:
yum install rabbitmq-server
Ahora, activar RabbitMQ para que se inicie junto con el sistema:
chkconfig rabbitmq on
Bien, a partir de aquí tenemos instalado el RabbitMQ.
Un ejemplo
Vamos a hacer un ejemplo de RabbitMQ en Java, para comunicar dos procesos de manera asíncrona mediante el patrón "Publish-subscribe". Así, vamos a establecer un emisor de mensajes, y un consumidor. Ambos procesos se van a comunicar de manera asíncrona, el mensaje va a ser emitido por el emisor, y se llamará a un método en el Consumidor cuando haya un mensaje para que este consuma.
package rabbitmqdemo; import java.io.IOException; import java.util.concurrent.TimeoutException; import org.apache.commons.lang3.SerializationUtils; import org.junit.Test; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.AMQP.BasicProperties; public class TestAsyncQueue { @Test public void testAsyncMessage() { AsyncMessageConsumer consumer; ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { consumer=new AsyncMessageConsumer(channel); channel.queueDeclare("AsyncMQ", false, false, false, null); MyMessage m=new MyMessage(); m.setMessage("HelloAsync"); channel.basicConsume("AsyncMQ", false,"ConsumerTag",consumer); channel.basicPublish("", "AsyncMQ", new BasicProperties.Builder().type("MyMessage").build(), SerializationUtils.serialize(m)); System.out.println(" [x] Sent '" + m.getMessage() + "'"); m.setMessage("AnotherHelloAsync"); channel.basicPublish("", "AsyncMQ", new BasicProperties.Builder().type("MyMessage").build(), SerializationUtils.serialize(m)); System.out.println(" [y] Sent '"+m.getMessage()+"'"); Thread.sleep(3L * 1000L); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
Veamos qué hace este código:
AsyncMessageConsumer consumer; ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
Aquí obtenemos acceso al servidor RabbitMQ instalado en “localhost”. Si el servidor se encuentra en otra máquina, ahí es el lugar para cambiarlo.
AsyncMessageConsumer es una clase que ustedes crean, que tiene que extender la clase DefaultConsumer, que representa un consumidor de mensajes. Más abajo vamos a poner el código de esta función.
El concepto central detrás de RabbitMQ es el Channel: los procesos emisor y consumidor se tienen que suscribir ambos a un canal. Dentro de ese canal, tenemos que declarar una cola de mensajes. Eso es lo que hacemos a continuación:
consumer=new AsyncMessageConsumer(channel); channel.queueDeclare("AsyncMQ", false, false, false, null);
La declaración de una cola es “idempotente”, o sea que si la cola ya existe, esa llamada no hace nada.
Ahora, creo un mensaje cualquiera y le asigno un contenido. Luego, indico al RabbitMQ quién va a ser el consumidor del mensaje, pasándole el objeto y un ConsumerTag, que va a usar para diferenciar entre distintos consumidores. Una cola de mensajes puede ser usada por distintos consumidores, los cuales consumirán mensajes solamente dirigidos a ellos, lo cual se especifica con distintos Tags.
MyMessage m=new MyMessage(); m.setMessage("HelloAsync"); channel.basicConsume("AsyncMQ", false,"ConsumerTag",consumer);
Bien, Ahora enviamos 2 mensajes. El sistema se ocupa del resto:
channel.basicPublish("", "AsyncMQ", new BasicProperties.Builder().type("MyMessage").build(), SerializationUtils.serialize(m)); System.out.println(" [x] Sent '" + m.getMessage() + "'"); m.setMessage("AnotherHelloAsync"); channel.basicPublish("", "AsyncMQ", new BasicProperties.Builder().type("MyMessage").build(), SerializationUtils.serialize(m)); System.out.println(" [y] Sent '"+m.getMessage()+"'"); Thread.sleep(3L * 1000L);
Noten lo siguiente: El RabbitMQ manda “Streams”, o sea, flujos de bytes, y nosotros tenemos que proveer una forma de “Serializar” el objeto y convertirlo en un array de Bytes para que sea enviado. Para eso, uso SerializationUtils, que es parte de Apache Commons. Esa dependencia se especifica en el pom.xml
Ahora vamos a poner el contenido de la clase AsyncMessageConsumer:
package rabbitmqdemo; import java.io.IOException; import org.apache.commons.lang3.SerializationUtils; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class AsyncMessageConsumer extends DefaultConsumer { private Channel channel; public AsyncMessageConsumer(Channel channel) { super(channel); this.channel=channel; } public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException { long deliveryTag = envelope.getDeliveryTag(); // (process the message components here ...) String type=properties.getType(); if(type.equals("MyMessage")) { MyMessage message=(MyMessage)SerializationUtils.deserialize(body); System.out.println("Message received asynchronously: "+message.getMessage()); } channel.basicAck(deliveryTag, false); } }
Cuando el emisor del mensaje invoca el método basicPublish del canal, el RabbitMQ invoca el método handleDelivery del Consumidor. El consumidor recibe el mensaje y hace la operación que necesita con él, ante todo, debe des-serializarlo y volverlo a convertir en un objeto. Ahora, una vez recibido el mensaje, y procesado, el Consumidor debe indicarle al RabbitMQ que el mensaje ha sido consumido apropiadamente. Esto indicará al RabbitMQ el éxito de la operación, que será informado al Emisor. Se podrán definir políticas de reenvío de mensajes, en caso de que el mensaje no haya sido aceptado.
Aquí ponemos el pom.xml que usé para el proyecto:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>rabbitmq-test</groupId> <artifactId>rabbitmq-test</artifactId> <version>0.0.1-SNAPSHOT</version> <build> <plugins> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.0</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-eclipse-plugin</artifactId> <version>2.10</version> <configuration> <downloadSources>true</downloadSources> <downloadJavadocs>true</downloadJavadocs> </configuration> </plugin> </plugins> </build> <dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.7.1</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.6.1</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> <exclusions> <exclusion> <groupId>javax.mail</groupId> <artifactId>mail</artifactId> </exclusion> <exclusion> <groupId>javax.jms</groupId> <artifactId>jms</artifactId> </exclusion> <exclusion> <groupId>com.sun.jdmk</groupId> <artifactId>jmxtools</artifactId> </exclusion> <exclusion> <groupId>com.sun.jmx</groupId> <artifactId>jmxri</artifactId> </exclusion> </exclusions> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 --> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.9</version> </dependency> </dependencies> </project>
Ejemplo de ejecución del código
Esto es lo que se muestra en pantalla cuando ejecuto mi código:
[x] Sent 'HelloAsync' [y] Sent 'AnotherHelloAsync' Message received asynchronously: HelloAsync Message received asynchronously: AnotherHelloAsync
Se puede observar, que el basicConsume se realizó una vez sola; una vez suscripto el consumidor al canal, este puede recibir tantos mensajes como se quiera. En este caso, envié 2, uno tras otro, y el método handleDelivery del consumidor fue invocado 2 veces.