JUC并发编程

JUC并发编程

1.线程基础知识复习

JUC其实就是java.util.concurrent并发包的缩写

1.start线程开启C源码分析

1
2
3
4
5
6
new Thread(() -> {}).start();
// 对着这行代码Ctrl B一下
// 可以发现核心代码就是start0()
start0();
// 接着Ctrl B发现下一个函数就是一个native修饰的方法
private native void start0();

native源码的下载地址:
https://hg.openjdk.org/jdk8

https://hg.openjdk.org/jdk8/jdk8/hotspot/archive/tip.zip
https://hg.openjdk.org/jdk8/jdk8/jdk/archive/tip.zip

openjdk的写JNI一般是一一对应的, thread.java其实对应的就是thread.c的源码

其中thread.c的部分源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
static JNINativeMethod methods[] = {
{"start0", "()V", (void *)&JVM_StartThread},
{"stop0", "(" OBJ ")V", (void *)&JVM_StopThread},
{"isAlive", "()Z", (void *)&JVM_IsThreadAlive},
{"suspend0", "()V", (void *)&JVM_SuspendThread},
{"resume0", "()V", (void *)&JVM_ResumeThread},
{"setPriority0", "(I)V", (void *)&JVM_SetThreadPriority},
{"yield", "()V", (void *)&JVM_Yield},
{"sleep", "(J)V", (void *)&JVM_Sleep},
{"currentThread", "()" THD, (void *)&JVM_CurrentThread},
{"countStackFrames", "()I", (void *)&JVM_CountStackFrames},
{"interrupt0", "()V", (void *)&JVM_Interrupt},
{"isInterrupted", "(Z)Z", (void *)&JVM_IsInterrupted},
{"holdsLock", "(" OBJ ")Z", (void *)&JVM_HoldsLock},
{"getThreads", "()[" THD, (void *)&JVM_GetAllThreads},
{"dumpThreads", "([" THD ")[[" STE, (void *)&JVM_DumpThreads},
{"setNativeName", "(" STR ")V", (void *)&JVM_SetNativeThreadName},
};

start0其实就是JVM_StartThread, 此时点开源代码 #include "jvm.h"
在jvm.h中声明了JVM_StartThread, 说明在jvm.cpp中实现了JVM_StartThread

jvm.h中声明如下

1
2
3
4
5
6
/*
* java.lang.Thread
*/
JNIEXPORT void JNICALL
JVM_StartThread(JNIEnv *env, jobject thread);

jvm.cpp实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
JVM_ENTRY(void, JVM_StartThread(JNIEnv* env, jobject jthread))
JVMWrapper("JVM_StartThread");
JavaThread *native_thread = NULL;

// We cannot hold the Threads_lock when we throw an exception,
// due to rank ordering issues. Example: we might need to grab the
// Heap_lock while we construct the exception.
bool throw_illegal_thread_state = false;

// We must release the Threads_lock before we can post a jvmti event
// in Thread::start.
{
// Ensure that the C++ Thread and OSThread structures aren't freed before
// we operate.
MutexLocker mu(Threads_lock);

// Since JDK 5 the java.lang.Thread threadStatus is used to prevent
// re-starting an already started thread, so we should usually find
// that the JavaThread is null. However for a JNI attached thread
// there is a small window between the Thread object being created
// (with its JavaThread set) and the update to its threadStatus, so we
// have to check for this
if (java_lang_Thread::thread(JNIHandles::resolve_non_null(jthread)) != NULL) {
throw_illegal_thread_state = true;
} else {
// We could also check the stillborn flag to see if this thread was already stopped, but
// for historical reasons we let the thread detect that itself when it starts running

jlong size =
java_lang_Thread::stackSize(JNIHandles::resolve_non_null(jthread));
// Allocate the C++ Thread structure and create the native thread. The
// stack size retrieved from java is signed, but the constructor takes
// size_t (an unsigned type), so avoid passing negative values which would
// result in really large stacks.
size_t sz = size > 0 ? (size_t) size : 0;
native_thread = new JavaThread(&thread_entry, sz);

// At this point it may be possible that no osthread was created for the
// JavaThread due to lack of memory. Check for this situation and throw
// an exception if necessary. Eventually we may want to change this so
// that we only grab the lock if the thread was created successfully -
// then we can also do this check and throw the exception in the
// JavaThread constructor.
if (native_thread->osthread() != NULL) {
// Note: the current thread is not being used within "prepare".
native_thread->prepare(jthread);
}
}
}

if (throw_illegal_thread_state) {
THROW(vmSymbols::java_lang_IllegalThreadStateException());
}

assert(native_thread != NULL, "Starting null thread?");

if (native_thread->osthread() == NULL) {
// No one should hold a reference to the 'native_thread'.
delete native_thread;
if (JvmtiExport::should_post_resource_exhausted()) {
JvmtiExport::post_resource_exhausted(
JVMTI_RESOURCE_EXHAUSTED_OOM_ERROR | JVMTI_RESOURCE_EXHAUSTED_THREADS,
"unable to create new native thread");
}
THROW_MSG(vmSymbols::java_lang_OutOfMemoryError(),
"unable to create new native thread");
}

Thread::start(native_thread);

JVM_END

实质上就是调用了个 Thread::start(native_thread);

Thread::start在thread.cpp中, 实现如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void Thread::start(Thread* thread) {
trace("start", thread);
// Start is different from resume in that its safety is guaranteed by context or
// being called from a Java method synchronized on the Thread object.
if (!DisableStartThread) {
if (thread->is_Java_thread()) {
// Initialize the thread state to RUNNABLE before starting this thread.
// Can not set it after the thread started because we do not know the
// exact thread state at that time. It could be in MONITOR_WAIT or
// in SLEEPING or some other state.
java_lang_Thread::set_thread_status(((JavaThread*)thread)->threadObj(),
java_lang_Thread::RUNNABLE);
}
os::start_thread(thread);
}
}

Thread::start(native_thread);的实现调用的是操作系统级别的os::start_thread(thread);

2.基础概念复习

(1).并发与并行

并发(12306卖票秒杀):

  • 是在同一实体上的多个事件
  • 是在一台处理器上“同时”处理多个任务
  • 同一时刻,其实是只有一个事件在发生

并行(泡方便面烧热水):

  • 是在不同实体上的多个事件
  • 是在多台处理器上同时处理多个任务
  • 同一时刻,大家真的都在做事情,你做你的,我做我的,但是我们都在做

区别:

(2).进程 线程 管程

进程是资源分配的最小单位,线程是程序执行的最小单位。

进程:

  • 简单的说,在系统中运行的一个应用程序就是一个进程,每一个进程都有它自己的内存空间和系统资源

线程:

  • 也被称为轻量级进程,在同一个进程内会有1个或多个线程是大多数操作系统进行时序调度的基本单元。

管程:

  • Monitor(监视器),也就是我们平时所说的锁
  • JVM第3版

Monitor其实是一种同步机制, 他的义务是保证(同一时间)只有一个线程可以访问被保护的数据和代码

3.用户线程和守护线程

(1).Java线程分为用户线程和守护线程

一般情况下不做特别说明配置,默认都是用户线程。

用户线程(User Thread)

  • 是系统的工作线程,它会完成这个程序需要完成的业务操作

守护线程(Daemon Thread)

  • 守护线程作为一个服务线程,没有服务对象就没有必要继续运行了,如果用户线程全部结束了,意味着程序需要完成的业务操作已经结束了系统可以退出了。所以假如当系统只剩下守护线程的时候,java虚拟机会自动退出。

守护线程(Daemon Thread)也被称之为后台线程或服务线程,守护线程是为用户线程服务的,当程序中的用户线程全部执行结束之后,守护线程也会跟随结束。 守护线程的角色就像“服务员”,而用户线程的角色就像“顾客”,当“顾客”全部走了之后(全部执行结束),那“服务员”(守护线程)也就没有了存在的意义,所以当一个程序中的全部用户线程都结束执行之后,那么无论守护线程是否还在工作都会随着用户线程一块结束,整个程序也会随之结束运行。

作者:磊哥聊编程
链接:https://www.zhihu.com/question/282487583/answer/2391589312

(2).线程daemon属性

源码解读:

Thread.java中的方法isDaemon()
判断当前线程是否是守护线程, true表示是守护线程, false表示是用户线程

1
2
3
4
5
6
7
8
9
10
/**
* Tests if this thread is a daemon thread.
*
* @return <code>true</code> if this thread is a daemon thread;
* <code>false</code> otherwise.
* @see #setDaemon(boolean)
*/
public final boolean isDaemon() {
return daemon;
}

true表示是守护线程

false表示是用户线程

(3).code演示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package icu.xiamu.juc.base;

import java.util.concurrent.TimeUnit;

public class DaemonDemo {
public static void main(String[] args) { // 一切方法运行的入口
Thread t1 = new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "\t 开始运行," + (Thread.currentThread().isDaemon() ? "守护线程" : "用户线程"));
while (true) {
}
}, "t1");

/*
// 默认t1.setDaemon(false);
// 运行结果:
t1 开始运行,用户线程
----------main线程运行完毕
*/
t1.setDaemon(true);
/*
// t1.setDaemon(true);
// 运行结果:
t1 开始运行,守护线程
----------main线程运行完毕
*/
t1.start();

// 3秒钟后主线程再运行
try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println("----------main线程运行完毕");
}
}

默认是false(用户线程), 所以这段程序并没有终止, 因为用户线程还没执行完, 死循环了
之后修改成true, 变成了守护线程, 此时就剩main一个用户线程, 当main线程执行完毕的时候, 此时用户线程没了, 就剩下一个守护线程, java虚拟机就会自动退出
用户线程都死了, 守护线程守护守护护护个鸡毛, 拍拍屁股下班走人

(4).小总结

  • 如果用户线程全部结束意味着程序需要完成的业务操作已经结束了,守护线程随着JVM一同结束工作
  • 用户线程都死了, 守护线程守护守护护护个鸡毛, 拍拍屁股下班走人
  • setDaemon(true)方法必须在start()之前设置,否则报IllegalThreadStateException异常

关于setDaemon()源码: 如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* Marks this thread as either a {@linkplain #isDaemon daemon} thread
* or a user thread. The Java Virtual Machine exits when the only
* threads running are all daemon threads.
*
* <p> This method must be invoked before the thread is started.
*
* @param on
* if {@code true}, marks this thread as a daemon thread
*
* @throws IllegalThreadStateException
* if this thread is {@linkplain #isAlive alive}
*
* @throws SecurityException
* if {@link #checkAccess} determines that the current
* thread cannot modify this thread
*/
public final void setDaemon(boolean on) {
checkAccess();
if (isAlive()) {
throw new IllegalThreadStateException();
}
daemon = on;
}

2.CompletableFuture

1.Future接口理论知识复习

Future接口(FutureTask实现类)定义了操作异步任务执行一些方法,如获取异步任务的执行结果、取消任务的执行、判断任务是否被取消、判断任务执行是否完毕等。

比如主线程让一个子线程去执行任务,子线程可能比较耗时,启动子线程开始执行任务后,
主线程就去做其他事情了,忙其它事情或者先执行完,过了一会才去获取子任务的执行结果或变更的任务状态。

举例上课买水案例给大家说明补充。。。。。。

查看Future接口内容

一句话: Future接口可以为主线程开一个分支任务,专门为主线程处理耗时和费力的复杂业务

2.Future接口常用实现类FutureTask异步任务

(1).Future接口能干什么

Future是Java5新加的一个接口,它提供了一种异步并行计算的功能。
如果主线程需要执行一个很耗时的计算任务,我们就可以通过future把这个任务放到异步线程中执行。
主线程继续处理其他任务或者先行结束,再通过Future获取计算结果。

代码说话:

Runnable接口
Callable接口
Future接口和FutureTask实现类

目的: 异步多线程任务执行且返回有结果,三个特点: 多线程/有返回/异步任务
(班长为老师去买水作为新启动的异步多线程任务且买到水有结果返回)

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package icu.xiamu.juc.cf;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

public class CompletableFutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<String> futureTask = new FutureTask<>(new MyThread2());
Thread t1 = new Thread(futureTask);
t1.start();

// get返回值
System.out.println(futureTask.get());
}
}

class MyThread implements Runnable {
@Override
public void run() {

}
}

class MyThread2 implements Callable<String> {
@Override
public String call() throws Exception {
System.out.println("--------come in");
return "hello Callable";
}
}

(2).Future编码实战和优缺点分析

优点:

  • future+线程池异步多线程任务配合,能显著提高程序的执行效率。

结合线程池提示性能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package icu.xiamu.juc.cf;

import java.util.concurrent.*;

public class FutureThreadPoolDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 3个任务, 目前只有一个线程main来处理, 请问耗时多少?
// 运行结果
/*
---constTime: 1128 毫秒
main ---end
*/
// m1();

// 3个任务, 目前开启多个异步任务线程来处理, 请问耗时多少?
// 运行结果
/*
---constTime: 387 毫秒
main ---end
*/
// m2();

// 3个任务, 目前开启多个异步任务线程来处理, 并且接收子线程返回值, 请问耗时多少?
// 运行结果
/*
task1 over
task2 over
---constTime: 901 毫秒
main ---end
*/
m3();
}

private static void m3() throws ExecutionException, InterruptedException {
ExecutorService threadPool = Executors.newFixedThreadPool(3);
long startTime = System.currentTimeMillis();

FutureTask<String> futureTask1 = new FutureTask<>(() -> {
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "task1 over";
});
threadPool.submit(futureTask1);

FutureTask<String > futureTask2 = new FutureTask<>(() -> {
try {
TimeUnit.MILLISECONDS.sleep(300);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "task2 over";
});
threadPool.submit(futureTask2);

System.out.println(futureTask1.get());
System.out.println(futureTask2.get());

try {
TimeUnit.MILLISECONDS.sleep(300);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}

long endTime = System.currentTimeMillis();
System.out.println("---constTime: " + (endTime - startTime) + " 毫秒");

System.out.println(Thread.currentThread().getName() + "\t ---end");
// 关闭资源
threadPool.shutdown();
}

private static void m2() {
ExecutorService threadPool = Executors.newFixedThreadPool(3);
long startTime = System.currentTimeMillis();

FutureTask<String> futureTask1 = new FutureTask<>(() -> {
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "task1 over";
});
threadPool.submit(futureTask1);

FutureTask<String > futureTask2 = new FutureTask<>(() -> {
try {
TimeUnit.MILLISECONDS.sleep(300);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "task2 over";
});
threadPool.submit(futureTask2);

try {
TimeUnit.MILLISECONDS.sleep(300);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}

long endTime = System.currentTimeMillis();
System.out.println("---constTime: " + (endTime - startTime) + " 毫秒");

System.out.println(Thread.currentThread().getName() + "\t ---end");
// 关闭资源
threadPool.shutdown();
}

private static void m1() {
long startTime = System.currentTimeMillis();
// 暂停毫秒
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}

try {
TimeUnit.MILLISECONDS.sleep(300);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}

try {
TimeUnit.MILLISECONDS.sleep(300);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}

long endTime = System.currentTimeMillis();
System.out.println("---constTime: " + (endTime - startTime) + " 毫秒");

System.out.println(Thread.currentThread().getName() + "\t ---end");
}
}

缺点:

  • get阻塞(): 一旦调用get()方法求结果,如果计算没有完成容易导致程序阻塞。
  • isDone()轮询:
    • 轮询的方式会耗费无谓的CPU资源,而且也不见得能及时地得到计算结果
    • 如果想要异步获取结果,通常都会以轮询的方式去获取结果尽量不要阻塞

get阻塞情况

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package icu.xiamu.juc.cf;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class FutureAPIDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
FutureTask<String> futureTask = new FutureTask<String>(() -> {
System.out.println(Thread.currentThread().getName() + "\t come in");
TimeUnit.SECONDS.sleep(5);
return "task over";
});

Thread t1 = new Thread(futureTask);
t1.start();

// System.out.println(futureTask.get()); // 2.阻塞main线程
// System.out.println(futureTask.get(3, TimeUnit.SECONDS)); // 3.设置阻塞超时时间
System.out.println(Thread.currentThread().getName() + "\t 忙其他任务了");
// System.out.println(futureTask.get()); // 1.不阻塞main线程

// 4.通过轮询方式获取结果
while (true) {
if (futureTask.isDone()) {
System.out.println(futureTask.get());
break;
} else {
TimeUnit.MILLISECONDS.sleep(500);
System.out.println("正在处理中, 不要催了, 越催越慢, 再催熄火");
}
}
}
}


/*
// 运行结果
// 2.阻塞main线程
Thread-0 come in
task over
main 忙其他任务了

// 3.设置阻塞超时时间
Thread-0 come in
Exception in thread "main" java.util.concurrent.TimeoutException
at java.util.concurrent.FutureTask.get(FutureTask.java:205)
at icu.xiamu.juc.cf.FutureAPIDemo.main(FutureAPIDemo.java:20)

// 1.不阻塞main线程
main 忙其他任务了
Thread-0 come in
task over

总结: System.out.println(futureTask.get());
1. get 容易导致阻寒,一般建议放在程序后面,一旦调用不见不做,非要等到结果才会离开,不管你是否计算完成,容易程序堵寒
2. 假如我不愿意等待很长时间,我希望过时不候,可以自动离开.

// 4.通过轮询方式获取结果
main 忙其他任务了
Thread-0 come in
正在处理中, 不要催了, 越催越慢, 再催熄火
正在处理中, 不要催了, 越催越慢, 再催熄火
正在处理中, 不要催了, 越催越慢, 再催熄火
正在处理中, 不要催了, 越催越慢, 再催熄火
正在处理中, 不要催了, 越催越慢, 再催熄火
正在处理中, 不要催了, 越催越慢, 再催熄火
正在处理中, 不要催了, 越催越慢, 再催熄火
正在处理中, 不要催了, 越催越慢, 再催熄火
正在处理中, 不要催了, 越催越慢, 再催熄火
正在处理中, 不要催了, 越催越慢, 再催熄火
task over
*/

结论:

  • Future对于结果的获取不是很友好,只能通过阻塞或轮询的方式得到任务的结果。

(3).想完成一些复杂的任务

想完成一些复杂的任务:

  • 对于简单的业务场景使用Future完全OK
  • 回调通知
    • 应对Future的完成时间,完成了可以告诉我,也就是我们的回调通知
    • 通过轮询的方式去判断任务是否完成这样非常占CPU并且代码也不优雅
  • 创建异步任务
    • Future+线程池配合
  • 多个任务前后依赖可以组合处理(水煮鱼)
    • 想将多个异步任务的计算结果组合起来,后一个异步任务的计算结果需要前一个异步任务的值
    • 将两个或多个异步计算合成一个异步计算,这几个异步计算互相独立,同时后面这个又依赖前一个处理的结果
  • 对计算速度选最快
    • 当Future集合中某个任务最快结束时,返回结果,返回第一名处理结果。
  • etc
    • 使用Future之前提供的那点API就囊中羞涩,处理起来不够优雅,这时候还是让CompletableFuture以声明式的方式优雅的处理这些需求
    • 从i到i++,o(n_n)o哈哈~
    • Future能干的,CompletableFuture都能干

3.CompletableFuture对Future的改进

(1).CompletableFuture为什么出现

(2).CompletableFuture和CompletionStage源码分别介绍

类架构说明:

CompletionStage接口:

类CompletableFuture:

(3).核心的四个静态方法, 来创建一个异步任务

CompletableFuture是一个类, 并且有构造方法, 但是description中描述道者是一个incomplete不完美的CompletableFuture

通常创建CompletableFuture的方式采用如下几种:

runAsync 无返回值

1
2
public static CompletableFuture<Void> runAsync(Runnable runnable) { ... }
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) { ... }

supplyAsync 有返回值

1
2
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) { ... }
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) { ... }

上述Executor executor参数说明:

  • 没有指定Executor的方法, 直接使用默认的ForkJoinPool.commonPool()作为它的线程池执行异步代码
  • 如果指定线程池, 则使用我们自定义的或者特别指定的线程池执行异步代码

代码演示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package icu.xiamu.juc.cf;

import java.util.concurrent.*;

/**
* @author 肉豆蔻吖
* @date 2024/4/13
*/
public class CompletableFutureBuildDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// runAsync()使用默认的线程池
// demo1();

// runAsync()使用自己定义的线程池
// demo2();

// supplyAsync()使用默认线程池
// demo3();

// supplyAsync()使用自定义线程池
demo4();
}

private static void demo4() throws ExecutionException, InterruptedException {
ExecutorService threadPool = Executors.newFixedThreadPool(3);
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
// pool-1-thread-1
System.out.println(Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "Hello SupplyAsync";
}, threadPool);
// Hello SupplyAsync
System.out.println(completableFuture.get());
}

private static void demo3() throws ExecutionException, InterruptedException {
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
// ForkJoinPool.commonPool-worker-1
System.out.println(Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}

return "Hello SupplyAsync";
});

// Hello SupplyAsync
System.out.println(completableFuture.get());
}

private static void demo2() throws InterruptedException, ExecutionException {
ExecutorService threadPool = Executors.newFixedThreadPool(3);
CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
// pool-1-thread-1
System.out.println(Thread.currentThread().getName());

try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}, threadPool);

// null
System.out.println(completableFuture.get());

// 养成关闭线程池的好习惯
threadPool.shutdown();
}

private static void demo1() throws InterruptedException, ExecutionException {
CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
// ForkJoinPool.commonPool-worker-1
System.out.println(Thread.currentThread().getName());

try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});

// null
System.out.println(completableFuture.get());
}
}

从Java8开始引入了CompletableFuture, 它是Future的功能增强版, 减少阻塞和轮询
可以传入回调函数, 当异步任务完成或者发生异常时, 自动调用回调对象的回调方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package icu.xiamu.juc.cf;

import java.util.concurrent.*;

/**
* @author 肉豆蔻吖
* @date 2024/4/13
*/
public class CompletableFutureUseDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// demo1();
// 运行结果:
/*
main线程先去忙其它任务
ForkJoinPool.commonPool-worker-1come in
一秒后出结果: 1
1
*/

// whenComplete + 默认线程池
// demo2();
// 运行结果:
/*
ForkJoinPool.commonPool-worker-1 come in
main线程先去忙其它任务
一秒后出结果: 3
计算完成, 更新系统UpdateValue3
*/

// whenComplete + 自定义的线程池
demo3();

// 运行结果
/*
pool-1-thread-1 come in
main线程先去忙其它任务
一秒后出结果: 5
计算完成, 更新系统UpdateValue5
*/
// 演示出错
/*
pool-1-thread-1 come in
main线程先去忙其它任务
一秒后出结果: 4
异常情况: java.lang.ArithmeticException: / by zero java.lang.ArithmeticException: / by zero
java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ArithmeticException: / by zero
at icu.xiamu.juc.cf.CompletableFutureUseDemo.lambda$demo3$0(CompletableFutureUseDemo.java:48)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
... 3 more
*/
}

private static void demo3() {
ExecutorService threadPool = Executors.newFixedThreadPool(3);
try {
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + " come in");
int result = ThreadLocalRandom.current().nextInt(10);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("一秒后出结果: " + result);

// 演示出错场景
// int i = 10/0;
return result;
}, threadPool).whenComplete((v, e) -> {
if (e == null) {
System.out.println("计算完成, 更新系统UpdateValue" + v);
}
}).exceptionally(e -> {
e.printStackTrace();
System.out.println("异常情况: " + e.getCause() + "\t" + e.getMessage());
return null;
});
System.out.println(Thread.currentThread().getName() + "线程先去忙其它任务");
} catch (Exception e) {
e.printStackTrace();
} finally {
// 养成关闭线程池的好习惯
threadPool.shutdown();
}

// 解释下为什么默认线程池关闭, 自定义线程池记得关闭
// 主线程不要立刻结束, 否则 CompletableFuture 默认使用线程池会立刻关闭, 暂停3秒钟线程
// 使用了自定义的线程池, 所以可以不用睡眠了
}

private static void demo2() {

CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + " come in");
int result = ThreadLocalRandom.current().nextInt(10);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("一秒后出结果: " + result);
return result;
}).whenComplete((v, e) -> {
if (e == null) {
System.out.println("计算完成, 更新系统UpdateValue" + v);
}
}).exceptionally(e -> {
e.printStackTrace();
System.out.println("异常情况: " + e.getCause() + "\t" + e.getMessage());
return null;
});
System.out.println(Thread.currentThread().getName() + "线程先去忙其它任务");

// 解释下为什么默认线程池关闭, 自定义线程池记得关闭
// 主线程不要立刻结束, 否则 CompletableFuture 默认使用线程池会立刻关闭, 暂停3秒钟线程
// try {
// TimeUnit.SECONDS.sleep(3);
// } catch (InterruptedException e) {
// throw new RuntimeException(e);
// }
}

private static void demo1() throws InterruptedException, ExecutionException {
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + " come in");
int result = ThreadLocalRandom.current().nextInt(10);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("一秒后出结果: " + result);
return result;
});

System.out.println(Thread.currentThread().getName() + "线程先去忙其它任务");
System.out.println(completableFuture.get());
}
}

CompletableFuture的优点:

  • 异步任务结束时, 会自动回调某个对象的方法
  • 主线程设置好回调后, 不再关心异步任务的执行, 异步任务之间可以顺序执行
  • 异步任务出错时, 会自动回调某个对象的方法

4.案例精讲-从电商网站的比价需求说开去

(1).函数式编程已经主流

大厂面试题看看

Lambda表达式+Stream流式调用+Chain链式调用+Java8函数式编程

Lambda的几个参数类型

  • Runnable
  • Function
  • Consumer (BiConsumer)
  • Supplier

Runnable

Function

Consumer 是消费型函数接口

BiConsumer

Supplier

总结:

idea插件推荐: GenerateAllSetter

自动生成set方法, 但是这样展示的set还是非常的多, 并且占用了很多行
于是链式调用就出现了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package icu.xiamu.juc.cf;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.Accessors;

/**
* @author 肉豆蔻吖
* @date 2024/4/13
*/
public class CompletableFutureMallDemo {
public static void main(String[] args) {
Student student = new Student();

student.setId(1);
student.setStudentName("黄磊");
student.setMajor("软件工程");

System.out.println(student);

Student student2 = student.setId(2)
.setStudentName("黄磊")
.setMajor("泌尿科");
System.out.println(student2);

// 运行结果:
/*
Student(id=1, studentName=黄磊, major=软件工程)
Student(id=2, studentName=黄磊, major=泌尿科)
*/
}
}

@AllArgsConstructor
@NoArgsConstructor
@Data
@Accessors(chain = true)
class Student {
private Integer id;
private String studentName;
private String major;

// 链式调用本质基本上就是返回值返回了当前函数对象本身
/*
public void setId(Integer id) {
this.id = id;
}

public Student setId(Integer id) {
this.id = id;
return this;
}
*/
}

(2).先说说join和get对比

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public static void main(String[] args) throws ExecutionException, InterruptedException {
// chain链式调用
// chain();

CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
return "hello 1234";
});
System.out.println(completableFuture.get());
}

public static void main(String[] args) {
// chain链式调用
// chain();

CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
return "hello 1234";
});
// System.out.println(completableFuture.get());
System.out.println(completableFuture.join());
}

一个要抛出异常, 另外一个不需要抛出异常, 都是获取结果的

(3).说说你过去工作中的项目亮点

看下节

(4).大厂业务需求说明

切记, 功能 => 性能
电商网站比价需求分析

(5).一波流Java8函数式编程带走-比价案例实战Case

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
package icu.xiamu.juc.cf;

import lombok.Getter;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
* @author 肉豆蔻吖
* @date 2024/4/13
*/
public class CompletableFutureMallDemo {

static List<NetMall> list = Arrays.asList(
new NetMall("jd"),
new NetMall("taobao"),
new NetMall("dangdang"),
new NetMall("amazon"),
new NetMall("pdd"),
new NetMall("tmall")
);

/**
* step by step 一家家搜查
* @param list
* @param productName
* @return
*/
public static List<String> getPrice(List<NetMall> list, String productName) {
return list.stream().map(netMall -> String.format(productName + "in %s price is %.2f",
netMall.getNetMallName(),
netMall.calcPrice(productName))
).collect(Collectors.toList());
}

public static List<String> getPriceByCompletableFuture(List<NetMall> list, String productName) {
return list.stream().map(netMall -> CompletableFuture.supplyAsync(() -> {
return String.format(productName + "in %s price is %.2f", netMall.getNetMallName(), netMall.calcPrice(productName));
})).collect(Collectors.toList())
.stream().map(data -> data.join())
.collect(Collectors.toList());
}

public static void main(String[] args) {
long startTime = System.currentTimeMillis();
List<String> list1 = getPrice(list, "mysql");
for (String element : list1) {
System.out.println(element);
}
long endTime = System.currentTimeMillis();
System.out.println("----costTime: " + (endTime - startTime) + "毫秒");

// 运行结果
/*
mysqlin jd price is 110.68
mysqlin taobao price is 110.77
mysqlin dangdang price is 109.00
mysqlin amazon price is 109.02
mysqlin pdd price is 109.35
mysqlin tmall price is 110.51
----costTime: 6154毫秒
*/

long startTime2 = System.currentTimeMillis();
List<String> list2 = getPriceByCompletableFuture(list, "mysql");
for (String element : list2) {
System.out.println(element);
}
long endTime2 = System.currentTimeMillis();
System.out.println("----costTime: " + (endTime2 - startTime2) + "毫秒");

// 运行结果
/*
mysqlin jd price is 109.49
mysqlin taobao price is 109.91
mysqlin dangdang price is 109.19
mysqlin amazon price is 109.23
mysqlin pdd price is 109.79
mysqlin tmall price is 110.17
----costTime: 1015毫秒
*/
}
}

class NetMall {
@Getter
private String netMallName;

public NetMall(String netMallName) {
this.netMallName = netMallName;
}

public double calcPrice(String productName) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}

return ThreadLocalRandom.current().nextDouble() * 2 + productName.charAt(0);
}
}

5.CompletableFuture常用方法

(1).获取结果和触发计算

获取结果

1
2
3
4
public T get() throws InterruptedException, ExecutionException { ... } // 不见不散
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException { ... } // 过时不候
public T join() { ... }
public T getNow(T valueIfAbsent) { ... }

getNow 没有计算完成的情况下, 给我一个替代结果
立即获取结果不阻塞

  • 计算完, 返回计算完成后的结果
  • 没算完, 返回设定的valueIfAbsent值

主动触发计算

1
public boolean complete(T value) { ... } // 是否打断get方法立即返回括号值

代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package icu.xiamu.juc.cf;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
* @author 肉豆蔻吖
* @date 2024/4/13
*/
public class CompletableFutureAPIDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "abc";
});

// abc
// System.out.println(completableFuture.get());

// 未超时: abc
// 超时: java.util.concurrent.TimeoutException
// System.out.println(completableFuture.get(2L, TimeUnit.SECONDS));

// abc
// System.out.println(completableFuture.join());

// 任务没有完成直接返回 : "黄磊不帅"
// 任务完成之后返回: "abc"
// try {
// TimeUnit.SECONDS.sleep(2);
// } catch (InterruptedException e) {
// throw new RuntimeException(e);
// }
// System.out.println(completableFuture.getNow("黄磊不帅"));

// 是否被打断,
// 已经算出结果了, 没有打断 => false abc
// 还没有结果, 打断了 , 直接要结果 => true completeValue
TimeUnit.SECONDS.sleep(1);
System.out.println(completableFuture.complete("completeValue") + "\t" + completableFuture.join());
}
}

(2).对计算结果进行处理

thenApply
计算结果存在依赖关系, 这两个线程串行化
异常相关: 由于存在依赖关系(当前步错, 不走下一步), 当前步骤有异常的话就叫停

handle
计算结果存在依赖关系, 这两个线程串行化
异常相关: 有异常也可以往下一步走, 根据带的异常参数可以进一步处理

总结:

代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package icu.xiamu.juc.cf;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
* @author 肉豆蔻吖
* @date 2024/4/13
*/
public class CompletableFutureAPI2Demo {
public static void main(String[] args) {

thenApply();
// 运行结果
/*
main主线程先去忙其它任务
111
222
333
计算结果6

main主线程先去忙其它任务
111
java.lang.ArithmeticException: / by zero
java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
*/

// handle();
// 运行结果
/*
main主线程先去忙其它任务
111
222
333
计算结果6

main主线程先去忙其它任务
111
333
java.lang.NullPointerException
java.util.concurrent.CompletionException: java.lang.NullPointerException
*/
}

private static void handle() {
ExecutorService threadPool = Executors.newFixedThreadPool(3);
CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("111");
return 1;
}, threadPool).handle((f, e) -> {
// int i = 1 / 0;
System.out.println("222");
return f + 2;
}).handle((f, e) -> {
System.out.println("333");
return f + 3;
}).whenComplete((v, e) -> {
if (e == null) {
System.out.println("计算结果" + v);
}
}).exceptionally(e -> {
e.printStackTrace();
System.out.println(e.getMessage());
return null;
});
System.out.println(Thread.currentThread().getName() + "主线程先去忙其它任务");
threadPool.shutdown();
}

private static void thenApply() {
ExecutorService threadPool = Executors.newFixedThreadPool(3);
CompletableFuture.supplyAsync(() -> {
// 暂停几秒钟线程
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("111");
return 1;
}, threadPool).thenApply(f -> {
int i = 1 / 0;
System.out.println("222");
return f + 2;
}).thenApply(f -> {
System.out.println("333");
return f + 3;
}).whenComplete((v, e) -> {
if (e == null) {
System.out.println("计算结果" + v);
}
}).exceptionally(e -> {
e.printStackTrace();
System.out.println(e.getMessage());
return null;
});

System.out.println(Thread.currentThread().getName() + "主线程先去忙其它任务");

// 养成关闭线程池的好习惯
threadPool.shutdown();
}
}

(3).对计算结果进行消费

接收任务的处理结果, 并消费处理, 无返回结果
thenAccept

消费型

1
public CompletableFuture<Void> thenAccept(Consumer<? super T> action) { ... }

对比补充 Code之任务之间的顺序执行

  • thenRun
  • thenAccept
  • thenApply
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 任务A执行完执行B, 并且B不需要A的结果
public CompletableFuture<Void> thenRun(Runnable action) {
return uniRunStage(null, action);
}

// 任务A执行完执行B, B需要A的结果, 但是任务B无返回值
public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
return uniAcceptStage(null, action);
}

// 任务A执行完执行B, B需要A的结果, 同时任务B有返回值
public <U> CompletableFuture<U> thenApply(
Function<? super T,? extends U> fn) {
return uniApplyStage(null, fn);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package icu.xiamu.juc.cf;

import java.util.concurrent.CompletableFuture;

/**
* @author 肉豆蔻吖
* @date 2024/4/13
*/
public class CompletableFutureAPI3Demo {
public static void main(String[] args) {
// demo1();

// null
System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenRun(() -> {}).join());

// resultA
// null
System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenAccept(r -> System.out.println(r)).join());

// resultAresultB
System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenApply(r -> r + "resultB").join());
}

private static void demo1() {
CompletableFuture.supplyAsync(() -> {
return 1;
}).thenApply(f -> {
return f + 2;
}).thenApply(f -> {
return f + 3;
}).thenAccept(r -> {
System.out.println(r);
});
}
}

CompletableFuture和线程池说明

以thenRun和thenRunAsync为例, 有什么区别?

代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package icu.xiamu.juc.cf;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
* @author 肉豆蔻吖
* @date 2024/4/13
*/
public class CompletableFutureWithThreadPoolDemo {
public static void main(String[] args) {
ExecutorService threadPool = Executors.newFixedThreadPool(5);

try {
CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.MILLISECONDS.sleep(20);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("1号任务\t" + Thread.currentThread().getName());
return "abcd";
}, threadPool).thenRunAsync(() -> {
try {
TimeUnit.MILLISECONDS.sleep(20);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("2号任务\t" + Thread.currentThread().getName());
}).thenRun(() -> {
try {
TimeUnit.MILLISECONDS.sleep(20);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("3号任务\t" + Thread.currentThread().getName());
}).thenRun(() -> {
try {
TimeUnit.MILLISECONDS.sleep(20);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("4号任务\t" + Thread.currentThread().getName());
});

System.out.println(completableFuture.get(2L, TimeUnit.SECONDS));
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
threadPool.shutdown();
}
}
}

小总结:

源码分析:

(4).对计算速度进行选用

谁快用谁
applyToEither

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package icu.xiamu.juc.cf;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

/**
* @author 肉豆蔻吖
* @date 2024/4/13
*/
public class CompletableFutureFastDemo {
public static void main(String[] args) {
CompletableFuture<String> playA = CompletableFuture.supplyAsync(() -> {
System.out.println("A come in");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "playA";
});

CompletableFuture<String> playB = CompletableFuture.supplyAsync(() -> {
System.out.println("B come in");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "playB";
});

CompletableFuture<String> result = playA.applyToEither(playB, f -> {
return f + " is winner";
});

System.out.println(Thread.currentThread().getName() + "\t" + result.join());
// A come in
// B come in
// main playA is winner
}
}

(5).对计算结果进行合并

两个CompletionStage任务完成后, 最终能把两个任务的结果一起交给thenCombine来处理
先完成的先等着, 等待其它分支任务

thenCombine

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package icu.xiamu.juc.cf;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

/**
* @author 肉豆蔻吖
* @date 2024/4/13
*/
public class CompletableFutureCombineDemo {
public static void main(String[] args) {
// 标准版
// demo1();

// 链式表达式
demo2();
}

private static void demo2() {
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "启动");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return 10;
}).thenCombine(CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "启动");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return 20;
}), (x, y) -> {
System.out.println("开始两个结果合并");
return x + y;
});

System.out.println(completableFuture.join());
// ForkJoinPool.commonPool-worker-1启动
// ForkJoinPool.commonPool-worker-2启动
// 开始两个结果合并
// 30
}

private static void demo1() {
CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "启动");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return 10;
});

CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "启动");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return 20;
});

CompletableFuture<Integer> result = completableFuture1.thenCombine(completableFuture2, (x, y) -> {
System.out.println("开始两个结果合并");
return x + y;
});

System.out.println(result.join());

// ForkJoinPool.commonPool-worker-1启动
// ForkJoinPool.commonPool-worker-2启动
// 开始两个结果合并
// 30
}
}

3.说说Java”锁”事

1.大厂面试题复盘

2.从轻松的乐观锁和悲观锁开讲

1.悲观锁
认为自己在使用数据的时候一定有别的线程来修改数据,因此在获取数据的时候会先加锁,确保数据不会被别的线程修改。
synchronized关键字和Lock的实现类都是悲观锁

适合写操作多的场景,先加锁可以保证写操作时数据正确
显式的锁定之后再操作同步资源
一句话: 狼性锁

2.乐观锁

适合读操作多的场景,不加锁的特点能够使其读操作的性能大幅提升。
乐观锁则直接去操作同步资源,是一种无锁算法,得之我幸不得我命,再努力就是
一句话: 佛系锁
乐观锁一般有两种实现方式:

  • 采用Version版本号机制
  • CAS(Compare-and-Swap, 即比较并交换)算法实现

伪代码说明:

3.通过8种情况演示锁运行案例, 看看我们到底锁的是什么

阿里巴巴开发手册

(1).锁相关的8种案例演示code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package icu.xiamu.juc.locks;

import java.util.concurrent.TimeUnit;

/**
* @author 肉豆蔻吖
* @date 2024/4/13
*
* 题目: 谈谈你对多线程锁的理解, 8种锁案例说明
* 口诀: 线程操作资源类
* 8锁案例说明:
* 1. 标准访问ab两个线程,请问先打印邮件还是短信? --------先邮件,后短信 共用一个对象锁
* 2. sendEmail钟加入暂停3秒钟,请问先打印邮件还是短信?---------先邮件,后短信 共用一个对象锁
* 3. 添加一个普通的hello方法,请问先打印普通方法还是邮件? --------先hello,再邮件
* 4. 有两部手机,请问先打印邮件还是短信? ----先短信后邮件 资源没有争抢,不是同一个对象锁 * 5. 有两个静态同步方法,一步手机, 请问先打印邮件还是短信?---------先邮件后短信 共用一个类锁
* 6. 有两个静态同步方法,两部手机, 请问先打印邮件还是短信? ----------先邮件后短信 共用一个类锁
* 7. 有一个静态同步方法 一个普通同步方法,请问先打印邮件还是短信? ---------先短信后邮件 一个用类锁一个用对象锁
* 8. 有一个静态同步方法,一个普通同步方法,两部手机,请问先打印邮件还是短信? -------先短信后邮件 一个类锁一个对象锁
*/
public class Lock8Demo {
public static void main(String[] args) {
Phone phone = new Phone();
Phone phone2 = new Phone();

new Thread(() -> {
phone.sendEmail();
}, "a").start();

new Thread(() -> {
phone.sendSMS();
}, "b").start();
}
}

class Phone { // 资源类 (线程操作资源类)
public static synchronized void sendEmail() {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("sendEmail");
}
public static synchronized void sendSMS() {
System.out.println("sendSMS");
}
public void hello() {
System.out.println("hello");
}
}

/**
*
* ============================================
* 1-2
* * 一个对象里面如果有多个synchronized方法,某一个时刻内,只要一个线程去调用其中的一个synchronized方法了,
* * 其它的线程都只能等待 换句话说,某一个时刻内,只能有唯一的一个线程去访问这些synchronized方法
* * 锁的是当前对象this,被锁定后,其它的线程都不能进入到当前对象的其它的synchronized方法
【synchronized锁的是当前对象this,即phone1】
* 3-4
* * 加个普通方法后发现和同步锁无关
* * 锁的不一样对象(2部手机),因为邮件要等2s,所以先短信
*
* 5-6 都换成静态同步方法后,情况又变化
* 三种 synchronized 锁的内容有一些差别:
* 对于普通同步方法,锁的是当前实例对象,通常指this,具体的一部部手机,所有的普通同步方法用的都是同一把锁——实例对象本身,
* 对于【静态同步方法(static synchronized),锁的是当前类的Class对象】,如Phone.class唯一的一个模板
* 对于同步方法块,锁的是 synchronized 括号内的对象
* synchronized修饰静态方法锁的是整个类,即class,而不是对象

* 7-8
* 当一个线程试图访问同步代码时它首先必须得到锁,退出或抛出异常时必须释放锁。
* *
* * 所有的普通同步方法用的都是同一把锁——实例对象本身,就是new出来的具体实例对象本身,本类this
* * 也就是说如果一个实例对象的普通同步方法获取锁后,该实例对象的其他普通同步方法必须等待获取锁的方法释放锁后才能获取锁。
* *
* * 所有的静态同步方法用的也是同一把锁——类对象本身,就是我们说过的唯一模板Class
* * 具体实例对象this和唯一模板Class,这两把锁是两个不同的对象,所以静态同步方法与普通同步方法之间是不会有竞态条件的
* * 但是一旦一个静态同步方法获取锁后,其他的静态同步方法都必须等待该方法释放锁后才能获取锁。
**/

总结:

  • 对于普通同步方法,锁的是当前实例对象,通常指this,所有的同步方法用的都是同一把锁—>实例对象本身
  • 对于静态同步方法,锁的时当前类的Class对象
  • 对于同步方法块,锁的时synchronized括号内的对象

synchronized function() {} 对象锁
static synchronized function() {} 类锁
类锁的锁的粒度比对象锁更大

(2).synchronized有三种应用方式

  • 作用于实例方法,当前实例加锁,进入同步代码块前要获得当前实例的锁;
  • 作用于代码块,对括号里配置的对象加锁
  • 作用于静态方法,当前类加锁,进去同步代码前要获得当前类对象的锁

(3).从字节码角度分析synchronized实现

javap -c ***.class 文件反编译
-c 表示对代码进行反汇编
假如需要更多信息, javap -v ***.class 文件反编译
-v -verbose 输出附加信息(包括行号, 本地变量表, 反汇编等详细信息)

synchronized同步代码块
javap -c ***.class 文件反编译

synchronized同步代码块, 一定是一个enter对应两个exit吗?

synchronized普通同步方法
synchronized静态同步方法

(4).反编译synchronized锁的是什么

(5).对于Synchronized关键字

后续章节讲解

4.公平锁和非公平锁

5.可重入锁(又名递归锁)

6.死锁及排查

7.写锁(独占锁)/读锁(共享锁)

8.自旋锁SpinLock

9.无锁 -> 独占锁 -> 读写锁 -> 邮戳锁

10.无锁 -> 偏向锁 -> 轻量锁 -> 重量锁

4.LockSupport与线程中断

5.Java内存模型之JMM

6.volatile与JMM

7.CAS

8…

在线笔记:

智造建筑
珠海金智维
和众汇富(二面挂)
昌硕科技
三泰信息
海中信科技


JUC并发编程
https://xiamu.icu/Java/JUC并发编程/
作者
肉豆蔻吖
发布于
2024年2月14日
许可协议