异步和线程池

Posted by Chenyawei on 2021-09-15
Words 5.9k and Reading Time 27 Minutes
Viewed Times

一、初始化线程的4种方式

1、继承Thread

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class ThreadTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Thread thread = new Thread01();
thread.start();
}
public static class Thread01 extends Thread {
@Override
public void run() {
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("运行结果:" + i);
}
}
}

2、实现Runnable接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class ThreadTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Runable01 runable01 = new Runable01();
runable01.run();
}
public static class Runable01 implements Runnable {
@Override
public void run() {
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("运行结果:" + i);
}
}
}

3、实现Callable接口 + FutureTask(可以拿到返回结果,可以处理异常)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class ThreadTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<Integer> futureTask = new FutureTask<>(new Callable01());
futureTask.run();
System.out.println(futureTask.get());
}
public static class Callable01 implements Callable<Integer> {
@Override
public Integer call() throws Exception {
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("运行结果:" + i);
return i;
}
}
}

4、线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class ThreadTest {
public static final ExecutorService executor = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
executor.execute(new Runable01()); // 无返回值
Future<Integer> submit = executor.submit(new Callable01()); // 有返回值
submit.get();
}
public static class Runable01 implements Runnable {
@Override
public void run() {
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("运行结果:" + i);
}
}
public static class Callable01 implements Callable<Integer> {
@Override
public Integer call() throws Exception {
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("运行结果:" + i);
return i;
}
}
}

方式 1 和方式 2:主进程无法获取线程的运算结果。方式 3:主进程可以获取线程的运算结果,但是不利于控制服务器中的线程资源。可以导致服务器资源耗尽。业务代码中使用线程池的方式进行并行开发.

二、线程池的使用

1、线程池的七大参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* 1、int corePoolSize,
* 2、int maximumPoolSize,
* 3、long keepAliveTime,
* 4、TimeUnit unit,
* 5、BlockingQueue<Runnable> workQueue,
* 6、ThreadFactory threadFactory,
* 7、RejectedExecutionHandler handler
*/
ExecutorService threadPool = new ThreadPoolExecutor(
200,
10,
10L,
TimeUnit.SECONDS,
new LinkedBlockingDeque<Runnable>(10000),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()
);
1)、int corePoolSize

– the number of threads to keep in the pool, even if they are idle, unless allowCoreThreadTimeOut is set.

池中一直保持的线程的数量,即使线程空闲.除非设置了allowCoreThreadTimeOut

2)、maximumPoolSize

– the maximum number of threads to allow in the pool

池中允许的最大线程数

3)、keepAliveTime

– when the number of threads is greater than the core, this is the maximum time that excess idle threads will wait for new tasks before terminating.

当线程数大于核心数时,这是多余的空闲线程在终止前等待新任务的最长时间。最终线程池维持在corePoolSize大小

4)、unit

– the time unit for the keepAliveTime argument

keepAliveTime参数的时间单位

5)、workQueue

– the queue to use for holding tasks before they are executed. This queue will hold only the Runnable tasks submitted by the execute method.

用于在执行任务之前保存任务的队列。 这个队列将只保存 execute 方法提交的 Runnable 任务。

阻塞队列,用来存储等待执行的任务,如果当前对线程的需求超过了corePoolSize大小,就会放在这里等待空闲线程执行.

6)、threadFactory

– the factory to use when the executor creates a new thread

创建线程的工厂,比如指定线程名等

7)、handler

– the handler to use when execution is blocked because the thread bounds and queue capacities are reached

由于达到线程边界和队列容量而阻塞执行时要使用的处理程序.

拒绝策略,如果线程慢了,线程池就会使用拒绝策略.

2、运行流程

1)、线程池创建,准备好core数量的核心线程,准备接受任务
2)、新的任务进来,用core线程最好的空闲线程执行
  • 1、core满了,就将再进来的任务放入阻塞队列中。空闲的core就会自己去阻塞队列取任务执行
  • 2、阻塞队列满了,就直接开新线程执行,最大只能开到max指定的数量 (new LinkedBlockingDeque<>(): 默认是Integer的最大值。并发量很大时可能会内存不够,可以指定阻塞队列的数量)
  • 3、max都执行完成,有很多空闲,指定的时间keepAliveTime以后,max减去core的这些线程自动销毁.最终保持到core大小
  • 4、如果线程数开到了max的数量,还有新任务进来,就用Handler指定的拒绝策略进行处理
3)、所有的线程创建都是由指定的factory创建的.

3、常见的4种线程池

  • newCachedThreadPool()

    创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。 60 秒内未使用的线程将被终止并从缓存中删除。

    1
    2
    3
    4
    5
    public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
    60L, TimeUnit.SECONDS,
    new SynchronousQueue<Runnable>());
    }
  • newFixedThreadPool(5)

    创建一个线程池,该线程池重用固定数量的线程在共享的无界队列中运行。 在任何时候,最多 nThreads 个线程将是活动的处理任务。 如果在所有线程都处于活动状态时提交了其他任务,它们将在队列中等待,直到有线程可用。 如果任何线程在关闭前的执行过程中因失败而终止,则在需要执行后续任务时,将有一个新线程取而代之。 池中的线程将一直存在,直到它被明确关闭。

    参数:nThreads——池中的线程数

    返回:新创建的线程池

    1
    2
    3
    4
    5
    public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
    0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<Runnable>());
    }
  • newScheduledThreadPool(5)

    创建一个线程池,可以安排命令在给定延迟后运行,或定期执行。

    参数:corePoolSize——要保留在池中的线程数,即使它们是空闲的.

    返回:新创建的调度线程池

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
    }
    public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue());
    }
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
    BlockingQueue<Runnable> workQueue){
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
    Executors.defaultThreadFactory(), defaultHandler);
    }
  • newSingleThreadExecutor

    创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

    返回:新创建的单线程Executor

    1
    2
    3
    4
    5
    public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService(
    new ThreadPoolExecutor(1, 1,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>())
    );
    }

    与其他等效的 newFixedThreadPool(1) 不同,返回的执行器保证不可重新配置以使用其他线程。 它保证不能重新配置从而使用其他线程。比如以下代码:

    (ThreadPoolExecutor) es2; 这行代码会报错.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    public static void main(String[] args) {
    ExecutorService es1 = Executors.newFixedThreadPool(1);
    ExecutorService es2 = Executors.newSingleThreadExecutor();

    ThreadPoolExecutor tp1 = (ThreadPoolExecutor) es1;
    tp1.setCorePoolSize(2);
    System.out.println(tp1.getCorePoolSize());

    ThreadPoolExecutor tp2 = (ThreadPoolExecutor) es2;
    tp1.setCorePoolSize(2);
    System.out.println(tp1.getCorePoolSize());
    }

    结果:

    1
    2
    3
    2
    Exception in thread "main" java.lang.ClassCastException: java.util.concurrent.Executors$FinalizableDelegatedExecutorService cannot be cast to java.util.concurrent.ThreadPoolExecutor
    at com.atguigu.gulimall.search.thread.PoolTest.main(PoolTest.java:21)

4、开发中为什么使用线程池

  • 降低资源的消耗

    通过重复利用已经创建好的线程降低线程的创建和销毁带来的损耗

  • 提高响应速度

    因为线程池中的线程数没有超过线程池的最大上限时,有的线程处于等待分配任务的状态,当任务来时无需创建新的线程就能执行

  • 提高线程的可管理性

    线程池会根据当前系统特点对池内的线程进行优化处理,减少创建和销毁线程带来的系统开销。无限的创建和销毁线程不仅消耗系统资源,还降低系统的稳定性,使用线程池进行统一分配

三、CompleTableFuture 异步编排

业务场景:

查询商品详情页的逻辑比较复杂,有些数据还需要远程调用,必然需要花费更多的时间。

1
2
3
4
5
6
1、获取sku的基本信息 0.5s
2、获取sku的图片信息 0.5s
3、获取sku的促销信息 1s
4、获取spu的所有销售属性 1s
5、获取规格参数组及组下的规格参数 1.5s
6、spu 详情 1s

假如商品详情页的每个查询,需要如下标注的时间才能完成.那么,用户需要 5.5s 后才能看到商品详情页的内容。很显然是不能接受的。如果有多个线程同时完成这 6 步操作,也许只需要 1.5s 即可完成响应。

Future 是 Java 5 添加的类,用来描述一个异步计算的结果。你可以使用isDone方法检查计算是否完成,或者使用get阻塞住调用线程,直到计算完成返回结果,你也可以使用cancel方法停止任务的执行。

虽然Future以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。阻塞的方式显然和我们的异步编程的初衷相违背,轮询的方式又会耗费无谓的CPU资源,而且也不能及时地得到计算结果,为什么不能用观察者设计模式当计算结果完成及时通知监听者呢?

很多语言,比如 Node.js,采用回调的方式实现异步编程。Java 的一些框架,比如 Netty,自己扩展了 Java 的Future接口,提供了addListener等多个扩展方法;Google guava 也提供了通用的扩展 Future;Scala 也提供了简单易用且功能强大的 Future/Promise 异步编程模式。作为正统的 Java 类库,是不是应该做点什么,加强一下自身库的功能呢?

在 Java 8 中, 新增加了一个包含 50 个方法左右的类: CompletableFuture,提供了非常强大的Future 的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合 CompletableFuture 的方法。CompletableFuture 类实现了 Future 接口,所以你还是可以像以前一样通过get方法阻塞或者轮询的方式获得结果,但是这种方式不推荐使用。

CompletableFuture 和 FutureTask 同属于 Future 接口的实现类,都可以获取线程的执行结果。

image-20210914113919587

1、创建异步对象

CompletableFuture 提供了四个静态方法来创建一个异步操作.

1
2
3
4
5
6
7
8
9
10
11
12
public static CompletableFuture<Void> runAsync(Runnable runnable) {
return asyncRunStage(asyncPool, runnable);
}
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor) {
return asyncRunStage(screenExecutor(executor), runnable);
}
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
return asyncSupplyStage(asyncPool, supplier);
}
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor) {
return asyncSupplyStage(screenExecutor(executor), supplier);
}

runAsync 都是没有返回结果的,supplyAsync 都是可以获取返回结果的; 可以传入自定义的线程池executor,否则就用默认的线程池;

实例:

1
2
3
4
5
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
System.out.println("runAsync当前线程:" + Thread.currentThread().getName());
int j = 10 / 2;
System.out.println("runAsync运行结果:" + j);
}, executor);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* 方法完成后的感知
*/
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("whenComplete当前线程:" + Thread.currentThread().getName());
int i = 10 / 0;
System.out.println("whenComplete运行结果:" + i);
return i;
}, executor).whenComplete((res,exception) -> {
//虽然能得到异常信息,但是没法修改返回数据
System.out.println("whenComplete异步任务成功完成了...结果是:" + res + "异常是:" + exception);
}).exceptionally(throwable -> {
//可以感知异常,同时返回默认值
return 10;
});
System.out.println("whenComplete获取最终结果:"+future1.get());

2、计算完成时回调方法

1
2
3
4
5
6
7
8
9
10
11
12
public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action) {
return uniWhenCompleteStage(null, action);
}
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action) {
return uniWhenCompleteStage(asyncPool, action);
}
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor) {
return uniWhenCompleteStage(screenExecutor(executor), action);
}
public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn) {
return uniExceptionallyStage(fn);
}
  • whenComplete 可以处理正常和异常的计算结果,exceptionally 处理异常情况。

  • whenComplete 和 whenCompleteAsync 的区别:

    whenComplete:是执行当前任务的线程执行继续执行 whenComplete 的任务。

    whenCompleteAsync:是执行把 whenCompleteAsync 这个任务继续提交给线程池来进行执行。

方法不以 Async 结尾,意味着 Action 使用相同的线程执行,而Async可能会使用其他线程执行(如果是使用相同的线程池,也可能会被同一个线程选中执行)

例子: whenComplete方法完成后的感知:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class ThreadTest {

public static final ExecutorService executor = Executors.newFixedThreadPool(10);

public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("whenComplete当前线程:" + Thread.currentThread().getName());
int i = 10 / 0;
System.out.println("whenComplete运行结果:" + i);
return i;
}, executor).whenComplete((res,exception) -> {
//虽然能得到异常信息,但是没法修改返回数据
System.out.println("whenComplete异步任务成功完成了...结果是:" + res + "异常是:" + exception);
}).exceptionally(throwable -> {
//可以感知异常,同时返回默认值
return 10;
});
System.out.println("whenComplete获取最终结果:"+future1.get());
}
}

3、handle方法

1
2
3
4
5
6
7
8
9
10
11
public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn) {
return uniHandleStage(null, fn);
}

public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn) {
return uniHandleStage(asyncPool, fn);
}

public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) {
return uniHandleStage(screenExecutor(executor), fn);
}

和 complete 一样,可对结果做最后的处理(可处理异常),可改变返回值。

例子: 方法执行完成后的处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class ThreadTest {
public static final ExecutorService executor = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
/**
* 方法执行完成后的处理
*/
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("handle当前线程:" + Thread.currentThread().getName());
int i = 10 / 2;
System.out.println("handle运行结果:" + i);
return i;
}, executor).handle((result,thr) -> {
if (result != null) {
return result * 2;
}
if (thr != null) {
System.out.println("handle异步任务成功完成了...结果是:" + result + "异常是:" + thr);
return 0;
}
return 0;
});
System.out.println("handle获取最终结果:"+future2.get());
}
}

4、线程串行化方法

1)、thenRun:只要上面的任务执行完成,就开始执行 thenRun,只是处理完任务后,执行thenRun的后续操作带有Async 默认是异步执行的。同之前。以上都要前置任务成功完成

1
2
3
4
5
6
7
8
9
public CompletableFuture<Void> thenRun(Runnable action) {
return uniRunStage(null, action);
}
public CompletableFuture<Void> thenRunAsync(Runnable action) {
return uniRunStage(asyncPool, action);
}
public CompletableFuture<Void> thenRunAsync(Runnable action,Executor executor) {
return uniRunStage(screenExecutor(executor), action);
}

例子:

1
2
3
4
5
6
7
8
CompletableFuture<Void> future3 = CompletableFuture.supplyAsync(() -> {
System.out.println("thenRunAsync当前线程:" + Thread.currentThread().getName());
int i = 10 / 2;
System.out.println("thenRunAsync运行结果:" + i);
return i;
}, executor).thenRunAsync(() -> {
System.out.println("thenRunAsync任务2启动了...");
}, executor);

2)、thenAcceptAsync:能接受上一步结果,并消费处理,无返回结果。

1
2
3
4
5
6
7
8
9
10
11
public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
return uniAcceptStage(null, action);
}

public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {
return uniAcceptStage(asyncPool, action);
}

public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor) {
return uniAcceptStage(screenExecutor(executor), action);
}

例子:

1
2
3
4
5
6
7
8
CompletableFuture<Void> future3 = CompletableFuture.supplyAsync(() -> {
System.out.println("thenAcceptAsync当前线程:" + Thread.currentThread().getName());
int i = 10 / 2;
System.out.println("thenAcceptAsync运行结果:" + i);
return i;
}, executor).thenAcceptAsync(res -> {
System.out.println("thenAcceptAsync任务2启动了..." + res);
}, executor);

3)、thenApplyAsync:当一个线程依赖另一个线程时,获取上一个任务返回的结果,并返回当前任务的返回值。Function<? super T,? extends U> T:上一个任务返回结果的类型

1
2
3
4
5
6
7
8
9
10
11
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) {
return uniApplyStage(null, fn);
}

public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn) {
return uniApplyStage(asyncPool, fn);
}

public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor) {
return uniApplyStage(screenExecutor(executor), fn);
}

例子:

1
2
3
4
5
6
7
8
9
10
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
System.out.println("thenApplyAsync当前线程:" + Thread.currentThread().getName());
int i = 10 / 2;
System.out.println("thenApplyAsync运行结果:" + i);
return i;
}, executor).thenApplyAsync(res -> {
System.out.println("thenApplyAsync任务2启动了..." + res);
return "Hello" + res;
}, executor);
System.out.println("thenApplyAsync获取最终结果:"+future3.get());

5、两个任务组合-都要完成,

两个任务必须都完成,触发该任务

1)、runAfterBoth:组合两个 future,不需要获取 future 的结果,只需两个 future 处理完任务后,处理该任务。

1
2
3
4
5
6
7
8
9
10
11
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,Runnable action) {
return biRunStage(null, other, action);
}

public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action) {
return biRunStage(asyncPool, other, action);
}

public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action,Executor executor) {
return biRunStage(screenExecutor(executor), other, action);
}

2)、thenAccept: 组合两个 future,获取两个 future 任务的返回结果,然后处理任务,没有返回值。

1
2
3
4
5
6
7
8
9
10
11
public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action) {
return biAcceptStage(null, other, action);
}

public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action) {
return biAcceptStage(asyncPool, other, action);
}

public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action, Executor executor) {
return biAcceptStage(screenExecutor(executor), other, action);
}

3)、thenCombine: 组合两个 future,获取两个 future 的返回结果,并返回当前任务的返回值

1
2
3
4
5
6
7
8
9
10
11
 public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn) {
return biApplyStage(null, other, fn);
}

public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) {
return biApplyStage(asyncPool, other, fn);
}

public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn, Executor executor) {
return biApplyStage(screenExecutor(executor), other, fn);
}

例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
CompletableFuture<Object> future_001 = CompletableFuture.supplyAsync(()->{
System.out.println("任务1线程:" + Thread.currentThread().getName());
int i = 10 / 2;
System.out.println("任务1结束:" + i);
return i;
},executor);

CompletableFuture<Object> future_002 = CompletableFuture.supplyAsync(()->{
System.out.println("任务2线程:" + Thread.currentThread().getName());
try {
Thread.sleep(1000);
System.out.println("任务2结束:");
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Hello ";
},executor);

/**
* runAfterBothAsync:不能感知到future_001 和 future_002的结果(自己无返回)
*/
future_001.runAfterBothAsync(future_002,()->{
System.out.println("任务3开始");
},executor);
/**
* 可以感知到future_001 和 future_002的结果(自己无返回)
* void accept(T t, U u); f1和f2为两个对应返回的结果
*/
future_001.thenAcceptBothAsync(future_002,(f1,f2)->{
System.out.println("thenAcceptBothAsync任务3返回:f1= " + f1 + " f2= " + f2);
},executor);

/**
* 可以感知到future_001 和 future_002的结果(自己有返回结果)
*/
CompletableFuture<String> stringCompletableFuture = future_001.thenCombineAsync(future_002, (f1, f2) -> {
return f1 + ": " + f2 + "====";
}, executor);
System.out.println("thenCombineAsync返回结果:" + stringCompletableFuture.get());

6、两个任务组合-一个完成

当两个任务中,任意一个 future 任务完成的时候,执行任务。

1)、acceptEither:两个任务有一个执行完成,获取它的返回值,处理任务,没有新的返回值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public CompletableFuture<Void> acceptEither(
CompletionStage<? extends T> other, Consumer<? super T> action) {
return orAcceptStage(null, other, action);
}

public CompletableFuture<Void> acceptEitherAsync(
CompletionStage<? extends T> other, Consumer<? super T> action) {
return orAcceptStage(asyncPool, other, action);
}

public CompletableFuture<Void> acceptEitherAsync(
CompletionStage<? extends T> other, Consumer<? super T> action,
Executor executor) {
return orAcceptStage(screenExecutor(executor), other, action);
}

2)、runAfterEither:两个任务有一个执行完成,不需要获取 future 的结果,处理任务,也没有返回值。

1
2
3
4
5
6
7
8
9
10
11
public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,Runnable action) {
return orRunStage(null, other, action);
}

public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action) {
return orRunStage(asyncPool, other, action);
}

public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor) {
return orRunStage(screenExecutor(executor), other, action);
}

3)、applyToEither:两个任务有一个执行完成,获取它的返回值,处理任务并有新的返回值。

1
2
3
4
5
6
7
8
9
10
11
public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn) {
return orApplyStage(null, other, fn);
}

public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn) {
return orApplyStage(asyncPool, other, fn);
}

public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn,Executor executor) {
return orApplyStage(screenExecutor(executor), other, fn);
}

例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* 两个任务有一个执行完了(获取到返回结果),就执行任务3.(自己没有返回值)
*/
future_001.acceptEitherAsync(future_002,(res)->{
System.out.println("任务3执行: "+res);
},executor);
/**
* 两个任务有一个执行完了(不获取其返回结果),就执行任务3(自己没有返回值)
*/
future_001.runAfterEitherAsync(future_002,()->{
System.out.println("任务3执行");
},executor);
/**
* 两个任务有一个执行完了(获取其返回结果),就执行任务3。(自己有返回值)
*/
CompletableFuture<String> stringCompletableFuture2 = future_001.applyToEitherAsync(future_002, (res) -> {
return res.toString() + "-->😄😄";
}, executor);
System.out.println("任务3执行:"+stringCompletableFuture2.get());

7、多任务组合

allOf:等待所有任务完成

1
2
3
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
return andTree(cfs, 0, cfs.length - 1);
}

anyOf:只要有一个任务完成

1
2
3
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
return orTree(cfs, 0, cfs.length - 1);
}

例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
CompletableFuture<String> futureImg = CompletableFuture.supplyAsync(() -> {
System.out.println("查询商品图片信息");
return "apple.jpg";
}, executor);
CompletableFuture<String> futureAttr = CompletableFuture.supplyAsync(() -> {
System.out.println("查询商品属性信息");
return "黑色+256G";
}, executor);
CompletableFuture<String> futureDesc = CompletableFuture.supplyAsync(() -> {
System.out.println("查询商品简介");
return "苹果";
}, executor);
/**
* 等待futureImg、futureAttr、futureDesc所有任务完成
*/
CompletableFuture<Void> allof = CompletableFuture.allOf(futureImg, futureAttr, futureDesc);
allof.get(); // 等待所有结果完成
System.out.println("allOf等待所有结果完成: "+futureImg.get()+"=>"+futureAttr.get()+"=>"+futureDesc.get());

/**
* 等待futureImg、futureAttr、futureDesc只要有一个任务完成
*/
CompletableFuture<Object> anyof = CompletableFuture.anyOf(futureImg, futureAttr, futureDesc);
anyof.get(); // 只要有一个任务完成
System.out.println("anyOf只要有一个任务完成: "+anyof.get());

四、线程池的三种队列

1、SynchronousQueue

1
private static ExecutorService cachedThreadPool = new ThreadPoolExecutor(4, Runtime.getRuntime().availableProcessors() * 2, 0, TimeUnit.MILLISECONDS, new SynchronousQueue<>(), r -> new Thread(r, "ThreadTest"));

SynchronousQueue没有容量,是无缓冲等待队列,是一个不存储元素的阻塞队列,会直接将任务交给消费者,必须等队列中的添加元素被消费后才能继续添加新的元素。拥有公平(FIFO)和非公平(LIFO)策略,非公平侧罗会导致一些数据永远无法被消费的情况?使用SynchronousQueue阻塞队列一般要求maximumPoolSizes为无界(Integer.MAX_VALUE),避免线程拒绝执行操作。

2、LinkedBlockingQueue

1
private static ExecutorService cachedThreadPool = new ThreadPoolExecutor(4, Runtime.getRuntime().availableProcessors() * 2, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), r -> new Thread(r, "ThreadTest"));

LinkedBlockingQueue是一个无界缓存等待队列。当前执行的线程数量达到corePoolSize的数量时,剩余的元素会在阻塞队列里等待。(所以在使用此阻塞队列时maximumPoolSizes就相当于无效了),每个线程完全独立于其他线程。生产者和消费者使用独立的锁来控制数据的同步,即在高并发的情况下可以并行操作队列中的数据。

注:这个队列需要注意的是,虽然通常称其为一个无界队列,但是可以人为指定队列大小,而且由于其用于记录队列大小的参数是int类型字段,所以通常意义上的无界其实就是队列长度为 Integer.MAX_VALUE,且在不指定队列大小的情况下也会默认队列大小为 Integer.MAX_VALUE,等同于如下:

1
private static ExecutorService cachedThreadPool = new ThreadPoolExecutor(4, Runtime.getRuntime().availableProcessors() * 2, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(Integer.MAX_VALUE), r -> new Thread(r, "ThreadTest"));

3、ArrayBlockingQueue

1
private static ExecutorService cachedThreadPool = new ThreadPoolExecutor(4, Runtime.getRuntime().availableProcessors() * 2, 0, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(32), r -> new Thread(r, "ThreadTest"));

ArrayBlockingQueue是一个有界缓存等待队列,可以指定缓存队列的大小,当正在执行的线程数等于corePoolSize时,多余的元素缓存在ArrayBlockingQueue队列中等待有空闲的线程时继续执行,当ArrayBlockingQueue已满时,加入ArrayBlockingQueue失败,会开启新的线程去执行,当线程数已经达到最大的maximumPoolSizes时,再有新的元素尝试加入ArrayBlockingQueue时会执行拒绝策略。


notice

欢迎访问 chenyawei 的博客, 若有问题或者有好的建议欢迎留言,笔者看到之后会及时回复。 评论点赞需要github账号登录,如果没有账号的话请点击 github 注册, 谢谢 !

If you like this blog or find it useful for you, you are welcome to comment on it. You are also welcome to share this blog, so that more people can participate in it. If the images used in the blog infringe your copyright, please contact the author to delete them. Thank you !