MATSIM
QNetsimEngine.java
Go to the documentation of this file.
1 /* *********************************************************************** *
2  * project: org.matsim.*
3  * QNetsimEngine.java
4  * *
5  * *********************************************************************** *
6  * *
7  * copyright : (C) 2009 by the members listed in the COPYING, *
8  * LICENSE and WARRANTY file. *
9  * email : info at matsim dot org *
10  * *
11  * *********************************************************************** *
12  * *
13  * This program is free software; you can redistribute it and/or modify *
14  * it under the terms of the GNU General Public License as published by *
15  * the Free Software Foundation; either version 2 of the License, or *
16  * (at your option) any later version. *
17  * See also COPYING, LICENSE and WARRANTY file *
18  * *
19  * *********************************************************************** */
20 
21 package org.matsim.core.mobsim.qsim.qnetsimengine;
22 
23 import org.apache.log4j.Logger;
24 import org.matsim.api.core.v01.Id;
25 import org.matsim.api.core.v01.Scenario;
30 import org.matsim.core.config.Config;
36 import org.matsim.core.gbl.Gbl;
41 import org.matsim.core.mobsim.qsim.QSim;
46 import org.matsim.core.utils.misc.Time;
47 import org.matsim.vehicles.Vehicle;
49 
50 import javax.inject.Inject;
51 import java.util.ArrayList;
52 import java.util.Collections;
53 import java.util.HashMap;
54 import java.util.HashSet;
55 import java.util.List;
56 import java.util.Map;
57 import java.util.Set;
58 import java.util.concurrent.ExecutionException;
59 import java.util.concurrent.ExecutorService;
60 import java.util.concurrent.Executors;
61 import java.util.concurrent.Future;
62 import java.util.concurrent.Phaser;
63 import java.util.concurrent.ThreadFactory;
64 
72 public class QNetsimEngine implements MobsimEngine, NetsimEngine {
73 
74  public interface NetsimInternalInterface {
77  void letVehicleArrive(QVehicle veh);
78  }
79 
81  @Override public QNetwork getNetsimNetwork() {
82  return network ;
83  }
84  @Override public void arrangeNextAgentState(MobsimAgent driver) {
86  }
87  @Override public void letVehicleArrive(QVehicle veh) {
88  QNetsimEngine.this.letVehicleArrive( veh ) ;
89  }
90  } ;
91 
92  private static final Logger log = Logger.getLogger(QNetsimEngine.class);
93 
94  private static final int INFO_PERIOD = 3600;
95 
96  private QNetwork network;
97 
98  private final Map<Id<Vehicle>, QVehicle> vehicles = new HashMap<>();
99 
100  private final QSim qsim;
101 
102  private final VehicularDepartureHandler dpHandler;
103 
104  private double infoTime = 0;
105 
106  private final int numOfThreads;
107 
108  private List<QNetsimEngineRunner> engines;
109 
110  private Phaser startBarrier;
111  private Phaser endBarrier;
112 
113  private final Set<QLinkI> linksToActivateInitially = new HashSet<>();
114 
116 
117  private int numOfRunners;
118 
119  private ExecutorService pool;
120 
121  private final boolean usingThreadpool;
122 
123  // for detailed run time analysis - used in combination with QSim.analyzeRunTimes
124  public static int numObservedTimeSteps = 24*3600;
125  public static boolean printRunTimesPerTimeStep = false;
126 
127  @Override
128  public void setInternalInterface( InternalInterface internalInterface) {
129  this.internalInterface = internalInterface;
130  }
131 
132  public QNetsimEngine(final QSim sim) {
133  this(sim, null);
134  }
135 
136  @Inject
137  public QNetsimEngine(final QSim sim, QNetworkFactory netsimNetworkFactory) {
138  this.qsim = sim;
139 
140  final Config config = sim.getScenario().getConfig();
141  final QSimConfigGroup qsimConfigGroup = config.qsim();
142  this.usingThreadpool = qsimConfigGroup.isUsingThreadpool();
143 
144 
145  // configuring the car departure hander (including the vehicle behavior)
146  QSimConfigGroup qSimConfigGroup = this.qsim.getScenario().getConfig().qsim();
147 
148  VehicleBehavior vehicleBehavior = qSimConfigGroup.getVehicleBehavior();
149  switch(vehicleBehavior) {
150  case exception:
151  case teleport:
152  case wait:
153  break;
154  default:
155  throw new RuntimeException("Unknown vehicle behavior option.");
156  }
157  dpHandler = new VehicularDepartureHandler(this, vehicleBehavior, qSimConfigGroup);
158 
159  if(qSimConfigGroup.getLinkDynamics().equals(LinkDynamics.SeepageQ)) {
160  log.info("Seepage is allowed. Seep mode(s) is(are) " + qSimConfigGroup.getSeepModes() + ".");
161  if(qSimConfigGroup.isSeepModeStorageFree()) {
162  log.warn("Seep mode(s) " + qSimConfigGroup.getSeepModes() + " does not take storage space thus only considered for flow capacities.");
163  }
164  }
165 
166  if (netsimNetworkFactory != null){
167  network = new QNetwork( sim.getScenario().getNetwork(), netsimNetworkFactory ) ;
168  } else {
169  Scenario scenario = sim.getScenario();
170  EventsManager events = sim.getEventsManager() ;
171  final DefaultQNetworkFactory netsimNetworkFactory2 = new DefaultQNetworkFactory( events, scenario );
172  MobsimTimer mobsimTimer = sim.getSimTimer() ;
173  AgentCounter agentCounter = sim.getAgentCounter() ;
174  netsimNetworkFactory2.initializeFactory(agentCounter, mobsimTimer, ii );
175  network = new QNetwork(sim.getScenario().getNetwork(), netsimNetworkFactory2 );
176  }
177  network.initialize(this, sim.getAgentCounter(), sim.getSimTimer() );
178 
179  this.numOfThreads = sim.getScenario().getConfig().qsim().getNumberOfThreads();
180  }
181 
182  private static int wrnCnt = 0;
183  public void addParkedVehicle(MobsimVehicle veh, Id<Link> startLinkId) {
184  if (this.vehicles.put(veh.getId(), (QVehicle) veh) != null) {
185  if (wrnCnt < 1) {
186  wrnCnt++ ;
187  log.warn("existing vehicle in mobsim was just overwritten by other vehicle with same ID. Not clear what this means. Continuing anyways ...") ;
188  log.warn(Gbl.ONLYONCE);
189  }
190  }
191  QLinkI qlink = network.getNetsimLinks().get(startLinkId);
192  if (qlink == null) {
193  throw new RuntimeException("requested link with id=" + startLinkId + " does not exist in network. Possible vehicles "
194  + "or activities or facilities are registered to a different network.") ;
195  }
196  qlink.addParkedVehicle(veh);
197  }
198 
199  static AbstractAgentSnapshotInfoBuilder createAgentSnapshotInfoBuilder(Scenario scenario, SnapshotLinkWidthCalculator linkWidthCalculator) {
200  final SnapshotStyle snapshotStyle = scenario.getConfig().qsim().getSnapshotStyle();
201  switch(snapshotStyle) {
202  case queue:
203  return new QueueAgentSnapshotInfoBuilder(scenario, linkWidthCalculator);
204  case withHoles:
205  case withHolesAndShowHoles:
206  // the difference is not in the spacing, thus cannot be differentiated by using different classes. kai, sep'14
207  // ??? kai, nov'15
208  return new QueueAgentSnapshotInfoBuilder(scenario, linkWidthCalculator);
209  case kinematicWaves:
210  log.warn("The snapshotStyle \"" + snapshotStyle + "\" is not explicitly supported. Using \""+SnapshotStyle.withHoles+ "\" instead.");
211  return new QueueAgentSnapshotInfoBuilder(scenario, linkWidthCalculator);
212  case equiDist:
213  return new EquiDistAgentSnapshotInfoBuilder(scenario, linkWidthCalculator);
214  default:
215  log.warn("The snapshotStyle \"" + snapshotStyle + "\" is not supported. Using equiDist");
216  return new EquiDistAgentSnapshotInfoBuilder(scenario, linkWidthCalculator);
217  }
218  }
219 
220  @Override
221  public void onPrepareSim() {
222  this.infoTime =
223  Math.floor(internalInterface.getMobsim().getSimTimer().getSimStartTime() / INFO_PERIOD) * INFO_PERIOD;
224  /*
225  * infoTime may be < simStartTime, this ensures to print out the
226  * info at the very first timestep already
227  */
228 
230  }
231 
232  @Override
233  public void afterSim() {
234 
235  /*
236  * Calling the afterSim Method of the QSimEngineThreads
237  * will set their simulationRunning flag to false.
238  */
239  for (QNetsimEngineRunner engine : this.engines) {
240  engine.afterSim();
241  }
242 
243  if (this.usingThreadpool) {
244  this.pool.shutdown();
245  } else {
246  /*
247  * Triggering the startBarrier of the QSimEngineThreads.
248  * They will check whether the Simulation is still running.
249  * It is not, so the Threads will stop running.
250  */
251  this.startBarrier.arriveAndAwaitAdvance();
252  }
253 
254  /* Reset vehicles on ALL links. We cannot iterate only over the active links
255  * (this.simLinksArray), because there may be links that have vehicles only
256  * in the buffer (such links are *not* active, as the buffer gets emptied
257  * when handling the nodes.
258  */
259  for (QLinkI link : network.getNetsimLinks().values()) {
260  link.clearVehicles();
261  }
262  }
263 
268  @Override
269  public void doSimStep(final double time) {
270  run(time);
271 
272  this.printSimLog(time);
273  }
274 
275  /*
276  * The Threads are waiting at the startBarrier.
277  * We trigger them by reaching this Barrier. Now the
278  * Threads will start moving the Nodes and Links. We wait
279  * until all of them reach the endBarrier to move
280  * on. We should not have any Problems with Race Conditions
281  * because even if the Threads would be faster than this
282  * Thread, means the reach the endBarrier before
283  * this Method does, it should work anyway.
284  */
285  private void run(double time) {
286  // yy Acceleration options to try out (kai, jan'15):
287 
288  // (a) Try to do without barriers. With our
289  // message-based experiments a decade ago, it was better to let each runner decide locally when to proceed. For intuition, imagine that
290  // one runner is slowest on the links, and some other runner slowest on the nodes. With the barriers, this cannot overlap.
291  // With message passing, this was achieved by waiting for all necessary messages. Here, it could (for example) be achieved with runner-local
292  // clocks:
293  // for ( all runners that own incoming links to my nodes ) { // (*)
294  // wait until runner.getTime() == myTime ;
295  // }
296  // processLocalNodes() ;
297  // mytime += 0.5 ;
298  // for ( all runners that own toNodes of my links ) { // (**)
299  // wait until runner.getTime() == myTime ;
300  // }
301  // processLocalLinks() ;
302  // myTime += 0.5 ;
303 
304  // (b) Do deliberate domain decomposition rather than round robin (fewer runners to wait for at (*) and (**)).
305 
306  // (c) One thread that is much faster than all others is much more efficient than one thread that is much slower than all others.
307  // So make sure that no thread sticks out in terms of slowness. Difficult to achieve, though. A decade back, we used a "typical" run
308  // as input for the domain decomposition under (b).
309 
310  // set current Time
311  for (QNetsimEngineRunner engine : this.engines) {
312  engine.setTime(time);
313  }
314 
315  if (this.usingThreadpool) {
316  try {
317  for (QNetsimEngineRunner engine : this.engines) {
318  engine.setMovingNodes(true);
319  }
320  for (Future<Boolean> future : pool.invokeAll(this.engines)) {
321  future.get();
322  }
323  for (QNetsimEngineRunner engine : this.engines) {
324  engine.setMovingNodes(false);
325  }
326  for (Future<Boolean> future : pool.invokeAll(this.engines)) {
327  future.get();
328  }
329  } catch (InterruptedException e) {
330  throw new RuntimeException(e) ;
331  } catch (ExecutionException e) {
332  throw new RuntimeException(e.getCause());
333  }
334  } else {
335  this.startBarrier.arriveAndAwaitAdvance();
336  this.endBarrier.arriveAndAwaitAdvance();
337  }
338  }
339 
340 
341  /*package*/ void printSimLog(double time) {
342  if (time >= this.infoTime) {
343  this.infoTime += INFO_PERIOD;
344  int nofActiveLinks = this.getNumberOfSimulatedLinks();
345  int nofActiveNodes = this.getNumberOfSimulatedNodes();
346  log.info("SIMULATION (QNetsimEngine) AT " + Time.writeTime(time)
347  + " : #links=" + nofActiveLinks
348  + " #nodes=" + nofActiveNodes);
349  }
350  }
351 
353 
354  int numLinks = 0;
355 
356  for (QNetsimEngineRunner engine : this.engines) {
357  numLinks = numLinks + engine.getNumberOfSimulatedLinks();
358  }
359 
360  return numLinks;
361  }
362 
364 
365  int numNodes = 0;
366 
367  for (QNetsimEngineRunner engine : this.engines) {
368  numNodes = numNodes + engine.getNumberOfSimulatedNodes();
369  }
370 
371  return numNodes;
372  }
373 
374 // QSim getMobsim() {
375 // return this.qsim;
376 // }
377  // do not hand out back pointers! kai, mar'16
378 
380  return this.network;
381  }
382 
383  public VehicularDepartureHandler getDepartureHandler() {
384  return dpHandler;
385  }
386 
387  public final Map<Id<Vehicle>, QVehicle> getVehicles() {
388  return Collections.unmodifiableMap(this.vehicles);
389  }
390 
391  public final void registerAdditionalAgentOnLink(final MobsimAgent planAgent) {
392  Id<Link> linkId = planAgent.getCurrentLinkId();
393  if (linkId != null) { // may be bushwacking
394  QLinkI qLink = this.network.getNetsimLink(linkId);
395  if ( qLink==null ) {
396  throw new RuntimeException("netsim link lookup failed; agentId=" + planAgent.getId() + "; linkId=" + linkId ) ;
397  }
398  qLink.registerAdditionalAgentOnLink(planAgent);
399  }
400  }
401 
403  if (linkId == null) { // seems that this can happen in tests; not sure if it can happen in regular code. kai, jun'15
404  return null;
405  }
406  QLinkI qLink = this.network.getNetsimLink(linkId);
407  return qLink.unregisterAdditionalAgentOnLink(agentId);
408  }
409 
410  private void letVehicleArrive(QVehicle veh) {
411  double now = this.qsim.getSimTimer().getTimeOfDay();
412  MobsimDriverAgent driver = veh.getDriver();
413  this.qsim.getEventsManager().processEvent(new PersonLeavesVehicleEvent(now, driver.getId(), veh.getId()));
414  // reset vehicles driver
415  veh.setDriver(null);
416  driver.endLegAndComputeNextState(now);
417  this.internalInterface.arrangeNextAgentState(driver);
418  }
419 
420  private void initQSimEngineThreads() {
421 
422  this.engines = new ArrayList<>();
423 
424  this.startBarrier = new Phaser(this.numOfThreads + 1);
425  Phaser separationBarrier = new Phaser(this.numOfThreads);
426  this.endBarrier = new Phaser(this.numOfThreads + 1);
427 
428  numOfRunners = this.numOfThreads;
429  if (this.usingThreadpool) {
430  // The number of runners should be larger than the number of threads, yes,
431  // but see MATSIM-404 - Simulation result still depends on the number of runners.
432 // numOfRunners *= 10 ;
433  this.pool = Executors.newFixedThreadPool(
434  this.numOfThreads,
435  new NamedThreadFactory());
436  }
437 
438  // setup threads
439  for (int i = 0; i < numOfRunners; i++) {
440  QNetsimEngineRunner engine ;
441  if (this.usingThreadpool) {
442  engine = new QNetsimEngineRunner();
443  } else {
444  engine = new QNetsimEngineRunner(this.startBarrier, separationBarrier, endBarrier);
445  Thread thread = new Thread(engine);
446  thread.setName("QNetsimEngineRunner_" + i);
447  thread.setDaemon(true); // make the Thread Daemons so they will terminate automatically
448  thread.start();
449  }
450  this.engines.add(engine);
451  }
452 
453  /*
454  * Assign every Link and Node to an Activator. By doing so, the
455  * activateNode(...) and activateLink(...) methods in this class
456  * should become obsolete.
457  */
459  }
460 
461  /*
462  * Within the MoveThreads Links are only activated when a Vehicle is moved
463  * over a Node which is processed by that Thread. So we can assign each QLink
464  * to the Thread that handles its InNode.
465  */
466  private void assignNetElementActivators() {
467 
468  // only for statistics
469  int nodes[] = new int[numOfRunners];
470  int links[] = new int[numOfRunners];
471 
472  int roundRobin = 0;
473  for (QNodeI node : network.getNetsimNodes().values()) {
474  int i = roundRobin % this.numOfRunners;
475  if( node instanceof AbstractQNode){
476  ((AbstractQNode) node).setNetElementActivationRegistry(this.engines.get(i));
477  }
478  nodes[i]++;
479 
480  // set activator for out links
481  for (Link outLink : node.getNode().getOutLinks().values()) {
482  AbstractQLink qLink = (AbstractQLink) network.getNetsimLink(outLink.getId());
483  // (must be of this type to work. kai, feb'12)
484 
485  // removing qsim as "person in the middle". not fully sure if this is the same in the parallel impl. kai, oct'10
486  qLink.setNetElementActivationRegistry(this.engines.get(i));
487 
488  /*
489  * If the QLink contains agents that end their activity in the first time
490  * step, the link should be activated.
491  */
492  if (linksToActivateInitially.remove(qLink)
494  this.engines.get(i).registerLinkAsActive(qLink);
495  }
496 
497  links[i]++;
498 
499  }
500 
501  roundRobin++;
502  }
503 
504  // print some statistics
505  for (int i = 0; i < this.engines.size(); i++) {
506  log.info("Assigned " + nodes[i] + " nodes and " + links[i] + " links to QSimEngineRunner #" + i);
507  }
508 
509  this.linksToActivateInitially.clear();
510  }
511 
512  public void printEngineRunTimes() {
513  if (!QSim.analyzeRunTimes) return;
514 
515  if (printRunTimesPerTimeStep) log.info("detailed QNetsimEngineRunner run times per time step:");
516  {
517  StringBuffer sb = new StringBuffer();
518  sb.append("\t");
519  sb.append("time");
520  for (int i = 0; i < this.engines.size(); i++) {
521  sb.append("\t");
522  sb.append("thread_");
523  sb.append(Integer.toString(i));
524  }
525  sb.append("\t");
526  sb.append("min");
527  sb.append("\t");
528  sb.append("max");
529  if (printRunTimesPerTimeStep) log.info(sb.toString());
530  }
531  long sum = 0;
532  long sumMin = 0;
533  long sumMax = 0;
534  for (int i = 0; i < numObservedTimeSteps; i++) {
535  StringBuffer sb = new StringBuffer();
536  sb.append("\t" + i);
537  long min = Long.MAX_VALUE;
538  long max = Long.MIN_VALUE;
539  for (QNetsimEngineRunner runner : this.engines) {
540  long runTime = runner.runTimes[i];
541  sum += runTime;
542  if (runTime < min) min = runTime;
543  if (runTime > max) max = runTime;
544  sb.append("\t");
545  sb.append(Long.toString(runTime));
546  }
547  sb.append("\t");
548  sb.append(Long.toString(min));
549  sb.append("\t");
550  sb.append(Long.toString(max));
551  if (printRunTimesPerTimeStep) log.info(sb.toString());
552  sumMin += min;
553  sumMax += max;
554  }
555  log.info("sum min run times: " + sumMin);
556  log.info("sum max run times: " + sumMax);
557  log.info("sum all run times / num threads: " + sum / this.numOfThreads);
558  }
559 
560  private static class NamedThreadFactory implements ThreadFactory {
561  private int count = 0;
562 
563  @Override
565  return new Thread( r , "QNetsimEngine_PooledThread_" + count++);
566  }
567  }
568 
569  private final void arrangeNextAgentState(MobsimAgent pp) {
570  internalInterface.arrangeNextAgentState(pp);
571  }
572 }
void initialize(QNetsimEngine simEngine, AgentCounter agentCounter, MobsimTimer simTimer)
Definition: QNetwork.java:63
final Map< Id< Vehicle >, QVehicle > getVehicles()
static final String ONLYONCE
Definition: Gbl.java:41
static boolean analyzeRunTimes
Definition: QSim.java:143
void initializeFactory(AgentCounter agentCounter, MobsimTimer mobsimTimer, NetsimInternalInterface netsimEngine1)
Map< Id< Link >, QLinkI > getNetsimLinks()
Definition: QNetwork.java:84
void arrangeNextAgentState(MobsimAgent agent)
Map< Id< Node >, QNodeI > getNetsimNodes()
Definition: QNetwork.java:94
final void registerAdditionalAgentOnLink(final MobsimAgent planAgent)
StarttimeInterpretation getSimStarttimeInterpretation()
QNetsimEngine(final QSim sim, QNetworkFactory netsimNetworkFactory)
void endLegAndComputeNextState(final double now)
QSimConfigGroup qsim()
Definition: Config.java:495
static final String writeTime(final double seconds, final String timeformat)
Definition: Time.java:95
MobsimAgent unregisterAdditionalAgentOnLink(Id< Person > agentId, Id< Link > linkId)
EventsManager getEventsManager()
Definition: QSim.java:569
AgentCounter getAgentCounter()
Definition: QSim.java:633
void setInternalInterface(InternalInterface internalInterface)
void addParkedVehicle(MobsimVehicle veh, Id< Link > startLinkId)
final Map< Id< Vehicle >, QVehicle > vehicles