JUC并发编程 1.线程基础知识复习 JUC其实就是java.util.concurrent并发包的缩写
1.start线程开启C源码分析 1 2 3 4 5 6 new Thread (() -> {}).start(); start0();private native void start0 () ;
native源码的下载地址:https://hg.openjdk.org/jdk8
https://hg.openjdk.org/jdk8/jdk8/hotspot/archive/tip.zip https://hg.openjdk.org/jdk8/jdk8/jdk/archive/tip.zip
openjdk的写JNI一般是一一对应的, thread.java其实对应的就是thread.c的源码
其中thread.c的部分源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 static JNINativeMethod methods[] = { {"start0" , "()V" , (void *)&JVM_StartThread}, {"stop0" , "(" OBJ ")V" , (void *)&JVM_StopThread}, {"isAlive" , "()Z" , (void *)&JVM_IsThreadAlive}, {"suspend0" , "()V" , (void *)&JVM_SuspendThread}, {"resume0" , "()V" , (void *)&JVM_ResumeThread}, {"setPriority0" , "(I)V" , (void *)&JVM_SetThreadPriority}, {"yield" , "()V" , (void *)&JVM_Yield}, {"sleep" , "(J)V" , (void *)&JVM_Sleep}, {"currentThread" , "()" THD, (void *)&JVM_CurrentThread}, {"countStackFrames" , "()I" , (void *)&JVM_CountStackFrames}, {"interrupt0" , "()V" , (void *)&JVM_Interrupt}, {"isInterrupted" , "(Z)Z" , (void *)&JVM_IsInterrupted}, {"holdsLock" , "(" OBJ ")Z" , (void *)&JVM_HoldsLock}, {"getThreads" , "()[" THD, (void *)&JVM_GetAllThreads}, {"dumpThreads" , "([" THD ")[[" STE, (void *)&JVM_DumpThreads}, {"setNativeName" , "(" STR ")V" , (void *)&JVM_SetNativeThreadName}, };
start0其实就是JVM_StartThread
, 此时点开源代码 #include "jvm.h"
在jvm.h中声明了JVM_StartThread
, 说明在jvm.cpp中实现了JVM_StartThread
jvm.h中声明如下
1 2 3 4 5 6 JNIEXPORT void JNICALLJVM_StartThread (JNIEnv *env, jobject thread) ;
jvm.cpp实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 JVM_ENTRY(void , JVM_StartThread(JNIEnv* env, jobject jthread)) JVMWrapper("JVM_StartThread" ); JavaThread *native_thread = NULL ; bool throw_illegal_thread_state = false ; { MutexLocker mu (Threads_lock) ; if (java_lang_Thread::thread(JNIHandles::resolve_non_null(jthread)) != NULL ) { throw_illegal_thread_state = true ; } else { jlong size = java_lang_Thread::stackSize(JNIHandles::resolve_non_null(jthread)); size_t sz = size > 0 ? (size_t ) size : 0 ; native_thread = new JavaThread(&thread_entry, sz); if (native_thread->osthread() != NULL ) { native_thread->prepare(jthread); } } } if (throw_illegal_thread_state) { THROW(vmSymbols::java_lang_IllegalThreadStateException()); } assert(native_thread != NULL , "Starting null thread?" ); if (native_thread->osthread() == NULL ) { delete native_thread; if (JvmtiExport::should_post_resource_exhausted()) { JvmtiExport::post_resource_exhausted( JVMTI_RESOURCE_EXHAUSTED_OOM_ERROR | JVMTI_RESOURCE_EXHAUSTED_THREADS, "unable to create new native thread" ); } THROW_MSG(vmSymbols::java_lang_OutOfMemoryError(), "unable to create new native thread" ); } Thread::start(native_thread); JVM_END
实质上就是调用了个 Thread::start(native_thread);
Thread::start在thread.cpp中, 实现如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 void Thread::start (Thread* thread) { trace("start" , thread); if (!DisableStartThread) { if (thread->is_Java_thread()) { java_lang_Thread::set_thread_status(((JavaThread*)thread)->threadObj(), java_lang_Thread::RUNNABLE); } os::start_thread(thread); } }
Thread::start(native_thread);的实现调用的是操作系统级别的os::start_thread(thread);
2.基础概念复习 (1).并发与并行 并发(12306卖票秒杀):
是在同一实体上的多个事件
是在一台处理器上“同时”处理多个任务
同一时刻,其实是只有一个事件在发生
并行(泡方便面烧热水):
是在不同实体上的多个事件
是在多台处理器上同时处理多个任务
同一时刻,大家真的都在做事情,你做你的,我做我的,但是我们都在做
区别:
(2).进程 线程 管程
进程是资源分配的最小单位,线程是程序执行的最小单位。
进程:
简单的说,在系统中运行的一个应用程序就是一个进程,每一个进程都有它自己的内存空间和系统资源
线程:
也被称为轻量级进程,在同一个进程内会有1个或多个线程是大多数操作系统进行时序调度的基本单元。
管程:
Monitor(监视器),也就是我们平时所说的锁
JVM第3版
Monitor其实是一种同步机制, 他的义务是保证(同一时间)只有一个线程可以访问被保护的数据和代码
3.用户线程和守护线程 (1).Java线程分为用户线程和守护线程 一般情况下不做特别说明配置,默认都是用户线程。
用户线程(User Thread)
是系统的工作线程,它会完成这个程序需要完成的业务操作
守护线程(Daemon Thread)
守护线程作为一个服务线程,没有服务对象就没有必要继续运行了,如果用户线程全部结束了,意味着程序需要完成的业务操作已经结束了系统可以退出了。所以假如当系统只剩下守护线程的时候,java虚拟机会自动退出。
守护线程(Daemon Thread)也被称之为后台线程或服务线程,守护线程是为用户线程服务的,当程序中的用户线程全部执行结束之后,守护线程也会跟随结束。 守护线程的角色就像“服务员”,而用户线程的角色就像“顾客”,当“顾客”全部走了之后(全部执行结束),那“服务员”(守护线程)也就没有了存在的意义,所以当一个程序中的全部用户线程都结束执行之后,那么无论守护线程是否还在工作都会随着用户线程一块结束,整个程序也会随之结束运行。
作者:磊哥聊编程 链接:https://www.zhihu.com/question/282487583/answer/2391589312
(2).线程daemon属性 源码解读:
Thread.java中的方法isDaemon()
判断当前线程是否是守护线程, true表示是守护线程, false表示是用户线程
1 2 3 4 5 6 7 8 9 10 public final boolean isDaemon () { return daemon; }
true表示是守护线程
false表示是用户线程
(3).code演示 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 package icu.xiamu.juc.base;import java.util.concurrent.TimeUnit;public class DaemonDemo { public static void main (String[] args) { Thread t1 = new Thread (() -> { System.out.println(Thread.currentThread().getName() + "\t 开始运行," + (Thread.currentThread().isDaemon() ? "守护线程" : "用户线程" )); while (true ) { } }, "t1" ); t1.setDaemon(true ); t1.start(); try { TimeUnit.SECONDS.sleep(3 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("----------main线程运行完毕" ); } }
默认是false(用户线程), 所以这段程序并没有终止, 因为用户线程还没执行完, 死循环了 之后修改成true, 变成了守护线程, 此时就剩main一个用户线程, 当main线程执行完毕的时候, 此时用户线程没了, 就剩下一个守护线程, java虚拟机就会自动退出 用户线程都死了, 守护线程守护守护护护个鸡毛, 拍拍屁股下班走人
(4).小总结
如果用户线程全部结束意味着程序需要完成的业务操作已经结束了,守护线程随着JVM一同结束工作
用户线程都死了, 守护线程守护守护护护个鸡毛, 拍拍屁股下班走人
setDaemon(true)方法必须在start()之前设置,否则报IllegalThreadStateException异常
关于setDaemon()源码: 如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public final void setDaemon (boolean on) { checkAccess(); if (isAlive()) { throw new IllegalThreadStateException (); } daemon = on; }
2.CompletableFuture 1.Future接口理论知识复习 Future接口(FutureTask实现类)定义了操作异步任务执行一些方法,如获取异步任务的执行结果、取消任务的执行、判断任务是否被取消、判断任务执行是否完毕等。
比如主线程让一个子线程去执行任务,子线程可能比较耗时,启动子线程开始执行任务后, 主线程就去做其他事情了,忙其它事情或者先执行完,过了一会才去获取子任务的执行结果或变更的任务状态。
举例上课买水案例给大家说明补充。。。。。。
查看Future接口内容
一句话: Future接口可以为主线程开一个分支任务,专门为主线程处理耗时和费力的复杂业务
2.Future接口常用实现类FutureTask异步任务 (1).Future接口能干什么 Future是Java5新加的一个接口,它提供了一种异步并行计算的功能。 如果主线程需要执行一个很耗时的计算任务,我们就可以通过future把这个任务放到异步线程中执行。 主线程继续处理其他任务或者先行结束,再通过Future获取计算结果。
代码说话:
Runnable接口 Callable接口 Future接口和FutureTask实现类
目的: 异步多线程任务执行且返回有结果,三个特点: 多线程/有返回/异步任务 (班长为老师去买水作为新启动的异步多线程任务且买到水有结果返回)
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 package icu.xiamu.juc.cf;import java.util.concurrent.Callable;import java.util.concurrent.ExecutionException;import java.util.concurrent.FutureTask;public class CompletableFutureDemo { public static void main (String[] args) throws ExecutionException, InterruptedException { FutureTask<String> futureTask = new FutureTask <>(new MyThread2 ()); Thread t1 = new Thread (futureTask); t1.start(); System.out.println(futureTask.get()); } }class MyThread implements Runnable { @Override public void run () { } }class MyThread2 implements Callable <String> { @Override public String call () throws Exception { System.out.println("--------come in" ); return "hello Callable" ; } }
(2).Future编码实战和优缺点分析 优点:
future+线程池异步多线程任务配合,能显著提高程序的执行效率。
结合线程池提示性能
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 package icu.xiamu.juc.cf;import java.util.concurrent.*;public class FutureThreadPoolDemo { public static void main (String[] args) throws ExecutionException, InterruptedException { m3(); } private static void m3 () throws ExecutionException, InterruptedException { ExecutorService threadPool = Executors.newFixedThreadPool(3 ); long startTime = System.currentTimeMillis(); FutureTask<String> futureTask1 = new FutureTask <>(() -> { try { TimeUnit.MILLISECONDS.sleep(500 ); } catch (InterruptedException e) { throw new RuntimeException (e); } return "task1 over" ; }); threadPool.submit(futureTask1); FutureTask<String > futureTask2 = new FutureTask <>(() -> { try { TimeUnit.MILLISECONDS.sleep(300 ); } catch (InterruptedException e) { throw new RuntimeException (e); } return "task2 over" ; }); threadPool.submit(futureTask2); System.out.println(futureTask1.get()); System.out.println(futureTask2.get()); try { TimeUnit.MILLISECONDS.sleep(300 ); } catch (InterruptedException e) { throw new RuntimeException (e); } long endTime = System.currentTimeMillis(); System.out.println("---constTime: " + (endTime - startTime) + " 毫秒" ); System.out.println(Thread.currentThread().getName() + "\t ---end" ); threadPool.shutdown(); } private static void m2 () { ExecutorService threadPool = Executors.newFixedThreadPool(3 ); long startTime = System.currentTimeMillis(); FutureTask<String> futureTask1 = new FutureTask <>(() -> { try { TimeUnit.MILLISECONDS.sleep(500 ); } catch (InterruptedException e) { throw new RuntimeException (e); } return "task1 over" ; }); threadPool.submit(futureTask1); FutureTask<String > futureTask2 = new FutureTask <>(() -> { try { TimeUnit.MILLISECONDS.sleep(300 ); } catch (InterruptedException e) { throw new RuntimeException (e); } return "task2 over" ; }); threadPool.submit(futureTask2); try { TimeUnit.MILLISECONDS.sleep(300 ); } catch (InterruptedException e) { throw new RuntimeException (e); } long endTime = System.currentTimeMillis(); System.out.println("---constTime: " + (endTime - startTime) + " 毫秒" ); System.out.println(Thread.currentThread().getName() + "\t ---end" ); threadPool.shutdown(); } private static void m1 () { long startTime = System.currentTimeMillis(); try { TimeUnit.MILLISECONDS.sleep(500 ); } catch (InterruptedException e) { throw new RuntimeException (e); } try { TimeUnit.MILLISECONDS.sleep(300 ); } catch (InterruptedException e) { throw new RuntimeException (e); } try { TimeUnit.MILLISECONDS.sleep(300 ); } catch (InterruptedException e) { throw new RuntimeException (e); } long endTime = System.currentTimeMillis(); System.out.println("---constTime: " + (endTime - startTime) + " 毫秒" ); System.out.println(Thread.currentThread().getName() + "\t ---end" ); } }
缺点:
get阻塞(): 一旦调用get()方法求结果,如果计算没有完成容易导致程序阻塞。
isDone()轮询:
轮询的方式会耗费无谓的CPU资源,而且也不见得能及时地得到计算结果
如果想要异步获取结果,通常都会以轮询的方式去获取结果尽量不要阻塞
get阻塞情况
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 package icu.xiamu.juc.cf;import java.util.concurrent.ExecutionException;import java.util.concurrent.FutureTask;import java.util.concurrent.TimeUnit;import java.util.concurrent.TimeoutException;public class FutureAPIDemo { public static void main (String[] args) throws ExecutionException, InterruptedException, TimeoutException { FutureTask<String> futureTask = new FutureTask <String>(() -> { System.out.println(Thread.currentThread().getName() + "\t come in" ); TimeUnit.SECONDS.sleep(5 ); return "task over" ; }); Thread t1 = new Thread (futureTask); t1.start(); System.out.println(Thread.currentThread().getName() + "\t 忙其他任务了" ); while (true ) { if (futureTask.isDone()) { System.out.println(futureTask.get()); break ; } else { TimeUnit.MILLISECONDS.sleep(500 ); System.out.println("正在处理中, 不要催了, 越催越慢, 再催熄火" ); } } } }
结论:
Future对于结果的获取不是很友好,只能通过阻塞或轮询的方式得到任务的结果。
(3).想完成一些复杂的任务 想完成一些复杂的任务:
对于简单的业务场景使用Future完全OK
回调通知
应对Future的完成时间,完成了可以告诉我,也就是我们的回调通知
通过轮询的方式去判断任务是否完成这样非常占CPU并且代码也不优雅
创建异步任务
多个任务前后依赖可以组合处理(水煮鱼)
想将多个异步任务的计算结果组合起来,后一个异步任务的计算结果需要前一个异步任务的值
将两个或多个异步计算合成一个异步计算,这几个异步计算互相独立,同时后面这个又依赖前一个处理的结果
对计算速度选最快
当Future集合中某个任务最快结束时,返回结果,返回第一名处理结果。
etc
使用Future之前提供的那点API就囊中羞涩,处理起来不够优雅,这时候还是让CompletableFuture以声明式的方式优雅的处理这些需求
从i到i++,o(n_n)o哈哈~
Future能干的,CompletableFuture都能干
3.CompletableFuture对Future的改进 (1).CompletableFuture为什么出现
(2).CompletableFuture和CompletionStage源码分别介绍 类架构说明:
CompletionStage接口:
类CompletableFuture:
(3).核心的四个静态方法, 来创建一个异步任务 CompletableFuture是一个类, 并且有构造方法, 但是description中描述道者是一个incomplete不完美的CompletableFuture
通常创建CompletableFuture的方式采用如下几种:
runAsync 无返回值
1 2 public static CompletableFuture<Void> runAsync (Runnable runnable) { ... }public static CompletableFuture<Void> runAsync (Runnable runnable, Executor executor) { ... }
supplyAsync 有返回值
1 2 public static <U> CompletableFuture<U> supplyAsync (Supplier<U> supplier) { ... }public static <U> CompletableFuture<U> supplyAsync (Supplier<U> supplier, Executor executor) { ... }
上述Executor executor参数说明:
没有指定Executor的方法, 直接使用默认的ForkJoinPool.commonPool()作为它的线程池执行异步代码
如果指定线程池, 则使用我们自定义的或者特别指定的线程池执行异步代码
代码演示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 package icu.xiamu.juc.cf;import java.util.concurrent.*;public class CompletableFutureBuildDemo { public static void main (String[] args) throws ExecutionException, InterruptedException { demo4(); } private static void demo4 () throws ExecutionException, InterruptedException { ExecutorService threadPool = Executors.newFixedThreadPool(3 ); CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName()); try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { throw new RuntimeException (e); } return "Hello SupplyAsync" ; }, threadPool); System.out.println(completableFuture.get()); } private static void demo3 () throws ExecutionException, InterruptedException { CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName()); try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { throw new RuntimeException (e); } return "Hello SupplyAsync" ; }); System.out.println(completableFuture.get()); } private static void demo2 () throws InterruptedException, ExecutionException { ExecutorService threadPool = Executors.newFixedThreadPool(3 ); CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> { System.out.println(Thread.currentThread().getName()); try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { throw new RuntimeException (e); } }, threadPool); System.out.println(completableFuture.get()); threadPool.shutdown(); } private static void demo1 () throws InterruptedException, ExecutionException { CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> { System.out.println(Thread.currentThread().getName()); try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { throw new RuntimeException (e); } }); System.out.println(completableFuture.get()); } }
从Java8开始引入了CompletableFuture, 它是Future的功能增强版, 减少阻塞和轮询 可以传入回调函数, 当异步任务完成或者发生异常时, 自动调用回调对象的回调方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 package icu.xiamu.juc.cf;import java.util.concurrent.*;public class CompletableFutureUseDemo { public static void main (String[] args) throws ExecutionException, InterruptedException { demo3(); } private static void demo3 () { ExecutorService threadPool = Executors.newFixedThreadPool(3 ); try { CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + " come in" ); int result = ThreadLocalRandom.current().nextInt(10 ); try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { throw new RuntimeException (e); } System.out.println("一秒后出结果: " + result); return result; }, threadPool).whenComplete((v, e) -> { if (e == null ) { System.out.println("计算完成, 更新系统UpdateValue" + v); } }).exceptionally(e -> { e.printStackTrace(); System.out.println("异常情况: " + e.getCause() + "\t" + e.getMessage()); return null ; }); System.out.println(Thread.currentThread().getName() + "线程先去忙其它任务" ); } catch (Exception e) { e.printStackTrace(); } finally { threadPool.shutdown(); } } private static void demo2 () { CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + " come in" ); int result = ThreadLocalRandom.current().nextInt(10 ); try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { throw new RuntimeException (e); } System.out.println("一秒后出结果: " + result); return result; }).whenComplete((v, e) -> { if (e == null ) { System.out.println("计算完成, 更新系统UpdateValue" + v); } }).exceptionally(e -> { e.printStackTrace(); System.out.println("异常情况: " + e.getCause() + "\t" + e.getMessage()); return null ; }); System.out.println(Thread.currentThread().getName() + "线程先去忙其它任务" ); } private static void demo1 () throws InterruptedException, ExecutionException { CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + " come in" ); int result = ThreadLocalRandom.current().nextInt(10 ); try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { throw new RuntimeException (e); } System.out.println("一秒后出结果: " + result); return result; }); System.out.println(Thread.currentThread().getName() + "线程先去忙其它任务" ); System.out.println(completableFuture.get()); } }
CompletableFuture的优点:
异步任务结束时, 会自动回调某个对象的方法
主线程设置好回调后, 不再关心异步任务的执行, 异步任务之间可以顺序执行
异步任务出错时, 会自动回调某个对象的方法
4.案例精讲-从电商网站的比价需求说开去 (1).函数式编程已经主流 大厂面试题看看
Lambda表达式+Stream流式调用+Chain链式调用+Java8函数式编程
Lambda的几个参数类型
Runnable
Function
Consumer (BiConsumer)
Supplier
Runnable
Function
Consumer 是消费型函数接口
BiConsumer
Supplier
总结:
idea插件推荐: GenerateAllSetter
自动生成set方法, 但是这样展示的set还是非常的多, 并且占用了很多行 于是链式调用就出现了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 package icu.xiamu.juc.cf;import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;import lombok.experimental.Accessors;public class CompletableFutureMallDemo { public static void main (String[] args) { Student student = new Student (); student.setId(1 ); student.setStudentName("黄磊" ); student.setMajor("软件工程" ); System.out.println(student); Student student2 = student.setId(2 ) .setStudentName("黄磊" ) .setMajor("泌尿科" ); System.out.println(student2); } }@AllArgsConstructor @NoArgsConstructor @Data @Accessors(chain = true) class Student { private Integer id; private String studentName; private String major; }
(2).先说说join和get对比 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public static void main (String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> { return "hello 1234" ; }); System.out.println(completableFuture.get()); }public static void main (String[] args) { CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> { return "hello 1234" ; }); System.out.println(completableFuture.join()); }
一个要抛出异常, 另外一个不需要抛出异常, 都是获取结果的
(3).说说你过去工作中的项目亮点 看下节
(4).大厂业务需求说明 切记, 功能 => 性能 电商网站比价需求分析
(5).一波流Java8函数式编程带走-比价案例实战Case 代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 package icu.xiamu.juc.cf;import lombok.Getter;import java.util.Arrays;import java.util.List;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ThreadLocalRandom;import java.util.concurrent.TimeUnit;import java.util.stream.Collectors;public class CompletableFutureMallDemo { static List<NetMall> list = Arrays.asList( new NetMall ("jd" ), new NetMall ("taobao" ), new NetMall ("dangdang" ), new NetMall ("amazon" ), new NetMall ("pdd" ), new NetMall ("tmall" ) ); public static List<String> getPrice (List<NetMall> list, String productName) { return list.stream().map(netMall -> String.format(productName + "in %s price is %.2f" , netMall.getNetMallName(), netMall.calcPrice(productName)) ).collect(Collectors.toList()); } public static List<String> getPriceByCompletableFuture (List<NetMall> list, String productName) { return list.stream().map(netMall -> CompletableFuture.supplyAsync(() -> { return String.format(productName + "in %s price is %.2f" , netMall.getNetMallName(), netMall.calcPrice(productName)); })).collect(Collectors.toList()) .stream().map(data -> data.join()) .collect(Collectors.toList()); } public static void main (String[] args) { long startTime = System.currentTimeMillis(); List<String> list1 = getPrice(list, "mysql" ); for (String element : list1) { System.out.println(element); } long endTime = System.currentTimeMillis(); System.out.println("----costTime: " + (endTime - startTime) + "毫秒" ); long startTime2 = System.currentTimeMillis(); List<String> list2 = getPriceByCompletableFuture(list, "mysql" ); for (String element : list2) { System.out.println(element); } long endTime2 = System.currentTimeMillis(); System.out.println("----costTime: " + (endTime2 - startTime2) + "毫秒" ); } }class NetMall { @Getter private String netMallName; public NetMall (String netMallName) { this .netMallName = netMallName; } public double calcPrice (String productName) { try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { throw new RuntimeException (e); } return ThreadLocalRandom.current().nextDouble() * 2 + productName.charAt(0 ); } }
5.CompletableFuture常用方法 (1).获取结果和触发计算 获取结果
1 2 3 4 public T get () throws InterruptedException, ExecutionException { ... } public T get (long timeout, TimeUnit unit) throws InterruptedException, ExecutionException { ... } public T join () { ... }public T getNow (T valueIfAbsent) { ... }
getNow 没有计算完成的情况下, 给我一个替代结果 立即获取结果不阻塞
计算完, 返回计算完成后的结果
没算完, 返回设定的valueIfAbsent值
主动触发计算
1 public boolean complete (T value) { ... }
代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 package icu.xiamu.juc.cf;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;import java.util.concurrent.TimeUnit;import java.util.concurrent.TimeoutException;public class CompletableFutureAPIDemo { public static void main (String[] args) throws ExecutionException, InterruptedException, TimeoutException { CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { throw new RuntimeException (e); } return "abc" ; }); TimeUnit.SECONDS.sleep(1 ); System.out.println(completableFuture.complete("completeValue" ) + "\t" + completableFuture.join()); } }
(2).对计算结果进行处理 thenApply 计算结果存在依赖关系, 这两个线程串行化 异常相关: 由于存在依赖关系(当前步错, 不走下一步), 当前步骤有异常的话就叫停
handle 计算结果存在依赖关系, 这两个线程串行化 异常相关: 有异常也可以往下一步走, 根据带的异常参数可以进一步处理
总结:
代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 package icu.xiamu.juc.cf;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.TimeUnit;public class CompletableFutureAPI2Demo { public static void main (String[] args) { thenApply(); } private static void handle () { ExecutorService threadPool = Executors.newFixedThreadPool(3 ); CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { throw new RuntimeException (e); } System.out.println("111" ); return 1 ; }, threadPool).handle((f, e) -> { System.out.println("222" ); return f + 2 ; }).handle((f, e) -> { System.out.println("333" ); return f + 3 ; }).whenComplete((v, e) -> { if (e == null ) { System.out.println("计算结果" + v); } }).exceptionally(e -> { e.printStackTrace(); System.out.println(e.getMessage()); return null ; }); System.out.println(Thread.currentThread().getName() + "主线程先去忙其它任务" ); threadPool.shutdown(); } private static void thenApply () { ExecutorService threadPool = Executors.newFixedThreadPool(3 ); CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { throw new RuntimeException (e); } System.out.println("111" ); return 1 ; }, threadPool).thenApply(f -> { int i = 1 / 0 ; System.out.println("222" ); return f + 2 ; }).thenApply(f -> { System.out.println("333" ); return f + 3 ; }).whenComplete((v, e) -> { if (e == null ) { System.out.println("计算结果" + v); } }).exceptionally(e -> { e.printStackTrace(); System.out.println(e.getMessage()); return null ; }); System.out.println(Thread.currentThread().getName() + "主线程先去忙其它任务" ); threadPool.shutdown(); } }
(3).对计算结果进行消费 接收任务的处理结果, 并消费处理, 无返回结果 thenAccept
消费型
1 public CompletableFuture<Void> thenAccept (Consumer<? super T> action) { ... }
对比补充 Code之任务之间的顺序执行
thenRun
thenAccept
thenApply
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public CompletableFuture<Void> thenRun (Runnable action) { return uniRunStage(null , action); }public CompletableFuture<Void> thenAccept (Consumer<? super T> action) { return uniAcceptStage(null , action); }public <U> CompletableFuture<U> thenApply ( Function<? super T,? extends U> fn) { return uniApplyStage(null , fn); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 package icu.xiamu.juc.cf;import java.util.concurrent.CompletableFuture;public class CompletableFutureAPI3Demo { public static void main (String[] args) { System.out.println(CompletableFuture.supplyAsync(() -> "resultA" ).thenRun(() -> {}).join()); System.out.println(CompletableFuture.supplyAsync(() -> "resultA" ).thenAccept(r -> System.out.println(r)).join()); System.out.println(CompletableFuture.supplyAsync(() -> "resultA" ).thenApply(r -> r + "resultB" ).join()); } private static void demo1 () { CompletableFuture.supplyAsync(() -> { return 1 ; }).thenApply(f -> { return f + 2 ; }).thenApply(f -> { return f + 3 ; }).thenAccept(r -> { System.out.println(r); }); } }
CompletableFuture和线程池说明
以thenRun和thenRunAsync为例, 有什么区别?
代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 package icu.xiamu.juc.cf;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.TimeUnit;public class CompletableFutureWithThreadPoolDemo { public static void main (String[] args) { ExecutorService threadPool = Executors.newFixedThreadPool(5 ); try { CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> { try { TimeUnit.MILLISECONDS.sleep(20 ); } catch (InterruptedException e) { throw new RuntimeException (e); } System.out.println("1号任务\t" + Thread.currentThread().getName()); return "abcd" ; }, threadPool).thenRunAsync(() -> { try { TimeUnit.MILLISECONDS.sleep(20 ); } catch (InterruptedException e) { throw new RuntimeException (e); } System.out.println("2号任务\t" + Thread.currentThread().getName()); }).thenRun(() -> { try { TimeUnit.MILLISECONDS.sleep(20 ); } catch (InterruptedException e) { throw new RuntimeException (e); } System.out.println("3号任务\t" + Thread.currentThread().getName()); }).thenRun(() -> { try { TimeUnit.MILLISECONDS.sleep(20 ); } catch (InterruptedException e) { throw new RuntimeException (e); } System.out.println("4号任务\t" + Thread.currentThread().getName()); }); System.out.println(completableFuture.get(2L , TimeUnit.SECONDS)); } catch (Exception e) { throw new RuntimeException (e); } finally { threadPool.shutdown(); } } }
小总结:
源码分析:
(4).对计算速度进行选用 谁快用谁 applyToEither
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 package icu.xiamu.juc.cf;import java.util.concurrent.CompletableFuture;import java.util.concurrent.TimeUnit;public class CompletableFutureFastDemo { public static void main (String[] args) { CompletableFuture<String> playA = CompletableFuture.supplyAsync(() -> { System.out.println("A come in" ); try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { throw new RuntimeException (e); } return "playA" ; }); CompletableFuture<String> playB = CompletableFuture.supplyAsync(() -> { System.out.println("B come in" ); try { TimeUnit.SECONDS.sleep(2 ); } catch (InterruptedException e) { throw new RuntimeException (e); } return "playB" ; }); CompletableFuture<String> result = playA.applyToEither(playB, f -> { return f + " is winner" ; }); System.out.println(Thread.currentThread().getName() + "\t" + result.join()); } }
(5).对计算结果进行合并 两个CompletionStage任务完成后, 最终能把两个任务的结果一起交给thenCombine来处理 先完成的先等着, 等待其它分支任务
thenCombine
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 package icu.xiamu.juc.cf;import java.util.concurrent.CompletableFuture;import java.util.concurrent.TimeUnit;public class CompletableFutureCombineDemo { public static void main (String[] args) { demo2(); } private static void demo2 () { CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + "启动" ); try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { throw new RuntimeException (e); } return 10 ; }).thenCombine(CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + "启动" ); try { TimeUnit.SECONDS.sleep(2 ); } catch (InterruptedException e) { throw new RuntimeException (e); } return 20 ; }), (x, y) -> { System.out.println("开始两个结果合并" ); return x + y; }); System.out.println(completableFuture.join()); } private static void demo1 () { CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + "启动" ); try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { throw new RuntimeException (e); } return 10 ; }); CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + "启动" ); try { TimeUnit.SECONDS.sleep(2 ); } catch (InterruptedException e) { throw new RuntimeException (e); } return 20 ; }); CompletableFuture<Integer> result = completableFuture1.thenCombine(completableFuture2, (x, y) -> { System.out.println("开始两个结果合并" ); return x + y; }); System.out.println(result.join()); } }
3.说说Java”锁”事 1.大厂面试题复盘
2.从轻松的乐观锁和悲观锁开讲 1.悲观锁 认为自己在使用数据的时候一定有别的线程来修改数据,因此在获取数据的时候会先加锁,确保数据不会被别的线程修改。 synchronized关键字和Lock的实现类都是悲观锁
适合写操作多的场景,先加锁可以保证写操作时数据正确 显式的锁定之后再操作同步资源 一句话: 狼性锁
2.乐观锁
适合读操作多的场景,不加锁的特点能够使其读操作的性能大幅提升。 乐观锁则直接去操作同步资源,是一种无锁算法,得之我幸不得我命,再努力就是 一句话: 佛系锁 乐观锁一般有两种实现方式:
采用Version版本号机制
CAS(Compare-and-Swap, 即比较并交换)算法实现
伪代码说明:
3.通过8种情况演示锁运行案例, 看看我们到底锁的是什么 阿里巴巴开发手册
(1).锁相关的8种案例演示code 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 package icu.xiamu.juc.locks;import java.util.concurrent.TimeUnit;public class Lock8Demo { public static void main (String[] args) { Phone phone = new Phone (); Phone phone2 = new Phone (); new Thread (() -> { phone.sendEmail(); }, "a" ).start(); new Thread (() -> { phone.sendSMS(); }, "b" ).start(); } }class Phone { public static synchronized void sendEmail () { try { TimeUnit.SECONDS.sleep(3 ); } catch (InterruptedException e) { throw new RuntimeException (e); } System.out.println("sendEmail" ); } public static synchronized void sendSMS () { System.out.println("sendSMS" ); } public void hello () { System.out.println("hello" ); } }
总结:
对于普通同步方法,锁的是当前实例对象,通常指this,所有的同步方法用的都是同一把锁—>实例对象本身
对于静态同步方法,锁的时当前类的Class对象
对于同步方法块,锁的时synchronized括号内的对象
synchronized function() {} 对象锁 static synchronized function() {} 类锁 类锁的锁的粒度比对象锁更大
(2).synchronized有三种应用方式
作用于实例方法,当前实例加锁,进入同步代码块前要获得当前实例的锁;
作用于代码块,对括号里配置的对象加锁
作用于静态方法,当前类加锁,进去同步代码前要获得当前类对象的锁
(3).从字节码角度分析synchronized实现 javap -c ***.class 文件反编译 -c 表示对代码进行反汇编 假如需要更多信息, javap -v ***.class 文件反编译 -v -verbose 输出附加信息(包括行号, 本地变量表, 反汇编等详细信息)
synchronized同步代码块 javap -c ***.class 文件反编译
synchronized同步代码块, 一定是一个enter对应两个exit吗?
synchronized普通同步方法 synchronized静态同步方法
(4).反编译synchronized锁的是什么 (5).对于Synchronized关键字 后续章节讲解
4.公平锁和非公平锁 5.可重入锁(又名递归锁) 6.死锁及排查 7.写锁(独占锁)/读锁(共享锁) 8.自旋锁SpinLock 9.无锁 -> 独占锁 -> 读写锁 -> 邮戳锁 10.无锁 -> 偏向锁 -> 轻量锁 -> 重量锁 4.LockSupport与线程中断 5.Java内存模型之JMM 6.volatile与JMM 7.CAS 8… 在线笔记:
智造建筑 珠海金智维 和众汇富(二面挂) 昌硕科技 三泰信息 海中信科技