MATSIM
ProcessEventThread.java
Go to the documentation of this file.
1 /* *********************************************************************** *
2  * project: org.matsim.*
3  * *********************************************************************** *
4  * *
5  * copyright : (C) 2007, 2008 by the members listed in the COPYING, *
6  * LICENSE and WARRANTY file. *
7  * email : info at matsim dot org *
8  * *
9  * *********************************************************************** *
10  * *
11  * This program is free software; you can redistribute it and/or modify *
12  * it under the terms of the GNU General Public License as published by *
13  * the Free Software Foundation; either version 2 of the License, or *
14  * (at your option) any later version. *
15  * See also COPYING, LICENSE and WARRANTY file *
16  * *
17  * *********************************************************************** */
18 
19 package org.matsim.core.events;
20 
21 import java.lang.InterruptedException;
22 
23 import java.util.ArrayList;
24 import java.util.List;
25 import java.util.concurrent.BlockingQueue;
26 import java.util.concurrent.LinkedBlockingQueue;
27 
30 import org.matsim.core.gbl.Gbl;
31 
37 /*package*/ class ProcessEventThread implements Runnable {
38  private final List<Event> preInputBuffer;
39  private final BlockingQueue<Event> eventQueue;
40  private final EventsManager events;
41  private final int preInputBufferMaxLength;
42 
43  public ProcessEventThread(
44  final EventsManager events,
45  final int preInputBufferMaxLength) {
46  this.events = events;
47  this.preInputBufferMaxLength = preInputBufferMaxLength;
48  eventQueue = new LinkedBlockingQueue<Event>();
49  preInputBuffer = new ArrayList<Event>( preInputBufferMaxLength + 1);
50  }
51 
52  public synchronized void processEvent(final Event event) {
53  // first approach (quick on office computer, but not on satawal)
54  // eventQueue.add(event);
55 
56  // second approach, lesser locking => faster on Satawal
57  preInputBuffer.add(event);
58  if (preInputBuffer.size() > preInputBufferMaxLength) {
59  emptyPreBuffer();
60  }
61  }
62 
63  private void emptyPreBuffer() {
64  eventQueue.addAll( preInputBuffer );
65  preInputBuffer.clear();
66  }
67 
68  @Override
69  public void run() {
70  try {
71  // process events, until LastEventOfIteration arrives
72  while (true) {
73  // take waits for an element to exist before returning:
74  // - thread sleeps until there is an event to process
75  // - we do not have to bother checking if the element exists
76  Event nextEvent = eventQueue.take();
77  if (nextEvent instanceof LastEventOfIteration) {
78  Gbl.printCurrentThreadCpuTime();
79 
80  // if there are more events generated after end of simulation
81  // (generated in events handler), process them before stopping events handling.
82  // in order to do this, LastEventOfIteration is moved to the back of the queue.
83  if (eventQueue.size()>0){
84  processEvent(nextEvent);
85  emptyPreBuffer();
86  nextEvent = eventQueue.take();
87  } else {
88  return;
89  }
90  }
91  getEvents().processEvent(nextEvent);
92  }
93  }
94  catch ( InterruptedException e ) {
95  throw new RuntimeException( e );
96  }
97  }
98 
99  // schedule LastEventOfIteration and flush buffered events
100  // the LastEventOfIteration lets the event handler threads know,
101  // that there is no more work, as soon as they have processed this,
102  // they are allowed to go to sleep
103  public synchronized void close() {
104  processEvent(new LastEventOfIteration(0.0));
105  emptyPreBuffer();
106  }
107 
108  public EventsManager getEvents() {
109  return events;
110  }
111 
112 }