21 package org.matsim.core.events;
23 import java.lang.Thread.UncaughtExceptionHandler;
24 import java.util.ArrayList;
25 import java.util.List;
26 import java.util.concurrent.ArrayBlockingQueue;
27 import java.util.concurrent.BlockingQueue;
28 import java.util.concurrent.LinkedBlockingQueue;
29 import java.util.concurrent.TimeUnit;
30 import jakarta.inject.Inject;
31 import org.apache.logging.log4j.LogManager;
32 import org.apache.logging.log4j.Logger;
53 private boolean init =
false;
79 ParallelEventsManager(
final boolean syncOnTimeSteps,
final boolean oneThreadPerHandler,
final int numOfThreads,
final int eventsQueueSize) {
83 this.eventsHandlers =
new ArrayList<>();
84 this.eventsArraySize = syncOnTimeSteps ? 512 : 32768;
91 int numHandlers = oneThreadPerHandler ? this.eventsHandlers.size() : Math.min(this.numOfThreads, this.eventsHandlers.size());
92 this.distributor =
new Distributor(
new ArrayList<ProcessEventsRunnable>(), eventQueue);
93 this.eventsManagers =
new ArrayList<>(numHandlers);
96 if (this.oneThreadPerHandler) {
97 for (
int i = 0; i < this.eventsHandlers.size(); i++) {
105 for (
int i = 0; i < this.eventsHandlers.size(); i++) {
106 this.eventsManagers.get(i % numOfThreads).addHandler(this.eventsHandlers.get(i));
111 for (
int i = 0; i < this.eventsManagers.size(); i++) {
114 distributor.
runnables.add(processEventsRunnable);
115 processEventsRunnable.setDaemon(
true);
116 processEventsRunnable.setUncaughtExceptionHandler(this.uncaughtExceptionHandler);
117 if (oneThreadPerHandler) {
118 processEventsRunnable.setName(
"SingleHandlerEventsManager: " + ((
SingleHandlerEventsManager) eventsManager).getEventHandlerClassName());
123 processEventsRunnable.start();
127 this.distributor.setDaemon(
true);
128 this.distributor.setUncaughtExceptionHandler(this.uncaughtExceptionHandler);
129 this.distributor.setName(
"EventsDistributor");
130 this.distributor.start();
140 distributor.interrupt();
142 }
catch (InterruptedException e) {
151 if (!init)
throw new IllegalStateException(
".initProcessing() has to be called before processing events!");
156 this.eventQueue.put(array);
157 }
catch (InterruptedException e) {
164 if (!init)
throw new IllegalStateException(
".initProcessing() has to be called before processing events!");
167 this.eventQueue.put(events);
168 }
catch (InterruptedException e) {
176 throw new IllegalStateException(
"Handlers can not be added after .initProcessing() was called!");
179 this.eventsHandlers.add(handler);
185 this.eventsHandlers.remove(handler);
191 eventsManager.resetHandlers(iteration);
201 eventsManager.initProcessing();
217 eventsManager.finishProcessing();
223 throw new RuntimeException(
"Exception while processing events. Cannot guarantee that all events have been fully processed.", this.uncaughtExceptionHandler.
exception);
231 if (this.syncOnTimeSteps) {
236 throw new RuntimeException(
"Exception while processing events. Cannot guarantee that all events have been fully processed.", this.uncaughtExceptionHandler.
exception);
243 this.distributor.
flush();
244 }
catch (InterruptedException e) {
245 throw new RuntimeException(
"Exception while waiting on flush... " + e.getMessage(), this.uncaughtExceptionHandler.
exception);
251 private final ArrayList<ProcessEventsRunnable>
runnables;
259 public Distributor(ArrayList<ProcessEventsRunnable> runnables, BlockingQueue<EventArray> eventQueue) {
264 public void flush() throws InterruptedException {
265 synchronized (
this) {
267 while (shouldFlush && this.isAlive()) {
275 runnable.eventsQueue.add(events);
284 EventArray earray = this.eventQueue.poll(50, TimeUnit.MICROSECONDS);
285 if (earray == null) {
286 synchronized (
this) {
290 if (events.
size() > 0) {
300 boolean stillFlushing =
true;
301 while (stillFlushing) {
304 stillFlushing =
false;
306 stillFlushing |= runnable.isFlushing();
310 if (eventQueue.isEmpty()) {
322 if (events.
size() > 0) {
331 for (
int i = 0; i < earray.
size(); i++) {
342 }
catch (InterruptedException e) {
356 this.eventsManager = eventsManager;
357 this.eventsQueue =
new LinkedBlockingQueue<>();
366 return flush && this.isAlive();
373 EventArray events = this.eventsQueue.poll(50, TimeUnit.MICROSECONDS);
375 if (events != null) {
376 for (
int i = 0; i < events.
size(); i++) {
382 if (flush && this.eventsQueue.isEmpty()) {
383 synchronized (
this) {
387 this.distributor.notify();
391 }
catch (InterruptedException e) {
402 private volatile boolean hadException =
false;
408 this.hadException =
true;
410 log.error(
"Thread " + t.getName() +
" died with exception while handling events.", e);
414 return this.hadException;
418 return this.exception;
void removeHandler(final EventHandler handler)
final boolean oneThreadPerHandler
final List< EventHandler > eventsHandlers
ParallelEventsManager(final boolean syncOnTimeSteps)
volatile boolean shouldFlush
final BlockingQueue< EventArray > eventsQueue
final boolean syncOnTimeSteps
EventsManagerConfigGroup eventsManager()
void processEvents(final EventArray events)
ProcessEventsRunnable(EventsManager eventsManager, Distributor distributor)
Boolean getSynchronizeOnSimSteps()
void processEvent(final Event event)
volatile Throwable exception
synchronized void flush()
final EventsManager eventsManager
void distribute(EventArray events)
void resetHandlers(int iteration)
final Distributor distributor
void processEvent(final Event event)
final int eventsQueueSize
void uncaughtException(Thread t, Throwable e)
final ExceptionHandler uncaughtExceptionHandler
Distributor(ArrayList< ProcessEventsRunnable > runnables, BlockingQueue< EventArray > eventQueue)
void afterSimStep(double time)
final ArrayList< ProcessEventsRunnable > runnables
final BlockingQueue< EventArray > eventQueue
ArrayList< EventsManager > eventsManagers
final int eventsArraySize
ParallelEventsManager(final boolean syncOnTimeSteps, int eventsQueueSize)
synchronized void finishProcessing()
void addHandler(final EventHandler handler)
ParallelEventsManager(final boolean syncOnTimeSteps, final int numOfThreads, int eventsQueueSize)
volatile boolean hadException
final BlockingQueue< EventArray > eventQueue