View Javadoc

1   /*
2    * Copyright 2014 University of Glasgow.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package broadwick.montecarlo;
17  
18  import broadwick.BroadwickException;
19  import broadwick.rng.RNG;
20  import broadwick.statistics.Samples;
21  import broadwick.utils.CloneUtils;
22  import com.google.common.base.Throwables;
23  import com.google.common.util.concurrent.ThreadFactoryBuilder;
24  import java.util.concurrent.ArrayBlockingQueue;
25  import java.util.concurrent.ExecutorService;
26  import java.util.concurrent.Executors;
27  import java.util.concurrent.ThreadFactory;
28  import java.util.concurrent.TimeUnit;
29  import lombok.extern.slf4j.Slf4j;
30  import org.apache.commons.lang3.time.StopWatch;
31  
32  /**
33   * An implementation of the Monte Carlo method, it runs many MonteCarloSimulations and gathers the statistics of their
34   * outputs. Note, each simulation object is cloned and each of these clones are run independently
35   * <code>numSimulations</code> times.
36   */
37  @Slf4j
38  public class MonteCarlo {
39  
40      /**
41       * Create a Monte Carlo object that is capable of running <code>numSimulation</code> MonteCarloScenarios.
42       * @param simulation     the simulation or scenario to be run.
43       * @param numSimulations the number of times the simulation should be run.
44       */
45      public MonteCarlo(final MonteCarloScenario simulation, final int numSimulations) {
46          this.simulation = simulation;
47          this.numSimulations = numSimulations;
48          this.resultsConsumer = new MonteCarloDefaultResults();
49      }
50  
51      /**
52       * Run the Monte Carlo simulations. Two threads (a producer and consumer) are created to asynchronously run the
53       * simulations (the producer) and to handle the results from each simulation as they are calculated (the consumer).
54       * The producer thread uses an execution pool to manage running each simulation and places the results on a queue
55       * which is monitored by a consumer thread to calculate the posterior distributions for the Monte Carlo run.
56       */
57      public final void run() {
58          final ArrayBlockingQueue<MonteCarloResults> queue = new ArrayBlockingQueue<>(numSimulations + 1);
59          try {
60  
61              //Creating Producer and Consumer Thread
62              final Thread producer = new Thread(new Producer(queue, simulation, numSimulations));
63              final Thread consumer = new Thread(new Consumer(queue, resultsConsumer));
64              producer.start();
65              consumer.start();
66  
67              producer.join();
68              consumer.join();
69          } catch (Exception e) {
70              log.error("Error joining Monte Carlo results {}", Throwables.getStackTraceAsString(e));
71              throw new BroadwickException("Failed to run Monte Carlo simulation. " + Throwables.getStackTraceAsString(e));
72          }
73  
74          queue.clear();
75      }
76  
77      /**
78       * Get a copy of the results from the simulation. This method MUST return a copy of the results NOT a reference.
79       * @return the MonteCarloResults object that contains the results of all the simulations.
80       */
81      public final MonteCarloResults getResults() {
82          return CloneUtils.deepClone(resultsConsumer);
83      }
84  
85      /**
86       * Set the consumer object for this Monte Carlo simulation. A consumer is simply a MonteCarloResults class that
87       * 'joins' each result that is consumes so that statistics can be calculated at the end of the simulation.
88       * @param consumer the results object to use as a consumer.
89       */
90      public final void setResultsConsumer(final MonteCarloResults consumer) {
91          resultsConsumer = consumer;
92          resultsConsumer.reset();
93      }
94  
95      @Override
96      public void finalize() throws Throwable {
97          super.finalize();
98          resultsConsumer = null;
99      }
100 
101     private final MonteCarloScenario simulation;
102     private MonteCarloResults resultsConsumer;
103     private final int numSimulations;
104 }
105 
106 /**
107  * This class takes (or consumes) the results of a single simulation of the model and saves the results in a manner to
108  * allow statistics to be generated from the Monte Carlo simulation.
109  */
110 @Slf4j
111 class Consumer implements Runnable {
112 
113     /**
114      * Create the consumer object.
115      * @param queue         the queue that the consumer should check for values to consume.
116      * @param joinedResults the MonteCarloResults object to which the consumed results will be added.
117      */
118     public Consumer(final ArrayBlockingQueue<MonteCarloResults> queue, final MonteCarloResults joinedResults) {
119         this.queue = queue;
120         this.joinedResults = joinedResults;
121         joinedResults.reset();
122     }
123 
124     @Override
125     public void run() {
126         log.trace("Starting Monte Carlo results consumer thread");
127 
128         // it might seem that we should reset (or initialise) the consumer results object but
129         // the consumer object is created for each Monte Carlo step so is really not strictly necessary.
130         joinedResults.reset();
131         final StopWatch sw = new StopWatch();
132         sw.start();
133 
134         try {
135             MonteCarloResults results = null;
136             while (!(results instanceof Poison)) {
137                 results = queue.take();
138 
139                 if (!(results instanceof Poison)) {
140                     numSimulationsFound++;
141 
142                     // get the MonteCarloResults from the q and calculate the statistics on it.
143                     joinedResults.join(results);
144 
145                     if (log.isTraceEnabled()) {
146                         log.trace("Monte Carlo consumer: consumed {}", results.getSamples().getSummary());
147                         log.trace("Monte Carlo consumer: consumed {} results, expected value={}",
148                                   numSimulationsFound, joinedResults.getExpectedValue());
149                     }
150 
151                     // we no longer require the results so allow the memory to be freed.
152                     results = null;
153                 }
154             }
155         } catch (java.lang.InterruptedException e) {
156             log.error("Error consuming Monte Carlo Results {}", Throwables.getStackTraceAsString(e));
157         }
158         sw.stop();
159         log.debug("Analysed {} simulation results in {}.", numSimulationsFound, sw);
160     }
161 
162     @Override
163     public void finalize() throws Throwable {
164         super.finalize();
165         joinedResults = null;
166     }
167 
168     private final ArrayBlockingQueue<MonteCarloResults> queue;
169     private MonteCarloResults joinedResults;
170     private int numSimulationsFound = 0;
171 }
172 
173 /**
174  * This class runs a single simulation of the model to 'produce' results (hence the name). This runnable class is run
175  * many times to create a Monte Carlo simulation.
176  */
177 @Slf4j
178 class Producer implements Runnable {
179 
180     /**
181      * Create the producer object.
182      * @param queue          the queue on which the producers results are added.
183      * @param simulation     the model that will be run to produce results.
184      * @param numSimulations the number of simulations that are to be run and placed in the quque.
185      */
186     public Producer(final ArrayBlockingQueue<MonteCarloResults> queue, final MonteCarloScenario simulation,
187                     final int numSimulations) {
188         this.queue = queue;
189         this.simulation = simulation.copyOf();
190         this.numSimulations = numSimulations;
191     }
192 
193     @Override
194     public void run() {
195         log.trace("Starting Monte Carlo results producer thread");
196         try {
197             final int poolSize = Runtime.getRuntime().availableProcessors();
198             final ThreadFactory threadFactory = new ThreadFactoryBuilder()
199                     .setNameFormat("MCScenarioProducer-%d")
200                     .setDaemon(true)
201                     .build();
202             final ExecutorService es = Executors.newFixedThreadPool(poolSize, threadFactory);
203             final RNG generator = new RNG(RNG.Generator.Well44497b);
204 
205             final StopWatch sw = new StopWatch();
206             sw.start();
207             for (int i = 0; i < numSimulations; i++) {
208                 es.submit(new Runnable() {
209                     @Override
210                     public void run() {
211                         try {
212                             log.trace("Monte Carlo producer: creating scenario object");
213                             final MonteCarloScenario scenario = simulation.copyOf();
214                             final MonteCarloResults results = scenario.run(generator.getInteger(0, Integer.MAX_VALUE - 1));
215                             log.trace("Monte Carlo producer: generated results {}", results.getExpectedValue());
216                             queue.put(results);
217                         } catch (Exception e) {
218                             log.error("Error running Monte Carlo simulation {}", Throwables.getStackTraceAsString(e));
219                         }
220                     }
221                 });
222             }
223             es.shutdown();
224             while (!es.isTerminated()) {
225                 es.awaitTermination(1, TimeUnit.SECONDS);
226             }
227             queue.put(new Poison());
228 
229             sw.stop();
230             log.info("Finished {} simulations in {}.", numSimulations, sw);
231         } catch (Exception ex) {
232             log.error("Monte Carlo simulation error: {}", Throwables.getStackTraceAsString(ex));
233         }
234     }
235 
236     @Override
237     public void finalize() throws Throwable {
238         super.finalize();
239         simulation = null;
240     }
241 
242     private final ArrayBlockingQueue<MonteCarloResults> queue;
243     private MonteCarloScenario simulation;
244     private final int numSimulations;
245 }
246 
247 /**
248  * Poison object that causes the consumer of the MonteCarlo results to stop looking for objects on the queue.
249  */
250 class Poison implements MonteCarloResults {
251 
252     @Override
253     public double getExpectedValue() {
254         return Double.NEGATIVE_INFINITY;
255     }
256 
257     @Override
258     public Samples getSamples() {
259         return new Samples();
260     }
261 
262     @Override
263     public String toCsv() {
264         return "POISON";
265     }
266 
267     @Override
268     public final MonteCarloResults join(final MonteCarloResults results) {
269         return results;
270     }
271 
272     @Override
273     public void reset() {
274         // do nothing - this is a poison pill
275     }
276 }