JUC并发编程 1.线程基础知识复习 JUC其实就是java.util.concurrent并发包的缩写
1.start线程开启C源码分析 1 2 3 4 5 6 new Thread (() -> {}).start(); start0();private native void start0 () ;
https://hg.openjdk.org/jdk8/jdk8/hotspot/archive/tip.zip https://hg.openjdk.org/jdk8/jdk8/jdk/archive/tip.zip
openjdk的写JNI一般是一一对应的, thread.java其实对应的就是thread.c的源码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 static JNINativeMethod methods[] = { {"start0" , "()V" , (void *)&JVM_StartThread}, {"stop0" , "(" OBJ ")V" , (void *)&JVM_StopThread}, {"isAlive" , "()Z" , (void *)&JVM_IsThreadAlive}, {"suspend0" , "()V" , (void *)&JVM_SuspendThread}, {"resume0" , "()V" , (void *)&JVM_ResumeThread}, {"setPriority0" , "(I)V" , (void *)&JVM_SetThreadPriority}, {"yield" , "()V" , (void *)&JVM_Yield}, {"sleep" , "(J)V" , (void *)&JVM_Sleep}, {"currentThread" , "()" THD, (void *)&JVM_CurrentThread}, {"countStackFrames" , "()I" , (void *)&JVM_CountStackFrames}, {"interrupt0" , "()V" , (void *)&JVM_Interrupt}, {"isInterrupted" , "(Z)Z" , (void *)&JVM_IsInterrupted}, {"holdsLock" , "(" OBJ ")Z" , (void *)&JVM_HoldsLock}, {"getThreads" , "()[" THD, (void *)&JVM_GetAllThreads}, {"dumpThreads" , "([" THD ")[[" STE, (void *)&JVM_DumpThreads}, {"setNativeName" , "(" STR ")V" , (void *)&JVM_SetNativeThreadName}, };
, 此时点开源代码 #include "jvm.h"
, 说明在jvm.cpp中实现了JVM_StartThread
1 2 3 4 5 6 JNIEXPORT void JNICALLJVM_StartThread (JNIEnv *env, jobject thread) ;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 JVM_ENTRY(void , JVM_StartThread(JNIEnv* env, jobject jthread)) JVMWrapper("JVM_StartThread" ); JavaThread *native_thread = NULL ; bool throw_illegal_thread_state = false ; { MutexLocker mu (Threads_lock) ; if (java_lang_Thread::thread(JNIHandles::resolve_non_null(jthread)) != NULL ) { throw_illegal_thread_state = true ; } else { jlong size = java_lang_Thread::stackSize(JNIHandles::resolve_non_null(jthread)); size_t sz = size > 0 ? (size_t ) size : 0 ; native_thread = new JavaThread(&thread_entry, sz); if (native_thread->osthread() != NULL ) { native_thread->prepare(jthread); } } } if (throw_illegal_thread_state) { THROW(vmSymbols::java_lang_IllegalThreadStateException()); } assert(native_thread != NULL , "Starting null thread?" ); if (native_thread->osthread() == NULL ) { delete native_thread; if (JvmtiExport::should_post_resource_exhausted()) { JvmtiExport::post_resource_exhausted( JVMTI_RESOURCE_EXHAUSTED_OOM_ERROR | JVMTI_RESOURCE_EXHAUSTED_THREADS, "unable to create new native thread" ); } THROW_MSG(vmSymbols::java_lang_OutOfMemoryError(), "unable to create new native thread" ); } Thread::start(native_thread); JVM_END
实质上就是调用了个 Thread::start(native_thread);
Thread::start在thread.cpp中, 实现如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 void Thread::start (Thread* thread) { trace("start" , thread); if (!DisableStartThread) { if (thread->is_Java_thread()) { java_lang_Thread::set_thread_status(((JavaThread*)thread)->threadObj(), java_lang_Thread::RUNNABLE); } os::start_thread(thread); } }
2.基础概念复习 (1).并发与并行 并发(12306卖票秒杀):
(2).进程 线程 管程
Monitor其实是一种同步机制, 他的义务是保证(同一时间)只有一个线程可以访问被保护的数据和代码
3.用户线程和守护线程 (1).Java线程分为用户线程和守护线程 一般情况下不做特别说明配置,默认都是用户线程。
用户线程(User Thread)
守护线程(Daemon Thread)
守护线程(Daemon Thread)也被称之为后台线程或服务线程,守护线程是为用户线程服务的,当程序中的用户线程全部执行结束之后,守护线程也会跟随结束。 守护线程的角色就像“服务员”,而用户线程的角色就像“顾客”,当“顾客”全部走了之后(全部执行结束),那“服务员”(守护线程)也就没有了存在的意义,所以当一个程序中的全部用户线程都结束执行之后,那么无论守护线程是否还在工作都会随着用户线程一块结束,整个程序也会随之结束运行。
(2).线程daemon属性 源码解读:
判断当前线程是否是守护线程, true表示是守护线程, false表示是用户线程
1 2 3 4 5 6 7 8 9 10 public final boolean isDaemon () { return daemon; }
(3).code演示 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 package icu.xiamu.juc.base;import java.util.concurrent.TimeUnit;public class DaemonDemo { public static void main (String[] args) { Thread t1 = new Thread (() -> { System.out.println(Thread.currentThread().getName() + "\t 开始运行," + (Thread.currentThread().isDaemon() ? "守护线程" : "用户线程" )); while (true ) { } }, "t1" ); t1.setDaemon(true ); t1.start(); try { TimeUnit.SECONDS.sleep(3 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("----------main线程运行完毕" ); } }
默认是false(用户线程), 所以这段程序并没有终止, 因为用户线程还没执行完, 死循环了 之后修改成true, 变成了守护线程, 此时就剩main一个用户线程, 当main线程执行完毕的时候, 此时用户线程没了, 就剩下一个守护线程, java虚拟机就会自动退出 用户线程都死了, 守护线程守护守护护护个鸡毛, 拍拍屁股下班走人
用户线程都死了, 守护线程守护守护护护个鸡毛, 拍拍屁股下班走人
关于setDaemon()源码: 如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public final void setDaemon (boolean on) { checkAccess(); if (isAlive()) { throw new IllegalThreadStateException (); } daemon = on; }
2.CompletableFuture 1.Future接口理论知识复习 Future接口(FutureTask实现类)定义了操作异步任务执行一些方法,如获取异步任务的执行结果、取消任务的执行、判断任务是否被取消、判断任务执行是否完毕等。
比如主线程让一个子线程去执行任务,子线程可能比较耗时,启动子线程开始执行任务后, 主线程就去做其他事情了,忙其它事情或者先执行完,过了一会才去获取子任务的执行结果或变更的任务状态。
一句话: Future接口可以为主线程开一个分支任务,专门为主线程处理耗时和费力的复杂业务
2.Future接口常用实现类FutureTask异步任务 (1).Future接口能干什么 Future是Java5新加的一个接口,它提供了一种异步并行计算的功能。 如果主线程需要执行一个很耗时的计算任务,我们就可以通过future把这个任务放到异步线程中执行。 主线程继续处理其他任务或者先行结束,再通过Future获取计算结果。
Runnable接口 Callable接口 Future接口和FutureTask实现类
目的: 异步多线程任务执行且返回有结果,三个特点: 多线程/有返回/异步任务 (班长为老师去买水作为新启动的异步多线程任务且买到水有结果返回)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 package icu.xiamu.juc.cf;import java.util.concurrent.Callable;import java.util.concurrent.ExecutionException;import java.util.concurrent.FutureTask;public class CompletableFutureDemo { public static void main (String[] args) throws ExecutionException, InterruptedException { FutureTask<String> futureTask = new FutureTask <>(new MyThread2 ()); Thread t1 = new Thread (futureTask); t1.start(); System.out.println(futureTask.get()); } }class MyThread implements Runnable { @Override public void run () { } }class MyThread2 implements Callable <String> { @Override public String call () throws Exception { System.out.println("--------come in" ); return "hello Callable" ; } }
(2).Future编码实战和优缺点分析 优点:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 package icu.xiamu.juc.cf;import java.util.concurrent.*;public class FutureThreadPoolDemo { public static void main (String[] args) throws ExecutionException, InterruptedException { m3(); } private static void m3 () throws ExecutionException, InterruptedException { ExecutorService threadPool = Executors.newFixedThreadPool(3 ); long startTime = System.currentTimeMillis(); FutureTask<String> futureTask1 = new FutureTask <>(() -> { try { TimeUnit.MILLISECONDS.sleep(500 ); } catch (InterruptedException e) { throw new RuntimeException (e); } return "task1 over" ; }); threadPool.submit(futureTask1); FutureTask<String > futureTask2 = new FutureTask <>(() -> { try { TimeUnit.MILLISECONDS.sleep(300 ); } catch (InterruptedException e) { throw new RuntimeException (e); } return "task2 over" ; }); threadPool.submit(futureTask2); System.out.println(futureTask1.get()); System.out.println(futureTask2.get()); try { TimeUnit.MILLISECONDS.sleep(300 ); } catch (InterruptedException e) { throw new RuntimeException (e); } long endTime = System.currentTimeMillis(); System.out.println("---constTime: " + (endTime - startTime) + " 毫秒" ); System.out.println(Thread.currentThread().getName() + "\t ---end" ); threadPool.shutdown(); } private static void m2 () { ExecutorService threadPool = Executors.newFixedThreadPool(3 ); long startTime = System.currentTimeMillis(); FutureTask<String> futureTask1 = new FutureTask <>(() -> { try { TimeUnit.MILLISECONDS.sleep(500 ); } catch (InterruptedException e) { throw new RuntimeException (e); } return "task1 over" ; }); threadPool.submit(futureTask1); FutureTask<String > futureTask2 = new FutureTask <>(() -> { try { TimeUnit.MILLISECONDS.sleep(300 ); } catch (InterruptedException e) { throw new RuntimeException (e); } return "task2 over" ; }); threadPool.submit(futureTask2); try { TimeUnit.MILLISECONDS.sleep(300 ); } catch (InterruptedException e) { throw new RuntimeException (e); } long endTime = System.currentTimeMillis(); System.out.println("---constTime: " + (endTime - startTime) + " 毫秒" ); System.out.println(Thread.currentThread().getName() + "\t ---end" ); threadPool.shutdown(); } private static void m1 () { long startTime = System.currentTimeMillis(); try { TimeUnit.MILLISECONDS.sleep(500 ); } catch (InterruptedException e) { throw new RuntimeException (e); } try { TimeUnit.MILLISECONDS.sleep(300 ); } catch (InterruptedException e) { throw new RuntimeException (e); } try { TimeUnit.MILLISECONDS.sleep(300 ); } catch (InterruptedException e) { throw new RuntimeException (e); } long endTime = System.currentTimeMillis(); System.out.println("---constTime: " + (endTime - startTime) + " 毫秒" ); System.out.println(Thread.currentThread().getName() + "\t ---end" ); } }
get阻塞(): 一旦调用get()方法求结果,如果计算没有完成容易导致程序阻塞。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 package icu.xiamu.juc.cf;import java.util.concurrent.ExecutionException;import java.util.concurrent.FutureTask;import java.util.concurrent.TimeUnit;import java.util.concurrent.TimeoutException;public class FutureAPIDemo { public static void main (String[] args) throws ExecutionException, InterruptedException, TimeoutException { FutureTask<String> futureTask = new FutureTask <String>(() -> { System.out.println(Thread.currentThread().getName() + "\t come in" ); TimeUnit.SECONDS.sleep(5 ); return "task over" ; }); Thread t1 = new Thread (futureTask); t1.start(); System.out.println(Thread.currentThread().getName() + "\t 忙其他任务了" ); while (true ) { if (futureTask.isDone()) { System.out.println(futureTask.get()); break ; } else { TimeUnit.MILLISECONDS.sleep(500 ); System.out.println("正在处理中, 不要催了, 越催越慢, 再催熄火" ); } } } }
(3).想完成一些复杂的任务 想完成一些复杂的任务:
3.CompletableFuture对Future的改进 (1).CompletableFuture为什么出现
(2).CompletableFuture和CompletionStage源码分别介绍 类架构说明:
(3).核心的四个静态方法, 来创建一个异步任务 CompletableFuture是一个类, 并且有构造方法, 但是description中描述道者是一个incomplete不完美的CompletableFuture
runAsync 无返回值
1 2 public static CompletableFuture<Void> runAsync (Runnable runnable) { ... }public static CompletableFuture<Void> runAsync (Runnable runnable, Executor executor) { ... }
supplyAsync 有返回值
1 2 public static <U> CompletableFuture<U> supplyAsync (Supplier<U> supplier) { ... }public static <U> CompletableFuture<U> supplyAsync (Supplier<U> supplier, Executor executor) { ... }
上述Executor executor参数说明:
没有指定Executor的方法, 直接使用默认的ForkJoinPool.commonPool()作为它的线程池执行异步代码
如果指定线程池, 则使用我们自定义的或者特别指定的线程池执行异步代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 package icu.xiamu.juc.cf;import java.util.concurrent.*;public class CompletableFutureBuildDemo { public static void main (String[] args) throws ExecutionException, InterruptedException { demo4(); } private static void demo4 () throws ExecutionException, InterruptedException { ExecutorService threadPool = Executors.newFixedThreadPool(3 ); CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName()); try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { throw new RuntimeException (e); } return "Hello SupplyAsync" ; }, threadPool); System.out.println(completableFuture.get()); } private static void demo3 () throws ExecutionException, InterruptedException { CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName()); try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { throw new RuntimeException (e); } return "Hello SupplyAsync" ; }); System.out.println(completableFuture.get()); } private static void demo2 () throws InterruptedException, ExecutionException { ExecutorService threadPool = Executors.newFixedThreadPool(3 ); CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> { System.out.println(Thread.currentThread().getName()); try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { throw new RuntimeException (e); } }, threadPool); System.out.println(completableFuture.get()); threadPool.shutdown(); } private static void demo1 () throws InterruptedException, ExecutionException { CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> { System.out.println(Thread.currentThread().getName()); try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { throw new RuntimeException (e); } }); System.out.println(completableFuture.get()); } }
从Java8开始引入了CompletableFuture, 它是Future的功能增强版, 减少阻塞和轮询 可以传入回调函数, 当异步任务完成或者发生异常时, 自动调用回调对象的回调方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 package icu.xiamu.juc.cf;import java.util.concurrent.*;public class CompletableFutureUseDemo { public static void main (String[] args) throws ExecutionException, InterruptedException { demo3(); } private static void demo3 () { ExecutorService threadPool = Executors.newFixedThreadPool(3 ); try { CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + " come in" ); int result = ThreadLocalRandom.current().nextInt(10 ); try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { throw new RuntimeException (e); } System.out.println("一秒后出结果: " + result); return result; }, threadPool).whenComplete((v, e) -> { if (e == null ) { System.out.println("计算完成, 更新系统UpdateValue" + v); } }).exceptionally(e -> { e.printStackTrace(); System.out.println("异常情况: " + e.getCause() + "\t" + e.getMessage()); return null ; }); System.out.println(Thread.currentThread().getName() + "线程先去忙其它任务" ); } catch (Exception e) { e.printStackTrace(); } finally { threadPool.shutdown(); } } private static void demo2 () { CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + " come in" ); int result = ThreadLocalRandom.current().nextInt(10 ); try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { throw new RuntimeException (e); } System.out.println("一秒后出结果: " + result); return result; }).whenComplete((v, e) -> { if (e == null ) { System.out.println("计算完成, 更新系统UpdateValue" + v); } }).exceptionally(e -> { e.printStackTrace(); System.out.println("异常情况: " + e.getCause() + "\t" + e.getMessage()); return null ; }); System.out.println(Thread.currentThread().getName() + "线程先去忙其它任务" ); } private static void demo1 () throws InterruptedException, ExecutionException { CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + " come in" ); int result = ThreadLocalRandom.current().nextInt(10 ); try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { throw new RuntimeException (e); } System.out.println("一秒后出结果: " + result); return result; }); System.out.println(Thread.currentThread().getName() + "线程先去忙其它任务" ); System.out.println(completableFuture.get()); } }
异步任务结束时, 会自动回调某个对象的方法
主线程设置好回调后, 不再关心异步任务的执行, 异步任务之间可以顺序执行
异步任务出错时, 会自动回调某个对象的方法
4.案例精讲-从电商网站的比价需求说开去 (1).函数式编程已经主流 大厂面试题看看
Consumer (BiConsumer)
Consumer 是消费型函数接口
idea插件推荐: GenerateAllSetter
自动生成set方法, 但是这样展示的set还是非常的多, 并且占用了很多行 于是链式调用就出现了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 package icu.xiamu.juc.cf;import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;import lombok.experimental.Accessors;public class CompletableFutureMallDemo { public static void main (String[] args) { Student student = new Student (); student.setId(1 ); student.setStudentName("黄磊" ); student.setMajor("软件工程" ); System.out.println(student); Student student2 = student.setId(2 ) .setStudentName("黄磊" ) .setMajor("泌尿科" ); System.out.println(student2); } }@AllArgsConstructor @NoArgsConstructor @Data @Accessors(chain = true) class Student { private Integer id; private String studentName; private String major; }
(2).先说说join和get对比 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public static void main (String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> { return "hello 1234" ; }); System.out.println(completableFuture.get()); }public static void main (String[] args) { CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> { return "hello 1234" ; }); System.out.println(completableFuture.join()); }
一个要抛出异常, 另外一个不需要抛出异常, 都是获取结果的
(3).说说你过去工作中的项目亮点 看下节
(4).大厂业务需求说明 切记, 功能 => 性能 电商网站比价需求分析
(5).一波流Java8函数式编程带走-比价案例实战Case 代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 package icu.xiamu.juc.cf;import lombok.Getter;import java.util.Arrays;import java.util.List;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ThreadLocalRandom;import java.util.concurrent.TimeUnit;import java.util.stream.Collectors;public class CompletableFutureMallDemo { static List<NetMall> list = Arrays.asList( new NetMall ("jd" ), new NetMall ("taobao" ), new NetMall ("dangdang" ), new NetMall ("amazon" ), new NetMall ("pdd" ), new NetMall ("tmall" ) ); public static List<String> getPrice (List<NetMall> list, String productName) { return list.stream().map(netMall -> String.format(productName + "in %s price is %.2f" , netMall.getNetMallName(), netMall.calcPrice(productName)) ).collect(Collectors.toList()); } public static List<String> getPriceByCompletableFuture (List<NetMall> list, String productName) { return list.stream().map(netMall -> CompletableFuture.supplyAsync(() -> { return String.format(productName + "in %s price is %.2f" , netMall.getNetMallName(), netMall.calcPrice(productName)); })).collect(Collectors.toList()) .stream().map(data -> data.join()) .collect(Collectors.toList()); } public static void main (String[] args) { long startTime = System.currentTimeMillis(); List<String> list1 = getPrice(list, "mysql" ); for (String element : list1) { System.out.println(element); } long endTime = System.currentTimeMillis(); System.out.println("----costTime: " + (endTime - startTime) + "毫秒" ); long startTime2 = System.currentTimeMillis(); List<String> list2 = getPriceByCompletableFuture(list, "mysql" ); for (String element : list2) { System.out.println(element); } long endTime2 = System.currentTimeMillis(); System.out.println("----costTime: " + (endTime2 - startTime2) + "毫秒" ); } }class NetMall { @Getter private String netMallName; public NetMall (String netMallName) { this .netMallName = netMallName; } public double calcPrice (String productName) { try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { throw new RuntimeException (e); } return ThreadLocalRandom.current().nextDouble() * 2 + productName.charAt(0 ); } }
5.CompletableFuture常用方法 (1).获取结果和触发计算 获取结果
1 2 3 4 public T get () throws InterruptedException, ExecutionException { ... } public T get (long timeout, TimeUnit unit) throws InterruptedException, ExecutionException { ... } public T join () { ... }public T getNow (T valueIfAbsent) { ... }
getNow 没有计算完成的情况下, 给我一个替代结果 立即获取结果不阻塞
计算完, 返回计算完成后的结果
没算完, 返回设定的valueIfAbsent值
1 public boolean complete (T value) { ... }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 package icu.xiamu.juc.cf;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;import java.util.concurrent.TimeUnit;import java.util.concurrent.TimeoutException;public class CompletableFutureAPIDemo { public static void main (String[] args) throws ExecutionException, InterruptedException, TimeoutException { CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { throw new RuntimeException (e); } return "abc" ; }); TimeUnit.SECONDS.sleep(1 ); System.out.println(completableFuture.complete("completeValue" ) + "\t" + completableFuture.join()); } }
(2).对计算结果进行处理 thenApply 计算结果存在依赖关系, 这两个线程串行化 异常相关: 由于存在依赖关系(当前步错, 不走下一步), 当前步骤有异常的话就叫停
handle 计算结果存在依赖关系, 这两个线程串行化 异常相关: 有异常也可以往下一步走, 根据带的异常参数可以进一步处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 package icu.xiamu.juc.cf;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.TimeUnit;public class CompletableFutureAPI2Demo { public static void main (String[] args) { thenApply(); } private static void handle () { ExecutorService threadPool = Executors.newFixedThreadPool(3 ); CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { throw new RuntimeException (e); } System.out.println("111" ); return 1 ; }, threadPool).handle((f, e) -> { System.out.println("222" ); return f + 2 ; }).handle((f, e) -> { System.out.println("333" ); return f + 3 ; }).whenComplete((v, e) -> { if (e == null ) { System.out.println("计算结果" + v); } }).exceptionally(e -> { e.printStackTrace(); System.out.println(e.getMessage()); return null ; }); System.out.println(Thread.currentThread().getName() + "主线程先去忙其它任务" ); threadPool.shutdown(); } private static void thenApply () { ExecutorService threadPool = Executors.newFixedThreadPool(3 ); CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { throw new RuntimeException (e); } System.out.println("111" ); return 1 ; }, threadPool).thenApply(f -> { int i = 1 / 0 ; System.out.println("222" ); return f + 2 ; }).thenApply(f -> { System.out.println("333" ); return f + 3 ; }).whenComplete((v, e) -> { if (e == null ) { System.out.println("计算结果" + v); } }).exceptionally(e -> { e.printStackTrace(); System.out.println(e.getMessage()); return null ; }); System.out.println(Thread.currentThread().getName() + "主线程先去忙其它任务" ); threadPool.shutdown(); } }
(3).对计算结果进行消费 接收任务的处理结果, 并消费处理, 无返回结果 thenAccept
1 public CompletableFuture<Void> thenAccept (Consumer<? super T> action) { ... }
对比补充 Code之任务之间的顺序执行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public CompletableFuture<Void> thenRun (Runnable action) { return uniRunStage(null , action); }public CompletableFuture<Void> thenAccept (Consumer<? super T> action) { return uniAcceptStage(null , action); }public <U> CompletableFuture<U> thenApply ( Function<? super T,? extends U> fn) { return uniApplyStage(null , fn); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 package icu.xiamu.juc.cf;import java.util.concurrent.CompletableFuture;public class CompletableFutureAPI3Demo { public static void main (String[] args) { System.out.println(CompletableFuture.supplyAsync(() -> "resultA" ).thenRun(() -> {}).join()); System.out.println(CompletableFuture.supplyAsync(() -> "resultA" ).thenAccept(r -> System.out.println(r)).join()); System.out.println(CompletableFuture.supplyAsync(() -> "resultA" ).thenApply(r -> r + "resultB" ).join()); } private static void demo1 () { CompletableFuture.supplyAsync(() -> { return 1 ; }).thenApply(f -> { return f + 2 ; }).thenApply(f -> { return f + 3 ; }).thenAccept(r -> { System.out.println(r); }); } }
以thenRun和thenRunAsync为例, 有什么区别?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 package icu.xiamu.juc.cf;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.TimeUnit;public class CompletableFutureWithThreadPoolDemo { public static void main (String[] args) { ExecutorService threadPool = Executors.newFixedThreadPool(5 ); try { CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> { try { TimeUnit.MILLISECONDS.sleep(20 ); } catch (InterruptedException e) { throw new RuntimeException (e); } System.out.println("1号任务\t" + Thread.currentThread().getName()); return "abcd" ; }, threadPool).thenRunAsync(() -> { try { TimeUnit.MILLISECONDS.sleep(20 ); } catch (InterruptedException e) { throw new RuntimeException (e); } System.out.println("2号任务\t" + Thread.currentThread().getName()); }).thenRun(() -> { try { TimeUnit.MILLISECONDS.sleep(20 ); } catch (InterruptedException e) { throw new RuntimeException (e); } System.out.println("3号任务\t" + Thread.currentThread().getName()); }).thenRun(() -> { try { TimeUnit.MILLISECONDS.sleep(20 ); } catch (InterruptedException e) { throw new RuntimeException (e); } System.out.println("4号任务\t" + Thread.currentThread().getName()); }); System.out.println(completableFuture.get(2L , TimeUnit.SECONDS)); } catch (Exception e) { throw new RuntimeException (e); } finally { threadPool.shutdown(); } } }
(4).对计算速度进行选用 谁快用谁 applyToEither
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 package icu.xiamu.juc.cf;import java.util.concurrent.CompletableFuture;import java.util.concurrent.TimeUnit;public class CompletableFutureFastDemo { public static void main (String[] args) { CompletableFuture<String> playA = CompletableFuture.supplyAsync(() -> { System.out.println("A come in" ); try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { throw new RuntimeException (e); } return "playA" ; }); CompletableFuture<String> playB = CompletableFuture.supplyAsync(() -> { System.out.println("B come in" ); try { TimeUnit.SECONDS.sleep(2 ); } catch (InterruptedException e) { throw new RuntimeException (e); } return "playB" ; }); CompletableFuture<String> result = playA.applyToEither(playB, f -> { return f + " is winner" ; }); System.out.println(Thread.currentThread().getName() + "\t" + result.join()); } }
(5).对计算结果进行合并 两个CompletionStage任务完成后, 最终能把两个任务的结果一起交给thenCombine来处理 先完成的先等着, 等待其它分支任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 package icu.xiamu.juc.cf;import java.util.concurrent.CompletableFuture;import java.util.concurrent.TimeUnit;public class CompletableFutureCombineDemo { public static void main (String[] args) { demo2(); } private static void demo2 () { CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + "启动" ); try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { throw new RuntimeException (e); } return 10 ; }).thenCombine(CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + "启动" ); try { TimeUnit.SECONDS.sleep(2 ); } catch (InterruptedException e) { throw new RuntimeException (e); } return 20 ; }), (x, y) -> { System.out.println("开始两个结果合并" ); return x + y; }); System.out.println(completableFuture.join()); } private static void demo1 () { CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + "启动" ); try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { throw new RuntimeException (e); } return 10 ; }); CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + "启动" ); try { TimeUnit.SECONDS.sleep(2 ); } catch (InterruptedException e) { throw new RuntimeException (e); } return 20 ; }); CompletableFuture<Integer> result = completableFuture1.thenCombine(completableFuture2, (x, y) -> { System.out.println("开始两个结果合并" ); return x + y; }); System.out.println(result.join()); } }
3.说说Java”锁”事 1.大厂面试题复盘
2.从轻松的乐观锁和悲观锁开讲 1.悲观锁 认为自己在使用数据的时候一定有别的线程来修改数据,因此在获取数据的时候会先加锁,确保数据不会被别的线程修改。 synchronized关键字和Lock的实现类都是悲观锁
适合写操作多的场景,先加锁可以保证写操作时数据正确 显式的锁定之后再操作同步资源 一句话: 狼性锁
适合读操作多的场景,不加锁的特点能够使其读操作的性能大幅提升。 乐观锁则直接去操作同步资源,是一种无锁算法,得之我幸不得我命,再努力就是 一句话: 佛系锁 乐观锁一般有两种实现方式:
CAS(Compare-and-Swap, 即比较并交换)算法实现
3.通过8种情况演示锁运行案例, 看看我们到底锁的是什么 阿里巴巴开发手册
(1).锁相关的8种案例演示code 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 package icu.xiamu.juc.locks;import java.util.concurrent.TimeUnit;public class Lock8Demo { public static void main (String[] args) { Phone phone = new Phone (); Phone phone2 = new Phone (); new Thread (() -> { phone.sendEmail(); }, "a" ).start(); new Thread (() -> { phone.sendSMS(); }, "b" ).start(); } }class Phone { public static synchronized void sendEmail () { try { TimeUnit.SECONDS.sleep(3 ); } catch (InterruptedException e) { throw new RuntimeException (e); } System.out.println("sendEmail" ); } public static synchronized void sendSMS () { System.out.println("sendSMS" ); } public void hello () { System.out.println("hello" ); } }
synchronized function() {} 对象锁 static synchronized function() {} 类锁 类锁的锁的粒度比对象锁更大
(3).从字节码角度分析synchronized实现 javap -c ***.class 文件反编译 -c 表示对代码进行反汇编 假如需要更多信息, javap -v ***.class 文件反编译 -v -verbose 输出附加信息(包括行号, 本地变量表, 反汇编等详细信息)
synchronized同步代码块 javap -c ***.class 文件反编译
synchronized同步代码块, 一定是一个enter对应两个exit吗?
synchronized普通同步方法 synchronized静态同步方法
(4).反编译synchronized锁的是什么 (5).对于Synchronized关键字 后续章节讲解
4.公平锁和非公平锁 5.可重入锁(又名递归锁) 6.死锁及排查 7.写锁(独占锁)/读锁(共享锁) 8.自旋锁SpinLock 9.无锁 -> 独占锁 -> 读写锁 -> 邮戳锁 10.无锁 -> 偏向锁 -> 轻量锁 -> 重量锁 4.LockSupport与线程中断 5.Java内存模型之JMM 6.volatile与JMM 7.CAS 8…
