Subscribe to RabbitMQ - Sample Code
Every queue within Rabbit MQ for customers requires a subscription over the customer's end to monitor the messages that get notified over the queue. Here is a simple Java-based Rabbit MQ subscription source code showing how simple it is to subscribe over a certain queue. Please refer Rabbit MQ Publish/Subscribe for additional details.
Java Rabbit MQ Subscription
Here is a sample code that when executed, pulls the available messages in the queue. Comments are added for code explanation.
package com.essdocs.sample;
// Rabbit MQ Client Java based class
import com.rabbitmq.client.*;
import java.io.IOException;
public class AmqReceiver {
// Setting up primary Rabbit MQ based details including Host Name, Virtual Host, Username, Password, Port and Queue name
private final static String HOST_NAME = "${HOST_NAME}";
private final static String VIRTUAL_HOST = "${VIRTUAL_HOST_UUID}";
private final static String AMQ_USERNAME = "${USERNAME}";
private final static String AMQ_PASSWORD = "${PASSWORD}";
private final static int AMQ_PORT = ${HOST_NAME};//Usually 443 if SLL is used
private final static String QUEUE_NAME = "${QUEUE_NAME}";
public static void main(String[] args) {
// Calling Recieve Message method with queue name as an argument
receiveMessage(QUEUE_NAME);
}
public static void receiveMessage(String queueName) {
// Creating ConnectionFactory and setting up Queue parameters within it
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(HOST_NAME);
factory.setVirtualHost(VIRTUAL_HOST);
factory.setPort(AMQ_PORT);
factory.setUsername(AMQ_USERNAME);
factory.setPassword(AMQ_PASSWORD);
try {
// Indicates factory to be using SSL for secure data transmission
factory.useSslProtocol();
}
catch (Exception e) {
System.out.println("AWS MQ SSL connection error " + e.getMessage());
}
try {
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// Setting up queue object to the Channel Factory
channel.queueDeclare(queueName, false, false, false, null);
// AutoAck if set to false, will note remove the message from the queue after viewing
boolean autoAck = false;
// Retrieve MQ messages
channel.basicConsume(queueName, autoAck, "myConsumerTag",
new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException {
long deliveryTag = envelope.getDeliveryTag();
// Printing queue message over the console
System.out.println("AWS MQ Received Message Body: " + new String(body));
// (process the message components here ...)
channel.basicAck(deliveryTag, false);
}
});
channel.close();
connection.close();
}
catch (Exception e) {
System.out.println("Error when receiving the message : " + e.getMessage());
}
}
}
Successful execution of the above code snippet fetches the available messages from the queue and will display the messages in the queue. Here is what the output looks like.
Additional Samples
Please refer https://www.rabbitmq.com/tutorials/tutorial-three-python.html for additional sample codes to subscribe to messages from the designated Rabbit MQs.
Updated over 1 year ago