1 package psl;
2
3 import java.util.concurrent.LinkedBlockingQueue;
4 import java.util.concurrent.ThreadPoolExecutor;
5 import java.util.concurrent.TimeUnit;
6
7 import ensure.Ensure;
8
9
10
11
12
13
14 public class AsyncDispatcher {
15
16
17
18
19 private static final long TERMINATION_POLLING_WAIT_TIME_MS = 25;
20
21
22
23
24 private ThreadPoolExecutor m_executor;
25
26
27
28
29 private DispatcherThreadFactory m_factory;
30
31
32
33
34 private long m_waiting;
35
36
37
38
39 private long m_executing;
40
41
42
43
44
45
46
47
48 public AsyncDispatcher(String name, ExceptionHandler handler, int threads) {
49 Ensure.not_null(name, "name == null");
50 Ensure.greater_equal(threads, 1, "threads < 1");
51
52 final ExceptionHandler h;
53 if (handler == null) {
54 h = new DefaultExceptionHandler();
55 } else {
56 h = handler;
57 }
58
59 m_factory = DispatcherThreadFactory.make(name, h);
60 m_executor = new ThreadPoolExecutor(threads, threads, 0,
61 TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
62 m_factory) {
63 @Override
64 protected void afterExecute(Runnable r, Throwable t) {
65 super.afterExecute(r, t);
66 if (t != null) {
67 h.thrown(r, t, Thread.currentThread());
68 }
69 }
70 };
71
72 m_executing = 0;
73 m_waiting = 0;
74 }
75
76
77
78
79
80 public synchronized boolean alive() {
81 return m_executor != null;
82 }
83
84
85
86
87 public void shutdown() {
88 ThreadPoolExecutor executor;
89
90 synchronized (this) {
91 if (m_executor == null) {
92 return;
93 }
94
95 executor = m_executor;
96 m_executor = null;
97 }
98
99 executor.shutdownNow();
100
101 while (true) {
102 boolean shutdown = executor.isTerminated();
103 if (shutdown) {
104 m_factory.shutdown();
105 return;
106 }
107
108 try {
109 Thread.sleep(TERMINATION_POLLING_WAIT_TIME_MS);
110 } catch (InterruptedException e) {
111
112
113
114 }
115 }
116 }
117
118
119
120
121
122 public synchronized void submit(final Runnable r) {
123 Ensure.not_null(r, "r == null");
124 Ensure.not_null(m_executor, "m_executor == null");
125
126 m_waiting++;
127 m_executor.submit(new Runnable() {
128 @Override
129 public void run() {
130 synchronized (AsyncDispatcher.this) {
131 Ensure.greater(m_waiting, 0, "m_waiting <= 0");
132 m_waiting--;
133 m_executing++;
134 }
135
136 try {
137 new ExecutorRunnable(r).run();
138 } finally {
139 synchronized (AsyncDispatcher.this) {
140 Ensure.greater(m_executing, 0, "m_executing <= 0");
141 m_executing--;
142 AsyncDispatcher.this.notifyAll();
143 }
144 }
145 }
146 });
147 }
148
149
150
151
152
153
154
155 public synchronized void wait_clear() {
156 while (m_waiting > 0 || m_executing > 0) {
157 try {
158 wait();
159 } catch (InterruptedException e) {
160
161
162
163 }
164 }
165 }
166 }