MATSIM
ParallelPopulationReaderMatsimV4.java
Go to the documentation of this file.
1 /* *********************************************************************** *
2  * project: org.matsim.*
3  * ParallelPlansReaderMatsimV4.java
4  * *
5  * *********************************************************************** *
6  * *
7  * copyright : (C) 2012 by the members listed in the COPYING, *
8  * LICENSE and WARRANTY file. *
9  * email : info at matsim dot org *
10  * *
11  * *********************************************************************** *
12  * *
13  * This program is free software; you can redistribute it and/or modify *
14  * it under the terms of the GNU General Public License as published by *
15  * the Free Software Foundation; either version 2 of the License, or *
16  * (at your option) any later version. *
17  * See also COPYING, LICENSE and WARRANTY file *
18  * *
19  * *********************************************************************** */
20 
21 package org.matsim.core.population.io;
22 
23 import org.apache.logging.log4j.LogManager;
24 import org.apache.logging.log4j.Logger;
25 import org.matsim.api.core.v01.Id;
26 import org.matsim.api.core.v01.Scenario;
31 import org.matsim.core.config.Config;
36 import org.matsim.lanes.Lanes;
39 import org.xml.sax.Attributes;
40 import org.xml.sax.helpers.AttributesImpl;
41 
42 import java.util.ArrayList;
43 import java.util.List;
44 import java.util.Map;
45 import java.util.Stack;
46 import java.util.concurrent.BlockingQueue;
47 import java.util.concurrent.LinkedBlockingQueue;
48 
59 /* deliberately package */ class ParallelPopulationReaderMatsimV4 extends PopulationReaderMatsimV4 {
60 
61  static final Logger log = LogManager.getLogger(ParallelPopulationReaderMatsimV4.class);
62 
63  private final boolean isPopulationStreaming;
64  private final int numThreads;
65  private final BlockingQueue<List<Tag>> queue;
66  private final CollectorScenario collectorScenario;
67  private final CollectorPopulation collectorPopulation;
68 
69  private Thread[] threads;
70  private List<Tag> currentPersonXmlData;
71 
72  private final CoordinateTransformation coordinateTransformation;
73  private Throwable exception = null;
74 
75  public ParallelPopulationReaderMatsimV4(
76  final Scenario scenario ) {
77  this( new IdentityTransformation() , scenario );
78  }
79 
80  public ParallelPopulationReaderMatsimV4(
81  final CoordinateTransformation coordinateTransformation,
82  final Scenario scenario) {
83  super( coordinateTransformation , scenario );
84  this.coordinateTransformation = coordinateTransformation;
85 
86  /*
87  * Check whether population streaming is activated
88  */
89 // if (scenario.getPopulation() instanceof Population && ((Population)scenario.getPopulation()).isStreaming()) {
90  if ( scenario.getPopulation() instanceof StreamingPopulationReader.StreamingPopulation ) {
91  log.warn("Population streaming is activated - cannot use " + ParallelPopulationReaderMatsimV4.class.getName() + "!");
92 
93  this.isPopulationStreaming = true;
94  this.numThreads = 1;
95  this.queue = null;
96  this.collectorPopulation = null;
97  this.collectorScenario = null;
98  } else {
99  isPopulationStreaming = false;
100 
101  if (scenario.getConfig().global().getNumberOfThreads() > 0) {
102  this.numThreads = scenario.getConfig().global().getNumberOfThreads();
103  } else this.numThreads = 1;
104 
105  this.queue = new LinkedBlockingQueue<>();
106  this.collectorPopulation = new CollectorPopulation(this.plans);
107  this.collectorScenario = new CollectorScenario(scenario, collectorPopulation);
108  }
109  }
110 
111  private void initThreads() {
112  threads = new Thread[numThreads];
113  for (int i = 0; i < numThreads; i++) {
114 
115  ParallelPopulationReaderMatsimV4Runner runner =
116  new ParallelPopulationReaderMatsimV4Runner(
117  this.coordinateTransformation,
118  this.collectorScenario,
119  this.queue);
120 
121  Thread thread = new Thread(runner);
122  thread.setDaemon(true);
123  thread.setName(ParallelPopulationReaderMatsimV4Runner.class.toString() + i);
124  thread.setUncaughtExceptionHandler(this::catchReaderException);
125  threads[i] = thread;
126  thread.start();
127  }
128  }
129 
130  private void stopThreads() {
131  // signal the threads that they should end parsing
132  for (int i = 0; i < this.numThreads; i++) {
133  List<Tag> list = new ArrayList<>();
134  list.add(new EndProcessingTag());
135  this.queue.add(list);
136  }
137 
138  // wait for the threads to finish
139  try {
140  for (Thread thread : threads) {
141  thread.join();
142  }
143  } catch (InterruptedException e) {
144  throw new RuntimeException(e);
145  }
146 
147  if (this.exception != null) {
148  throw new RuntimeException(this.exception);
149  }
150  }
151 
152  private void catchReaderException(Thread thread, Throwable throwable) {
153  log.error("Error parsing XML", throwable);
154  this.exception = throwable;
155  }
156 
157  @Override
158  public void startTag(String name, Attributes atts, Stack<String> context) {
159 
160  // if population streaming is activated, use non-parallel reader
161  if (isPopulationStreaming) {
162  super.startTag(name, atts, context);
163  return;
164  }
165 
166  if (PLANS.equals(name)) {
167  log.info("Start parallel population reading...");
168  initThreads();
169  startPlans(atts);
170  }
171  else {
172  // If it is an new person, create a new person and a list for its attributes.
173  if (PERSON.equals(name)) {
174  Person person = this.plans.getFactory().createPerson(Id.create(atts.getValue("id"), Person.class));
175  currentPersonXmlData = new ArrayList<>();
176  PersonTag personTag = new PersonTag();
177  personTag.person = person;
178  currentPersonXmlData.add(personTag);
179  this.plans.addPerson(person);
180  }
181 
182  // Create a new start tag and add it to the person data.
183  StartTag tag = new StartTag();
184  tag.name = name;
185  tag.atts = new AttributesImpl(atts); // We have to create copies of the attributes because the object is re-used by the parser!
186  currentPersonXmlData.add(tag);
187  }
188  }
189 
190  @Override
191  public void endTag(String name, String content, Stack<String> context) {
192 
193  // if population streaming is activated, use non-parallel reader
194  if (isPopulationStreaming) {
195  super.endTag(name, content, context);
196  return;
197  }
198 
199  if (PLANS.equals(name)) {
200  this.stopThreads();
201  super.endTag(name, content, context);
202  log.info("Finished parallel population reading...");
203  } else {
204  // Create a new end tag and add it to the person data.
205  EndTag tag = new EndTag();
206  tag.name = name;
207  tag.content = content;
208  tag.context = context;
209  currentPersonXmlData.add(tag);
210 
211  // if its a person end tag, add the persons xml data to the queue.
212  if (PERSON.equals(name)) queue.add(currentPersonXmlData);
213  }
214  }
215 
216  private static class CollectorScenario implements Scenario {
217  // yyyy Why is this necessary at all? Could you please explain your design decisions? The same instance is passed to all threads, so
218  // what is the difference to using the underlying population directly?
219 
220  private final Scenario delegate;
222 
223  public CollectorScenario(Scenario scenario, CollectorPopulation population) {
224  this.delegate = scenario;
225  this.population = population;
226  }
227 
228  @Override
229  public Network getNetwork() {
230  return this.delegate.getNetwork();
231  }
232 
233  @Override
235  return this.population; // return collector population
236  }
237 
238  @Override
240  return this.delegate.getActivityFacilities();
241  }
242 
243  @Override
245  return this.delegate.getTransitSchedule();
246  }
247 
248  @Override
249  public Config getConfig() {
250  return this.delegate.getConfig();
251  }
252 
253  @Override
254  public void addScenarioElement(String name, Object o) {
255  this.delegate.addScenarioElement(name, o);
256  }
257 
258  @Override
259  public Object getScenarioElement(String name) {
260  return this.delegate.getScenarioElement(name);
261  }
262 
263  @Override
265  return this.delegate.getTransitVehicles();
266  }
267 
268  @Override
270  return this.delegate.getHouseholds();
271  }
272 
273  @Override
274  public Lanes getLanes() {
275  return this.delegate.getLanes();
276  }
277 
278  @Override
280  return this.delegate.getVehicles() ;
281  }
282  }
283 
284  private static class CollectorPopulation implements Population {
285 
286  private final Population population;
287 
288  public CollectorPopulation(Population population) {
289  this.population = population;
290  }
291 
292  @Override
294  return population.getFactory();
295  }
296 
297  @Override
298  public String getName() {
299  throw new RuntimeException("Calls to this method are not expected to happen...");
300  }
301 
302  @Override
303  public void setName(String name) {
304  throw new RuntimeException("Calls to this method are not expected to happen...");
305  }
306 
307  @Override
308  public Map<Id<Person>, ? extends Person> getPersons() {
309  throw new RuntimeException("Calls to this method are not expected to happen...");
310  }
311 
312  @Override
313  public void addPerson(Person p) {
314  throw new RuntimeException("Calls to this method are not expected to happen...");
315  }
316 
317  @Override
318  public Person removePerson(Id<Person> personId) {
319  throw new RuntimeException("not implemented") ;
320  }
321 
322  @Override
324  throw new RuntimeException("Calls to this method are not expected to happen...");
325  }
326  }
327 
328  public abstract static class Tag {
329  String name;
330  Stack<String> context = null; // not used by the PopulationReader
331  }
332 
333  public static final class StartTag extends Tag {
334  Attributes atts;
335  }
336 
337  public static final class PersonTag extends Tag {
338  Person person;
339  }
340 
341  public static final class EndTag extends Tag {
342  String content;
343  }
344 
345  /*
346  * Marker Tag to inform the threads that no further data has to be parsed.
347  */
348  public static final class EndProcessingTag extends Tag {
349  }
350 }
abstract void startTag(String name, Attributes atts, Stack< String > context)
void addScenarioElement(String name, Object o)
Object getScenarioElement(String name)
ActivityFacilities getActivityFacilities()
TransitSchedule getTransitSchedule()