Java Concurrent Programming
Executor Concurrent Task Execution
Using Executor and CompletableFuture to concurrent execute the Many Queries. Below is commit changes.
Java Concurrent Programming Case: Task Failure Fail-Fast Cancels Other Tasks
package com.whalefall541.cases.concurrentqry.jobversion;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;
@SuppressWarnings("all")
@Slf4j
public class JobFailFastAsyncExecutor implements AutoCloseable {
private final ExecutorService executor;
public JobFailFastAsyncExecutor(int threadCount, String jobName) {
this.executor = Executors.newFixedThreadPool(threadCount, r -> {
Thread thread = new Thread(r);
thread.setName(String.format("%s-%s", jobName, thread.getName()));
return thread;
});
}
/**
* Execute a group of asynchronous tasks, once any task fails, immediately cancel all tasks and propagate exception <br/>
* Truly strict "fail-fast + minimal log"
*
* @param inputs Input parameter list
* @param taskFunction Task processing function, input P return R
* @return An asynchronous CompletableFuture, returns result list on success, throws first exception on failure
*/
public <P, R> CompletableFuture<List<R>> executeFailFast(List<P> inputs, Function<P, R> taskFunction) {
List<CompletableFuture<R>> futures = inputs.stream()
.map(input -> CompletableFuture.supplyAsync(
// Asynchronous thread (from thread pool) executes logic below
() -> taskFunction.apply(input), executor))
.collect(Collectors.toList());
CompletableFuture<List<R>> resultFuture = new CompletableFuture<>();
CommonTaskSupport.registerFailFastHandlers(futures, resultFuture);
CommonTaskSupport.collectAllResults(futures, resultFuture);
return resultFuture;
}
static class CommonTaskSupport {
private CommonTaskSupport() {
}
public static <R> void registerFailFastHandlers(List<CompletableFuture<R>> futures,
CompletableFuture<List<R>> resultFuture) {
AtomicBoolean failFastTriggered = new AtomicBoolean(false);
futures.forEach(future -> future.whenComplete(
// Triggered after asynchronous threads below complete
(r, ex) -> {
if (ex != null && failFastTriggered.compareAndSet(false, true)) {
Throwable actual = unwrap(ex);
logIfNeeded(actual);
resultFuture.completeExceptionally(actual);
futures.forEach(f -> {
boolean cancelled = f.cancel(true);
log.debug("Try to cancel task {}: {}", f, cancelled ? "Success" : "Failed");
});
}
}));
}
private static Throwable unwrap(Throwable ex) {
if (ex instanceof CompletionException || ex instanceof ExecutionException) {
return ex.getCause();
}
return ex;
}
/**
* Gracefully shutdown thread pool
*
* @param executor Thread pool
*/
public static void shutdownGracefully(ExecutorService executor) {
executor.shutdown();
try {
if (!executor.awaitTermination(1, TimeUnit.MINUTES)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
private static void logIfNeeded(Throwable actual) {
if (!(actual instanceof CancellationException)) {
log.warn("Task failed, start fail-fast to cancel other tasks: {} - [{}]",
actual != null ? actual.getMessage() : "null",
actual != null ? actual.getClass().getSimpleName() : "null");
}
}
public static <R> void collectAllResults(List<CompletableFuture<R>> futures,
CompletableFuture<List<R>> resultFuture) {
CompletableFuture
.allOf(futures.toArray(new CompletableFuture[0]))
.whenComplete((v, ex) -> {
if (!resultFuture.isDone()) {
try {
List<R> results = futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
resultFuture.complete(results);
} catch (CompletionException e) {
resultFuture.completeExceptionally(e.getCause());
}
}
});
}
}
@Override
public void close() {
CommonTaskSupport.shutdownGracefully(executor);
}
}
Agreement
The code part of this work is licensed under Apache License 2.0 . You may freely modify and redistribute the code, and use it for commercial purposes, provided that you comply with the license. However, you are required to:
- Attribution: Retain the original author's signature and code source information in the original and derivative code.
- Preserve License: Retain the Apache 2.0 license file in the original and derivative code.
- Attribution: Give appropriate credit, provide a link to the license, and indicate if changes were made.
- NonCommercial: You may not use the material for commercial purposes. For commercial use, please contact the author.
- ShareAlike: If you remix, transform, or build upon the material, you must distribute your contributions under the same license as the original.