1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package broadwick.montecarlo;
17
18 import broadwick.BroadwickException;
19 import broadwick.rng.RNG;
20 import broadwick.statistics.Samples;
21 import broadwick.utils.CloneUtils;
22 import com.google.common.base.Throwables;
23 import com.google.common.util.concurrent.ThreadFactoryBuilder;
24 import java.util.concurrent.ArrayBlockingQueue;
25 import java.util.concurrent.ExecutorService;
26 import java.util.concurrent.Executors;
27 import java.util.concurrent.ThreadFactory;
28 import java.util.concurrent.TimeUnit;
29 import lombok.extern.slf4j.Slf4j;
30 import org.apache.commons.lang3.time.StopWatch;
31
32
33
34
35
36
37 @Slf4j
38 public class MonteCarlo {
39
40
41
42
43
44
45 public MonteCarlo(final MonteCarloScenario simulation, final int numSimulations) {
46 this.simulation = simulation;
47 this.numSimulations = numSimulations;
48 this.resultsConsumer = new MonteCarloDefaultResults();
49 }
50
51
52
53
54
55
56
57 public final void run() {
58 final ArrayBlockingQueue<MonteCarloResults> queue = new ArrayBlockingQueue<>(numSimulations + 1);
59 try {
60
61
62 final Thread producer = new Thread(new Producer(queue, simulation, numSimulations));
63 final Thread consumer = new Thread(new Consumer(queue, resultsConsumer));
64 producer.start();
65 consumer.start();
66
67 producer.join();
68 consumer.join();
69 } catch (Exception e) {
70 log.error("Error joining Monte Carlo results {}", Throwables.getStackTraceAsString(e));
71 throw new BroadwickException("Failed to run Monte Carlo simulation. " + Throwables.getStackTraceAsString(e));
72 }
73
74 queue.clear();
75 }
76
77
78
79
80
81 public final MonteCarloResults getResults() {
82 return CloneUtils.deepClone(resultsConsumer);
83 }
84
85
86
87
88
89
90 public final void setResultsConsumer(final MonteCarloResults consumer) {
91 resultsConsumer = consumer;
92 resultsConsumer.reset();
93 }
94
95 @Override
96 public void finalize() throws Throwable {
97 super.finalize();
98 resultsConsumer = null;
99 }
100
101 private final MonteCarloScenario simulation;
102 private MonteCarloResults resultsConsumer;
103 private final int numSimulations;
104 }
105
106
107
108
109
110 @Slf4j
111 class Consumer implements Runnable {
112
113
114
115
116
117
118 public Consumer(final ArrayBlockingQueue<MonteCarloResults> queue, final MonteCarloResults joinedResults) {
119 this.queue = queue;
120 this.joinedResults = joinedResults;
121 joinedResults.reset();
122 }
123
124 @Override
125 public void run() {
126 log.trace("Starting Monte Carlo results consumer thread");
127
128
129
130 joinedResults.reset();
131 final StopWatch sw = new StopWatch();
132 sw.start();
133
134 try {
135 MonteCarloResults results = null;
136 while (!(results instanceof Poison)) {
137 results = queue.take();
138
139 if (!(results instanceof Poison)) {
140 numSimulationsFound++;
141
142
143 joinedResults.join(results);
144
145 if (log.isTraceEnabled()) {
146 log.trace("Monte Carlo consumer: consumed {}", results.getSamples().getSummary());
147 log.trace("Monte Carlo consumer: consumed {} results, expected value={}",
148 numSimulationsFound, joinedResults.getExpectedValue());
149 }
150
151
152 results = null;
153 }
154 }
155 } catch (java.lang.InterruptedException e) {
156 log.error("Error consuming Monte Carlo Results {}", Throwables.getStackTraceAsString(e));
157 }
158 sw.stop();
159 log.debug("Analysed {} simulation results in {}.", numSimulationsFound, sw);
160 }
161
162 @Override
163 public void finalize() throws Throwable {
164 super.finalize();
165 joinedResults = null;
166 }
167
168 private final ArrayBlockingQueue<MonteCarloResults> queue;
169 private MonteCarloResults joinedResults;
170 private int numSimulationsFound = 0;
171 }
172
173
174
175
176
177 @Slf4j
178 class Producer implements Runnable {
179
180
181
182
183
184
185
186 public Producer(final ArrayBlockingQueue<MonteCarloResults> queue, final MonteCarloScenario simulation,
187 final int numSimulations) {
188 this.queue = queue;
189 this.simulation = simulation.copyOf();
190 this.numSimulations = numSimulations;
191 }
192
193 @Override
194 public void run() {
195 log.trace("Starting Monte Carlo results producer thread");
196 try {
197 final int poolSize = Runtime.getRuntime().availableProcessors();
198 final ThreadFactory threadFactory = new ThreadFactoryBuilder()
199 .setNameFormat("MCScenarioProducer-%d")
200 .setDaemon(true)
201 .build();
202 final ExecutorService es = Executors.newFixedThreadPool(poolSize, threadFactory);
203 final RNG generator = new RNG(RNG.Generator.Well44497b);
204
205 final StopWatch sw = new StopWatch();
206 sw.start();
207 for (int i = 0; i < numSimulations; i++) {
208 es.submit(new Runnable() {
209 @Override
210 public void run() {
211 try {
212 log.trace("Monte Carlo producer: creating scenario object");
213 final MonteCarloScenario scenario = simulation.copyOf();
214 final MonteCarloResults results = scenario.run(generator.getInteger(0, Integer.MAX_VALUE - 1));
215 log.trace("Monte Carlo producer: generated results {}", results.getExpectedValue());
216 queue.put(results);
217 } catch (Exception e) {
218 log.error("Error running Monte Carlo simulation {}", Throwables.getStackTraceAsString(e));
219 }
220 }
221 });
222 }
223 es.shutdown();
224 while (!es.isTerminated()) {
225 es.awaitTermination(1, TimeUnit.SECONDS);
226 }
227 queue.put(new Poison());
228
229 sw.stop();
230 log.info("Finished {} simulations in {}.", numSimulations, sw);
231 } catch (Exception ex) {
232 log.error("Monte Carlo simulation error: {}", Throwables.getStackTraceAsString(ex));
233 }
234 }
235
236 @Override
237 public void finalize() throws Throwable {
238 super.finalize();
239 simulation = null;
240 }
241
242 private final ArrayBlockingQueue<MonteCarloResults> queue;
243 private MonteCarloScenario simulation;
244 private final int numSimulations;
245 }
246
247
248
249
250 class Poison implements MonteCarloResults {
251
252 @Override
253 public double getExpectedValue() {
254 return Double.NEGATIVE_INFINITY;
255 }
256
257 @Override
258 public Samples getSamples() {
259 return new Samples();
260 }
261
262 @Override
263 public String toCsv() {
264 return "POISON";
265 }
266
267 @Override
268 public final MonteCarloResults join(final MonteCarloResults results) {
269 return results;
270 }
271
272 @Override
273 public void reset() {
274
275 }
276 }