MATSIM
ParallelEventsManagerImpl.java
Go to the documentation of this file.
1 /* *********************************************************************** *
2  * project: org.matsim.*
3  * *********************************************************************** *
4  * *
5  * copyright : (C) 2007, 2008 by the members listed in the COPYING, *
6  * LICENSE and WARRANTY file. *
7  * email : info at matsim dot org *
8  * *
9  * *********************************************************************** *
10  * *
11  * This program is free software; you can redistribute it and/or modify *
12  * it under the terms of the GNU General Public License as published by *
13  * the Free Software Foundation; either version 2 of the License, or *
14  * (at your option) any later version. *
15  * See also COPYING, LICENSE and WARRANTY file *
16  * *
17  * *********************************************************************** */
18 
19 package org.matsim.core.events;
20 
21 import java.lang.Thread.UncaughtExceptionHandler;
22 import java.util.concurrent.atomic.AtomicReference;
23 
24 import jakarta.inject.Inject;
25 
26 import org.apache.logging.log4j.LogManager;
27 import org.apache.logging.log4j.Logger;
30 import org.matsim.core.config.Config;
32 
61 public final class ParallelEventsManagerImpl implements EventsManager {
62 
63  private boolean parallelMode = true;
64  private int numberOfThreads;
65  private EventsManagerImpl[] events = null;
66  private ProcessEventThread[] eventsProcessThread = null;
67  private Thread[] threads = null;
68  private int numberOfAddedEventsHandler = 0;
69  private final AtomicReference<Throwable> hadException = new AtomicReference<>();
70  private final ExceptionHandler uncaughtExceptionHandler = new ExceptionHandler(hadException);
71 
72  private final static Logger log = LogManager.getLogger(ParallelEventsManagerImpl.class);
73 
74  // this number should be set in the following way:
75  // if the number of events is estimated as x, then this number
76  // could be set to x/10
77  // the higher this parameter, the less locks are used, but
78  // the more the time buffer between the simulation and events handling
79  // for small simulations, the default value is ok and it even works
80  // quite well for larger simulations with 10 million events
81  private int preInputBufferMaxLength = 100000;
82 
83  @Inject
85  if (config.eventsManager().getEstimatedNumberOfEvents() != null) {
86  preInputBufferMaxLength = (int) (config.eventsManager().getEstimatedNumberOfEvents() / 10);
87  }
89  }
90 
95  public ParallelEventsManagerImpl(int numberOfThreads) {
96  init(numberOfThreads);
97  }
98 
106  public ParallelEventsManagerImpl(int numberOfThreads, long estimatedNumberOfEvents) {
107  preInputBufferMaxLength = (int) (estimatedNumberOfEvents / 10 );
108  init(numberOfThreads);
109  }
110 
111  @Override
112  public void processEvent(final Event event) {
113  if (parallelMode) {
114  for (int i = 0; i < eventsProcessThread.length; i++) {
115  eventsProcessThread[i].processEvent(event);
116  }
117  } else {
118  for (int i = 0; i < eventsProcessThread.length; i++) {
119  eventsProcessThread[i].getEvents().processEvent(event);
120  }
121  }
122  }
123 
124  @Override
125  public void addHandler(final EventHandler handler) {
126  synchronized (this) {
127  log.info("adding Event-Handler " + handler.getClass().getName() + " to thread " + numberOfAddedEventsHandler);
128  events[numberOfAddedEventsHandler].addHandler(handler);
129  numberOfAddedEventsHandler = (numberOfAddedEventsHandler + 1) % numberOfThreads;
130  }
131  }
132 
133  @Override
134  public void resetHandlers(final int iteration) {
135  synchronized (this) {
136  for (int i = 0; i < events.length; i++) {
137  events[i].resetHandlers(iteration);
138  }
139  }
140  }
141 
142  @Override
143  public void removeHandler(final EventHandler handler) {
144  synchronized (this) {
145  for (int i = 0; i < events.length; i++) {
146  events[i].removeHandler(handler);
147  }
148  }
149  }
150 
151  private void printEventHandlers() {
152  synchronized (this) {
153  for (int i = 0; i < events.length; i++) {
154  log.info("registered event handlers for thread " + i + ":");
155  events[i].printEventHandlers();
156  }
157  }
158  }
159 
160  private void init(int numberOfThreads) {
161  this.numberOfThreads = numberOfThreads;
162  this.events = new EventsManagerImpl[numberOfThreads];
163  this.eventsProcessThread = new ProcessEventThread[numberOfThreads];
164  this.threads = new Thread[numberOfThreads];
165  // the additional 1 is for the simulation barrier
166  for (int i = 0; i < numberOfThreads; i++) {
167  events[i] = new EventsManagerImpl();
168  }
169  }
170 
171  // When one simulation iteration is finish, it must call this method,
172  // so that it can communicate to the threads, that the simulation is
173  // finished and that it can await the event handler threads.
174 
175  // after call to this method, all event processing is done not in parallel
176  // anymore
177  @Override
178  public void finishProcessing() {
179  for (int i = 0; i < eventsProcessThread.length; i++) {
180  eventsProcessThread[i].close();
181  }
182 
183  try {
184  for (Thread t : this.threads) {
185  t.join();
186  }
187  } catch (InterruptedException e) {
188  e.printStackTrace();
189  }
190 
191  // list which threads had which handlers to debug performance issues
193 
194  /*
195  * introduction of the parallel mode variable was required, because of
196  * the following reason: previously no event handling was possible after
197  * the end of the simulation. e.g. adding money events in the after
198  * mobsim controler listener would not be invoked by parallelEventHandling
199  */
200 
201  parallelMode = false;
202 
203  if (this.hadException.get() != null) {
204  throw new RuntimeException(
205  "Exception while processing events. Cannot guarantee that all events have been fully processed.",
206  uncaughtExceptionHandler.hadException.get());
207  }
208  }
209 
210  // create event handler threads
211  // prepare for next iteration
212  @Override
213  public void initProcessing() {
214  // reset this class, so that it can be reused for the next iteration
215  for (int i = 0; i < numberOfThreads; i++) {
216  this.eventsProcessThread[i] = new ProcessEventThread(events[i], preInputBufferMaxLength);
217  this.threads[i] = new Thread(eventsProcessThread[i], "Events-" + i);
218  this.threads[i].setUncaughtExceptionHandler(this.uncaughtExceptionHandler);
219  this.threads[i].start();
220  }
221 
222  // (re-)activate parallel mode while the mobsim is running
223  this.parallelMode = true;
224  }
225 
229  private static class ExceptionHandler implements UncaughtExceptionHandler {
230 
231  private final AtomicReference<Throwable> hadException;
232 
233  public ExceptionHandler(final AtomicReference<Throwable> hadException) {
234  this.hadException = hadException;
235  }
236 
237  @Override
238  public void uncaughtException(Thread t, Throwable e) {
239  log.error("Thread " + t.getName() + " died with exception while handling events.", e);
240  this.hadException.set(e);
241  }
242 
243  }
244 
245  @Override
246  public void afterSimStep(double time) {
247  // nothing to do in this implementation
248  }
249 
250 }
void removeHandler(final EventHandler handler)
EventsManagerConfigGroup eventsManager()
Definition: Config.java:467
void addHandler(final EventHandler handler)
ParallelEventsManagerImpl(int numberOfThreads, long estimatedNumberOfEvents)