19 package org.matsim.core.events;
21 import java.lang.Thread.UncaughtExceptionHandler;
22 import java.util.concurrent.atomic.AtomicReference;
24 import jakarta.inject.Inject;
26 import org.apache.logging.log4j.LogManager;
27 import org.apache.logging.log4j.Logger;
69 private final AtomicReference<Throwable>
hadException =
new AtomicReference<>();
96 init(numberOfThreads);
107 preInputBufferMaxLength = (int) (estimatedNumberOfEvents / 10 );
108 init(numberOfThreads);
114 for (
int i = 0; i < eventsProcessThread.length; i++) {
115 eventsProcessThread[i].processEvent(event);
118 for (
int i = 0; i < eventsProcessThread.length; i++) {
119 eventsProcessThread[i].getEvents().processEvent(event);
126 synchronized (
this) {
129 numberOfAddedEventsHandler = (numberOfAddedEventsHandler + 1) % numberOfThreads;
135 synchronized (
this) {
136 for (
int i = 0; i < events.length; i++) {
144 synchronized (
this) {
145 for (
int i = 0; i < events.length; i++) {
152 synchronized (
this) {
153 for (
int i = 0; i < events.length; i++) {
154 log.info(
"registered event handlers for thread " + i +
":");
160 private void init(
int numberOfThreads) {
179 for (
int i = 0; i < eventsProcessThread.length; i++) {
180 eventsProcessThread[i].close();
184 for (Thread t : this.threads) {
187 }
catch (InterruptedException e) {
201 parallelMode =
false;
203 if (this.hadException.get() != null) {
205 "Exception while processing events. Cannot guarantee that all events have been fully processed.",
216 this.eventsProcessThread[i] =
new ProcessEventThread(events[i], preInputBufferMaxLength);
217 this.threads[i] =
new Thread(eventsProcessThread[i],
"Events-" + i);
218 this.threads[i].setUncaughtExceptionHandler(this.uncaughtExceptionHandler);
219 this.threads[i].start();
223 this.parallelMode =
true;
239 log.error(
"Thread " + t.getName() +
" died with exception while handling events.", e);
240 this.hadException.set(e);
int preInputBufferMaxLength
void removeHandler(final EventHandler handler)
void afterSimStep(double time)
Long getEstimatedNumberOfEvents()
EventsManagerImpl [] events
void removeHandler(final EventHandler handler)
void addHandler(final EventHandler handler)
void printEventHandlers()
EventsManagerConfigGroup eventsManager()
Integer getNumberOfThreads()
ExceptionHandler(final AtomicReference< Throwable > hadException)
int numberOfAddedEventsHandler
void resetHandlers(final int iteration)
void printEventHandlers()
void addHandler(final EventHandler handler)
void resetHandlers(final int iteration)
ParallelEventsManagerImpl(int numberOfThreads, long estimatedNumberOfEvents)
final AtomicReference< Throwable > hadException
final AtomicReference< Throwable > hadException
void uncaughtException(Thread t, Throwable e)
ProcessEventThread [] eventsProcessThread
ParallelEventsManagerImpl(int numberOfThreads)
final ExceptionHandler uncaughtExceptionHandler
void init(int numberOfThreads)
void processEvent(final Event event)