The pub/sub library (psl)

The psl library (pub/sub library) provides support for publish/subscribe connectors between components. The library supports both synchronous and asynchronous subscriptions, has a useful set of features and provides an easy programming model. It was mainly designed as a way to make the standard mechanism of java listeners easier to implement and safer to use.

Event publishing and subscribing is known by a myriad of different names. The following list contains some definitions and some of the most common names used for the term.

Publisher
An object that sends information about things that happen. It knows what it informs but does not know who it informs. An alternative name is subject from the observer pattern. A signal used in the signal/slots libraries might also be considered a publisher.
Subscriber
An object that is informed when something happens in a publisher. It knows what it receives but does not know when it is informed. Alternative names are: listener from the common java APIs, observer from the observer pattern, slot and callback.
Event
The information that is sent from the publisher to the subscriber. An alternative name is a notification used in the observer pattern.
Event dispatch
The operation of sending an event from a publisher to a subscriber. When something relevant happens in the publisher, the publisher dispatches an event to the subscriber.

Besides providing a much simpler API to use than implementing subscribers with the plain java API, psl makes it easier to implement pub/sub by handling:

  • Thread-safety. psl is thread safe and implementation of subscribers is thread-safe. Whether the subscribers themselves are thread-safe or not depends on their implementation, naturally.
  • Reentrancy. Subscribers may subscribe and unsubscribe freely at any time, even during event notification. Also, and probably more importantly, if a subscriber modifies the publisher during event dispatch, other listeners will still receive the events in the order they were sent, something that does not happen using simple loops for dispatch, and something that the majority of the implementations does not handle correctly.
  • Asynchronous subscriptions. Subscribers may subscribe synchronously, e.g. they are invoked by the thread that has has changed the publisher, or asynchronously, e.g., they are invoked by a different thread allowing the thread that modified the publisher to continue. The choice is made by the subscriber and is transparent to the publisher.
  • Ordering. All subscribers receive events in the order they were sent, even if they subscribe asynchronously.

Usage

General mechanism

From a broad perspective, psl requires defining a contract that subscribers must implement. Publishers provide a dispatcher that subscribers register with. When events are published to the dispatcher, subscribers are invoked. The contract is generally a java interface, although any java type can theoretically be used.

Subscribers can subscribe synchronously or asynchronously. If subscribers subscribe synchronously, they are invoked event before the publisher completes the event dispatch operation. If subscribers subscribe asynchronously, subscribers are invoked later in a separate thread.

psl guarantees event ordering, even in the case of reentrancy, that is, even if an event is fired during dispatch of another event, all subscribers with receive both events in order.

Publisher use

The publisher will provide access to a Dispatcher that is associated with a specific contract, e.g., a subscription interface. Locally, it will create a LocalDispatcher that it uses to dispatch events. An event dispatch is an instance of a DispatcherOp.

For example, consider the following subscription interface

public interface Watcher {
        void it_happened();
}

A publisher could contain the following code, where the something method actually fires the it_happened event and the dispatcher method is used by subscribers to access the dispatcher and register.

public class Publisher {
        private LocalDispatcher<Watcher> m_dispatcher = new LocalDispatcher<>();
        
        public Dispatcher<Watcher> dispatcher() {
                return m_dispatcher;
        }
        
        public void something() {
                m_dispatcher.dispatch(new DispatcherOp<Watcher>() {
                        public void dispatch(Watcher w) {
                                w.it_happened();
                        }
                });
        }
}

Because the Dispatcher class is thread-safe, no synchronization is necessary even in a multi-threaded application.

Additionally, if the subscription contract is a java interface, then instead of a LocalDispatcher, a ProxyLocalDispatcher can be used which makes code even simpler:

public class Publisher {
        private ProxyLocalDispatcher<Watcher> m_dispatcher = new ProxyLocalDispatcher<>(Watcher.class);
        
        public Dispatcher<Watcher> dispatcher() {
                return m_dispatcher;
        }
        
        public void something() {
                m_dispatcher.proxy().it_happened();
        }
}

Subscriber use

Subscribers will request the Dispatcher from a publisher and subscribe using either the add_sync method or the add_async method to subscribe either synchronously or asynchronously, respectively.

Subscribers can unsubscribe from a dispatcher by using the remove method of the Dispatcher object.

Synchronous subscribers will be invoked in the same thread that dispatch operation is being performed and will be notified during publishing. Asynchronous subscribers will be notified in a different thread, see below.

Asynchronous notification and threads

Asynchronous notification requires a threading system that is capable of firing events asynchronously. psl provides the AsyncDispatcher for this purpose.

In general terms, the asynchronous dispatcher is independent from the publish and subscribe mechanism in the sense that it is a multi-threaded task dispatcher. However, it has some features which are useful for the psl library.

While multiple asynchronous dispatchers may exist, a global one exists and is maintained by the GlobalDispatcher class. This class, an utility class of which no instances may exist, maintains a static global asynchronous dispatcher. It is this dispatcher that is used to notify asynchronous subscribers.

The asynchronous dispatcher provides a single dispatching method, the submit method that receives a Runnable object. The object will be queued for dispatch and run in a thread as soon as possible.

Exceptions fired during asynchronous dispatch are handled by an ExceptionHandler. The default handler, used if none is provided explicitly, simply prints the stack trace to the stderr.

The asynchronous dispatcher keeps track of the number of tasks executed, their type and the time they took and provides that information through a jmx interface, as described below. It also keeps track of the stack trace where the tasks were submitted to execution. For example, if an exception of type ExampleException is fired in a method X.foo, the stack trace of the exception looks something like:

ExampleException: some description
        at X.foo(X.java:10)
        ...

However, if this exception has been fired in a task which was submitted during execution of method Y.bar, then the stack trace will look something like:

ExampleException: some description
        at X.foo(X.java:10)
        ...
        Suppressed: psl.AsynchronousDispatchLocation: (dispatched from here)
                at psl.AsyncDispatchRunnable.<init>(AsyncDispatchRunnable.java:49)
                at psl.AsyncSubscription.dispatch(AsyncSubscription.java:95)
                at psl.LocalDispatcher.dispatch(LocalDispatcher.java:79)
                at Y.bar (Y.java:20)

And if Y.bar was run, in turn, run from inside a task submitted in method Z.xpto, then the stack trace will look something like:

ExampleException: some description
        at X.foo(X.java:10)
        ...
        Suppressed: psl.AsynchronousDispatchLocation: (dispatched from here)
                at psl.AsyncDispatchRunnable.<init>(AsyncDispatchRunnable.java:49)
                at psl.AsyncSubscription.dispatch(AsyncSubscription.java:95)
                at psl.LocalDispatcher.dispatch(LocalDispatcher.java:79)
                at Y.bar (Y.java:20)
                ...
        Caused by: psl.AsynchronousDispatchLocation: (dispatched from here)
                at psl.AsyncDispatchRunnable.<init>(AsyncDispatchRunnable.java:49)
                at psl.AsyncSubscription.dispatch(AsyncSubscription.java:95)
                at psl.LocalDispatcher.dispatch(LocalDispatcher.java:79)
                at Z.xpto (Z.java:30)
                ...

This allows keeping track of which events fired which events in case multiple events are fired in a chain making debugging easier. Even if the chain spanws multiple threads.

JMX support

psl automatically publishes to jmx information about all asynchronous dispatchers, and the number of events they fired, organized by type. It also provides some timing information on the events.

An example of <i>jconsole</i> showing information on a disptcher.

In the figure above, we can see that there is an asynchronous dispatcher named foo that has one thread, foo-1. This thread has processed events of type psl.BB_MXTest$1MyRunnable and, in the attributes, we can see that it has processed only one event and its time was 6 ms.