MATSIM
ParallelEventsManager.java
Go to the documentation of this file.
1 /* *********************************************************************** *
2  * project: org.matsim.*
3  * ParallelEventsManager.java
4  * *
5  * *********************************************************************** *
6  * *
7  * copyright : (C) 2012 by the members listed in the COPYING, *
8  * LICENSE and WARRANTY file. *
9  * email : info at matsim dot org *
10  * *
11  * *********************************************************************** *
12  * *
13  * This program is free software; you can redistribute it and/or modify *
14  * it under the terms of the GNU General Public License as published by *
15  * the Free Software Foundation; either version 2 of the License, or *
16  * (at your option) any later version. *
17  * See also COPYING, LICENSE and WARRANTY file *
18  * *
19  * *********************************************************************** */
20 
21 package org.matsim.core.events;
22 
23 import java.lang.Thread.UncaughtExceptionHandler;
24 import java.util.ArrayList;
25 import java.util.List;
26 import java.util.concurrent.ArrayBlockingQueue;
27 import java.util.concurrent.BlockingQueue;
28 import java.util.concurrent.LinkedBlockingQueue;
29 import java.util.concurrent.TimeUnit;
30 import jakarta.inject.Inject;
31 import org.apache.logging.log4j.LogManager;
32 import org.apache.logging.log4j.Logger;
35 import org.matsim.core.config.Config;
37 
41 public final class ParallelEventsManager implements EventsManager {
42 
43  private final static Logger log = LogManager.getLogger(ParallelEventsManager.class);
44 
46  private ArrayList<EventsManager> eventsManagers = new ArrayList<>();
47  private final List<EventHandler> eventsHandlers;
48  private final boolean oneThreadPerHandler;
49  private final boolean syncOnTimeSteps;
50  private final int numOfThreads;
52  private int iteration = 0;
53  private boolean init = false;
54  private final BlockingQueue<EventArray> eventQueue;
55 
56  private final int eventsQueueSize;
57  //private final int eventsQueueSize = 1048576 * 32;
58  private final int eventsArraySize;
59 
60  @Inject
63 
64  }
65 
66  public ParallelEventsManager(final boolean syncOnTimeSteps) {
67  this(syncOnTimeSteps, 65536);
68  }
69 
70  public ParallelEventsManager(final boolean syncOnTimeSteps, int eventsQueueSize) {
71  this(syncOnTimeSteps, true, -1,eventsQueueSize);
72 
73  }
74 
75  public ParallelEventsManager(final boolean syncOnTimeSteps, final int numOfThreads, int eventsQueueSize) {
77  }
78 
79  /*package*/ ParallelEventsManager(final boolean syncOnTimeSteps, final boolean oneThreadPerHandler, final int numOfThreads,final int eventsQueueSize) {
80  this.syncOnTimeSteps = syncOnTimeSteps;
81  this.oneThreadPerHandler = oneThreadPerHandler;
82  this.numOfThreads = numOfThreads;
83  this.eventsHandlers = new ArrayList<>();
84  this.eventsArraySize = syncOnTimeSteps ? 512 : 32768;
85  this.eventsQueueSize = eventsQueueSize;
86  this.eventQueue = new ArrayBlockingQueue<>(eventsQueueSize);
87  this.uncaughtExceptionHandler = new ExceptionHandler();
88  }
89 
90  private void initialize() {
91  int numHandlers = oneThreadPerHandler ? this.eventsHandlers.size() : Math.min(this.numOfThreads, this.eventsHandlers.size());
92  this.distributor = new Distributor(new ArrayList<ProcessEventsRunnable>(), eventQueue);
93  this.eventsManagers = new ArrayList<>(numHandlers);
94 
95  // create event managers
96  if (this.oneThreadPerHandler) {
97  for (int i = 0; i < this.eventsHandlers.size(); i++) {
98  this.eventsManagers.add(new SingleHandlerEventsManager(this.eventsHandlers.get(i)));
99  }
100  } else {
101  // TODO - check if this slow path is correct
102  for (int i = 0; i < this.numOfThreads; i++) {
103  this.eventsManagers.add(new EventsManagerImpl());
104  }
105  for (int i = 0; i < this.eventsHandlers.size(); i++) {
106  this.eventsManagers.get(i % numOfThreads).addHandler(this.eventsHandlers.get(i));
107  }
108  }
109 
110  // initialize runnables (threads that will execute the event managers)
111  for (int i = 0; i < this.eventsManagers.size(); i++) {
112  EventsManager eventsManager = this.eventsManagers.get(i);
113  ProcessEventsRunnable processEventsRunnable = new ProcessEventsRunnable(eventsManager, distributor);
114  distributor.runnables.add(processEventsRunnable);
115  processEventsRunnable.setDaemon(true);
116  processEventsRunnable.setUncaughtExceptionHandler(this.uncaughtExceptionHandler);
117  if (oneThreadPerHandler) {
118  processEventsRunnable.setName("SingleHandlerEventsManager: " + ((SingleHandlerEventsManager) eventsManager).getEventHandlerClassName());
119  }
120  else {
121  processEventsRunnable.setName(ProcessEventsRunnable.class.toString() + i);
122  }
123  processEventsRunnable.start();
124  }
125 
126  // initialize the distributor
127  this.distributor.setDaemon(true);
128  this.distributor.setUncaughtExceptionHandler(this.uncaughtExceptionHandler);
129  this.distributor.setName("EventsDistributor");
130  this.distributor.start();
131  this.init = true;
132  }
133 
134  private void teardown() {
135  try {
136  for (ProcessEventsRunnable per : distributor.runnables) {
137  per.interrupt();
138  per.join();
139  }
140  distributor.interrupt();
141  distributor.join();
142  } catch (InterruptedException e) {
143  throw new RuntimeException("Exception while waiting on join...", e);
144  }
145  this.init = false;
146 
147  }
148 
149  @Override
150  public void processEvent(final Event event) {
151  if (!init) throw new IllegalStateException(".initProcessing() has to be called before processing events!");
152 
153  EventArray array = new EventArray(1);
154  array.add(event);
155  try {
156  this.eventQueue.put(array);
157  } catch (InterruptedException e) {
158  throw new RuntimeException("Exception while adding event.", e);
159  }
160  }
161 
162  @Override
163  public void processEvents(final EventArray events) {
164  if (!init) throw new IllegalStateException(".initProcessing() has to be called before processing events!");
165 
166  try {
167  this.eventQueue.put(events);
168  } catch (InterruptedException e) {
169  throw new RuntimeException("Exception while adding event.", e);
170  }
171  }
172 
173  @Override
174  public void addHandler(final EventHandler handler) {
175  if (init)
176  throw new IllegalStateException("Handlers can not be added after .initProcessing() was called!");
177 
178  // this will be used the next time we start an iteration
179  this.eventsHandlers.add(handler);
180  }
181 
182  @Override
183  public void removeHandler(final EventHandler handler) {
184  // this will be used the next time we start an iteration
185  this.eventsHandlers.remove(handler);
186  }
187 
188  @Override
189  public void resetHandlers(int iteration) {
190  for (EventsManager eventsManager : this.eventsManagers) {
191  eventsManager.resetHandlers(iteration);
192  }
193  }
194 
195  @Override
196  public void initProcessing() {
197 
198  initialize();
199 
200  for (EventsManager eventsManager : this.eventsManagers) {
201  eventsManager.initProcessing();
202  }
203 
204  resetHandlers(iteration);
205  }
206 
207  /*
208  * In some chases Events are created after this method has been called. To ensure that they are processed in
209  * real time, we process them not in the parallel thread. To do so, we replace the parallel events manager
210  * with its EventsManager instance.
211  */
212  @Override
213  public synchronized void finishProcessing() {
214  flush();
215 
216  for (EventsManager eventsManager : this.eventsManagers) {
217  eventsManager.finishProcessing();
218  }
219 
220  teardown();
221 
222  if (this.uncaughtExceptionHandler.hadException()) {
223  throw new RuntimeException("Exception while processing events. Cannot guarantee that all events have been fully processed.", this.uncaughtExceptionHandler.exception);
224  }
225 
226  iteration += 1;
227  }
228 
229  @Override
230  public void afterSimStep(double time) {
231  if (this.syncOnTimeSteps) {
232  flush();
233  }
234 
235  if (this.uncaughtExceptionHandler.hadException()) {
236  throw new RuntimeException("Exception while processing events. Cannot guarantee that all events have been fully processed.", this.uncaughtExceptionHandler.exception);
237  }
238 
239  }
240 
241  public void flush() {
242  try {
243  this.distributor.flush();
244  } catch (InterruptedException e) {
245  throw new RuntimeException("Exception while waiting on flush... " + e.getMessage(), this.uncaughtExceptionHandler.exception);
246  }
247  }
248 
249  private class Distributor extends Thread {
250 
251  private final ArrayList<ProcessEventsRunnable> runnables;
252  private final BlockingQueue<EventArray> eventQueue;
253 
254  // When set to true, the distributor will process all events until all events in the event manager are processed.
255  // This is used when the simulation needs to sync with event processing and make sure there are no unprocessed
256  // events in the system.
257  private volatile boolean shouldFlush = false;
258 
259  public Distributor(ArrayList<ProcessEventsRunnable> runnables, BlockingQueue<EventArray> eventQueue) {
260  this.runnables = runnables;
261  this.eventQueue = eventQueue;
262  }
263 
264  public void flush() throws InterruptedException {
265  synchronized (this) {
266  shouldFlush = true;
267  while (shouldFlush && this.isAlive()) {
268  this.wait(1);
269  }
270  }
271  }
272 
273  private void distribute(EventArray events) {
274  for (ProcessEventsRunnable runnable : this.runnables) {
275  runnable.eventsQueue.add(events);
276  }
277  }
278 
279  @Override
280  public final void run() {
281  try {
282  EventArray events = new EventArray(eventsArraySize);
283  while (true) {
284  EventArray earray = this.eventQueue.poll(50, TimeUnit.MICROSECONDS);
285  if (earray == null) {
286  synchronized (this) {
287  // check if we can finish the flush
288  if (shouldFlush) {
289  // distribute missing events
290  if (events.size() > 0) {
291  distribute(events);
292  events = new EventArray(eventsArraySize);
293  }
294 
295  // We'll ask the ProcessEventsRunnables to flush
296  for (ProcessEventsRunnable runnable : this.runnables)
297  runnable.flush();
298 
299  // Wait until all the ProcessEventsRunnables have finished flushing
300  boolean stillFlushing = true;
301  while (stillFlushing) {
302  this.wait(1);
303 
304  stillFlushing = false;
305  for (ProcessEventsRunnable runnable : this.runnables)
306  stillFlushing |= runnable.isFlushing();
307  }
308 
309  // termination criteria for the flush
310  if (eventQueue.isEmpty()) {
311  shouldFlush = false;
312  this.notify();
313  }
314  }
315  }
316  continue;
317  }
318 
319  // this is an optimization, if we receive a large buffer, avoid copying it and send it directly.
320  if (earray.size() >= eventsArraySize) {
321  // make sure we don't miss events already buffered
322  if (events.size() > 0) {
323  distribute(events);
324  events = new EventArray(eventsArraySize);
325  }
326  // send newly received events
327  distribute(earray);
328  }
329  // this is the non-optimized path, where we receive small number of events at a time
330  else {
331  for (int i = 0; i < earray.size(); i++) {
332  Event event = earray.get(i);
333  events.add(event);
334  // if the buffer is full or if we need to flush
335  if (events.size() == eventsArraySize || shouldFlush) {
336  distribute(events);
337  events = new EventArray(eventsArraySize);
338  }
339  }
340  }
341  }
342  } catch (InterruptedException e) {
343  return;
344  }
345  }
346  }
347 
348  private class ProcessEventsRunnable extends Thread {
349 
350  private final Distributor distributor;
352  private final BlockingQueue<EventArray> eventsQueue;
353  private boolean flush = false;
354 
355  public ProcessEventsRunnable(EventsManager eventsManager, Distributor distributor) {
356  this.eventsManager = eventsManager;
357  this.eventsQueue = new LinkedBlockingQueue<>();
358  this.distributor = distributor;
359  }
360 
361  public synchronized void flush() {
362  flush = true;
363  }
364 
365  public boolean isFlushing() {
366  return flush && this.isAlive();
367  }
368 
369  @Override
370  public void run() {
371  try {
372  while (true) {
373  EventArray events = this.eventsQueue.poll(50, TimeUnit.MICROSECONDS);
374 
375  if (events != null) {
376  for (int i = 0; i < events.size(); i++) {
377  this.eventsManager.processEvent(events.get(i));
378  }
379  }
380 
381  // If flush is over, then try to wake up distributor
382  if (flush && this.eventsQueue.isEmpty()) {
383  synchronized (this) {
384  flush = false;
385  }
386  synchronized (this.distributor) {
387  this.distributor.notify();
388  }
389  }
390  }
391  } catch (InterruptedException e) {
392  return;
393  }
394  }
395  }
396 
400  private static class ExceptionHandler implements UncaughtExceptionHandler {
401 
402  private volatile boolean hadException = false;
403  private volatile Throwable exception;
404 
405 
406  @Override
407  public void uncaughtException(Thread t, Throwable e) {
408  this.hadException = true;
409  this.exception = e;
410  log.error("Thread " + t.getName() + " died with exception while handling events.", e);
411  }
412 
413  public boolean hadException() {
414  return this.hadException;
415  }
416 
417  public Throwable exception() {
418  return this.exception;
419  }
420 
421  }
422 }
ParallelEventsManager(final boolean syncOnTimeSteps)
EventsManagerConfigGroup eventsManager()
Definition: Config.java:467
ProcessEventsRunnable(EventsManager eventsManager, Distributor distributor)
Distributor(ArrayList< ProcessEventsRunnable > runnables, BlockingQueue< EventArray > eventQueue)
final BlockingQueue< EventArray > eventQueue
ParallelEventsManager(final boolean syncOnTimeSteps, int eventsQueueSize)
ParallelEventsManager(final boolean syncOnTimeSteps, final int numOfThreads, int eventsQueueSize)