In Message Bus, you send messages to destinations. A destination is a named logical (not physical) location. Sender classes send messages to destinations, while listener classes wait to receive messages at the destinations. In this way, the sender and recipient don’t need to know each other–they’re loosely coupled. Here are the messaging destination topics this tutorial covers:
It’s time to configure a destination.
Destination Configuration
Each destination has a name and type and can have several other attributes. The destination type determines whether there’s a message queue, the kinds of threads involved with a destination, and the message delivery behavior to expect at the destination.
Here are the primary destination types:
-
Parallel Destination
-
Messages sent here are queued.
-
Multiple worker threads from a thread pool deliver each message to a registered message listener. There’s one worker thread per message per message listener.
-
-
Serial Destination
-
Messages sent here are queued.
-
Worker threads from a thread pool deliver the messages to each registered message listener, one worker thread per message.
-
-
Synchronous Destination
-
Messages sent here are directly delivered to message listeners.
-
The thread sending the message here delivers the message to all message listeners also.
-
Liferay has preconfigured destinations for various purposes. The
DestinationNames
class
defines String
constants for each of them. For example,
DestinationNames.HOT_DEPLOY
(value is "liferay/hot_deploy"
) is for
deployment event messages. Since destinations are tuned for specific purposes,
don’t modify them.
Destinations are based on
DestinationConfiguration
instances.
The configuration specifies the destination type, name, and these destination-
related attributes:
Maximum Queue Size: limits the number of queued messages for the destination.
Rejected Execution Handler: A
com.liferay.portal.kernel.concurrent.RejectedExecutionHandler
instance
can take action (e.g., log warnings) regarding rejected messages when the
destination queue is full.
Workers Core Size: initial number of worker threads for processing messages.
Workers Max Size: limits the number of worker threads for processing messages.
The DestinationConfiguration
class provides these static methods for creating
the various types of configurations.
-
createParallelDestinationConfiguration(String destinationName)
-
createSerialDestinationConfiguration(String destinationName)
-
createSynchronousDestinationConfiguration(String destinationName)
You can also use the DestinationConfiguration
constructor
to create a configuration for any destination type, even your own.
Creating a Destination
Message Bus destinations are based on destination configurations and registered as OSGi services. Message Bus detects the destination services and manages their associated destinations.
Here are the general steps for creating a destination. The example configurator class that follows demonstrates these steps.
-
Create a destination configuration using one of
DestinationConfiguration
’s staticcreate*
methods or its constructor. Set any attributes that apply to the destinations you’ll create with it. -
Create a destination by invoking the
DestinationFactory
methodcreateDestination(DestinationConfiguration)
, passing in the destination configuration you created in the previous step. -
Register the destination as an OSGi service by invoking the
BundleContext
methodregisterService
, passing in the following parameters.- Destination class
Destination.class
- Your
Destination
object - A
Dictionary
of properties defining the destination, including thedestination.name
- Destination class
-
Manage the destination object and service registration resources using a collection, such as a
Map<String, ServiceRegistration<Destination>>
. Keeping references to these resources is helpful for when you’re ready to unregister and destroy them. Thedeactivate
method in example below demonstrates this.
Here’s an example messaging configurator component that creates and registers a parallel destination and manages its resources:
@Component (
immediate = true,
service = MyMessagingConfigurator .class
)
public class MyMessagingConfigurator {
@Activate
protected void activate(BundleContext bundleContext) {
_bundleContext = bundleContext;
// Create a DestinationConfiguration for parallel destinations.
DestinationConfiguration destinationConfiguration =
new DestinationConfiguration(
DestinationConfiguration.DESTINATION_TYPE_PARALLEL,
"myDestinationName");
// Set the DestinationConfiguration's max queue size and
// rejected execution handler.
destinationConfiguration.setMaximumQueueSize(_MAXIMUM_QUEUE_SIZE);
RejectedExecutionHandler rejectedExecutionHandler =
new CallerRunsPolicy() {
@Override
public void rejectedExecution(
Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
if (_log.isWarnEnabled()) {
_log.warn(
"The current thread will handle the request " +
"because the graph walker's task queue is at " +
"its maximum capacity");
}
super.rejectedExecution(runnable, threadPoolExecutor);
}
};
destinationConfiguration.setRejectedExecutionHandler(
rejectedExecutionHandler);
// Create the destination
Destination destination = _destinationFactory.createDestination(
kaleoGraphWalkerDestinationConfiguration);
// Add the destination to the OSGi service registry
Dictionary<String, Object> properties = new HashMapDictionary<>();
properties.put("destination.name", destination.getName());
ServiceRegistration<Destination> serviceRegistration =
_bundleContext.registerService(
Destination.class, destination, properties);
// Track references to the destination service registrations
_serviceRegistrations.put(destination.getName(),
serviceRegistration);
}
@Deactivate
protected void deactivate() {
// Unregister and destroy destinations this component unregistered
for (ServiceRegistration<Destination> serviceRegistration :
_serviceRegistrations.values()) {
Destination destination = _bundleContext.getService(
serviceRegistration.getReference());
serviceRegistration.unregister();
destination.destroy();
}
_serviceRegistrations.clear();
}
@Reference
private DestinationFactory _destinationFactory;
private final Map<String, ServiceRegistration<Destination>>
_serviceRegistrations = new HashMap<>();
}
On activation, the example configurator above does these things:
-
Creates a
DestinationConfiguration
for parallel destinations. -
Sets the
DestinationConfiguration
’s max queue size and a rejected execution handler. -
Uses the
DestinationFactory
(the one bound to the_destinationFactory
field) to create the destination. -
Adds the destination to the OSGi service registry
-
Adds the destination service registration to a map for managing them.
Once the destination is registered, Message Bus detects its service and manages
the destination. On the example configurator’s deactivation, its deactivate
method unregisters the destination services and destroys the destinations.
As an added bonus to creating destinations, you can create classes that listen for new destinations and new message listeners. You might want to create such listeners to keep up to log the deployment of new message bus endpoints.
Messaging Event Listeners
There are Message Bus framework interfaces that let you listen for new destinations and message listeners.
Listening for new Destinations
The Message Bus notifies Message Bus Event Listeners when destinations are
added and removed. To register these listeners, publish a
MessageBusEventListener
instance
to the OSGi service registry (e.g., via an @Component
annotation).
@Component(
immediate = true,
service = MessageBusEventListener.class
)
public class MyMessageBusEventListener implements MessageBusEventListener {
void destinationAdded(Destination destination) {
...
}
void destinationDestroyed(Destination destination) {
...
}
}
Listening for new message listeners is easy too.
Listening for new Message Listeners
The Message Bus notifies
DestinationEventListener
instances
when message listeners are either registered or unregistered to destinations. To
register a listener to a destination, publish a
DestinationEventListener
service
to the OSGi service registry, making sure to specify the destination’s
destination.name
property.
@Component(
immediate = true,
property = {"destination.name=myCustom/Destination"},
service = DestinationEventListener.class
)
public class MyDestinationEventListener implements DestinationEventListener {
void messageListenerRegistered(String destinationName,
MessageListener messageListener) {
...
}
void messageListenerUnregistered(String destinationName,
MessageListener messageListener) {
...
}
}
And that’s how you listen for new destinations and message listeners.
Now you understand the different destination types, how to create and register destinations, and how to manage destination resources. Once you deploy your destination, registered message listeners receive messages sent to it.