21 package org.matsim.core.population.io;
23 import org.apache.logging.log4j.LogManager;
24 import org.apache.logging.log4j.Logger;
39 import org.xml.sax.Attributes;
40 import org.xml.sax.helpers.AttributesImpl;
42 import java.util.ArrayList;
43 import java.util.List;
45 import java.util.Stack;
46 import java.util.concurrent.BlockingQueue;
47 import java.util.concurrent.LinkedBlockingQueue;
59 class ParallelPopulationReaderMatsimV4
extends PopulationReaderMatsimV4 {
61 static final Logger log = LogManager.getLogger(ParallelPopulationReaderMatsimV4.class);
63 private final boolean isPopulationStreaming;
64 private final int numThreads;
65 private final BlockingQueue<List<Tag>> queue;
66 private final CollectorScenario collectorScenario;
67 private final CollectorPopulation collectorPopulation;
69 private Thread[] threads;
70 private List<Tag> currentPersonXmlData;
72 private final CoordinateTransformation coordinateTransformation;
73 private Throwable exception = null;
75 public ParallelPopulationReaderMatsimV4(
76 final Scenario scenario ) {
77 this(
new IdentityTransformation() , scenario );
80 public ParallelPopulationReaderMatsimV4(
81 final CoordinateTransformation coordinateTransformation,
82 final Scenario scenario) {
83 super( coordinateTransformation , scenario );
84 this.coordinateTransformation = coordinateTransformation;
90 if ( scenario.getPopulation() instanceof StreamingPopulationReader.StreamingPopulation ) {
91 log.warn(
"Population streaming is activated - cannot use " + ParallelPopulationReaderMatsimV4.class.getName() +
"!");
93 this.isPopulationStreaming =
true;
96 this.collectorPopulation = null;
97 this.collectorScenario = null;
99 isPopulationStreaming =
false;
101 if (scenario.getConfig().global().getNumberOfThreads() > 0) {
102 this.numThreads = scenario.getConfig().global().getNumberOfThreads();
103 }
else this.numThreads = 1;
105 this.queue =
new LinkedBlockingQueue<>();
106 this.collectorPopulation =
new CollectorPopulation(this.plans);
107 this.collectorScenario =
new CollectorScenario(scenario, collectorPopulation);
112 threads =
new Thread[numThreads];
113 for (
int i = 0; i < numThreads; i++) {
115 ParallelPopulationReaderMatsimV4Runner runner =
116 new ParallelPopulationReaderMatsimV4Runner(
117 this.coordinateTransformation,
118 this.collectorScenario,
121 Thread thread =
new Thread(runner);
122 thread.setDaemon(
true);
123 thread.setName(ParallelPopulationReaderMatsimV4Runner.class.toString() + i);
124 thread.setUncaughtExceptionHandler(this::catchReaderException);
130 private void stopThreads() {
132 for (
int i = 0; i < this.numThreads; i++) {
133 List<Tag> list =
new ArrayList<>();
134 list.add(
new EndProcessingTag());
135 this.queue.add(list);
140 for (Thread thread : threads) {
143 }
catch (InterruptedException e) {
147 if (this.exception != null) {
152 private void catchReaderException(Thread thread, Throwable throwable) {
153 log.error(
"Error parsing XML", throwable);
154 this.exception = throwable;
158 public void startTag(String name, Attributes atts, Stack<String> context) {
161 if (isPopulationStreaming) {
162 super.startTag(name, atts, context);
166 if (PLANS.equals(name)) {
167 log.info(
"Start parallel population reading...");
173 if (PERSON.equals(name)) {
174 Person person = this.plans.getFactory().createPerson(Id.create(atts.getValue(
"id"), Person.class));
175 currentPersonXmlData =
new ArrayList<>();
176 PersonTag personTag =
new PersonTag();
177 personTag.person = person;
178 currentPersonXmlData.add(personTag);
179 this.plans.addPerson(person);
183 StartTag tag =
new StartTag();
185 tag.atts =
new AttributesImpl(atts);
186 currentPersonXmlData.add(tag);
191 public void endTag(String name, String content, Stack<String> context) {
194 if (isPopulationStreaming) {
195 super.endTag(name, content, context);
199 if (PLANS.equals(name)) {
201 super.endTag(name, content, context);
202 log.info(
"Finished parallel population reading...");
205 EndTag tag =
new EndTag();
207 tag.content = content;
208 tag.context = context;
209 currentPersonXmlData.add(tag);
212 if (PERSON.equals(name)) queue.add(currentPersonXmlData);
224 this.delegate = scenario;
299 throw new RuntimeException(
"Calls to this method are not expected to happen...");
304 throw new RuntimeException(
"Calls to this method are not expected to happen...");
309 throw new RuntimeException(
"Calls to this method are not expected to happen...");
314 throw new RuntimeException(
"Calls to this method are not expected to happen...");
324 throw new RuntimeException(
"Calls to this method are not expected to happen...");
328 public abstract static class Tag {
330 Stack<String> context = null;
PopulationFactory getFactory()
Vehicles getTransitVehicles()
Vehicles getTransitVehicles()
final Population population
ActivityFacilities getActivityFacilities()
abstract void startTag(String name, Attributes atts, Stack< String > context)
void addScenarioElement(String name, Object o)
Households getHouseholds()
Population getPopulation()
Object getScenarioElement(String name)
Object getScenarioElement(String name)
CollectorScenario(Scenario scenario, CollectorPopulation population)
final CollectorPopulation population
TransitSchedule getTransitSchedule()
Map< Id< Person >, ? extends Person > getPersons()
Households getHouseholds()
void addScenarioElement(String name, Object o)
CollectorPopulation(Population population)
Person removePerson(Id< Person > personId)
org.matsim.utils.objectattributes.attributable.Attributes getAttributes()
void setName(String name)
ActivityFacilities getActivityFacilities()
PopulationFactory getFactory()
TransitSchedule getTransitSchedule()