21 package org.matsim.withinday.trafficmonitoring;
23 import org.apache.log4j.Logger;
49 import javax.inject.Inject;
50 import javax.inject.Singleton;
52 import java.util.concurrent.BrokenBarrierException;
53 import java.util.concurrent.ConcurrentHashMap;
54 import java.util.concurrent.CyclicBarrier;
104 private double now = Double.NEGATIVE_INFINITY ;
108 this(scenario, null);
119 this.numOfThreads = scenario.
getConfig().
global().getNumberOfThreads();
121 if (analyzedModes == null || analyzedModes.size() == 0) {
122 this.filterModes =
false;
123 this.analyzedModes = null;
133 this.regularActiveTrips =
new HashMap<>();
134 this.travelTimeInfos =
new ConcurrentHashMap<>();
135 this.changedLinks =
new TreeMap<>();
136 this.vehiclesToFilter =
new HashSet<>();
140 TravelTimeInfo travelTimeInfo =
new TravelTimeInfo();
141 this.travelTimeInfos.put(link.getId(), travelTimeInfo);
170 if (networkChangeEvents != null) {
172 ChangeValue freespeedChange = networkChangeEvent.getFreespeedChange();
173 if (freespeedChange != null) {
174 double startTime = networkChangeEvent.getStartTime();
175 Map<Link,Double> links = changedLinks.computeIfAbsent(startTime, k ->
new HashMap<>());
176 for (
Link link : networkChangeEvent.getLinks() ) {
179 switch ( freespeedChange.
getType() ) {
180 case ABSOLUTE_IN_SI_UNITS:
181 newSpeed = freespeedChange.
getValue() ;
184 newSpeed = link.getFreespeed() * freespeedChange.
getValue() ;
186 case OFFSET_IN_SI_UNITS:
187 newSpeed = link.getFreespeed() + freespeedChange.
getValue() ;
192 if ( startTime > 0. ) {
193 log.debug(
"registering a change event for time=" + startTime
194 +
"; linkId=" + link.getId() ) ;
196 links.put( link, newSpeed ) ;
209 Map<Link,Double> links = changedLinks.computeIfAbsent(this.now, k ->
new HashMap<>());
210 links.put( link, speed ) ;
215 final double travelTime = this.travelTimeInfoProvider.
getTravelTimeInfo(link).travelTime;
216 if ( Id.createLinkId(51825).equals(link.getId())) {
217 log.debug(
"link=" + link.getId() +
";\ttime=" + time +
";\tttime=" + travelTime ) ;
228 throw new RuntimeException(
"using TravelTimeCollector, but mobsim notifications not called between two resets. "
229 +
"Did you really add this as a mobsim listener?") ;
240 if (filterModes && vehiclesToFilter.contains(event.
getVehicleId()))
return;
242 Id<Vehicle> vehicleId =
event.getVehicleId();
243 double time =
event.getTime();
246 tripBin.enterTime = time;
248 this.regularActiveTrips.put(vehicleId, tripBin);
253 Id<Link> linkId =
event.getLinkId();
254 Id<Vehicle> vehicleId =
event.getVehicleId();
255 double time =
event.getTime();
257 TripBin tripBin = this.regularActiveTrips.remove(vehicleId);
258 if (tripBin != null) {
259 tripBin.leaveTime = time;
261 double tripTime = tripBin.leaveTime - tripBin.enterTime;
263 TravelTimeInfo travelTimeInfo = this.travelTimeInfoProvider.
getTravelTimeInfo(linkId);
264 travelTimeInfo.tripBins.add(tripBin);
265 travelTimeInfo.addedTravelTimes += tripTime;
266 travelTimeInfo.addedTrips++;
268 travelTimeInfo.checkActiveState();
269 travelTimeInfo.checkBinSize(tripTime);
289 Id<Vehicle> vehicleId =
event.getVehicleId();
291 this.regularActiveTrips.remove(vehicleId);
294 if (filterModes) this.vehiclesToFilter.remove(event.
getVehicleId());
314 if (e.getQueueSimulation() instanceof
QSim) {
315 double simStartTime = ((
QSim) e.getQueueSimulation()).getSimTimer().getSimStartTime();
321 this.nextInfoTime = (int)(Math.floor(simStartTime /
this.infoTimeStep) * this.
infoTimeStep);
326 double freeSpeedTravelTime = link.getLength() / link.getFreespeed(
Time.
UNDEFINED_TIME);
328 TravelTimeInfo travelTimeInfo = this.travelTimeInfoProvider.
getTravelTimeInfo(link);
329 travelTimeInfo.travelTime = freeSpeedTravelTime;
330 travelTimeInfo.init(freeSpeedTravelTime);
350 while( !changedLinks.isEmpty() && changedLinks.firstKey() <= e.getSimulationTime() ) {
351 Map<Link, Double> map = changedLinks.pollFirstEntry().getValue();
352 for ( Map.Entry<
Link,Double> link2speed : map.entrySet() ) {
353 Link link = link2speed.getKey() ;
354 double freeSpeedTravelTime = link.
getLength() / link2speed.getValue() ;
355 if ( e.getSimulationTime() > ((
QSim)e.getQueueSimulation()).getSimTimer().getSimStartTime() ) {
357 log.debug(
"time=" + e.getSimulationTime() +
358 "; network change event for link=" + link.getId() +
359 "; new ttime="+ freeSpeedTravelTime );
361 TravelTimeInfo travelTimeInfo = this.travelTimeInfoProvider.
getTravelTimeInfo(link);
362 travelTimeInfo.init(freeSpeedTravelTime);
363 travelTimeInfo.checkActiveState();
387 now = e.getSimulationTime() ;
390 this.
run(e.getSimulationTime());
414 this.startBarrier.await();
415 }
catch (InterruptedException | BrokenBarrierException ex) {
421 if (time >= this.nextInfoTime) {
424 activeLinks += runnable.getActiveLinksCount();
427 log.info(
"TravelTimeCollector at " +
Time.
writeTime(time) +
" #links=" + activeLinks);
438 static class TravelTimeInfo {
441 List<TripBin> tripBins =
new ArrayList<>();
443 boolean isActive =
false;
446 double addedTravelTimes = 0.0;
447 double sumTravelTimes = 0.0;
449 double freeSpeedTravelTime = Double.MAX_VALUE;
450 double travelTime = Double.MAX_VALUE;
452 double dynamicBinSize = 0.0;
454 static Counter enlarge =
new Counter(
"TravelTimeCollector: enlarged time bin size: ");
455 static Counter shrink =
new Counter(
"TravelTimeCollector: shrunk time bin size: ");
457 void init(
double freeSpeedTravelTime) {
458 this.freeSpeedTravelTime = freeSpeedTravelTime;
459 this.dynamicBinSize = freeSpeedTravelTime * 2.5;
462 void checkActiveState() {
464 this.isActive =
true;
469 void checkBinSize(
double tripTime) {
470 if (tripTime > dynamicBinSize) {
471 dynamicBinSize = tripTime * 2;
473 }
else if (tripTime * 3 < dynamicBinSize) {
474 dynamicBinSize = tripTime * 3;
494 private void run(
double time) {
499 updateMeanTravelTimesRunnable.setTime(time);
502 this.startBarrier.await();
504 this.endBarrier.await();
505 }
catch (InterruptedException | BrokenBarrierException e) {
512 this.startBarrier =
new CyclicBarrier(numOfThreads + 1);
513 this.endBarrier =
new CyclicBarrier(numOfThreads + 1);
522 updateMeanTravelTimesRunnable.
setEndBarrier(this.endBarrier);
523 updateMeanTravelTimesRunnables[i] = updateMeanTravelTimesRunnable;
525 Thread thread =
new Thread(updateMeanTravelTimesRunnable);
526 thread.setName(
"UpdateMeanTravelTimes" + i);
527 thread.setDaemon(
true);
537 for (TravelTimeInfo travelTimeInfo : this.travelTimeInfos.values()) {
538 travelTimeInfo.runnable = updateMeanTravelTimesRunnables[roundRobin %
numOfThreads];
548 this.endBarrier.await();
549 }
catch (InterruptedException | BrokenBarrierException e) {
561 private CyclicBarrier startBarrier = null;
562 private CyclicBarrier endBarrier = null;
568 activeTravelTimeInfos =
new ArrayList<>();
572 this.startBarrier = cyclicBarrier;
576 this.endBarrier = cyclicBarrier;
584 this.activeTravelTimeInfos.add(travelTimeInfo);
588 return this.activeTravelTimeInfos.size();
592 this.simulationRunning =
false;
611 startBarrier.await();
617 if (!simulationRunning) {
622 Iterator<TravelTimeInfo> iter = activeTravelTimeInfos.iterator();
623 while (iter.hasNext()) {
624 TravelTimeInfo travelTimeInfo = iter.next();
632 if (travelTimeInfo.tripBins.size() == 0) {
633 travelTimeInfo.isActive =
false;
634 travelTimeInfo.travelTime = travelTimeInfo.freeSpeedTravelTime;
639 }
catch (InterruptedException | BrokenBarrierException e) {
646 double removedTravelTimes = 0.0;
648 List<TripBin> tripBins = travelTimeInfo.tripBins;
651 Iterator<TripBin> iter = tripBins.iterator();
652 while (iter.hasNext()) {
654 if (tripBin.leaveTime + travelTimeInfo.dynamicBinSize < time) {
655 double travelTime = tripBin.leaveTime - tripBin.enterTime;
656 removedTravelTimes += travelTime;
666 if (removedTravelTimes == 0.0 && travelTimeInfo.addedTravelTimes == 0.0)
return;
668 travelTimeInfo.sumTravelTimes = travelTimeInfo.sumTravelTimes - removedTravelTimes + travelTimeInfo.addedTravelTimes;
670 travelTimeInfo.addedTravelTimes = 0.0;
676 double meanTravelTime = travelTimeInfo.freeSpeedTravelTime;
677 if (!tripBins.isEmpty()) meanTravelTime = travelTimeInfo.sumTravelTimes / tripBins.size();
679 if (meanTravelTime < travelTimeInfo.freeSpeedTravelTime) {
680 log.warn(
"Mean TravelTime to short?");
681 travelTimeInfo.travelTime = travelTimeInfo.freeSpeedTravelTime;
682 }
else travelTimeInfo.travelTime = meanTravelTime;
static final double UNDEFINED_TIME
void notifyMobsimInitialized(MobsimInitializedEvent e)
TravelTimeInfo getTravelTimeInfo(final Id< Link > linkId)
void notifyMobsimBeforeSimStep(MobsimBeforeSimStepEvent e)
Collection< TravelTimeInfo > activeTravelTimeInfos
volatile boolean simulationRunning
final Set< String > analyzedModes
Map< Id< Vehicle >, TripBin > regularActiveTrips
CyclicBarrier startBarrier
void initParallelThreads()
void notifyMobsimBeforeCleanup(MobsimBeforeCleanupEvent e)
TravelTimeCollector(Scenario scenario, Set< String > analyzedModes)
double getLinkTravelTime(Link link, double time, Person person, Vehicle vehicle)
void handleEvent(PersonStuckEvent event)
static Queue< NetworkChangeEvent > getNetworkChangeEvents(Network network)
Id< Vehicle > getVehicleId()
TreeMap< Double, Map< Link, Double > > changedLinks
void notifyMobsimAfterSimStep(MobsimAfterSimStepEvent e)
void setStartBarrier(CyclicBarrier cyclicBarrier)
void handleEvent(LinkEnterEvent event)
Id< Vehicle > getVehicleId()
final boolean filterModes
void reset(int iteration)
void handleEvent(VehicleEntersTrafficEvent event)
void setTime(final double t)
int getActiveLinksCount()
void handleEvent(VehicleLeavesTrafficEvent event)
static final String writeTime(final double seconds, final String timeformat)
Set< Id< Vehicle > > vehiclesToFilter
void addTravelTimeInfo(TravelTimeInfo travelTimeInfo)
Map< Id< Link >,?extends Link > getLinks()
void changeSpeedMetersPerSecond(Link link, double speed)
UpdateMeanTravelTimesRunnable[] updateMeanTravelTimesRunnables
Map< Id< Link >, TravelTimeInfo > travelTimeInfos
void calcBinTravelTime(double time, TravelTimeInfo travelTimeInfo)
void handleEvent(LinkLeaveEvent event)
void setEndBarrier(CyclicBarrier cyclicBarrier)
Id< Vehicle > getVehicleId()
TravelTimeInfoProvider travelTimeInfoProvider
UpdateMeanTravelTimesRunnable()
void printInfo(double time)
static final void printCurrentThreadCpuTime()