MATSIM
SimStepParallelEventsManagerImpl.java
Go to the documentation of this file.
1 /* *********************************************************************** *
2  * project: org.matsim.*
3  * SimStepParallelEventsManagerImpl.java
4  * *
5  * *********************************************************************** *
6  * *
7  * copyright : (C) 2012 by the members listed in the COPYING, *
8  * LICENSE and WARRANTY file. *
9  * email : info at matsim dot org *
10  * *
11  * *********************************************************************** *
12  * *
13  * This program is free software; you can redistribute it and/or modify *
14  * it under the terms of the GNU General Public License as published by *
15  * the Free Software Foundation; either version 2 of the License, or *
16  * (at your option) any later version. *
17  * See also COPYING, LICENSE and WARRANTY file *
18  * *
19  * *********************************************************************** */
20 
21 package org.matsim.core.events;
22 
23 import java.lang.Thread.UncaughtExceptionHandler;
24 import java.util.ArrayList;
25 import java.util.List;
26 import java.util.Queue;
27 import java.util.concurrent.BrokenBarrierException;
28 import java.util.concurrent.CyclicBarrier;
29 import java.util.concurrent.LinkedBlockingQueue;
30 import java.util.concurrent.atomic.AtomicLong;
31 import java.util.concurrent.atomic.AtomicReference;
32 
33 import jakarta.inject.Inject;
34 
35 import org.apache.logging.log4j.LogManager;
36 import org.apache.logging.log4j.Logger;
41 import org.matsim.core.gbl.Gbl;
42 
51 class SimStepParallelEventsManagerImpl implements EventsManager {
52 
53  private final static Logger log = LogManager.getLogger(SimStepParallelEventsManagerImpl.class);
54 
55  private final int numOfThreads;
56  private CyclicBarrier simStepEndBarrier;
57  private CyclicBarrier iterationEndBarrier;
58  private ProcessEventsRunnable[] runnables;
59  private EventsManagerImpl[] eventsManagers;
60  private EventsManagerImpl delegate;
61  private ProcessedEventsChecker processedEventsChecker;
62 
63  private boolean parallelMode = false;
64  private int handlerCount = 0;
65 
66  private AtomicLong counter;
67  private AtomicReference<Throwable> hadException = new AtomicReference<>();
68 
69  @Inject
70  SimStepParallelEventsManagerImpl(EventsManagerConfigGroup config) {
71  this(config.getNumberOfThreads() != null ? config.getNumberOfThreads() : 1);
72  }
73 
74  public SimStepParallelEventsManagerImpl() {
75  this(1);
76  }
77 
78  public SimStepParallelEventsManagerImpl(int numOfThreads) {
79  this.numOfThreads = numOfThreads;
80  log.info("number of threads=" + numOfThreads );
81  init();
82  }
83 
84  private void init() {
85  this.counter = new AtomicLong(0);
86 
87  this.simStepEndBarrier = new CyclicBarrier(this.numOfThreads + 1);
88  this.iterationEndBarrier = new CyclicBarrier(this.numOfThreads + 1);
89 
90  this.delegate = new EventsManagerImpl();
91 
92  this.eventsManagers = new EventsManagerImpl[this.numOfThreads];
93  for (int i = 0; i < numOfThreads; i++) this.eventsManagers[i] = new EventsManagerImpl();
94  }
95 
96  @Override
97  public void processEvent(final Event event) {
98  this.counter.incrementAndGet();
99 
100  if (parallelMode) {
101  // pass it to the event queue of the first event processing thread, it will pass it further
102  runnables[0].processEvent(event);
103  } else {
104  delegate.processEvent(event);
105  }
106  }
107 
108  @Override
109  public void addHandler(final EventHandler handler) {
110  delegate.addHandler(handler);
111 
112  eventsManagers[handlerCount % numOfThreads].addHandler(handler);
113  handlerCount++;
114  }
115 
116  @Override
117  public void removeHandler(final EventHandler handler) {
118  delegate.removeHandler(handler);
119 
120  for (EventsManager eventsManager : eventsManagers) eventsManager.removeHandler(handler);
121  }
122 
123  @Override
124  public void resetHandlers(int iteration) {
125  delegate.resetHandlers(iteration);
126  counter.set(0);
127  }
128 
129  @Override
130  public void initProcessing() {
131  delegate.initProcessing();
132  for (EventsManager eventsManager : this.eventsManagers) eventsManager.initProcessing();
133 
134  Queue<Event>[] eventsQueuesArray = new Queue[this.numOfThreads];
135  List<Queue<Event>> eventsQueues = new ArrayList<Queue<Event>>();
136  for (int i = 0; i < numOfThreads; i++) {
137  Queue<Event> eventsQueue = new LinkedBlockingQueue<>();
138  eventsQueues.add(eventsQueue);
139  eventsQueuesArray[i] = eventsQueue;
140  }
141 
142  /*
143  * Add a null entry to the list which will be set as nextEventsQueue in the last ProcessEventsThread.
144  */
145  eventsQueues.add(null);
146 
147  /*
148  * Create a ProcessedEventsChecker that checks whether all Events of
149  * a time step have been processed.
150  */
151  processedEventsChecker = new ProcessedEventsChecker(this, eventsQueuesArray);
152 
153  /*
154  * Create a Barrier that the threads use to synchronize.
155  */
156  CyclicBarrier waitForEmptyQueuesBarrier = new CyclicBarrier(this.numOfThreads, processedEventsChecker);
157 
158  hadException = new AtomicReference<>();
159  ExceptionHandler uncaughtExceptionHandler = new ExceptionHandler(hadException, waitForEmptyQueuesBarrier,
160  simStepEndBarrier, iterationEndBarrier);
161 
162  runnables = new ProcessEventsRunnable[numOfThreads];
163  for (int i = 0; i < numOfThreads; i++) {
164  ProcessEventsRunnable processEventsRunnable = new ProcessEventsRunnable(eventsManagers[i], processedEventsChecker,
165  waitForEmptyQueuesBarrier, simStepEndBarrier, iterationEndBarrier, eventsQueues.get(i), eventsQueues.get(i + 1));
166  runnables[i] = processEventsRunnable;
167  Thread thread = new Thread(processEventsRunnable);
168  thread.setDaemon(true);
169  thread.setUncaughtExceptionHandler(uncaughtExceptionHandler);
170  thread.setName(ProcessEventsRunnable.class.toString() + i);
171  thread.start();
172  }
173 
174  /*
175  * During the simulation Events are processed in
176  * the EventsProcessingThreads.
177  */
178  this.parallelMode = true;
179  }
180 
181  /*
182  * In some chases Events are created after this method has been called.
183  * To ensure that they are processed in real time, we process them not
184  * in the parallel thread. To do so, we replace the parallel events manager
185  * with its EventsManager instance.
186  */
187  @Override
188  public synchronized void finishProcessing() {
189 
190  /*
191  * If an exception occurred, at least one of the events processing threads
192  * has crashed. As a result, also all other events processing threads have
193  * been stopped.
194  * If not, it is waited until all threads have ended processing events.
195  */
196  if (hadException.get() == null) {
197  try {
198  this.processEvent(new LastEventOfIteration(Double.POSITIVE_INFINITY));
199  iterationEndBarrier.await();
200  } catch (InterruptedException | BrokenBarrierException e) {
201  // prefer storing the original exception and not the follow-up BrokenCyclicBarrier exceptions
202  if (hadException.get() == null) {
203  // let's log it as it may be superseded by exceptions thrown during finishProcessing() by
204  // the delegate or one of the eventManagers
205  log.error("Exception caught while finishing event processing at the iteration end.", e);
206  this.hadException.set(e);
207  }
208  }
209  }
210 
211  delegate.finishProcessing();
212  for (EventsManager eventsManager : this.eventsManagers) eventsManager.finishProcessing();
213 
214  /*
215  * After the simulation Events are processed in
216  * the Main Thread.
217  */
218  this.parallelMode = false;
219 
220  if (hadException.get() != null) {
221  throw new RuntimeException("Exception while processing events. Cannot guarantee that all events have been fully processed.", hadException.get());
222  }
223  }
224 
225  @Override
226  public void afterSimStep(double time) {
227 
228  /*
229  * If an exception occurred, at least one of the events processing threads
230  * has crashed. Therefore the remaining threads would get stuck at the
231  * CyclicBarrier.
232  */
233  if (hadException.get() != null) {
234  throw new RuntimeException(hadException.get());
235  }
236 
237  try {
238  Gbl.assertNotNull( this.processedEventsChecker );
239  this.processedEventsChecker.setTime(time);
240  this.processEvent(new LastEventOfSimStep(time));
241  simStepEndBarrier.await();
242  } catch (InterruptedException | BrokenBarrierException e) {
243  if (hadException.get() != null) {
244  throw new RuntimeException(hadException.get());
245  }
246  throw new RuntimeException(e);
247  }
248  }
249 
250  private static class ProcessEventsRunnable implements Runnable {
251 
254  private final CyclicBarrier waitForEmptyQueuesBarrier;
255  private final CyclicBarrier simStepEndBarrier;
256  private final CyclicBarrier iterationEndBarrier;
257  private final Queue<Event> eventsQueue;
258  private final Queue<Event> nextEventsQueue;
259  private double lastEventTime = 0.0;
260 
261  public ProcessEventsRunnable(EventsManager eventsManager, ProcessedEventsChecker processedEventsChecker,
262  CyclicBarrier waitForEmptyQueuesBarrier,CyclicBarrier simStepEndBarrier,
263  CyclicBarrier iterationEndBarrier, Queue<Event> eventsQueue, Queue<Event> nextEventsQueue) {
264  this.eventsManager = eventsManager;
265  this.processedEventsChecker = processedEventsChecker;
266  this.waitForEmptyQueuesBarrier = waitForEmptyQueuesBarrier;
267  this.simStepEndBarrier = simStepEndBarrier;
268  this.iterationEndBarrier = iterationEndBarrier;
269  this.eventsQueue = eventsQueue;
270  this.nextEventsQueue = nextEventsQueue;
271  }
272 
273  @Override
274  public void run() {
275  try {
276  /*
277  * If the Simulation has ended we may still have some
278  * Events left to process. So we continue until the
279  * eventsQueue is empty.
280  *
281  * The loop is ended by a break command when a LastEventOfIteration
282  * event is found.
283  */
284  lastEventTime = 0.0;
285  while (true) {
286  Event event = ((LinkedBlockingQueue<Event>) eventsQueue).take();
287 
288  /*
289  * Check whether the events are ordered chronologically.
290  */
291  if (event.getTime() < this.lastEventTime) {
292  throw new RuntimeException("Events in the queue are not ordered chronologically. " +
293  "This should never happen. Is the SimTimeStepParallelEventsManager registered " +
294  "as a MobsimAfterSimStepListener? LastEventTime = " + this.lastEventTime +
295  " currentEvent.time = " + event.getTime() + " currentEvent.type = " + event.getEventType() +
296  " full event: " + event.toString());
297  } else {
298  this.lastEventTime = event.getTime();
299  }
300 
301  if (event instanceof LastEventOfSimStep) {
302  /*
303  * Send the event to the next events processing thread, if this is not the last thread.
304  */
305  if (nextEventsQueue != null) {
306  nextEventsQueue.add(event);
307  }
308 
309  /*
310  * At the moment, this thread's queue is empty. However, one of the other threads
311  * could create additional events for this time step. Therefore we have to wait
312  * until all threads reach this barrier. Afterwards we can check whether still
313  * all queues are empty. If this is true, the threads reach the sim step end barrier.
314  */
315  waitForEmptyQueuesBarrier.await();
316  if (!processedEventsChecker.allEventsProcessed()) continue;
317 
318  /*
319  * All event queues are empty, therefore finish current time step by
320  * reaching the sim step end barrier.
321  */
322  simStepEndBarrier.await();
323  continue;
324  }
325  else {
326  /*
327  * Handing the event over to the next events processing thread.
328  *
329  * This could be added to the next EventHandlers queue here or in
330  * the processEvent(...) method. Doing it here might be a bit faster
331  * since it is not done in the main thread.
332  */
333  if (nextEventsQueue != null) {
334  nextEventsQueue.add(event);
335  }
336 
337  /*
338  * If it is the last Event of the iteration, break the while loop
339  * and end the parallel events processing.
340  * TODO: Check whether still some events could be left in the queues...
341  */
342  if (event instanceof LastEventOfIteration) {
343  break;
344  }
345  }
346  eventsManager.processEvent(event);
347  }
348  iterationEndBarrier.await();
349  } catch (InterruptedException | BrokenBarrierException e) {
350  throw new RuntimeException(e);
351  }
353  }
354 
355  public void processEvent(Event event) {
356  this.eventsQueue.add(event);
357  }
358 
359  } // ProcessEventsRunnable
360 
361  private static class ProcessedEventsChecker implements Runnable {
362 
364  private final Queue<Event>[] eventQueues;
365  private boolean allEventsProcessed;
366  private double time;
367 
368  public ProcessedEventsChecker(EventsManager evenentsManger, Queue<Event>[] eventQueues) {
369  this.evenentsManger = evenentsManger;
370  this.eventQueues = eventQueues;
371 
372  this.allEventsProcessed = true;
373  }
374 
375  public void setTime(double time) {
376  this.time = time;
377  }
378 
379  public boolean allEventsProcessed() {
380  return this.allEventsProcessed;
381  }
382 
383  @Override
384  public void run() {
385  for (Queue<Event> eventsQueue : eventQueues) {
386  /*
387  * Some EventHandlers might have created additional Events [1] which
388  * could be located in the list AFTER the LastEventOfSimStep, meaning
389  * that they would be processed while the simulation is already processing
390  * the next time step. Therefore we check whether the eventsQueues are really
391  * empty.
392  * If at least one of the queues is not empty, this time steps
393  * events processing has to go on. This is triggered by setting
394  * allEventsProcessed to false. Additionally a last event of sim
395  * step event is created. When all events processing threads
396  * process that event, it is again checked whether there are
397  * more events left.
398  *
399  * [1] ... Such a behavior is NOT part of MATSim's default EventHandlers but it
400  * still might occur.
401  */
402  if (eventsQueue.size() > 0) {
403  allEventsProcessed = false;
404  evenentsManger.processEvent(new LastEventOfSimStep(time));
405  return;
406  }
407  }
408 
409  // otherwise
410  allEventsProcessed = true;
411  }
412 
413  } // ProcessedEventsChecker
414 
418  private static class ExceptionHandler implements UncaughtExceptionHandler {
419 
420  private final AtomicReference<Throwable> hadException;
421  private final CyclicBarrier simStepEndBarrier;
422  private final CyclicBarrier iterationEndBarrier;
423  private final CyclicBarrier waitForEmptyQueuesBarrier;
424 
425  public ExceptionHandler(final AtomicReference<Throwable> hadException, CyclicBarrier waitForEmptyQueuesBarrier,
426  CyclicBarrier simStepEndBarrier, CyclicBarrier iterationEndBarrier) {
427  this.hadException = hadException;
428  this.waitForEmptyQueuesBarrier = waitForEmptyQueuesBarrier;
429  this.simStepEndBarrier = simStepEndBarrier;
430  this.iterationEndBarrier = iterationEndBarrier;
431  }
432 
433  @Override
434  public void uncaughtException(Thread t, Throwable e) {
435  if (hadException.get() == null) {
436  // store the original exception and not the follow-up BrokenCyclicBarrier exceptions
437  this.hadException.set(e);
438  }
439  if (!(e instanceof BrokenBarrierException)) {
440  // do not log BrokenBarrierException -- they are triggered by resetting the barrier due to
441  // another (original) exception
442  log.error("Thread " + t.getName() + " died with exception while handling events.", e);
443  }
444  /*
445  * By reseting the barriers, they will throw a BrokenBarrierException
446  * which again will stop the events processing threads.
447  */
448  this.simStepEndBarrier.reset();
449  this.iterationEndBarrier.reset();
450  this.waitForEmptyQueuesBarrier.reset();
451  }
452 
453  }
454 
455 }
ExceptionHandler(final AtomicReference< Throwable > hadException, CyclicBarrier waitForEmptyQueuesBarrier, CyclicBarrier simStepEndBarrier, CyclicBarrier iterationEndBarrier)
ProcessEventsRunnable(EventsManager eventsManager, ProcessedEventsChecker processedEventsChecker, CyclicBarrier waitForEmptyQueuesBarrier, CyclicBarrier simStepEndBarrier, CyclicBarrier iterationEndBarrier, Queue< Event > eventsQueue, Queue< Event > nextEventsQueue)
static final void printCurrentThreadCpuTime()
Definition: Gbl.java:203