MATSIM
AbstractMultithreadedModule.java
Go to the documentation of this file.
1 /* *********************************************************************** *
2  * project: org.matsim.*
3  * MultithreadedModuleA.java
4  * *
5  * *********************************************************************** *
6  * *
7  * copyright : (C) 2007 by the members listed in the COPYING, *
8  * LICENSE and WARRANTY file. *
9  * email : info at matsim dot org *
10  * *
11  * *********************************************************************** *
12  * *
13  * This program is free software; you can redistribute it and/or modify *
14  * it under the terms of the GNU General Public License as published by *
15  * the Free Software Foundation; either version 2 of the License, or *
16  * (at your option) any later version. *
17  * See also COPYING, LICENSE and WARRANTY file *
18  * *
19  * *********************************************************************** */
20 
21 package org.matsim.core.replanning.modules;
22 
23 import org.apache.logging.log4j.LogManager;
24 import org.apache.logging.log4j.Logger;
31 
32 import java.lang.Thread.UncaughtExceptionHandler;
33 import java.util.LinkedList;
34 import java.util.List;
35 import java.util.concurrent.atomic.AtomicReference;
36 
61 abstract public class AbstractMultithreadedModule implements PlanStrategyModule {
62  private final int numOfThreads;
63 
64  private PlanAlgoThread[] algothreads = null;
65  private Thread[] threads = null;
66  private PlanAlgorithm directAlgo = null;
67  private String name = null;
68 
69  private int count = 0;
70 
71  private final AtomicReference<Throwable> hadException = new AtomicReference<>(null);
72  private final ExceptionHandler exceptionHandler = new ExceptionHandler(this.hadException);
73 
75 
76  static final private Logger log = LogManager.getLogger(AbstractMultithreadedModule.class);
77 
84  abstract public PlanAlgorithm getPlanAlgoInstance();
85 
86  public AbstractMultithreadedModule(GlobalConfigGroup globalConfigGroup) {
87  this.numOfThreads = globalConfigGroup.getNumberOfThreads();
88  }
89 
90  public AbstractMultithreadedModule(final int numOfThreads) {
91  this.numOfThreads = numOfThreads;
92  }
93 
94  // left empty for inheritance
95  }
96 
97  // left empty for inheritance
98  }
99 
100  @Override
101  public final void prepareReplanning(ReplanningContext replanningContextTmp) {
102  this.beforePrepareReplanningHook(replanningContextTmp);
103  this.replanningContext = replanningContextTmp;
104  if (this.numOfThreads == 0) {
105  // it seems, no threads are desired :(
107  } else {
108  initThreads();
109  }
110  this.afterPrepareReplanningHook(replanningContextTmp);
111  }
112 
114  return replanningContext;
115  }
116 
117  @Override
118  public final void handlePlan(final Plan plan) {
119  if (this.directAlgo == null) {
120  this.algothreads[this.count % this.numOfThreads].addPlanToThread(plan);
121  this.count++;
122  } else {
123  this.directAlgo.run(plan);
124  }
125  }
126 
127  protected void beforeFinishReplanningHook() {
128  // left empty for inheritance
129  }
130  protected void afterFinishReplanningHook() {
131  // left empty for inheritance
132  }
133 
134  @Override
135  public final void finishReplanning() {
137 
138  if (this.directAlgo == null) {
139  // only try to start threads if we did not directly work on all the plans
140  log.info("[" + this.name + "] starting " + this.threads.length + " threads, handling " + this.count + " plans");
141 
142  // start threads
143  for (Thread thread : this.threads) {
144  thread.start();
145  }
146 
147  // wait until each thread is finished
148  try {
149  for (Thread thread : this.threads) {
150  thread.join();
151  }
152  } catch (InterruptedException e) {
153  throw new RuntimeException(e);
154  }
155  log.info("[" + this.name + "] all " + this.threads.length + " threads finished.");
156  Throwable throwable = this.hadException.get();
157  if (throwable != null) {
158  throw new RuntimeException("Some threads crashed, thus not all plans may have been handled.", throwable);
159  }
160  }
161  // reset
162  this.algothreads = null;
163  this.threads = null;
164  this.replanningContext = null;
165  this.count = 0;
166 
168  }
169 
170  private void initThreads() {
171  if (this.threads != null) {
172  throw new RuntimeException("threads are already initialized");
173  }
174 
175  this.hadException.set(null);
176  this.threads = new Thread[this.numOfThreads];
177  this.algothreads = new PlanAlgoThread[this.numOfThreads];
178 
179  Counter counter = null;
180  // setup threads
181  for (int i = 0; i < this.numOfThreads; i++) {
183  if (i == 0) {
184  this.name = algo.getClass().getSimpleName();
185  counter = new Counter("[" + this.name + "] handled plan # ");
186  }
187  PlanAlgoThread algothread = new PlanAlgoThread(algo, counter);
188  Thread thread = new Thread(algothread, this.name + "." + i);
189  thread.setUncaughtExceptionHandler(this.exceptionHandler);
190  this.threads[i] = thread;
191  this.algothreads[i] = algothread;
192  }
193  }
194 
195  /* package (for a test) */ final int getNumOfThreads() {
196  return numOfThreads;
197  }
198 
199  private final static class ExceptionHandler implements UncaughtExceptionHandler {
200 
201  private final AtomicReference<Throwable> hadException;
202 
203  public ExceptionHandler(final AtomicReference<Throwable> hadException) {
204  this.hadException = hadException;
205  }
206 
207  @Override
208  public void uncaughtException(Thread t, Throwable e) {
209  log.error("Thread " + t.getName() + " died with exception. Will stop after all threads finished.", e);
210  this.hadException.set(e);
211  }
212 
213  }
214 
215  private final static class PlanAlgoThread implements Runnable {
216 
217  private final PlanAlgorithm planAlgo;
218  private final List<Plan> plans = new LinkedList<>();
219  private final Counter counter;
220 
221  public PlanAlgoThread(final PlanAlgorithm algo, final Counter counter) {
222  this.planAlgo = algo;
223  this.counter = counter;
224  }
225 
226  public void addPlanToThread(final Plan plan) {
227  this.plans.add(plan);
228  }
229 
230  @Override
231  public void run() {
232  for (Plan plan : this.plans) {
233  this.planAlgo.run(plan);
234  this.counter.incCounter();
235  }
236  }
237  }
238 }
PlanAlgoThread(final PlanAlgorithm algo, final Counter counter)
ExceptionHandler(final AtomicReference< Throwable > hadException)
void prepareReplanning(ReplanningContext replanningContext)