LocalPoolExecutor.java

/*
 * Copyright 2013 University of Glasgow.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package broadwick.concurrent;

import broadwick.BroadwickException;
import com.google.common.base.Throwables;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

/**
 * This Executor controls the execution of several threads using a fixed thread pool. Several instances of the callable
 * object are added to the execution pool which monitors their execution, if any fails, i.e. any of the jobs throws an
 * exception, then all non-running jobs are canceled and currently running jobs are terminated.
 */
@Slf4j
public class LocalPoolExecutor implements Executor {

    /**
     * Create the Callable executor that will run a supplied number of instances of a supplied callable, canceling the execution
     * if a thread throws an exception.
     * @param threadPoolSize    the number of concurrent threads to be run (the size of the thread pool)
     * @param numInstancesToRun the number of threads to be run.
     * @param callable          the actual Callable instance that is to be executed in the pool.
     */
    public LocalPoolExecutor(final int threadPoolSize, final int numInstancesToRun, final Callable<?> callable) {
        poolSize = threadPoolSize;
        numRuns = numInstancesToRun;
        job = callable;
        service = Executors.newFixedThreadPool(poolSize);
    }

    /**
     * Create the Callable executor that will run each callable in a collection, canceling the execution
     * if a thread throws an exception.
     * @param threadPoolSize    the number of concurrent threads to be run (the size of the thread pool)
     * @param jobs          a collection of [Callable] jobs to be run.
     */
    public LocalPoolExecutor(final int threadPoolSize, final List<Callable<?>> jobs) {
        poolSize = threadPoolSize;
        numRuns = jobs.size();
        job = null;
        service = Executors.newFixedThreadPool(poolSize);
    }

    @Override
    public final Status getStatus() {
        return status;
    }

    @Override
    public final void run() {

        final List<Future<?>> tasks = new ArrayList<>();
        final CountDownLatch latch = new CountDownLatch(numRuns);

        status = Executor.Status.RUNNING;

        // Submit all the Callable objects to the executor, counting down the latch
        // when each submitted job is finished. If any of the jobs throws an exception,
        // it will be wrapped in an ExecutionException by the Future and we will check for it later.
        for (int i = 0; i < numRuns; i++) {
            final int taskId = i;
            log.trace("Adding task {} to the executor service.", i);
            tasks.add(service.submit(new Runnable() {
                @Override
                public void run() {
                    try {
                        job.call();
                    } catch (Exception e) {
                        ok = false;
                        throw new BroadwickException(e);
                    } finally {
                        log.debug("Completed task {}", taskId);
                        latch.countDown();
                    }
                }
            }));
        }

        // We now have all the jobs submitted to the executor service. Now we iteratively check the status of each 
        // job (sleeping for 100ms between checks) to determine if each job has been completed. If any job
        // fails then we catch the resultant ExecutionException and cancel all remaining jobs.
        int completedTasks = 0;
        while (completedTasks < tasks.size()) {
            try {
                final Iterator<Future<?>> iterator = tasks.iterator();
                while (iterator.hasNext()) {
                    final Future<?> task = iterator.next();
                    if (task.isDone()) {
                        completedTasks++;
                        task.get();
                        iterator.remove();
                    } else if (task.isCancelled()) {
                        stopAllTasks(tasks);
                    }
                }
                Thread.sleep(10);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                status = Executor.Status.TERMINATED;
                ok = false;
                break;
            } catch (ExecutionException e) {
                // One of the submitted jobs threw an exception, we need to stop all the currently running threads.
                log.error("LocalPoolExecutor detected an error: {}", Throwables.getStackTraceAsString(e));
                stopAllTasks(tasks);
                service.shutdown();
                status = Executor.Status.TERMINATED;
                ok = false;
                break;
            }
        }

        try {
            if (status != Executor.Status.TERMINATED) {
                latch.await();
            }
        } catch (InterruptedException e) {
            log.error("Caught exception closing failed tasks");
            status = Executor.Status.TERMINATED;

        }

        shutdown();
    }

    /**
     * Send a message to all the futures to cancel their execution. On completion the status of the executor is set to
     * Executor.Status.TERMINATED.
     * @param tasks a collection of future object that are to be stopped.
     */
    private void stopAllTasks(final List<Future<?>> tasks) {
        for (Future<?> task : tasks) {
            task.cancel(true);
        }
        status = Executor.Status.TERMINATED;
    }

    /**
     * Shutdown the ExecutionService, blocking until ALL the threads have finished before returning.
     */
    private void shutdown() {
        log.trace("Shutting down the LocalPoolExecutor");
        try {
            service.shutdown();
            service.awaitTermination(1000, TimeUnit.MILLISECONDS);

            log.trace("LocalPoolExecutor succesfully shutdown");
            status = Executor.Status.COMPLETED;

        } catch (InterruptedException e) {
            // Nothing to do - we are shuting done the service
            Thread.currentThread().interrupt();
            status = Executor.Status.TERMINATED;
        } catch (Exception e) {
            // Nothing to do - we are shuting done the service
            status = Executor.Status.TERMINATED;
        }
    }
    private final int poolSize;
    private final int numRuns;
    private final Callable<?> job;
    private final ExecutorService service;
    private Status status = Executor.Status.NOT_STARTED;
    @Getter
    @SuppressWarnings("PMD.UnusedPrivateField")
    private boolean ok = true; /// true if the underlying task completed without error, false otherwise
}