更优雅的多线程调用工具
java中的多线程
ts中实现异步的方法
async function hello(): Promise<string> {
return "hello1";
}
hello().then(value => {
console.log(value);
}).catch(err => {
console.error(err);
});
console.log('hello2');
-
ts中有一个极其重要的对象用于实现异步操作,也就是Promise
-
只要实现了Promise/A+规范,那他就是一个Promise
-
ts中Promise的使用非常的简洁,易于开发,那么我们能不能在java中也实现这样的一个类呢
-
答案是肯定的
-
配置异步线程池
-
SpringBoot中配置异步线程池并获取bean
-
@Bean(name = "threadPoolTaskExecutor") public TaskExecutor threadPoolTaskExecutor() { log.info("start taskExecutor"); ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); // 核心线程数 executor.setCorePoolSize(2); // 最大线程数 executor.setMaxPoolSize(10); // 任务队列容量 executor.setQueueCapacity(100); // 设置线程的最大空闲时间 executor.setKeepAliveSeconds(60); // 线程名前缀 executor.setThreadNamePrefix("async-"); // 设置拒绝策略:当线程池达到最大线程数时,如何处理新任务 // CALLER_RUNS:在添加到线程池失败时会由主线程自己来执行这个任务 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // 初始化 executor.initialize(); return executor; }
-
我们可以使用@Bean声明一个bean,并为其取名为threadPoolTaskExecutor
- 同时在bean初始化方法中,我们可以使用spring提供的ThreadPoolTaskExecutor类,来快速声明一个线程池
-
使用
-
@Autowired @Qualifier("threadPoolTaskExecutor") private Executor threadPoolTaskExecutor;
- 我们可以在
由spring管理
的类中,使用@Autowired注解和@Qualifier来加载指定名称的bean。
- 我们可以在
-
-
非spring环境创建线程池
-
非spring环境下,创建线程池
-
先看看开发手册是怎么说的
- 所以我们需要手动创建线程池
-
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)
-
-
如下:
-
new ThreadPoolExecutor(50, 100, 60, SECONDS, new LinkedBlockingQueue<>(127), new ThreadPoolExecutor.CallerRunsPolicy());
-
-
-
-
Promise异步线程池配置
-
spring环境中,仅需修改配置文件(已提供默认配置)
-
-
promise: executor: core-pool-size: 16 max-pool-size: 32 queue-capacity: 1000 keep-alive-seconds: 60 thread-name-prefix: promise
-
-
非spring环境中,需要手动初始化线程池
-
PromiseExecutor.initExecutor(ThreadsUtils.createExecutor(50));
-
-
Promise使用
-
PromisedTask接口
-
/** * 执行任务 * @param status 状态 * @return 结果 * @throws Exception 异常 */ T execute(Promise.NextStatus status) throws Exception;
-
promise中运行的任务需要实现这个接口,传入一个参数,返回一个参数
-
NextStatus status:下一状态,可以调用其中的方法来指定Promise的下一步操作
-
T:返回类型,也是Promise的泛型
-
-
创建一个Promise
-
Promise<Integer> resolve = Promise.resolve((status) -> { System.out.println("start promise...."); Thread.sleep(1000); return 123; });
- 此时这个Promise还未执行,处于待执行状态
-
指定回调方法
-
Promise<Integer> resolve = Promise.resolve((status) -> { System.out.println("start promise...."); Thread.sleep(1000); return 123; }).onSucceed((res) -> System.out.println("返回结果为" + res + ",进入成功回调")) .onFail((res) -> System.out.println("返回结果为" + res + ",进入失败回调")) .onException((e) -> System.out.println("返回结果为" + e + ",进入异常回调")) .onFinally((res) -> System.out.println("返回结果为" + res + ",进入finally回调"));
-
有四种回调:
- succeed回调:任务执行成功时进行回调,传入参数为任务的执行结果
- fail回调:任务执行失败时进行回调,传入参数为任务执行结果
- exception回调:在任务出现异常时进行回调,传入参数为异常Exception类
- finally回调:直接到最后进行回调,传入参数为执行结果
-
-
-
开始执行一个Promise
-
有两种执行方式
-
交由线程池进行运行
resolve.startAsync();
-
由当前线程直接执行任务
resolve.startSync();
-
-
-
手动确定下一步运行状态
-
在任务中,我们传入了status作为下一状态信息,我们可以调用其中的方法,直接进行状态转移
-
status.accept(); status.reject(); status.terminated();
-
accept():
- 默认状态,在该任务直接完成后进入成功回调。
-
reject():
- 拒绝状态,在完成任务之后进入失败回调,进行结果处理。
-
terminated():
- 终止状态,在完成任务后直接进入结束回调,跳过成功或失败回调。
-
你也可以借由抛出异常来进入异常回调
-
-
在新版中,异常回调必须有返回值,作为这次任务的返回结果
-
-
-
-
等待Promise执行结束并获取返回值
public T startSync()
- 由当前线程直接执行该任务,并获取任务的返回值
public void startAsync()
- 开始异步执行任务,此时结果还未产出,所以返回值为void
public T await()
- 等待异步任务执行完毕,并获取返回值
public void waitFinish()
- 等待异步任务执行完毕,忽略返回值
-
批量获取Promise返回值
-
Tasks类
-
提供了三个方法,用于等待执行结束
public static <T> List<T> awaitAll(List<Promise<T>> promises)
- 等待同一种返回类型的Promise执行结束,并批量获取他们的返回值
- 返回结果的顺序和传入Promise的顺序是一致的
public static List<Object> awaitAll(Promise<?>... promises)
- 等待不同种类型的Promise执行结束,并批量获取他们的返回值
- 因为返回类型的不同,所以统一返回Object数组作为结果
- 顺序也与传入顺序一致
public static void awaitAllVoid(Promise<?>... promises)
- 只是等待执行结束并不关心返回值
- 也就是返回值已经在成功回调中处理完成,后续不需要关心返回值的情况
-
-
-
重启Promise
-
出于安全考虑,一个Promise对象只能被执行一次,但是可以多次获取返回值,如果需要多次运行一个任务,这里也提供了两个方法
public Promise<T> reBuild()
- 在任务执行完成后,重构一个完全相同的任务。
- 如果当前任务还未被执行,则返回自身。
public Promise<T> changeTask(PromisedTask<T> task)
- 修改任务,但不修改回调函数,返回一个新的任务。
-
使用示例
-
@Test public void testThreads() throws Exception { PromiseExecutor.initExecutor(ThreadsUtils.createExecutor(50)); Promise<String> stringPromise = Promise.resolve((status) -> { System.out.println("start wait....."); Thread.sleep(5000); // 拒绝状态,进入失败回调 status.reject(); return "123"; }) .onSucceed((res) -> System.out.println("返回结果为" + res + ",进入成功回调")) .onFail((res) -> System.out.println("返回结果为" + res + ",进入失败回调")) .onFinally((res) -> System.out.println("返回结果为" + res + ",进入finally回调")); stringPromise.startAsync(); System.out.println("主线程已完成,等待3秒钟"); Thread.sleep(3000); System.out.println("尝试获取Promise结果"); String await = stringPromise.await(); System.out.println("Promise结果为:" + await); System.out.println("尝试重启Promise"); stringPromise.reBuild().startAsync(); stringPromise.changeTask((status) -> { System.out.println("start wait....."); Thread.sleep(5000); // 接受状态,进入成功回调 status.accept(); return "456"; }).startAsync(); Thread.sleep(10000); }
-
18:07:28.724 [main] DEBUG org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor - Initializing ExecutorService 主线程已完成,等待3秒钟 start wait..... 尝试获取Promise结果 18:07:33.739 [1709028448721-exec-1] ERROR online.zust.qcqcqc.utils.threads.Promise - Promise rejected by user 返回结果为123,进入失败回调 返回结果为123,进入finally回调 Promise结果为:123 尝试重启Promise start wait..... start wait..... 18:07:38.744 [1709028448721-exec-2] ERROR online.zust.qcqcqc.utils.threads.Promise - Promise rejected by user 返回结果为123,进入失败回调 返回结果为123,进入finally回调 返回结果为456,进入成功回调 返回结果为456,进入finally回调
-
@Test public void testException() { PromiseExecutor.initExecutor(ThreadsUtils.createExecutor(50)); Promise<Integer> resolve = Promise.resolve((status) -> { System.out.println("start promise...."); Thread.sleep(1000); throwException(); return 123; }).onSucceed((res) -> System.out.println("返回结果为" + res + ",进入成功回调")) .onFail((res) -> System.out.println("返回结果为" + res + ",进入失败回调")) .onException((e) -> { System.out.println("返回结果为" + e + ",进入异常回调"); return 456; }) .onFinally((res) -> System.out.println("返回结果为" + res + ",进入finally回调")); Integer i = resolve.startSync(); System.out.println("Promise结果为:" + i); } public void throwException() { throw new RuntimeException("运行出错啦"); }
-
18:17:44.345 [main] DEBUG org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor - Initializing ExecutorService start promise.... 返回结果为java.lang.RuntimeException: 运行出错啦,进入异常回调 返回结果为456,进入finally回调 Promise结果为:456