The psl library (pub/sub library) provides support for publish/subscribe connectors between components. The library supports both synchronous and asynchronous subscriptions, has a useful set of features and provides an easy programming model. It was mainly designed as a way to make the standard mechanism of java listeners easier to implement and safer to use.
Event publishing and subscribing is known by a myriad of different names. The following list contains some definitions and some of the most common names used for the term.
Besides providing a much simpler API to use than implementing subscribers with the plain java API, psl makes it easier to implement pub/sub by handling:
From a broad perspective, psl requires defining a contract that subscribers must implement. Publishers provide a dispatcher that subscribers register with. When events are published to the dispatcher, subscribers are invoked. The contract is generally a java interface, although any java type can theoretically be used.
Subscribers can subscribe synchronously or asynchronously. If subscribers subscribe synchronously, they are invoked event before the publisher completes the event dispatch operation. If subscribers subscribe asynchronously, subscribers are invoked later in a separate thread.
psl guarantees event ordering, even in the case of reentrancy, that is, even if an event is fired during dispatch of another event, all subscribers with receive both events in order.
The publisher will provide access to a Dispatcher that is associated with a specific contract, e.g., a subscription interface. Locally, it will create a LocalDispatcher that it uses to dispatch events. An event dispatch is an instance of a DispatcherOp.
For example, consider the following subscription interface
public interface Watcher { void it_happened(); }
A publisher could contain the following code, where the something method actually fires the it_happened event and the dispatcher method is used by subscribers to access the dispatcher and register.
public class Publisher { private LocalDispatcher<Watcher> m_dispatcher = new LocalDispatcher<>(); public Dispatcher<Watcher> dispatcher() { return m_dispatcher; } public void something() { m_dispatcher.dispatch(new DispatcherOp<Watcher>() { public void dispatch(Watcher w) { w.it_happened(); } }); } }
Because the Dispatcher class is thread-safe, no synchronization is necessary even in a multi-threaded application.
Additionally, if the subscription contract is a java interface, then instead of a LocalDispatcher, a ProxyLocalDispatcher can be used which makes code even simpler:
public class Publisher { private ProxyLocalDispatcher<Watcher> m_dispatcher = new ProxyLocalDispatcher<>(Watcher.class); public Dispatcher<Watcher> dispatcher() { return m_dispatcher; } public void something() { m_dispatcher.proxy().it_happened(); } }
Subscribers will request the Dispatcher from a publisher and subscribe using either the add_sync method or the add_async method to subscribe either synchronously or asynchronously, respectively.
Subscribers can unsubscribe from a dispatcher by using the remove method of the Dispatcher object.
Synchronous subscribers will be invoked in the same thread that dispatch operation is being performed and will be notified during publishing. Asynchronous subscribers will be notified in a different thread, see below.
Asynchronous notification requires a threading system that is capable of firing events asynchronously. psl provides the AsyncDispatcher for this purpose.
In general terms, the asynchronous dispatcher is independent from the publish and subscribe mechanism in the sense that it is a multi-threaded task dispatcher. However, it has some features which are useful for the psl library.
While multiple asynchronous dispatchers may exist, a global one exists and is maintained by the GlobalDispatcher class. This class, an utility class of which no instances may exist, maintains a static global asynchronous dispatcher. It is this dispatcher that is used to notify asynchronous subscribers.
The asynchronous dispatcher provides a single dispatching method, the submit method that receives a Runnable object. The object will be queued for dispatch and run in a thread as soon as possible.
Exceptions fired during asynchronous dispatch are handled by an ExceptionHandler. The default handler, used if none is provided explicitly, simply prints the stack trace to the stderr.
The asynchronous dispatcher keeps track of the number of tasks executed, their type and the time they took and provides that information through a jmx interface, as described below. It also keeps track of the stack trace where the tasks were submitted to execution. For example, if an exception of type ExampleException is fired in a method X.foo, the stack trace of the exception looks something like:
ExampleException: some description at X.foo(X.java:10) ...
However, if this exception has been fired in a task which was submitted during execution of method Y.bar, then the stack trace will look something like:
ExampleException: some description at X.foo(X.java:10) ... Suppressed: psl.AsynchronousDispatchLocation: (dispatched from here) at psl.AsyncDispatchRunnable.<init>(AsyncDispatchRunnable.java:49) at psl.AsyncSubscription.dispatch(AsyncSubscription.java:95) at psl.LocalDispatcher.dispatch(LocalDispatcher.java:79) at Y.bar (Y.java:20)
And if Y.bar was run, in turn, run from inside a task submitted in method Z.xpto, then the stack trace will look something like:
ExampleException: some description at X.foo(X.java:10) ... Suppressed: psl.AsynchronousDispatchLocation: (dispatched from here) at psl.AsyncDispatchRunnable.<init>(AsyncDispatchRunnable.java:49) at psl.AsyncSubscription.dispatch(AsyncSubscription.java:95) at psl.LocalDispatcher.dispatch(LocalDispatcher.java:79) at Y.bar (Y.java:20) ... Caused by: psl.AsynchronousDispatchLocation: (dispatched from here) at psl.AsyncDispatchRunnable.<init>(AsyncDispatchRunnable.java:49) at psl.AsyncSubscription.dispatch(AsyncSubscription.java:95) at psl.LocalDispatcher.dispatch(LocalDispatcher.java:79) at Z.xpto (Z.java:30) ...
This allows keeping track of which events fired which events in case multiple events are fired in a chain making debugging easier. Even if the chain spanws multiple threads.
psl automatically publishes to jmx information about all asynchronous dispatchers, and the number of events they fired, organized by type. It also provides some timing information on the events.
In the figure above, we can see that there is an asynchronous dispatcher named foo that has one thread, foo-1. This thread has processed events of type psl.BB_MXTest$1MyRunnable and, in the attributes, we can see that it has processed only one event and its time was 6 ms.