Using Message Bus

The Message Bus is a service level API used to exchange messages within Liferay. The Message Bus is a mechanism for sending message payloads to different components in Liferay, providing loose coupling between message producers and consumers to prevent class loading issues. It’s located in the global class loader, making it accessible to every deployed web application. Remote messaging isn’t supported, but messages are sent across a cluster when ClusterLink is enabled.

Message Bus has several common uses, including sending search index write events, sending subscription emails, handling messages at scheduler endpoints, and running asynchronous processes.

You can leverage Message Bus to send messages between and within your plugins.

As we show you Message Bus we’ll talk about things like synchronous and asynchronous messaging , serial vs. in-parallel message dispatching, and Java and JSON style messages formats.

Before we get into those topics, let’s first try to understand Message Bus System’s architecture.

The Message Bus System

The Message Bus system contains the following components:

  • Message Bus: Manages transfer of messages from message senders to message listeners.
  • Destinations: Addresses or endpoints to which listeners register to receive messages.
  • Listeners: Consume messages received at destinations. They receive all messages sent to their registered destinations.
  • Senders: Invoke the Message Bus to send messages to destinations.

Your services can send messages to one or more destinations, and can listen to one or more destinations. The figure below depicts this. An individual service can be both a message sender and a message listener. For example, in the figure below both Plugin 2 - Service 3 and Plugin 5 - Service 7 send and listen for messages.

Figure 11.4: Example, Message Bus system

Figure 11.4: Example, Message Bus system

The Message Bus supports synchronous and asynchronous messaging:

  • Synchronous messaging: After it sends a message, the sender blocks waiting for a response from a recipient.
  • Asynchronous messaging: After it sends a message, the sender is free to continue processing. The sender can be configured to receive a call-back or can simply send and forget. We’ll show you how to implement both synchronous and asynchronous messaging in this section.
    • Call-back: The sender can include a call-back destination key as the response destination for the message. The recipient (listener) can then send a response message back to the sender via this response destination.
    • Send-and-Forget: The sender includes no call-back information in the message sent and continues with processing.

Configuration of Message Bus is done using the following files:

  • WEB-INF/src/META-INF/messaging-spring.xml: Specifies your destinations, listeners, and their mappings to each other.
  • WEB-INF/web.xml: Holds a listing of deployment descriptors for your plugin. Make sure you add messaging-spring.xml to your list of Spring configurations in this file.

You can control your Message Types by using either the Message or JSONObject class. Liferay core services are typically serialized and deserialized in JSON. In our examples we’ll demonstrate both types of message classes.

So far we’ve introduced the Message Bus System, including message types, destinations, senders, listeners, and approaches to sending messages. Next we’ll show you how easy it is to create your destinations, register listeners, and send your messages. To demonstrate, we’ll implement a business use case.

Example Use Case–Procurement process

Our use case will consider Jungle Gyms R-Us and its distribution of playground equipment, buying the equipment from manufacturers and selling the equipment to retailers. We’ll focus on the company’s process for procuring new jungle gym equipment. Let’s lay out this process now.

Jungle Gyms R-Us employs the following departments in their procurement process:

  • Procurement Department: Scouts out the latest equipment deals of manufacturers.
  • Finance Department: Determines whether the equipment can be purchased based on budget.
  • Legal Department: Determines whether the equipment’s safety ratings are acceptable.
  • Warehouse Department: Recieves the equipment, stores it, and prepares it for shipping.
  • Sales Department: Builds relationships with prospective customers to sell them products.

The departments currently use email to exchange comments about new equipment purchases, but someone always seems to be left out of the loop. One time, Sales was gung-ho about getting their hands on the latest and greatest spring rider animals from Boingo-Boingo Industries, but they didn’t consider the failing safety reviews discovered by the Legal department, because the Legal department forgot to copy the Sales department in their email to Procurement. Tempers flew, feelings were hurt, and everybody avoided hanging out in the company breakroom for the next couple of weeks.

Jungle Gyms R-Us could use Liferay’s Workflow with Kaleo to resolve the communication breakdown, but we’ll resolve the Jungle Gym’s communication woes using Message Bus, to show you how it works. Here are the inter-department message exchanges we’ll accommodate:

MessageSenderListenerResponseResponse Listeners
Request permission to proceed with purchaseProcurementFinancerequiredProcurement
Request permission to proceed with purchaseProcurementLegalrequiredProcurement
Notify and solicit feedback on new purchaseProcurementWarehouseoptionalProcurement, Sales
Notify and solicit feedback on new purchaseProcurementSalesoptionalProcurement, Warehouse
Broadcast equipment newsProcurementEmployeesnonenone

Let’s implement Procurement’s request to Finance first.

Synchronous messaging

In our example, equipment purchases can’t proceed without approval from Finance and Legal departments. Since special offers from the manufacturers often only last for a couple hours, Procurement makes it their top priority to get approval as soon as possible. Implementing their exchange using synchronous messaging makes the most sense.

Figure 11.5: Synchronous messaging

Figure 11.5: Synchronous messaging

The following table describes how we’ll set things up:

Destination KeyTypeSenderReceivers
jungle/finance/purchasesynchronousProcurementFinance
jungle/finance/purchase/responsesynchronousFinanceProcurement
jungle/legal/purchasesynchronousProcurementLegal
jungle/legal/purchase/responsesynchronousLegalProcurement

We’ve set it up so Finance sends its response messages to a destination on which Procurement will listen. That way a full-bodied response message is sent back to Procurement in addition to the response object returned from sending the message.

The Procurement Department sends a purchase approval request:

Message message = new Message();
message.put("department", "Procurement");
message.put("partName", part.getName(Locale.US));

message.setResponseId("1111");
message.setResponseDestinationName("jungle/finance/purchase/response");

try {
    String financeResponse = (String) MessageBusUtil.sendSynchronousMessage(
        "jungle/finance/purchase", message, 10000);

    System.out.println(
        "Procurement received Finance sync response to purchase approval for " +
        part.getName(Locale.US) + ": " + financeResponse);

    message.setResponseId("2222");
    message.setResponseDestinationName("jungle/legal/purchase/response");

    String legalResponse = (String) MessageBusUtil.sendSynchronousMessage(
        "jungle/legal/purchase", message, 10000);

    System.out.println(
        "Procurement received Legal sync response to purchase approval for " +
        part.getName(Locale.US) + ": " + legalResponse);

    if (financeResponse.contains("yes") && legalResponse.contains("yes")) {
        sendPurchaseNotification(part, userId);
    }
}
catch (MessageBusException e) {
    e.printStackTrace();
}

This sender takes the following steps:

  1. Creates the message using Liferay’s Message class.

  2. Stuffs the message with key/value pairs.

  3. Sets a response ID and response destination for listeners to use in replying back.

  4. Sends the message to the destination with a timeout value of 10,000 milliseconds.

  5. Blocks waiting for the response.

Finance Department listens for purchase approval requests and replies back:

public class FinanceMessagingImpl implements MessageListener {

    public void receive(Message message) {
        try {
            doReceive(message);
        }
        catch (Exception e) {
            _log.error("Unable to process message " + message, e);
        }
    }

    protected void doReceive(Message message)
        throws Exception {

        String department = (String) message.get("department");
        String partName = (String) message.get("partName");

        System.out.println("Finance received purchase request for " +
            partName + " from " + department);

        Message responseMessage = MessageBusUtil.createResponseMessage(
            message);

        responseMessage.put("department", "Finance");
        responseMessage.put("partName", partName);
        responseMessage.setPayload("yes");

        MessageBusUtil.sendMessage(
            responseMessage.getDestinationName(), responseMessage);
    }

    private static Log _log =
        LogFactoryUtil.getLog(FinanceMessagingImpl.class);
}

This listener executes the following steps:

  1. Implements the receive(Message message) method of the com.liferay.portal.kernel.messaging.MessageListener interface.

  2. Extracts values from the Message parameter by getting values associated with known keys.

  3. Creates a Message based on the message received via the MessageBusUtil.createResponseMessage(message) method, which accesses the response destination name from the message variable and sets the destination of the response message.

  4. Sets the response message’s payload.

  5. Sends the response Message to the response destination.

You can implement the listener for the Legal Department similarly. Next we’ll account for Legal Department-related classes in our configuration.

Message Bus Configuration for the purchase approval request process:

For Message Bus to direct messages from destinations to listeners, we must register the listeners by configuring the appropriate mappings in our plugin’s WEB-INF/src/META-INF/messaging-spring.xml file (create this file if it’s not already in your plugin). Here is the configuration:

<?xml version="1.0"?>

<beans
    default-destroy-method="destroy"
    default-init-method="afterPropertiesSet"
    xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd"
>

    <!-- Listeners -->

    <bean id="messageListener.finance_listener" class="com.liferay.training.parts.messaging.impl.FinanceMessagingImpl" />
    <bean id="messageListener.legal_listener" class="com.liferay.training.parts.messaging.impl.LegalMessagingImpl" />
    <bean id="messageListener.procurement_listener" class="com.liferay.training.parts.messaging.impl.ProcurementMessagingImpl" />

    <!-- Destinations -->

    <bean id="destination.finance.purchase" class="com.liferay.portal.kernel.messaging.SynchronousDestination">
        <property name="name" value="jungle/finance/purchase" />
    </bean>

    <bean id="destination.finance.purchase.response" class="com.liferay.portal.kernel.messaging.SynchronousDestination">
        <property name="name" value="jungle/finance/purchase/response" />
    </bean>

    <bean id="destination.legal.purchase" class="com.liferay.portal.kernel.messaging.SynchronousDestination">
        <property name="name" value="jungle/legal/purchase" />
    </bean>

    <bean id="destination.legal.purchase.response" class="com.liferay.portal.kernel.messaging.SynchronousDestination">
        <property name="name" value="jungle/legal/purchase/response" />
    </bean>

    <!-- Configurator -->

    <bean id="messagingConfigurator" class="com.liferay.portal.kernel.messaging.config.PluginMessagingConfigurator">
        <property name="messageListeners">
            <map key-type="java.lang.String" value-type="java.util.List">
                <entry key="jungle/finance/purchase">
                    <list value-type="com.liferay.portal.kernel.messaging.MessageListener">
                        <ref bean="messageListener.finance_listener" />
                    </list>
                </entry>
                <entry key="jungle/finance/purchase/response">
                    <list value-type="com.liferay.portal.kernel.messaging.MessageListener">
                        <ref bean="messageListener.procurement_listener" />
                    </list>
                </entry>
                <entry key="jungle/legal/purchase">
                    <list value-type="com.liferay.portal.kernel.messaging.MessageListener">
                        <ref bean="messageListener.legal_listener" />
                    </list>
                </entry>
                <entry key="jungle/legal/purchase/response">
                    <list value-type="com.liferay.portal.kernel.messaging.MessageListener">
                        <ref bean="messageListener.procurement_listener" />
                    </list>
                </entry>
            </map>
        </property>
        <property name="destinations">
            <list>
                <ref bean="destination.finance.purchase"/>
                <ref bean="destination.finance.purchase.response"/>
                <ref bean="destination.legal.purchase"/>
                <ref bean="destination.legal.purchase.response"/>
            </list>
        </property>
    </bean>
</beans>

The configuration above specifies the following beans:

  • Listener beans: Specify classes to handle messages.
  • Destination beans: Specify the class type and key names of the destinations.
  • Configurator bean: Maps listeners to their destinations.

When Finance sends its purchase approval request message for a new three-story spiral slide, the console reports Finance’s receipt of the message, Procurement’s receipt of the callback response from Finance, and Procurement’s receipt of the synchronous response returned from sending the message. Here’s what the console message looks like:

Finance received purchase request for three-story spiral slide from Procurement
Procurement received Finance callback response to purchase approval for three-
story spiral slide: yes
Procurement received Finance sync response to purchase approval for three-story 
spiral slide: yes
Legal received purchase request for three-story spiral slide from Procurement
Procurement received Legal callback response to purchase approval for three-
story spiral slide: yes
Procurement received Legal sync response to purchase approval for three-story 
spiral slide: yes

Do you know what all those yes messages mean? Success! Jungle Gyms R-Us has the cash to purchase this cool new slide, and the Legal Department has no gripes about the slide’s safety ratings!

Next let’s have Procurement notify the Sales and Warehouse departments and solicit their feedback.

Asynchronous messaging with callbacks

Asynchronous messaging consists of sending a message and then continuing with processing without blocking waiting for an immediate response. This allows the sender to continue with other tasks. It’s often necessary, however, that the listener can respond to the sender. This can be done using a call-back.

Jungle Gyms R-Us’s Procurement Department must notify the Sales and Warehouse departments of incoming equipment while simultaneously soliciting their feedback. To assure all three departments are up to speed, any responses from the Sales or Warehouse departments are posted to a shared destination.

The following table describes how we’ll set things up:

Destination KeyTypeSenderReceivers
jungle/purchaseasync serialProcurementSales, Warehouse
jungle/purchase/responsesynchronousSales, WarehouseProcurement

The following image shows asynchronous messaging, with serial dispatching of messages:

Figure 11.6: Asynchronous messaging with serial dispatching

Figure 11.6: Asynchronous messaging with *serial* dispatching

Let’s package the message as a JSONObject and send it to the destination:

JSONObject jsonObject = JSONFactoryUtil.createJSONObject();

jsonObject.put("department", "Procurement");
jsonObject.put("partName", part.getName(Locale.US));
jsonObject.put("responseDestinationName", "jungle/purchase/response");

MessageBusUtil.sendMessage("jungle/purchase", jsonObject.toString());

Here’s how the Warehouse Department listens for and handles messages:

public void receive(Message message) {

    try {
        doReceive(message);
    }
    catch (Exception e)
    {
        _log.error("Unable to process message " + message, e);
    }
}

protected void doReceive(Message message)
    throws Exception {

    String payload = (String)message.getPayload();

    JSONObject jsonObject = JSONFactoryUtil.createJSONObject(payload);

    String department = jsonObject.getString("department");
    String partName = jsonObject.getString("partName");
    String responseDestinationName = jsonObject.getString(
                    "responseDestinationName");

    System.out.println("Warehouse received purchase notification for " +
        partName + " from " + department);

    jsonObject = JSONFactoryUtil.createJSONObject();

    jsonObject.put("department", "Warehouse");
    jsonObject.put("partName", partName);
    jsonObject.put("comment", "Ugh! We're running out of space!!");

    MessageBusUtil.sendMessage(
        responseDestinationName, jsonObject.toString());
}

Here’s how this listener deserializes the JSONObject from the message:

  1. Gets the message payload and casts it to a String.
  2. Creates a JSONObject from the payload string.
  3. Gets values from the JSONObject using its getter methods.

The class also demonstrates how the Warehouse Department packages a response message and sends it back to the Procurement Department, using these steps:

  1. Create a JSONObject.
  2. Stuff it with name-value pairs.
  3. Send the response message to the original message’s response destination.

The Sales department listener can be implemented the same way, substituting Sales as the department value; the comment would likely be different, too.

You just used the JSONObject message type to send an asynchronous response message using a call-back.

Remember, we want the Procurement, Sales, and Warehouse departments to be aware of any message regarding the new playground equipment purchasing process. Let’s leverage our destination keys and department names in handling shared responses.

Here’s how the Warehouse might handle messages it receives:

public void receive(Message message) {

    try {
        if (message.getDestinationName().equals(
                "jungle/purchase"))
        {
            doReceive(message);
        }
        else if (message.getDestinationName().equals(
                "jungle/purchase/response"))
        {
            doReceiveResponse(message);
        }
    }
    catch (Exception e)
    {
        _log.error("Unable to process message " + message, e);
    }
}

protected void doReceiveResponse(Message message)
    throws JSONException {

    String payload = (String)message.getPayload();

    JSONObject jsonObject = JSONFactoryUtil.createJSONObject(payload);

    String department = jsonObject.getString("department");

    if (!department.equals("Warehouse")) {
        System.out.println(
            "Warehouse is in the loop on response from " + department);
    }
}

Let’s look at receive(Message) for a minute. We’ve set it up to handle messages differently depending on their destinations: messages to jungle/purchase are handled as Procurement’s purchase notifications, while messages to jungle/purchase/response are treated as departmental responses to Procurement’s purchase notifications. The doReceiveResponse(Message) method performs an important task, checking that the response comes from a department other than itself, and printing an error if it doesn’t.

Here are the configuration elements we added to the messaging-spring.xml from the previous section:

Listener beans:

    <bean id="messageListener.warehouse_listener" class="com.liferay.training.parts.messaging.impl.WarehouseMessagingImpl" />
    <bean id="messageListener.sales_listener" class="com.liferay.training.parts.messaging.impl.SalesMessagingImpl" />

Destination beans: The purchase notifications will be sent to a serial destination and the responses will be sent to a synchronous destination.

    <bean id="destination.purchase" class="com.liferay.portal.kernel.messaging.SerialDestination">
        <property name="name" value="jungle/purchase" />
    </bean>

    <bean id="destination.purchase.response" class="com.liferay.portal.kernel.messaging.SynchronousDestination">
        <property name="name" value="jungle/purchase/response" />
    </bean>

Configuration bean listener map entry: Warehouse and Sales are registered to listen for the notifications from Procurement. All three departments are registered to listen for inter-departmental responses.

    <entry key="jungle/purchase">
        <list value-type="com.liferay.portal.kernel.messaging.MessageListener">
            <ref bean="messageListener.warehouse_listener" />
            <ref bean="messageListener.sales_listener" />
        </list>
    </entry>
    <entry key="jungle/purchase/response">
        <list value-type="com.liferay.portal.kernel.messaging.MessageListener">
            <ref bean="messageListener.procurement_listener" />
            <ref bean="messageListener.warehouse_listener" />
            <ref bean="messageListener.sales_listener" />
        </list>
    </entry>

Configuration bean destination list references:

    <ref bean="destination.purchase"/>
    <ref bean="destination.purchase.response"/>

Don’t forget to send news of these new products to all Jungle Gyms R-Us employees.

Asynchronous “Send and Forget”

In the send and forget model, the sender sends messages and continues processing. We’ll apply this behavior to Jungle Gym’s company-wide new product notification.

Procurement isn’t expecting response messages from individual employees, so there’s no need for the company-wide listener to package up responses. We do, however, want everyone to get product news at the same time, so instead of dispatching news to employees serially we’ll dispatch in parallel.

Figure 11.7: Asynchronous messaging with parallel dispatching

Figure 11.7: Asynchronous messaging with *parallel* dispatching

We’ll specify a parallel destination type in our messaging-spring.xml:

Destination bean:

    <bean id="destination.employee.news" class="com.liferay.portal.kernel.messaging.ParallelDestination">
            <property name="name" value="jungle/employee/news" />
    </bean>

Listener bean:

    <bean id="messageListener.employee_listener" class="com.liferay.training.parts.messaging.impl.EmployeeMessagingImpl" />

Configuration bean listener map entry:

    <entry key="jungle/employee/news">
        <list value-type="com.liferay.portal.kernel.messaging.MessageListener">
            <ref bean="messageListener.employee_listener" />
        </list>
    </entry>

Configuration bean destination list reference:

    <ref bean="destination.employee.news"/>

Congratulations! You implemented inter-departmental communications for the procurement process at Jungle Gyms R-Us.

Along the way you used Message Bus to implement the following:

  • Sender, listener, and destination components.
  • Synchronous and Asynchronous messaging schemes.
  • Serial and parallel message dispatching.
  • Java and JSON message types.

Next we’ll show you the Device Detection API and its capabilities.

« Service ContextDevice Detection »
这篇文章有帮助吗?
0 人中有 0 人觉得有帮助