Sending Messages

Message Bus lets you send messages to destinations that have any number of listening classes. As a message sender you don’t need to know the message recipients. Instead, you focus on creating message content (payload) and sending messages to destinations.

You can also send messages in a synchronous or asynchronous manner. The synchronous option waits for a response that the message was received or that it timed out. The asynchronous option gives you the “fire and forget” behavior; send the message and continue processing without waiting for a response.

Here are the message sending topics:

Start by creating a message.

Creating a Message

Here’s how to create a message:

  1. Call the Message constructor.

    Message message = new Message();

  2. Populate the message with a String or Object payload

    • String payload: message.setPayload("Message Bus is great!")

    • Object payload: message.put("firstName", "Joe")

  3. To receive responses at a particular location, set both of these attributes

    • Response destination name: setResponseDestinationName(String)

    • Response ID: setResponseId(String)

Your new message is ready to send.

Sending a Message

Here are the ways to send a message:

First, let’s consider using Message Bus directly.

Directly Using the Message Bus

This method involves obtaining a MessageBus instance and invoking it to send messages. Here’s an example of directly using Message Bus to send a message.

@Component(
    immediate = true,
    service = SomeServiceImpl.class
)
public class SomeServiceImpl {
    ...

    public void sendSomeMessage() {

        Message message = new Message();
        message.put("myId", 12345);
        message.put("someAttribute", "abcdef");
        _messageBus.sendMessage("myDestinationName", message);
    }

    @Reference
    private MessageBus _messageBus;
}

To send messages asynchronously, consider using SingleDestinationMessageSender.

Using SingleDestinationMessageSender

The SingleDestinationMessageSender class wraps the Message Bus to send messages asynchronously. This class demonstrates using a SingleDestinationMessageSender:

@Component(
    immediate = true,
    service = SomeServiceImpl.class
)
public class SomeServiceImpl {
    ...

    public void sendSomeMessage() {

        Message message = new Message();
        message.put("myId", 12345);
        message.put("someValue", "abcdef");

        SingleDestinationMessageSender messageSender = 
           _messageSenderFactory.createSingleDestinationMessageSender("myDestinationName");

        messageSender.send(message);
    }

    @Reference
    private SingleDestinationMessageSenderFactory _messageSenderFactory;
}

The _messageSenderFactory field’s @Reference wires it to a SingleDestinationMessageSenderFactory instance. The method sendSomeMessage creates a message, uses the _messageSenderFactory to create a SingleDestinationMessageSender for the specified destination, and sends the message through the sender.

Using a SynchronousMessageSender

A SynchronousMessageSender instance sends a message to the Message Bus and blocks until receiving a response or the response times out. A SynchronousMessageSender has these operating modes:

  • DEFAULT: Delivers the message in a separate thread and also provides timeouts, in case the message is not delivered properly.

  • DIRECT: Delivers the message in the same thread of execution and blocks until it receives a response.

Here’s an example of using SynchronousMessageSender in DEFAULT mode.

@Component(
    immediate = true,
    service = SomeServiceImpl.class
)
public class SomeServiceImpl {
    ...

    public void sendSomeMessage() {

        Message message = new Message();
        message.put("myId", 12345);
        message.put("someAttribute", "abcdef");

        SingleDestinationSynchronousMessageSender messageSender = 
            _messageSenderFactory.createSingleDestinationSynchronousMessageSender(
                "myDestinationName", SynchronousMessageSender.Mode.DEFAULT);

        messageSender.send(message);

    }

    @Reference
    private SingleDestinationMessageSenderFactory _messageSenderFactory;
}

And those are the ways to send messages. Next, if you’re in a cluster and want messages sent to a destination across all nodes, you must register a bridge message listener to that destination.

Sending Messages Across the Cluster

To ensure a message sent to a destination is received by all cluster nodes, you must register a ClusterBridgeMessageListener at that destination. This bridges the local destination to the cluster.

Here’s a message listener registrator that bridges a destination for distributing messages to all the cluster nodes.

@Component(
    immediate = true,
    service = MyMessageListenerRegistrator.class
)
public class MyMessageListenerRegistrator {
    ...

    @Activate
    protected void activate() {

        _clusterBridgeMessageListener = new ClusterBridgeMessageListener();
        _clusterBridgeMessageListener.setPriority(Priority.LEVEL_5)
        _destination.register(_clusterBridgeMessageListener);
    }

    @Deactivate
    protected void deactivate() {

        _destination.unregister(_clusterBridgeMessageListener );
    }

    @Reference(target = "(destination.name=liferay/live_users)")
    private Destination _destination;

    private MessageListener _clusterBridgeMessageListener;
}

The destination named "liferay/live_users" is bound to the _destination field. The activate method creates a ClusterBridgeMessageListener, sets its priority queue, and registers it to the destination. Messages sent to the destination are distributed across the cluster’s JVMs.

The com.liferay.portal.kernel.cluster.Priority class has ten levels (Level_1 through Level_10, with Level 10 being the most important). Each level is a priority queue for sending messages through the cluster. This is similar in concept to thread priorities: Thread.MIN_PRIORITY, Thread.MAX_PRIORITY, and Thread.NORM_PRIORITY.

That concludes the tour on sending messages. You’ve learned how to create messages, send messages synchronously and asynchronously, and send messages to a destination in a clustered environment.

Messaging Destinations

Message Listeners

« メッセージリスナー
この記事は役に立ちましたか?
0人中0人がこの記事が役に立ったと言っています