Java 并发编程
Executor 并行执行任务(带返回值)
Using Executor and CompletableFuture to concurrent execute the Many Queries. Below is commit changes.
Java 并发编程案例:任务失败快速失败取消其他任务
package com.whalefall541.cases.concurrentqry.jobversion;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;
@SuppressWarnings("all")
@Slf4j
public class JobFailFastAsyncExecutor implements AutoCloseable {
private final ExecutorService executor;
public JobFailFastAsyncExecutor(int threadCount, String jobName) {
this.executor = Executors.newFixedThreadPool(threadCount, r -> {
Thread thread = new Thread(r);
thread.setName(String.format("%s-%s", jobName, thread.getName()));
return thread;
});
}
/**
* 执行一组异步任务,一旦其中任一任务失败,立即取消所有任务,并传播异常 <br/>
* 真正严格意义的“fail-fast + 最少日志
*
* @param inputs 输入参数列表
* @param taskFunction 任务处理函数,输入 P 返回 R
* @return 一个异步 CompletableFuture,成功返回结果列表,失败抛出第一个异常
*/
public <P, R> CompletableFuture<List<R>> executeFailFast(List<P> inputs, Function<P, R> taskFunction) {
List<CompletableFuture<R>> futures = inputs.stream()
.map(input -> CompletableFuture.supplyAsync(
// 异步线程(来自 thread pool)执行下面逻辑
() -> taskFunction.apply(input), executor))
.collect(Collectors.toList());
CompletableFuture<List<R>> resultFuture = new CompletableFuture<>();
CommonTaskSupport.registerFailFastHandlers(futures, resultFuture);
CommonTaskSupport.collectAllResults(futures, resultFuture);
return resultFuture;
}
static class CommonTaskSupport {
private CommonTaskSupport() {
}
public static <R> void registerFailFastHandlers(List<CompletableFuture<R>> futures,
CompletableFuture<List<R>> resultFuture) {
AtomicBoolean failFastTriggered = new AtomicBoolean(false);
futures.forEach(future -> future.whenComplete(
// 下面都是的异步线程完成后触发
(r, ex) -> {
if (ex != null && failFastTriggered.compareAndSet(false, true)) {
Throwable actual = unwrap(ex);
logIfNeeded(actual);
resultFuture.completeExceptionally(actual);
futures.forEach(f -> {
boolean cancelled = f.cancel(true);
log.debug("尝试取消任务{}: {}", f, cancelled ? "成功" : "失败");
});
}
}));
}
private static Throwable unwrap(Throwable ex) {
if (ex instanceof CompletionException || ex instanceof ExecutionException) {
return ex.getCause();
}
return ex;
}
/**
* 优雅关闭线程池
*
* @param executor 线程池
*/
public static void shutdownGracefully(ExecutorService executor) {
executor.shutdown();
try {
if (!executor.awaitTermination(1, TimeUnit.MINUTES)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
private static void logIfNeeded(Throwable actual) {
if (!(actual instanceof CancellationException)) {
log.warn("任务失败,开始 fail-fast 取消其他任务:{} - [{}]",
actual != null ? actual.getMessage() : "null",
actual != null ? actual.getClass().getSimpleName() : "null");
}
}
public static <R> void collectAllResults(List<CompletableFuture<R>> futures,
CompletableFuture<List<R>> resultFuture) {
CompletableFuture
.allOf(futures.toArray(new CompletableFuture[0]))
.whenComplete((v, ex) -> {
if (!resultFuture.isDone()) {
try {
List<R> results = futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
resultFuture.complete(results);
} catch (CompletionException e) {
resultFuture.completeExceptionally(e.getCause());
}
}
});
}
}
@Override
public void close() {
CommonTaskSupport.shutdownGracefully(executor);
}
}
并行任务执行器(不带返回值)
Java 并行任务执行器(不带返回值)
package com.whalefall541.cases.concurrenttskvoid;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 并行任务执行器
*
* <p>用于同时执行两个 Runnable 任务,支持超时控制。
* 实现 AutoCloseable,支持 try-with-resources 自动释放线程池。
*
* <p>使用示例:
* <pre>{@code
* try (ParallelTaskExecutor executor = new ParallelTaskExecutor()) {
* executor.execute(task1, task2, 28, TimeUnit.SECONDS);
* }
* }</pre>
*
*/
public class ParallelTaskExecutor implements AutoCloseable {
private final ExecutorService executor;
private final boolean owned;
/**
* 使用默认线程池(2线程)创建执行器
*
* <p>线程池将由本类管理,close() 时会自动关闭
*/
public ParallelTaskExecutor() {
this.executor = Executors.newFixedThreadPool(2, new NamedThreadFactory("parallel-task"));
this.owned = true;
}
/**
* 使用外部线程池创建执行器
*
* @param executor 外部 ExecutorService,本类不会关闭它
*/
public ParallelTaskExecutor(ExecutorService executor) {
this.executor = executor;
this.owned = false;
}
/**
* 并行执行两个任务,默认超时 28 秒
*
* @param step1 任务1
* @param step2 任务2
* @throws ParallelExecutionException 任一任务超时、失败或中断
*/
public void execute(Runnable step1, Runnable step2) {
execute(step1, step2, 28, TimeUnit.SECONDS);
}
/**
* 并行执行两个任务,支持自定义超时
*
* @param step1 任务1
* @param step2 任务2
* @param timeout 超时时间
* @param unit 时间单位
* @throws ParallelExecutionException 任一任务超时、失败或中断
*/
public void execute(Runnable step1, Runnable step2, long timeout, TimeUnit unit) {
CompletableFuture<Void> f1 = CompletableFuture.runAsync(step1, executor);
CompletableFuture<Void> f2 = CompletableFuture.runAsync(step2, executor);
try {
CompletableFuture.allOf(f1, f2).get(timeout, unit);
} catch (TimeoutException e) {
throw new ParallelExecutionException("parallel task timeout", e);
} catch (ExecutionException e) {
throw new ParallelExecutionException("parallel task failed", unwrap(e));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new ParallelExecutionException("parallel task interrupted", e);
} finally {
f1.cancel(true);
f2.cancel(true);
}
}
private Throwable unwrap(Throwable ex) {
while ((ex instanceof ExecutionException || ex instanceof CompletionException)
&& ex.getCause() != null) {
ex = ex.getCause();
}
return ex;
}
@Override
public void close() {
if (owned) {
shutdown();
}
}
private void shutdown() {
executor.shutdown();
try {
if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
/**
* 并行任务执行异常
*
* <p>包装 TimeoutException、ExecutionException、InterruptedException
*/
public static class ParallelExecutionException extends RuntimeException {
public ParallelExecutionException(String message, Throwable cause) {
super(message, cause);
}
}
/**
* 命名线程工厂
*
* <p>创建的线程格式为: {prefix}-{序号}
*/
public static class NamedThreadFactory implements ThreadFactory {
private final String prefix;
private final AtomicInteger counter = new AtomicInteger(1);
public NamedThreadFactory(String prefix) {
this.prefix = prefix;
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, prefix + "-" + counter.getAndIncrement());
t.setDaemon(false);
return t;
}
}
}
协议
本作品代码部分采用Apache 2.0协议 进行许可。遵循许可的前提下,你可以自由地对代码进行修改,再发布,可以将代码用作商业用途。但要求你:
- 署名:在原有代码和衍生代码中,保留原作者署名及代码来源信息。
- 保留许可证:在原有代码和衍生代码中,保留Apache 2.0协议文件。
- 署名:应在使用本文档的全部或部分内容时候,注明原作者及来源信息。
- 非商业性使用:不得用于商业出版或其他任何带有商业性质的行为。如需商业使用,请联系作者。
- 相同方式共享的条件:在本文档基础上演绎、修改的作品,应当继续以知识共享署名 4.0国际许可协议进行许可。