MonteCarlo.java

/*
 * Copyright 2014 University of Glasgow.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package broadwick.montecarlo;

import broadwick.BroadwickException;
import broadwick.rng.RNG;
import broadwick.statistics.Samples;
import broadwick.utils.CloneUtils;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.time.StopWatch;

/**
 * An implementation of the Monte Carlo method, it runs many MonteCarloSimulations and gathers the statistics of their
 * outputs. Note, each simulation object is cloned and each of these clones are run independently
 * <code>numSimulations</code> times.
 */
@Slf4j
public class MonteCarlo {

    /**
     * Create a Monte Carlo object that is capable of running <code>numSimulation</code> MonteCarloScenarios.
     * @param simulation     the simulation or scenario to be run.
     * @param numSimulations the number of times the simulation should be run.
     */
    public MonteCarlo(final MonteCarloScenario simulation, final int numSimulations) {
        this.simulation = simulation;
        this.numSimulations = numSimulations;
        this.resultsConsumer = new MonteCarloDefaultResults();
    }

    /**
     * Run the Monte Carlo simulations. Two threads (a producer and consumer) are created to asynchronously run the
     * simulations (the producer) and to handle the results from each simulation as they are calculated (the consumer).
     * The producer thread uses an execution pool to manage running each simulation and places the results on a queue
     * which is monitored by a consumer thread to calculate the posterior distributions for the Monte Carlo run.
     */
    public final void run() {
        final ArrayBlockingQueue<MonteCarloResults> queue = new ArrayBlockingQueue<>(numSimulations + 1);
        try {

            //Creating Producer and Consumer Thread
            final Thread producer = new Thread(new Producer(queue, simulation, numSimulations));
            final Thread consumer = new Thread(new Consumer(queue, resultsConsumer));
            producer.start();
            consumer.start();

            producer.join();
            consumer.join();
        } catch (Exception e) {
            log.error("Error joining Monte Carlo results {}", Throwables.getStackTraceAsString(e));
            throw new BroadwickException("Failed to run Monte Carlo simulation. " + Throwables.getStackTraceAsString(e));
        }

        queue.clear();
    }

    /**
     * Get a copy of the results from the simulation. This method MUST return a copy of the results NOT a reference.
     * @return the MonteCarloResults object that contains the results of all the simulations.
     */
    public final MonteCarloResults getResults() {
        return CloneUtils.deepClone(resultsConsumer);
    }

    /**
     * Set the consumer object for this Monte Carlo simulation. A consumer is simply a MonteCarloResults class that
     * 'joins' each result that is consumes so that statistics can be calculated at the end of the simulation.
     * @param consumer the results object to use as a consumer.
     */
    public final void setResultsConsumer(final MonteCarloResults consumer) {
        resultsConsumer = consumer;
        resultsConsumer.reset();
    }

    @Override
    public void finalize() throws Throwable {
        super.finalize();
        resultsConsumer = null;
    }

    private final MonteCarloScenario simulation;
    private MonteCarloResults resultsConsumer;
    private final int numSimulations;
}

/**
 * This class takes (or consumes) the results of a single simulation of the model and saves the results in a manner to
 * allow statistics to be generated from the Monte Carlo simulation.
 */
@Slf4j
class Consumer implements Runnable {

    /**
     * Create the consumer object.
     * @param queue         the queue that the consumer should check for values to consume.
     * @param joinedResults the MonteCarloResults object to which the consumed results will be added.
     */
    public Consumer(final ArrayBlockingQueue<MonteCarloResults> queue, final MonteCarloResults joinedResults) {
        this.queue = queue;
        this.joinedResults = joinedResults;
        joinedResults.reset();
    }

    @Override
    public void run() {
        log.trace("Starting Monte Carlo results consumer thread");

        // it might seem that we should reset (or initialise) the consumer results object but
        // the consumer object is created for each Monte Carlo step so is really not strictly necessary.
        joinedResults.reset();
        final StopWatch sw = new StopWatch();
        sw.start();

        try {
            MonteCarloResults results = null;
            while (!(results instanceof Poison)) {
                results = queue.take();

                if (!(results instanceof Poison)) {
                    numSimulationsFound++;

                    // get the MonteCarloResults from the q and calculate the statistics on it.
                    joinedResults.join(results);

                    if (log.isTraceEnabled()) {
                        log.trace("Monte Carlo consumer: consumed {}", results.getSamples().getSummary());
                        log.trace("Monte Carlo consumer: consumed {} results, expected value={}",
                                  numSimulationsFound, joinedResults.getExpectedValue());
                    }

                    // we no longer require the results so allow the memory to be freed.
                    results = null;
                }
            }
        } catch (java.lang.InterruptedException e) {
            log.error("Error consuming Monte Carlo Results {}", Throwables.getStackTraceAsString(e));
        }
        sw.stop();
        log.debug("Analysed {} simulation results in {}.", numSimulationsFound, sw);
    }

    @Override
    public void finalize() throws Throwable {
        super.finalize();
        joinedResults = null;
    }

    private final ArrayBlockingQueue<MonteCarloResults> queue;
    private MonteCarloResults joinedResults;
    private int numSimulationsFound = 0;
}

/**
 * This class runs a single simulation of the model to 'produce' results (hence the name). This runnable class is run
 * many times to create a Monte Carlo simulation.
 */
@Slf4j
class Producer implements Runnable {

    /**
     * Create the producer object.
     * @param queue          the queue on which the producers results are added.
     * @param simulation     the model that will be run to produce results.
     * @param numSimulations the number of simulations that are to be run and placed in the quque.
     */
    public Producer(final ArrayBlockingQueue<MonteCarloResults> queue, final MonteCarloScenario simulation,
                    final int numSimulations) {
        this.queue = queue;
        this.simulation = simulation.copyOf();
        this.numSimulations = numSimulations;
    }

    @Override
    public void run() {
        log.trace("Starting Monte Carlo results producer thread");
        try {
            final int poolSize = Runtime.getRuntime().availableProcessors();
            final ThreadFactory threadFactory = new ThreadFactoryBuilder()
                    .setNameFormat("MCScenarioProducer-%d")
                    .setDaemon(true)
                    .build();
            final ExecutorService es = Executors.newFixedThreadPool(poolSize, threadFactory);
            final RNG generator = new RNG(RNG.Generator.Well44497b);

            final StopWatch sw = new StopWatch();
            sw.start();
            for (int i = 0; i < numSimulations; i++) {
                es.submit(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            log.trace("Monte Carlo producer: creating scenario object");
                            final MonteCarloScenario scenario = simulation.copyOf();
                            final MonteCarloResults results = scenario.run(generator.getInteger(0, Integer.MAX_VALUE - 1));
                            log.trace("Monte Carlo producer: generated results {}", results.getExpectedValue());
                            queue.put(results);
                        } catch (Exception e) {
                            log.error("Error running Monte Carlo simulation {}", Throwables.getStackTraceAsString(e));
                        }
                    }
                });
            }
            es.shutdown();
            while (!es.isTerminated()) {
                es.awaitTermination(1, TimeUnit.SECONDS);
            }
            queue.put(new Poison());

            sw.stop();
            log.info("Finished {} simulations in {}.", numSimulations, sw);
        } catch (Exception ex) {
            log.error("Monte Carlo simulation error: {}", Throwables.getStackTraceAsString(ex));
        }
    }

    @Override
    public void finalize() throws Throwable {
        super.finalize();
        simulation = null;
    }

    private final ArrayBlockingQueue<MonteCarloResults> queue;
    private MonteCarloScenario simulation;
    private final int numSimulations;
}

/**
 * Poison object that causes the consumer of the MonteCarlo results to stop looking for objects on the queue.
 */
class Poison implements MonteCarloResults {

    @Override
    public double getExpectedValue() {
        return Double.NEGATIVE_INFINITY;
    }

    @Override
    public Samples getSamples() {
        return new Samples();
    }

    @Override
    public String toCsv() {
        return "POISON";
    }

    @Override
    public final MonteCarloResults join(final MonteCarloResults results) {
        return results;
    }

    @Override
    public void reset() {
        // do nothing - this is a poison pill
    }
}