Message Queue Service has become a popular choice for data communication between services. As the name suggests, it uses a queue for messaging. A Message is published by a publisher and it can be consumed by one or more consumers.
There are some advantages of using Message Queue. Data will be stored persistently until being consumed and acknowledged, which reduces the chance of data loss when your system is down. In addition, it allows asynchronus communication between the producer and the consumer because the producer does not need to wait the consumer process the message. Scalability is another reason why we may need this kind of service. When workload increases, we can add some instances of producers or consumers that communicate using the same queue.
If you need to use a cloud message queue service, one of the alternatives is Alibaba Cloud Message Service. In this tutorial, I'm going to show you how to setup Alibaba Message Service and use it in Java application.
1. Register for an Alibaba Cloud Account
The first thing you need to do is registering for an Alibaba Account. Open Alibaba Cloud registration page. First, you need to enter basic account information which includes country, email and a new password. Then a confirmation code will be sent to your email. Check your email and paste the code on the provided box.
Once you've successfully confirmed your email, you'll be asked to enter billing information and add payment method. After that, you can create a new instance. If you get a free trial, you can use it.
2. Create AccessKey
Your application need to get authenticated for accessing Alibaba service. As for authentiation, it requires client id along with valid client secret. To create a new access key, open Access Key Management page of Alibaba Cloud. On that page, click on Create Access Key and a new one will be generated instantly. To save user access key, click on Save AccessKey information.
3. Using Message Service Console
Just like other Alibaba services, this service has a console called Message Service Management Console where you can manage your queues. If you haven't activated it, you'll be asked to activate the service first.
You can create a new queue by selecting the server location first. List of existing queues will be shown and you can modify settings for each queues. The console also allows you to try sending and receiving messages.
4. Code
In this tutorial, I'm going to show you how to send messages to MNS service and how to consume stored messages. Below is the example of how to send a message.
Produce Message
import com.aliyun.mns.client.CloudAccount;
import com.aliyun.mns.client.CloudQueue;
import com.aliyun.mns.client.MNSClient;
import com.aliyun.mns.common.ClientException;
import com.aliyun.mns.common.ServiceException;
import com.aliyun.mns.model.Message;
import com.aliyun.mns.model.QueueMeta;
import java.util.Date;
public class ProducerDemo {
private static final String clientId = "xxx";
private static final String clientSecret = "yyy";
private static final String endpoint = "http://5256031591160030.mns.ap-southeast-5.aliyuncs.com/";
public static void main(String[] args) {
CloudAccount account = new CloudAccount(clientId, clientSecret, endpoint);
MNSClient client = account.getMNSClient();
String queueName = "TestQueue2";
QueueMeta meta = new QueueMeta(); //Set QueueMeta attributes with default values.
meta.setQueueName(queueName); // Set Queue Name.
meta.setPollingWaitSeconds(15);
meta.setMaxMessageSize(2048L);
try {
CloudQueue queue = client.getQueueRef("TestQueue");
Message message = new Message();
message.setMessageBody("An example message created at" + new Date().toString());
Message putMsg = queue.putMessage(message);
System.out.println("Message successfully sent. ID: " + putMsg.getMessageId());
} catch (ClientException ce) {
System.out.println("Something wrong with the network connection between client and MNS service."
+ "Please check your network and DNS availablity.");
ce.printStackTrace();
} catch (ServiceException se) {
se.printStackTrace();
if (se.getErrorCode() != null) {
if (se.getErrorCode().equals("QueueNotExist")) {
System.out.println("Queue does not exist");
} else if (se.getErrorCode().equals("TimeExpired")) {
System.out.println("The request is time expired. Please check your local machine timeclock");
}
}
} catch (Exception e) {
System.out.println("Unknown exception happened!");
e.printStackTrace();
}
client.close();
}
}
You need to set clientId
and clientSecret
obtained from step 2. As for endpoint
, to get the value, click on the Get Endpoint button on the Message Service Management Console and copy Public Endpoint.
Each sent messages will be saved in the queue until deleted manually after being consumed or exceeding retention period.
Consume Message
With some messages stored in the queue, we can start a consumer to read them and delete one by one after being read. Below is the consumer code.
import com.aliyun.mns.client.CloudAccount;
import com.aliyun.mns.client.CloudQueue;
import com.aliyun.mns.client.MNSClient;
import com.aliyun.mns.common.ClientException;
import com.aliyun.mns.common.ServiceException;
import com.aliyun.mns.model.Message;
import java.util.List;
public class ConsumerDemo {
private static final String clientId = "xxx";
private static final String clientSecret = "yyy";
private static final String endpoint = "http://5256031591160030.mns.ap-southeast-5.aliyuncs.com/";
public static void main(String[] args) {
CloudAccount account = new CloudAccount(clientId, clientSecret, endpoint);
MNSClient client = account.getMNSClient();
try {
CloudQueue queue = client.getQueueRef("TestQueue");
List messages = queue.batchPeekMessage(10);
System.out.println("messages: " + messages.size());
Message popMsg = queue.popMessage();
while (popMsg != null) {
System.out.println("Message ID: " + popMsg.getMessageId());
System.out.println("Message Handle: " + popMsg.getReceiptHandle());
System.out.println("Message Body: " + popMsg.getMessageBodyAsString());
// How many the times message has been dequeued
System.out.println("Message Dequeue Count:" + popMsg.getDequeueCount());
// Delete consumed messages
queue.deleteMessage(popMsg.getReceiptHandle());
popMsg = queue.popMessage();
}
System.out.println("No consumable message.");
} catch (ClientException ce) {
System.out.println("Unable to connect to MNS service");
ce.printStackTrace();
} catch (ServiceException se) {
se.printStackTrace();
if (se.getErrorCode() != null) {
if (se.getErrorCode().equals("QueueNotExist"))
{
System.out.println("Queue is not exist.Please create before use");
} else if (se.getErrorCode().equals("TimeExpired"))
{
System.out.println("The request is time expired. Please check your local machine timeclock");
}
}
} catch (Exception e) {
System.out.println("Unknown exception happened!");
e.printStackTrace();
}
client.close();
}
}
To delete a message, you must explicitly call deleteMessage
by passing message handler as the first parameter. Unfortunately, using the current Alibaba's library, when the message queue is empty, calling peekMessage
, popMessage
, etc. will throw an exception. If you need the consumer to always running, just modify the code above by catching the exception inside a loop.
That's the basic usage of using Alibaba Message Service in Java. We also have a tutorial of using a similar service, Google Pub/Sub in Node.js