View Javadoc
1   package psl;
2   
3   import java.util.concurrent.LinkedBlockingQueue;
4   import java.util.concurrent.ThreadPoolExecutor;
5   import java.util.concurrent.TimeUnit;
6   
7   import ensure.Ensure;
8   
9   /**
10   * Dispatcher used to run asynchronous requests. A dispatcher runs several
11   * threads that execute tasks submitted. It is commonly used to register
12   * {@link AsyncDispatchRunnable} tasks but any runnable task may be registered.
13   */
14  public class AsyncDispatcher {
15  	/**
16  	 * Time to wait between polls to check whether threads have terminated
17  	 * or not.
18  	 */
19  	private static final long TERMINATION_POLLING_WAIT_TIME_MS = 25;
20  	
21  	/**
22  	 * The executor, <code>null</code> if not alive.
23  	 */
24  	private ThreadPoolExecutor m_executor;
25  	
26  	/**
27  	 * The thread factory.
28  	 */
29  	private DispatcherThreadFactory m_factory;
30  	
31  	/**
32  	 * Number of tasks waiting to be executed.
33  	 */
34  	private long m_waiting;
35  	
36  	/**
37  	 * Number of tasks currently executing.
38  	 */
39  	private long m_executing;
40  	
41  	/**
42  	 * Creates a new dispatcher.
43  	 * @param name the dispatcher name
44  	 * @param handler the exception handler; may be <code>null</code> in which
45  	 * case the default handler will be used 
46  	 * @param threads number of threads to use in the dispatcher
47  	 */
48  	public AsyncDispatcher(String name, ExceptionHandler handler, int threads) {
49  		Ensure.not_null(name, "name == null");
50  		Ensure.greater_equal(threads, 1, "threads < 1");
51  		
52  		final ExceptionHandler h;
53  		if (handler == null) {
54  			h = new DefaultExceptionHandler();
55  		} else {
56  			h = handler;
57  		}
58  		
59  		m_factory = DispatcherThreadFactory.make(name, h);
60  		m_executor = new ThreadPoolExecutor(threads, threads, 0,
61  				TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
62  				m_factory) {
63  			@Override
64  			protected void afterExecute(Runnable r, Throwable t) {
65  				super.afterExecute(r, t);
66  				if (t != null) {
67  					h.thrown(r, t, Thread.currentThread());
68  				}
69  			}
70  		};
71  		
72  		m_executing = 0;
73  		m_waiting = 0;
74  	}
75  	
76  	/**
77  	 * Checks whether the dispatcher is still alive.
78  	 * @return is the dispatcher alive?
79  	 */
80  	public synchronized boolean alive() {
81  		return m_executor != null;
82  	}
83  	
84  	/**
85  	 * Shuts down the dispatcher.
86  	 */
87  	public void shutdown() {
88  		ThreadPoolExecutor executor;
89  		
90  		synchronized (this) {
91  			if (m_executor == null) {
92  				return;
93  			}
94  			
95  			executor = m_executor;
96  			m_executor = null;
97  		}
98  		
99  		executor.shutdownNow();
100 		
101 		while (true) {
102 			boolean shutdown = executor.isTerminated();
103 			if (shutdown) {
104 				m_factory.shutdown();
105 				return;
106 			}
107 			
108 			try {
109 				Thread.sleep(TERMINATION_POLLING_WAIT_TIME_MS);
110 			} catch (InterruptedException e) {
111 				/*
112 				 * Ignored.
113 				 */
114 			}
115 		}
116 	}
117 	
118 	/**
119 	 * Submits a task for execution.
120 	 * @param r the task, cannot be <code>null</code>
121 	 */
122 	public synchronized void submit(final Runnable r) {
123 		Ensure.not_null(r, "r == null");
124 		Ensure.not_null(m_executor, "m_executor == null");
125 		
126 		m_waiting++;
127 		m_executor.submit(new Runnable() {
128 			@Override
129 			public void run() {
130 				synchronized (AsyncDispatcher.this) {
131 					Ensure.greater(m_waiting, 0, "m_waiting <= 0");
132 					m_waiting--;
133 					m_executing++;
134 				}
135 				
136 				try {
137 					new ExecutorRunnable(r).run();
138 				} finally {
139 					synchronized (AsyncDispatcher.this) {
140 						Ensure.greater(m_executing, 0, "m_executing <= 0");
141 						m_executing--;
142 						AsyncDispatcher.this.notifyAll();
143 					}
144 				}
145 			}
146 		});
147 	}
148 	
149 	/**
150 	 * Waits until there are no more tasks to process. Because this method is
151 	 * unsafe as tasks can be submitted right after the method has returned,
152 	 * it is usually only useful in tightly-controlled environments such as
153 	 * unit tests. 
154 	 */
155 	public synchronized void wait_clear() {
156 		while (m_waiting > 0 || m_executing > 0) {
157 			try {
158 				wait();
159 			} catch (InterruptedException e) {
160 				/*
161 				 * Just cycle again.
162 				 */
163 			}
164 		}
165 	}
166 }