21 package org.matsim.core.events;
23 import java.lang.Thread.UncaughtExceptionHandler;
24 import java.util.ArrayList;
25 import java.util.List;
26 import java.util.Queue;
27 import java.util.concurrent.BrokenBarrierException;
28 import java.util.concurrent.CyclicBarrier;
29 import java.util.concurrent.LinkedBlockingQueue;
30 import java.util.concurrent.atomic.AtomicLong;
31 import java.util.concurrent.atomic.AtomicReference;
33 import jakarta.inject.Inject;
35 import org.apache.logging.log4j.LogManager;
36 import org.apache.logging.log4j.Logger;
51 class SimStepParallelEventsManagerImpl
implements EventsManager {
53 private final static Logger log = LogManager.getLogger(SimStepParallelEventsManagerImpl.class);
55 private final int numOfThreads;
56 private CyclicBarrier simStepEndBarrier;
57 private CyclicBarrier iterationEndBarrier;
58 private ProcessEventsRunnable[] runnables;
59 private EventsManagerImpl[] eventsManagers;
60 private EventsManagerImpl delegate;
61 private ProcessedEventsChecker processedEventsChecker;
63 private boolean parallelMode =
false;
64 private int handlerCount = 0;
66 private AtomicLong counter;
67 private AtomicReference<Throwable> hadException =
new AtomicReference<>();
70 SimStepParallelEventsManagerImpl(EventsManagerConfigGroup config) {
71 this(config.getNumberOfThreads() != null ? config.getNumberOfThreads() : 1);
74 public SimStepParallelEventsManagerImpl() {
78 public SimStepParallelEventsManagerImpl(
int numOfThreads) {
79 this.numOfThreads = numOfThreads;
80 log.info(
"number of threads=" + numOfThreads );
85 this.counter =
new AtomicLong(0);
87 this.simStepEndBarrier =
new CyclicBarrier(this.numOfThreads + 1);
88 this.iterationEndBarrier =
new CyclicBarrier(this.numOfThreads + 1);
90 this.delegate =
new EventsManagerImpl();
92 this.eventsManagers =
new EventsManagerImpl[this.numOfThreads];
93 for (
int i = 0; i < numOfThreads; i++) this.eventsManagers[i] =
new EventsManagerImpl();
97 public void processEvent(
final Event event) {
98 this.counter.incrementAndGet();
102 runnables[0].processEvent(event);
104 delegate.processEvent(event);
109 public void addHandler(
final EventHandler handler) {
110 delegate.addHandler(handler);
112 eventsManagers[handlerCount % numOfThreads].addHandler(handler);
117 public void removeHandler(
final EventHandler handler) {
118 delegate.removeHandler(handler);
120 for (EventsManager eventsManager : eventsManagers) eventsManager.removeHandler(handler);
124 public void resetHandlers(
int iteration) {
125 delegate.resetHandlers(iteration);
130 public void initProcessing() {
131 delegate.initProcessing();
132 for (EventsManager eventsManager : this.eventsManagers) eventsManager.initProcessing();
134 Queue<Event>[] eventsQueuesArray =
new Queue[this.numOfThreads];
135 List<Queue<Event>> eventsQueues =
new ArrayList<Queue<Event>>();
136 for (
int i = 0; i < numOfThreads; i++) {
137 Queue<Event> eventsQueue =
new LinkedBlockingQueue<>();
138 eventsQueues.add(eventsQueue);
139 eventsQueuesArray[i] = eventsQueue;
145 eventsQueues.add(null);
151 processedEventsChecker =
new ProcessedEventsChecker(
this, eventsQueuesArray);
156 CyclicBarrier waitForEmptyQueuesBarrier =
new CyclicBarrier(this.numOfThreads, processedEventsChecker);
158 hadException =
new AtomicReference<>();
159 ExceptionHandler uncaughtExceptionHandler =
new ExceptionHandler(hadException, waitForEmptyQueuesBarrier,
160 simStepEndBarrier, iterationEndBarrier);
162 runnables =
new ProcessEventsRunnable[numOfThreads];
163 for (
int i = 0; i < numOfThreads; i++) {
164 ProcessEventsRunnable processEventsRunnable =
new ProcessEventsRunnable(eventsManagers[i], processedEventsChecker,
165 waitForEmptyQueuesBarrier, simStepEndBarrier, iterationEndBarrier, eventsQueues.get(i), eventsQueues.get(i + 1));
166 runnables[i] = processEventsRunnable;
167 Thread thread =
new Thread(processEventsRunnable);
168 thread.setDaemon(
true);
169 thread.setUncaughtExceptionHandler(uncaughtExceptionHandler);
170 thread.setName(ProcessEventsRunnable.class.toString() + i);
178 this.parallelMode =
true;
188 public synchronized void finishProcessing() {
196 if (hadException.get() == null) {
198 this.processEvent(
new LastEventOfIteration(Double.POSITIVE_INFINITY));
199 iterationEndBarrier.await();
200 }
catch (InterruptedException | BrokenBarrierException e) {
202 if (hadException.get() == null) {
205 log.error(
"Exception caught while finishing event processing at the iteration end.", e);
206 this.hadException.set(e);
211 delegate.finishProcessing();
212 for (EventsManager eventsManager : this.eventsManagers) eventsManager.finishProcessing();
218 this.parallelMode =
false;
220 if (hadException.get() != null) {
221 throw new RuntimeException(
"Exception while processing events. Cannot guarantee that all events have been fully processed.", hadException.get());
226 public void afterSimStep(
double time) {
233 if (hadException.get() != null) {
238 Gbl.assertNotNull( this.processedEventsChecker );
239 this.processedEventsChecker.setTime(time);
240 this.processEvent(
new LastEventOfSimStep(time));
241 simStepEndBarrier.await();
242 }
catch (InterruptedException | BrokenBarrierException e) {
243 if (hadException.get() != null) {
262 CyclicBarrier waitForEmptyQueuesBarrier,CyclicBarrier simStepEndBarrier,
263 CyclicBarrier iterationEndBarrier, Queue<Event> eventsQueue, Queue<Event> nextEventsQueue) {
286 Event event = ((LinkedBlockingQueue<Event>) eventsQueue).take();
292 throw new RuntimeException(
"Events in the queue are not ordered chronologically. " +
293 "This should never happen. Is the SimTimeStepParallelEventsManager registered " +
294 "as a MobsimAfterSimStepListener? LastEventTime = " + this.lastEventTime +
295 " currentEvent.time = " + event.getTime() +
" currentEvent.type = " +
event.getEventType() +
296 " full event: " +
event.toString());
298 this.lastEventTime =
event.getTime();
305 if (nextEventsQueue != null) {
306 nextEventsQueue.add(event);
315 waitForEmptyQueuesBarrier.await();
322 simStepEndBarrier.await();
333 if (nextEventsQueue != null) {
334 nextEventsQueue.add(event);
348 iterationEndBarrier.await();
349 }
catch (InterruptedException | BrokenBarrierException e) {
356 this.eventsQueue.add(event);
369 this.evenentsManger = evenentsManger;
370 this.eventQueues = eventQueues;
372 this.allEventsProcessed =
true;
380 return this.allEventsProcessed;
385 for (Queue<Event> eventsQueue : eventQueues) {
402 if (eventsQueue.size() > 0) {
403 allEventsProcessed =
false;
410 allEventsProcessed =
true;
425 public ExceptionHandler(
final AtomicReference<Throwable> hadException, CyclicBarrier waitForEmptyQueuesBarrier,
426 CyclicBarrier simStepEndBarrier, CyclicBarrier iterationEndBarrier) {
427 this.hadException = hadException;
435 if (hadException.get() == null) {
437 this.hadException.set(e);
439 if (!(e instanceof BrokenBarrierException)) {
442 log.error(
"Thread " + t.getName() +
" died with exception while handling events.", e);
448 this.simStepEndBarrier.reset();
449 this.iterationEndBarrier.reset();
450 this.waitForEmptyQueuesBarrier.reset();
boolean allEventsProcessed()
final Queue< Event > [] eventQueues
void uncaughtException(Thread t, Throwable e)
void setTime(double time)
ProcessedEventsChecker(EventsManager evenentsManger, Queue< Event >[] eventQueues)
ExceptionHandler(final AtomicReference< Throwable > hadException, CyclicBarrier waitForEmptyQueuesBarrier, CyclicBarrier simStepEndBarrier, CyclicBarrier iterationEndBarrier)
final CyclicBarrier iterationEndBarrier
final ProcessedEventsChecker processedEventsChecker
void processEvent(final Event event)
final EventsManager eventsManager
ProcessEventsRunnable(EventsManager eventsManager, ProcessedEventsChecker processedEventsChecker, CyclicBarrier waitForEmptyQueuesBarrier, CyclicBarrier simStepEndBarrier, CyclicBarrier iterationEndBarrier, Queue< Event > eventsQueue, Queue< Event > nextEventsQueue)
final EventsManager evenentsManger
final CyclicBarrier iterationEndBarrier
final CyclicBarrier simStepEndBarrier
final CyclicBarrier simStepEndBarrier
final CyclicBarrier waitForEmptyQueuesBarrier
final Queue< Event > nextEventsQueue
final AtomicReference< Throwable > hadException
void processEvent(Event event)
boolean allEventsProcessed
final Queue< Event > eventsQueue
final CyclicBarrier waitForEmptyQueuesBarrier
static final void printCurrentThreadCpuTime()