简单谈谈Juc并发编程
前言
本课程学习与B站狂神说Java的JUC并发编程
本课程的代码都放在了我的个人gitee仓库 上了
什么是JUC?
java.util.concurrent juc
java.util.concurrent.atomic 原子性
java.util.concurrent.locks 锁
平时业务中可能用Thread
或者像Runnable接口实现,没有返回值,而且效率相对于callable较低
java.util.concurrent
Interface Callable
进程与线程
我们都知道计算机的核心是CPU,它承担了所有的计算任务,而操作系统是计算机的管理者,它负责任务的调度,资源的分配和管理,统领整个计算机硬件
进程是一个具有一定独立功能的程序在一个数据集上的一次动态执行的过程,是操作系统进行资源分配和调度的一个独立单位,是应用程序运行的载体
线程是程序执行中一个单一的顺序控制流程,是程序执行流的最小单元,是处理器调度和分派的基本单位。一个进程可以有一个或多个线程,各个线程之间共享程序的内存空间(也就是所在进程的内存空间)
进程:一个程序,QQ.exe Music.exe 程序的集合;
一个进程往往可以包含多个线程,至少包含一个!
Java默认有几个线程? 2 个 main、GC
线程:开了一个进程 Typora,写字,自动保存(线程负责的)
对于Java而言:Thread、Runnable、Callable
Java 真的可以开启线程吗? 开不了
1 2 private native void start0 () ;
线程有几个状态?
Thread.State可以看到,是一个枚举
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public enum State { NEW, RUNNABLE, BLOCKED, WAITING, TIMED_WAITING, TERMINATED; }
并发和并行
并发是指两个或多个事件在同一时间间隔发生->交替进行
并行是指两个或者多个事件在同一时刻发生 ->同时进行
并发编程的目标是充分的利用cpu的每一个核,以达到最高的处理性能
1 2 System.out.println(Runtime.getRuntime().availableProcessors());
wait/sleep 区别
1、来自不同的类
wait => Object
sleep => Thread
2、关于锁的释放
wait 会释放锁
sleep 睡觉了,抱着锁睡觉,不会释放!
3、使用的范围是不同的
wait必须在synchronized同步代码块中使用
sleep可以在任何地方睡
4、是否需要捕获异常 (存疑)
throws InterruptedException
wait 也需要捕获异常(实测提示需要捕获异常,且不捕获会报错!)
sleep 必须要捕获异常
Lock锁
只要是并发编程,就一定需要有锁!
传统的synchronized锁
此处不谈线程池,讲普通的方法
解耦线程类,不必要再去写一个单独的线程类继承Runnable接口
而是使用lambda表达式()->{}
实现Runnable接口来创建线程
1 2 3 new Thread(()->{ },"Name" ).start();
然后synchronized锁方法上,锁住这个对象
1 2 3 4 public synchronized void sale () { if (num<=0 )return ; System.out.println(Thread.currentThread().getName()+" 买到第" +(num--)+"张票,剩下" +num+"张票" ); }
还是老生常谈的卖票
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 public static void main (String[] args) throws InterruptedException { Ticket ticket = new Ticket(); new Thread(()->{ for (int i=0 ;i<100 ;i++){ ticket.sale(); try { Thread.sleep(10 ); } catch (InterruptedException e) { e.printStackTrace(); } } },"A" ).start(); new Thread(()->{ for (int i=0 ;i<100 ;i++){ ticket.sale(); try { Thread.sleep(10 ); } catch (InterruptedException e) { e.printStackTrace(); } } },"B" ).start(); new Thread(()->{ for (int i=0 ;i<100 ;i++){ ticket.sale(); try { Thread.sleep(10 ); } catch (InterruptedException e) { e.printStackTrace(); } } },"C" ).start(); } static class Ticket { private int num=100 ; public synchronized void sale () { if (num<=0 )return ; System.out.println(Thread.currentThread().getName()+" 买到第" +(num--)+"张票,剩下" +num+"张票" ); } }
Lock锁
java.util.concurrent.locks.Lock
的Lock是一个接口
建议的做法是始终 立即跟随lock
与try
块的通话
最常见的是在之前/之后的上锁lock.lock();
和解锁lock.unlock()
它有几个实现类:
ReentrantLock可重入锁
ReentrantReadWriteLock.ReadLock读锁
ReentrantReadWriteLock.WriteLock写锁
我们来看看可重入锁ReentrantLock
的构造器
1 2 3 4 5 6 7 8 public ReentrantLock () { sync = new NonfairSync(); } public ReentrantLock (boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
公平锁:公平:需要先来后到
非公平锁:不公平:可以插队 (默认)
使用三部曲
Lock lock=new ReentrantLock();
实例化锁对象
lock.lock();
上锁
在finally中 lock.unlock();
解锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 static class Ticket { private int num=100 ; Lock lock=new ReentrantLock(); public void sale () { lock.lock(); try { if (num<=0 )return ; System.out.println(Thread.currentThread().getName()+" 买到第" +(num--)+"张票,剩下" +num+"张票" ); }catch (Exception e){ e.printStackTrace(); }finally { lock.unlock(); } } }
synchronized和lock的区别
Synchronized 内置的Java关键字 , Lock 是一个Java类
Synchronized 无法判断获取锁的状态 ,Lock 可以判断是否获取到了锁
Synchronized 会自动释放 锁,lock 必须要手动释放 锁!如果不释放锁,死锁
如果在Synchronized中出现异常,会自动释放锁
Synchronized 线程 1(获得锁,阻塞)、线程2(等待,傻傻的等);Lock锁就不一定会等待下去;
lock.tryLock()
尝试获取锁 ,获取不到就自己掉头走了不会等下去
Synchronized 可重入锁,不可以中断的,非公平;Lock ,可重入锁,可以判断锁,非公平(可以自己设置)
Synchronized 适合锁少量的代码同步问题,Lock 适合锁大量的同步代码!
生产者、消费者问题
面试:单例模式、排序算法、生产消费者、死锁
老版的synchronized实现
当num为0时,消费者等待,生产者生成消息
当num>=0时,生产者等待,消费者进行消费
我们先来看一下这段问题代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 public static void main (String[] args) throws InterruptedException { Data data = new Data(); new Thread(()->{ for (int i=0 ;i<100 ;i++){ try { data.pro(); } catch (InterruptedException e) { e.printStackTrace(); } } },"ProducerA" ).start(); new Thread(()->{ for (int i=0 ;i<100 ;i++){ try { data.con(); } catch (InterruptedException e) { e.printStackTrace(); } } },"ConsumerA" ).start(); } static class Data { private int num=0 ; public synchronized void pro () throws InterruptedException { if (num!=0 ){ System.out.println(Thread.currentThread().getName()+"正在等待" ); this .wait(); } num++; System.out.println(Thread.currentThread().getName()+" 生产者生产了一条消息,此时num=" +num); this .notifyAll(); } public synchronized void con () throws InterruptedException { if (num==0 ){ System.out.println(Thread.currentThread().getName()+"正在等待" ); this .wait(); } num--; System.out.println(Thread.currentThread().getName()+" 消费者消费了一条消息,此时num=" +num); this .notifyAll(); } }
这个时候代码会正确运行嘛,结论是会的
那如果我们放置多个 producer和consumer呢?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 new Thread(()->{ for (int i=0 ;i<100 ;i++){ try { data.pro(); } catch (InterruptedException e) { e.printStackTrace(); } } },"ProducerA" ).start(); new Thread(()->{ for (int i=0 ;i<100 ;i++){ try { data.pro(); } catch (InterruptedException e) { e.printStackTrace(); } } },"ProducerB" ).start(); new Thread(()->{ for (int i=0 ;i<100 ;i++){ try { data.con(); } catch (InterruptedException e) { e.printStackTrace(); } } },"ConsumerA" ).start(); new Thread(()->{ for (int i=0 ;i<100 ;i++){ try { data.con(); } catch (InterruptedException e) { e.printStackTrace(); } } },"ConsumerB" ).start();
可以看见有很大几率会出问题
ConsumerA 消费者消费了一条消息,此时num=-92
ConsumerA 消费者消费了一条消息,此时num=-93
ConsumerA 消费者消费了一条消息,此时num=-94
ConsumerA 消费者消费了一条消息,此时num=-95
ConsumerA 消费者消费了一条消息,此时num=-96
ConsumerB 消费者消费了一条消息,此时num=-97
ProducerB 生产者生产了一条消息,此时num=-96
正在等待
这里出现的就是虚假唤醒
查看Object的wait方法的api文档可以看见
线程也可以唤醒,而不会被通知,中断或超时,即所谓的虚假唤醒
比如说买货,如果商品本来没有货物,突然进了一件商品,这是所有的线程都被唤醒了 ,但是只能一个人买,所以其他人都是假唤醒,获取不到对象的锁
虽然这在实践中很少会发生,但应用程序必须通过测试应该使线程被唤醒的条件来防范,并且如果条件不满足则继续等待。
换句话说,等待应该总是出现在循环中
为什么if块会存在虚假唤醒的情况?
在if块中使用wait方法,是非常危险的,因为一旦线程被唤醒,并得到锁,就不会再判断if条件,而执行if语句块外的代码
所以建议,凡是先要做条件判断,再wait的地方,都使用while循环来做
1 2 3 4 5 synchronized (obj) { while (<condition does not hold>) obj.wait(timeout); ... }
所以我们将原有的代码将if改为while
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public synchronized void pro () throws InterruptedException { while (num!=0 ){ System.out.println(Thread.currentThread().getName()+"正在等待" ); this .wait(); } num++; System.out.println(Thread.currentThread().getName()+" 生产者生产了一条消息,此时num=" +num); this .notifyAll(); } public synchronized void con () throws InterruptedException { while (num==0 ){ System.out.println(Thread.currentThread().getName()+"正在等待" ); this .wait(); } num--; System.out.println(Thread.currentThread().getName()+" 消费者消费了一条消息,此时num=" +num); this .notifyAll(); }
juc版的生产者和消费者实现
使用Lock和Condition两个接口
其中lock对象我们这使用ReentrantLock
实例化
condition对象使用lock.newCondition()
获取
Condition实现可以提供Object监视器方法的行为和语义,例如有保证的通知顺序,或者在执行通知时不需要
锁定
一个Condition实例本质上绑定到一个锁。 要获得特定Condition实例的Condition实例,请使用其
newCondition()方法
Condition因素出Object监视器方法( wait,notify 和 notifyAll )成不同的对象,以得到具有多个等待集的每个对象,通过将它们与使用任意的组合的效果Lock个实现。
Lock替换synchronized方法和语句的使用, Condition取代了对象监视器方法的使用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 static class Data { private int num=0 ; Lock lock=new ReentrantLock(); Condition condition = lock.newCondition(); public void pro () { lock.lock(); try { while (num!=0 ){ System.out.println(Thread.currentThread().getName()+"正在等待" ); condition.await(); } num++; System.out.println(Thread.currentThread().getName()+" 生产者生产了一条消息,此时num=" +num); condition.signalAll(); }catch (Exception e){ e.printStackTrace(); }finally { lock.unlock(); } } public void con () { lock.lock(); try { while (num==0 ){ System.out.println(Thread.currentThread().getName()+"正在等待" ); condition.await(); } num--; System.out.println(Thread.currentThread().getName()+" 消费者消费了一条消息,此时num=" +num); condition.signalAll(); }catch (Exception e){ e.printStackTrace(); }finally { lock.unlock(); } } }
也许会有人问:既然synchronized更简洁,这里反而还多加了一层condition,岂不是更麻烦了?
当然不是
Lock+Condition与synchronized的区别
设置多个Condition监视器可以实现精准的通知 和唤醒线程
个人理解:不用就等待,需要则唤醒
Condition监视器的精准唤醒
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;public class Test03 { public static void main (String[] args) { Data data=new Data(); new Thread(()->{ for (int i=0 ;i<10 ;i++){ data.soutA(); } },"A" ).start(); new Thread(()->{ for (int i=0 ;i<10 ;i++){ data.soutB(); } },"B" ).start(); new Thread(()->{ for (int i=0 ;i<10 ;i++){ data.soutC(); } },"C" ).start(); } static class Data { private Lock lock=new ReentrantLock(); private Condition condition1=lock.newCondition(); private Condition condition2=lock.newCondition(); private Condition condition3=lock.newCondition(); private int num=1 ; public void soutA () { lock.lock(); try { while (num!=1 ){ condition1.await(); } System.out.println("AAAAAA" ); num=2 ; condition2.signal(); }catch (Exception e){ e.printStackTrace(); }finally { lock.unlock(); } } public void soutB () { lock.lock(); try { while (num!=2 ){ condition2.await(); } System.out.println("BBBBBB" ); num=3 ; condition3.signal(); }catch (Exception e){ e.printStackTrace(); }finally { lock.unlock(); } } public void soutC () { lock.lock(); try { while (num!=3 ){ condition3.await(); } System.out.println("CCCCCC" ); num=1 ; condition1.signal(); }catch (Exception e){ e.printStackTrace(); }finally { lock.unlock(); } } } }
结果
AAAAAA
BBBBBB
CCCCCC
AAAAAA
BBBBBB
CCCCCC
其实condition还有awaitNanos超时等待 和awaitUntil超时时间等待,下文ArrayBlockingQueue会讲到
八锁问题
1.锁对象的同步锁synchronized
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 import java.util.concurrent.TimeUnit;public class Test1 { public static void main (String[] args) { Phone phone = new Phone(); new Thread(()->{ phone.mail(); },"A" ).start(); try { TimeUnit.SECONDS.sleep(1 ); }catch (Exception e){ e.printStackTrace(); } new Thread(()->{ phone.call(); },"B" ).start(); } static class Phone { public synchronized void mail () { try { TimeUnit.SECONDS.sleep(4 ); }catch (Exception e){ e.printStackTrace(); } System.out.println("发邮件" ); } public synchronized void call () { System.out.println("打电话" ); } } }
2.synchronized和普通方法不同步
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 import java.util.concurrent.TimeUnit;public class Test2 { public static void main (String[] args) { Phone phone = new Phone(); new Thread(()->{ phone.mail(); },"A" ).start(); try { TimeUnit.SECONDS.sleep(1 ); }catch (Exception e){ e.printStackTrace(); } new Thread(()->{ phone.call(); },"B" ).start(); } static class Phone { public synchronized void mail () { try { TimeUnit.SECONDS.sleep(4 ); }catch (Exception e){ e.printStackTrace(); } System.out.println("发邮件" ); } public void call () { System.out.println("打电话" ); } } }
3.锁class的同步锁synchronized
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 import java.util.concurrent.TimeUnit;public class Test3 { public static void main (String[] args) { Phone phone = new Phone(); new Thread(()->{ phone.mail(); },"A" ).start(); try { TimeUnit.SECONDS.sleep(1 ); }catch (Exception e){ e.printStackTrace(); } new Thread(()->{ phone.call(); },"B" ).start(); Phone phone1 = new Phone(); Phone phone2 = new Phone(); new Thread(()->{ phone1.mail(); },"A" ).start(); try { TimeUnit.SECONDS.sleep(1 ); }catch (Exception e){ e.printStackTrace(); } new Thread(()->{ phone2.call(); },"B" ).start(); } static class Phone { public static synchronized void mail () { try { TimeUnit.SECONDS.sleep(4 ); }catch (Exception e){ e.printStackTrace(); } System.out.println("发邮件" ); } public static synchronized void call () { System.out.println("打电话" ); } } }
4.静态synchronized和普通synchronized锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 import java.util.concurrent.TimeUnit;public class Test4 { public static void main (String[] args) { Phone phone = new Phone(); new Thread(()->{ phone.mail(); },"A" ).start(); try { TimeUnit.SECONDS.sleep(1 ); }catch (Exception e){ e.printStackTrace(); } new Thread(()->{ phone.call(); },"B" ).start(); } static class Phone { public static synchronized void mail () { try { TimeUnit.SECONDS.sleep(4 ); }catch (Exception e){ e.printStackTrace(); } System.out.println("发邮件" ); } public synchronized void call () { System.out.println("打电话" ); } } }
总结
synchronized在代码块 或者普通方法 中,锁住的是方法的调用者 (实例化对象)
在静态方法 中,锁住的类的class
对象锁和class锁不同,所以不需要同步
不安全的List类
我们之前使用的集合都是在单线程情况下,所以没有出现问题,但是其实很多都是不安全的
例如我们平时经常使用的ArrayList
1 2 3 4 5 6 7 8 9 List<String> stringList=new ArrayList<>(); for (int i=1 ;i<=100 ;i++){ new Thread(()->{ stringList.add(UUID.randomUUID().toString().substring(0 ,5 )); System.out.println(stringList); },i+"" ).start(); }
单线程玩多了,乍一看没什么问题,可是这是在多线程的情况下,就会出现并发修改异常
如何优化让他变成线程安全的呢?
1.使用Vector代替
Vector的增删改查都加上了同步锁synchronized,使得线程安全
但是效率怎么样呢?我们下文再说
1 2 3 4 5 6 7 8 List<String> stringVector=new Vector<>(); for (int i=1 ;i<=100 ;i++){ new Thread(()->{ stringVector.add(UUID.randomUUID().toString().substring(0 ,5 )); System.out.println(stringVector); },i+"" ).start(); }
2.使用Collections转synchronizedList
使用Collections.synchronizedList()方法将普通list转为线程安全的list
1 2 3 4 5 6 7 8 9 10 List<String> stringList=new ArrayList<>(); List<String> synchronizedList = Collections.synchronizedList(stringList); for (int i=1 ;i<=100 ;i++){ new Thread(()->{ synchronizedList.add(UUID.randomUUID().toString().substring(0 ,5 )); System.out.println(synchronizedList); },i+"" ).start(); }
如何既保证线程安全,效率也高呢
使用JUC的CopyOnWriteArrayList
JUC:使用CopyOnWriteArrayList,解决并发
COW :写入时复制,一种优化策略
list是唯一固定的,多个线程读取时是固定的,但是写入时有可能会覆盖
COW写入时避免了覆盖,防止了数据问题
1 2 3 4 5 6 7 8 9 10 List<String> list = new CopyOnWriteArrayList<>(); for (int i=1 ;i<=100 ;i++){ new Thread(()->{ list.add(UUID.randomUUID().toString().substring(0 ,5 )); System.out.println(list); },i+"" ).start(); }
怎么解决的?
写入时先复制一份长度+1的数组,然后末尾插入数据,再把数组赋给原数组完成插入
插入源码为例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public boolean add (E e) { final ReentrantLock lock = this .lock; lock.lock(); try { Object[] elements = getArray(); int len = elements.length; Object[] newElements = Arrays.copyOf(elements, len + 1 ); newElements[len] = e; setArray(newElements); return true ; } finally { lock.unlock(); } }
CopyOnWriteArrayList比vector好在哪?
Vector 的增删改查 方法都加上了synchronized锁 ,保证同步的情况下,每个方法都要去获得锁,所以性能会下降
CopyOnWriteArrayList 方法只是在增删改 方法上增加了ReentrantLock锁
但是他的读方法不加锁 ,==读写分离==,所以在读的方面就要比Vector性能要好
CopyOnWriteArrayList适合读多写少 的并发情况
不安全的Set类
和上面list差不多,我就不多做讲解了,直接贴代码
多线程下报错
1 2 3 4 5 6 7 8 HashSet<String> set = new HashSet<>(); for (int i=1 ;i<=100 ;i++){ new Thread(()->{ set.add(UUID.randomUUID().toString().substring(0 ,5 )); System.out.println(set); },i+"" ).start(); }
解决方案
1.转synchronizedSet
1 2 3 4 5 6 7 8 HashSet<String> set = new HashSet<>(); Set<String> synchronizedSet = Collections.synchronizedSet(set); for (int i=1 ;i<=100 ;i++){ new Thread(()->{ synchronizedSet.add(UUID.randomUUID().toString().substring(0 ,5 )); System.out.println(synchronizedSet); },i+"" ).start(); }
2.使用CopyOnWriteArraySet
1 2 3 4 5 6 7 Set<String> set = new CopyOnWriteArraySet<>(); for (int i=1 ;i<=100 ;i++){ new Thread(()->{ set.add(UUID.randomUUID().toString().substring(0 ,5 )); System.out.println(set); },i+"" ).start(); }
简单说明一下HashSet的实现
这里提一嘴HashSet的实现,说了不一定加分,但是说不出来一定扣分
本质就是就是new的HashMap ,然后new的Object当做HashMap的value,add的参数当做key
因为是hash算法,所以HashSet是无序的
因为key不能重复,所以HashSet的的元素是不能重复的
1 2 3 4 5 6 7 8 9 10 private transient HashMap<E,Object> map; private static final Object PRESENT = new Object();public HashSet () { map = new HashMap<>(); } public boolean add (E e) { return map.put(e, PRESENT)==null ; }
不安全的Map类
单线程中我们经常使用的HashMap在多线程下也是不安全的
1 2 3 4 5 6 7 8 HashMap<String, String> map = new HashMap<>(); for (int i=1 ;i<=100 ;i++){ new Thread(()->{ map.put(Thread.currentThread().getName(), UUID.randomUUID().toString().substring(0 ,5 )); System.out.println(map); }).start(); }
1.用Hashtable代替
1 2 3 4 5 6 7 Map<String, String> hashtable = new Hashtable<>(); for (int i = 1 ; i <= 100 ; i++) { new Thread(() -> { hashtable.put(Thread.currentThread().getName(), UUID.randomUUID().toString().substring(0 , 5 )); System.out.println(hashtable); }).start(); }
和之前的Vector代替ArrayList一样,用synchronized简单粗暴的加上同步保证线程安全,只是效率可能会低一些
2.转synchronizedMap
1 2 3 4 5 6 7 8 HashMap<String, String> map = new HashMap<>(); Map<String, String> synchronizedMap = Collections.synchronizedMap(map); for (int i = 1 ; i <= 100 ; i++) { new Thread(() -> { synchronizedMap.put(Thread.currentThread().getName(), UUID.randomUUID().toString().substring(0 , 5 )); System.out.println(synchronizedMap); }).start(); }
3.使用ConcurrentHashMap
使用java.util.concurrent.ConcurrentHashMap
并发的HashMap,在保证了线程安全的情况下也保证了效率的高效,推荐使用
对ConcurrentHashMap不熟悉的小伙伴可以看看我的《简单谈谈ConcurrentHashMap》
1 2 3 4 5 6 7 Map<String, String> concurrentHashMap = new ConcurrentHashMap<>(); for (int i = 1 ; i <= 100 ; i++) { new Thread(() -> { concurrentHashMap.put(Thread.currentThread().getName(), UUID.randomUUID().toString().substring(0 , 5 )); System.out.println(concurrentHashMap); }).start(); }
走进Callable
返回结果并可能引发异常的任务。 实现者定义一个没有参数的单一方法,称为call
Callable接口类似于Runnable ,因为它们都是为其实例可能由另一个线程执行的类设计的
然而,A Runnable不返回结果,也不能抛出被检查的异常
可以有返回值
可以抛出异常
方法不同
Runnable是run()
Callable是call()
老版本创建线程的两种方式
1.extends Thread
1 2 3 4 5 6 7 8 9 10 11 public static void main (String[] args) { new MyThread().start(); } static class MyThread extends Thread { @Override public void run () { System.out.println("class MyThread extends Thread" ); } }
2.实现Runnable接口
1 2 3 4 5 6 7 8 9 10 public static void main (String[] args) { new Thread(new MyRun()).start(); } static class MyRun implements Runnable { @Override public void run () { System.out.println("class MyRun implements Runnable" ); } }
使用Callable创建线程
我们这里需要一个适配类FutureTask
这个类实现了RunnableFuture类,FutureTask<V> implements RunnableFuture<V>
RunnableFuture类继承了Runnable,RunnableFuture<V> extends Runnable, Future<V>
所以Thread(Runnable target)
可以将其传入
注意:futureTask.get()
可以获取返回结果 ,但是可能会抛异常,需要捕获或抛出
因为要等待执行完毕才返回,所以有可能会阻塞,最好把它放在最后,或者异步通信来处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 public static void main (String[] args) { MyCall callable = new MyCall(); FutureTask<Integer> futureTask = new FutureTask<>(callable); new Thread(futureTask).start(); try { Integer integer = futureTask.get(); System.out.println(integer); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } static class MyCall implements Callable <Integer > { @Override public Integer call () throws Exception { System.out.println("class MyCall implements Callable<Integer>" ); return 1024 ; } }
FutureTask的状态
如果我们此时用同一个FutureTask传入两条线程,会输出两次结果吗?
1 2 3 4 5 6 7 8 9 MyCall callable = new MyCall(); FutureTask<Integer> futureTask = new FutureTask<>(callable); new Thread(futureTask,"A" ).start();new Thread(futureTask,"B" ).start();
答案是:不会 ,只会执行一次!
为什么?
我们看看源码
可以看到FutureTask有一个state表示状态变量,还有很多int类型的常量表示具体状态
这里我们暂时只关注NEW和COMPLETING
1 2 3 4 5 6 7 8 9 public class FutureTask <V > implements RunnableFuture <V > {private volatile int state;private static final int NEW = 0 ;private static final int COMPLETING = 1 ;private static final int NORMAL = 2 ;private static final int EXCEPTIONAL = 3 ;private static final int CANCELLED = 4 ;private static final int INTERRUPTING = 5 ;private static final int INTERRUPTED = 6 ;
在构造器中,默认给state为NEW
1 2 3 4 5 6 public FutureTask (Callable<V> callable) { if (callable == null ) throw new NullPointerException(); this .callable = callable; this .state = NEW; }
第一条线程进入
在run方法中,执行了callable的call方法后,会将判断变量ran设置为trueif (ran) set(result);
而在set方法中UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)
将NEW状态变为了COMPLETING
也就是说此条FutureTask已经完成了他的使命,变为COMPLETING完成状态
当下一条线程进来判断state != NEW
时,直接return
所以执行了一次之后,其他的线程都无法继续执行run,也就是Callable的call方法了
所以可以得出结论:正常情况下,一个FutureTask只能执行一次call
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public void run () { if (state != NEW || !UNSAFE.compareAndSwapObject(this , runnerOffset, null , Thread.currentThread())) return ; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true ; } catch (Throwable ex) { result = null ; ran = false ; setException(ex); } if (ran) set(result); } } finally { runner = null ; int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
常用的辅助类
CountDownLatch减法计数器
java.util.concurrent.CountDownLatch
减法计数器
允许一个或多个线程等待直到在其他线程中执行的一组操作完成的同步辅助类
可用于某些线程的强制执行
CountDownLatch用给定的计数初始化。 await方法阻塞,直到由于countDown()方法的调用而导致当前计数达到零,之后所有等待线程被释放,并且任何后续的await 调用立即返回。 这是一个一次性的现象 - 计数无法重置。 如果您需要重置计数的版本,请考虑使用CyclicBarrier 。
A CountDownLatch是一种通用的同步工具,可用于多种用途。 一个CountDownLatch为一个计数的CountDownLatch用作一个简单的开/关锁存器,或者门:所有线程调用await在门口等待,直到被调用countDown()的线程打开。 一个CountDownLatch初始化N可以用来做一个线程等待,直到N个线程完成某项操作,或某些动作已经完成N次。
CountDownLatch一个有用的属性是,它不要求调用countDown线程等待计数到达零之前继续,它只是阻止任何线程通过await ,直到所有线程可以通过
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 import java.util.concurrent.CountDownLatch;public class CountDownLatchDemo { public static void main (String[] args) throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(12 ); for (int i=1 ;i<=6 ;i++){ try { Thread.sleep(500 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("第" +i+"次执行countDown" ); countDownLatch.countDown(); } countDownLatch.await(); System.out.println("关门" ); } }
CyclicBarrier加法计数器
java.util.concurrent.CyclicBarrier
加法计数器
允许一组线程全部等待彼此达到共同屏障点的同步辅助类
可以用于某些线程的强制等待
循环阻塞在涉及固定大小的线程方的程序中很有用,这些线程必须偶尔等待彼此。 屏障被称为循环 ,因为它可以在等待的线程被释放之后重新使用。
A CyclicBarrier支持一个可选的Runnable命令,每个屏障点运行一次,在派对中的最后一个线程到达之后,但在任何线程释放之前。 在任何一方继续进行之前,此屏障操作对更新共享状态很有用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 import java.util.concurrent.BrokenBarrierException;import java.util.concurrent.CyclicBarrier;public class CyclicBarrierDemo { public static void main (String[] args) throws InterruptedException, BrokenBarrierException { CyclicBarrier cyclicBarrier = new CyclicBarrier(7 ,()->{ System.out.println("集齐7颗龙珠召唤神龙成功" ); }); for (int i=1 ;i<=7 ;i++){ Thread.sleep(500 ); int finalI = i; new Thread(()->{ System.out.println(Thread.currentThread().getName()+"收集了第" +finalI+"颗龙珠" ); try { cyclicBarrier.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } },i+"" ).start(); } } }
Semaphore信号量
一个计数信号量,在概念上,信号量维持一组许可证。
如果有必要,每个acquire()都会阻塞,直到许可证可用,然后才能使用它。 每个release()添加许可证,潜在地释放阻塞获取方
但是,没有使用实际的许可证对象; Semaphore只保留可用数量的计数,并相应地执行
信号量通常用于限制线程数 ,而不是访问某些(物理或逻辑)资源
我们这假设有3个车位和6辆车需要停车,所以只能有3台车能停进去,其他的车需要等待车位空出才能停
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 import java.util.concurrent.Semaphore;import java.util.concurrent.TimeUnit;public class SemaphoreDemo { public static void main (String[] args) { Semaphore semaphore = new Semaphore(3 ); for (int i=1 ;i<=6 ;i++){ new Thread(()->{ try { semaphore.acquire(); System.out.println(Thread.currentThread().getName()+"线程抢到车位" ); TimeUnit.SECONDS.sleep(3 ); System.out.println(Thread.currentThread().getName()+"线程离开车位" ); }catch (Exception e){ e.printStackTrace(); }finally { semaphore.release(); } },i+"" ).start(); } } }
ReadWriteLock读写锁
ReadWriteLock维护一对关联的locks ,一个用于只读操作,一个用于写入
读锁和写锁之间是互斥的 ,同一时间只能有一个在运行
读的时候可以被多个线程同时读
写的时候只能由一个线程来写
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 import java.util.HashMap;import java.util.Map;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReadWriteLock;import java.util.concurrent.locks.ReentrantReadWriteLock;public class ReadWriteLockDemo { public static void main (String[] args) throws InterruptedException { MyCache myCache=new MyCache(); for (int i=1 ;i<=10 ;i++){ final int temp=i; new Thread(()->{ myCache.put(temp+"" ,temp); },"write" +i).start(); } for (int i=1 ;i<=10 ;i++){ int temp=i; new Thread(()->{ myCache.get(temp + "" ); },"read" +i).start(); } } static class MyCache { private volatile Map<String,Object> map=new HashMap<>(); private ReadWriteLock readWriteLock=new ReentrantReadWriteLock(); public void put (String key,Object value) { Lock writeLock = readWriteLock.writeLock(); writeLock.lock(); try { System.out.println(Thread.currentThread().getName()+"写入key = " +key); map.put(key,value); System.out.println(Thread.currentThread().getName()+"写入完毕" ); }catch (Exception e){ e.printStackTrace(); }finally { writeLock.unlock(); } } public void get (String key) { Lock readLock = readWriteLock.readLock(); readLock.lock(); try { System.out.println(Thread.currentThread().getName()+"读取key = " +key+",value = " +map.get(key)); }catch (Exception e){ e.printStackTrace(); }finally { readLock.unlock(); } } } }
阻塞队列BlockingQueue
什么时候会用到阻塞队列:多线程并发处理、线程池
和生产者消费者问题有点相似
写入:如果队列满了,就必须阻塞等待
取出:如果是队列是空的,必须阻塞等待生产
BlockingQueue的4组Api
方式
抛出异常
不抛异常且有返回值
阻塞等待
超时等待
添加
add()
offer()
put()
offer(,)
移除
remove()
poll()
take()
poll(,)
判断队列首
element()
peek
-
-
添加操作都会进行判空,所以不能放null
1 2 3 4 5 6 checkNotNull(e); private static void checkNotNull (Object v) { if (v == null ) throw new NullPointerException(); }
1.抛出异常
设定好队列大小后,这些操作都会抛出异常
队列满再添加:IllegalStateException: Queue full
队列空再取出:NoSuchElementException
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public static void throwException () { BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3 ); System.out.println(blockingQueue.add("a" )); System.out.println(blockingQueue.add("b" )); System.out.println(blockingQueue.add("c" )); System.out.println("blockingQueue.element() = " + blockingQueue.element()); System.out.println(blockingQueue.remove()); System.out.println(blockingQueue.remove()); System.out.println(blockingQueue.remove()); System.out.println("blockingQueue.element() = " + blockingQueue.element()); }
这里的情况和普通LinkedList的队列的异常一模一样
我们查看一下ArrayBlockingQueue
的源码
可以看到ArrayBlockingQueue
调用了他的父类AbstractQueue
的add()方法
而这个add方法调用的是offer方法 ,并在添加失败时主动抛出异常
(implement BlockingQueue的offer方法)
1 2 3 4 5 6 7 8 9 10 11 public boolean add (E e) { return super .add(e); } public boolean add (E e) { if (offer(e)) return true ; else throw new IllegalStateException("Queue full" ); }
element方法也调用的是peek方法
为空手动抛异常
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public E element () { E x = peek(); if (x != null ) return x; else throw new NoSuchElementException(); } public E peek () { final ReentrantLock lock = this .lock; lock.lock(); try { return itemAt(takeIndex); } finally { lock.unlock(); } }
2.不会抛出异常且有返回值
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public static void noExceptionAndReturn () { BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3 ); System.out.println(blockingQueue.offer("a" )); System.out.println(blockingQueue.offer("b" )); System.out.println(blockingQueue.offer("c" )); System.out.println("blockingQueue.peek() = " + blockingQueue.peek()); System.out.println(blockingQueue.offer("d" )); System.out.println(blockingQueue.poll()); System.out.println(blockingQueue.poll()); System.out.println(blockingQueue.poll()); System.out.println(blockingQueue.poll()); System.out.println("blockingQueue.peek() = " + blockingQueue.peek()); }
3.阻塞等待
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 public static void blockWait () throws InterruptedException { BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3 ); System.out.println("开始put" ); new Thread(()->{ try { blockingQueue.put("a" ); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); new Thread(()->{ try { blockingQueue.put("b" ); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); new Thread(()->{ try { blockingQueue.put("c" ); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); TimeUnit.SECONDS.sleep(1 ); new Thread(()->{ try { blockingQueue.put("d" ); System.out.println(Thread.currentThread().getName()+"put完毕" ); } catch (InterruptedException e) { e.printStackTrace(); } },"阻塞线程" ).start(); new Thread(()->{ try { blockingQueue.take(); System.out.println("再开一个" +Thread.currentThread().getName()+"take出来,阻塞线程能不能输出?可以" ); } catch (InterruptedException e) { e.printStackTrace(); } },"救急线程" ).start(); System.out.println("主线程能不能输出?能输出,不影响,因为阻塞的是上上面的那个阻塞线程" ); System.out.println("如果在主线程put的话,这里一样会被阻塞\n====================" ); }
因为put()
和take()
用了Condition 监视器,调用await
和single
实现了精准睡眠和唤醒
下面是解析
成员变量condition
上文有讲到,这里不再赘述
private final Condition notFull;
put方法
while (count == items.length)
队列满时,notFull.await();
线程等待
enqueue方法中notEmpty.signal();
唤醒阻塞的take线程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public void put (E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this .lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); } } private void enqueue (E x) { final Object[] items = this .items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0 ; count++; notEmpty.signal(); }
take方法
while (count == 0)
队列空时,notEmpty.await();
线程等待
dequeue方法中,notFull.signal();
唤醒阻塞的put线程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public E take () throws InterruptedException { final ReentrantLock lock = this .lock; lock.lockInterruptibly(); try { while (count == 0 ) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } } private E dequeue () { final Object[] items = this .items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null ; if (++takeIndex == items.length) takeIndex = 0 ; count--; if (itrs != null ) itrs.elementDequeued(); notFull.signal(); return x; }
4.超时等待
和阻塞等待类比就很好理解了
阻塞等待就是会一直死等 ,直到有其他线程操作队列才有可能被唤醒
超时等待在设定的时间内会等待,超时则放弃
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public static void outTimeWait () throws InterruptedException { BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3 ); blockingQueue.offer("a" ); blockingQueue.offer("b" ); blockingQueue.offer("c" ); System.out.println("普通offer方法,立即放弃->" +blockingQueue.offer("d" )); System.out.println("带参offer方法,等待后放弃->" +blockingQueue.offer("d" ,3 ,TimeUnit.SECONDS)); blockingQueue.poll(); blockingQueue.poll(); blockingQueue.poll(); System.out.println("普通poll方法,立即放弃->" + blockingQueue.poll()); System.out.println("带参poll方法,等待后放弃->" + blockingQueue.poll(3 ,TimeUnit.SECONDS)); }
我们来看看带参数offer源码
long nanos = unit.toNanos(timeout);
获取了超时时间
if (nanos <= 0)
判断计时是否结束
nacos<0,倒计时结束,return false;
放弃等待直接返回(心灰意冷)
nacos>0,还在计时nanos = notFull.awaitNanos(nanos);
,调用condition的awaitNanos继续计时等待(抱有希望)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public boolean offer (E e, long timeout, TimeUnit unit) throws InterruptedException { checkNotNull(e); long nanos = unit.toNanos(timeout); final ReentrantLock lock = this .lock; lock.lockInterruptibly(); try { while (count == items.length) { if (nanos <= 0 ) return false ; nanos = notFull.awaitNanos(nanos); } enqueue(e); return true ; } finally { lock.unlock(); } }
同步队列SynchronizedQueue
和其他的BlockingQueue不同,他不是用于储存元素
put了一个元素后就必须去take出来,不然就会等待(相当于只有1个空间的BlockingQueue?)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 import java.util.concurrent.BlockingQueue;import java.util.concurrent.SynchronousQueue;import java.util.concurrent.TimeUnit;public class SynchronizedQueueTest { public static void main (String[] args) { BlockingQueue<String> synchronousQueue = new SynchronousQueue<>(); new Thread(()->{ try { System.out.println(Thread.currentThread().getName()+"put a" ); synchronousQueue.put("a" ); System.out.println(Thread.currentThread().getName()+"put b" ); synchronousQueue.put("b" ); System.out.println(Thread.currentThread().getName()+"put c" ); synchronousQueue.put("c" ); } catch (InterruptedException e) { e.printStackTrace(); } },"Thead-put" ).start(); new Thread(()->{ try { TimeUnit.SECONDS.sleep(1 ); System.out.println(Thread.currentThread().getName()+"take = " + synchronousQueue.take()); TimeUnit.SECONDS.sleep(1 ); System.out.println(Thread.currentThread().getName()+"take = " + synchronousQueue.take()); TimeUnit.SECONDS.sleep(1 ); System.out.println(Thread.currentThread().getName()+"take = " + synchronousQueue.take()); } catch (InterruptedException e) { e.printStackTrace(); } },"Thread-take" ).start(); } }
线程池
平时使用线程时需要创建、销毁。十分浪费资源和时间
池化技术:事先准备好一些资源,有人要用,就来我这里拿,用完之后还给我
线程池的好处 :
降低资源的消耗
提高响应的速度
方便管理
线程复用 、可以控制最大并发数 、管理线程
线程池:3大方法 、7大参数 、**4种拒绝策略 **
阿里java规范中关于线程池写到
【==强制==】线程池不允许使用Executors
去创建 ,而是通过ThreadPoolExecutor
的方式
这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
说明: Executors 返回的线程池对象的弊端 如下:
FixedThreadPool
和SingleThreadPool
:
允许的请求队列长度为Integer.MAX_VALUE
, 可能会堆积大量的请求,从而导致OOM
CachedThreadPool
和ScheduledThreadPool
:
允许的创建线程数量为Integer.MAX_VALUE
,可能会创建大量的线程,从而导致OOM
OOM:out of memory内存溢出
Executors也是new的ThreadPoolExecutor,只是默认规定了一些参数
3大方法
1、单线程执行器SingleThreadExecutor
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 ExecutorService service1 = Executors.newSingleThreadExecutor(); try { for (int i=1 ;i<=5 ;i++){ service1.execute(()->{ System.out.println(Thread.currentThread().getName()+"在执行" ); }); } }catch (Exception e){ e.printStackTrace(); }finally { service1.shutdown(); TimeUnit.SECONDS.sleep(1 ); System.out.println("========== service1已关闭 ==========" ); }
执行结果
pool-1-thread-1在执行
pool-1-thread-1在执行
pool-1-thread-1在执行
pool-1-thread-1在执行
pool-1-thread-1在执行
2、固定线程池FixedThreadPool
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 ExecutorService service2 = Executors.newFixedThreadPool(5 ); try { for (int i=1 ;i<=10 ;i++){ service2.execute(()->{ System.out.println(Thread.currentThread().getName()+"在执行" ); }); } }catch (Exception e){ e.printStackTrace(); }finally { service2.shutdown(); TimeUnit.SECONDS.sleep(1 ); System.out.println("========== service2已关闭 ==========" ); }
执行结果
pool-2-thread-2在执行
pool-2-thread-2在执行
pool-2-thread-1在执行
pool-2-thread-1在执行
pool-2-thread-1在执行
pool-2-thread-1在执行
pool-2-thread-1在执行
pool-2-thread-3在执行
pool-2-thread-4在执行
pool-2-thread-5在执行
3、缓存线程池CachedThreadPool
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 ExecutorService service3 = Executors.newCachedThreadPool(); try { for (int i=1 ;i<=10 ;i++){ service3.execute(()->{ System.out.println(Thread.currentThread().getName()+"在执行" ); }); } }catch (Exception e){ e.printStackTrace(); }finally { service3.shutdown(); TimeUnit.SECONDS.sleep(1 ); System.out.println("========== service3已关闭 ==========" ); }
执行结果
pool-3-thread-1在执行
pool-3-thread-2在执行
pool-3-thread-3在执行
pool-3-thread-4在执行
pool-3-thread-5在执行
pool-3-thread-7在执行
pool-3-thread-8在执行
pool-3-thread-9在执行
pool-3-thread-2在执行
pool-3-thread-6在执行
7大参数
查看源码可以发现其实调用Executor的这3个方法中,也是new的ThreadPoolExecutor
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public static ExecutorService newSingleThreadExecutor () { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1 , 1 , 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } public static ExecutorService newFixedThreadPool (int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } public static ExecutorService newCachedThreadPool () { return new ThreadPoolExecutor(0 , Integer.MAX_VALUE, 60L , TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
只是他们默认给了一些固定的参数,例如maximumPoolSize=Integer.MAX_VALUE
俗话说得好,适合的才是最好的,有时候他默认给的参数并不一定是适合的,所以阿里java规范中让我们调用原生的线程池帮助类去创建线程池
1 2 3 4 5 6 7 public ThreadPoolExecutor (int corePoolSize, //初始核心线程池大小 int maximumPoolSize, //最大线程池大小(核心线程不够,增加非核心线程) long keepAliveTime, //保持活跃时间(超时无调用的非核心线程则释放) TimeUnit unit, //超时时间的单位 BlockingQueue<Runnable> workQueue,//阻塞队列(候客区) ThreadFactory threadFactory,//线程工厂,用于创建线程的,一般用默认的 RejectedExecutionHandler handler)
线程池运行原理简述
可以看到,请求打到线程池时,线程池首先根据初始化时创建的核心线程去处理请求,当核心线程都在使用时,接下来的请求将会放入阻塞队列中去
当核心线程都在处理,且阻塞队列占满时,会根据maximumPoolSize
最大线程池大小继续创建非核心线程
keepAliveTime
和unit
决定了非核心线程能够在没有业务调用时的存活时间
非核心线程在keepAliveTime
结束后会进行收回
如果所有线程(核心与非核心)都被取走使用,且阻塞队列也占满的情况下就会采取拒绝策略
这里默认的拒绝策略是AbortPolicy
,超出承受范围就不接收,并抛出异常
1 2 3 4 5 6 7 private static final RejectedExecutionHandler defaultHandler = new AbortPolicy(); public static class CallerRunsPolicy implements RejectedExecutionHandler public static class AbortPolicy implements RejectedExecutionHandler public static class DiscardPolicy implements RejectedExecutionHandler public static class DiscardOldestPolicy implements RejectedExecutionHandler
代码简单实现
我们这初始化2个核心线程,最大线程数为5,超时时间为5秒,阻塞队列大小为3,默认的线程工厂和中止策略
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 import java.util.concurrent.Executors;import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;public class MyPool { public static void main (String[] args) { ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( 2 , 5 , 5 , TimeUnit.SECONDS, new LinkedBlockingQueue<>(3 ), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); try { for (int i=1 ;i<=9 ;i++){ threadPoolExecutor.execute(()->{ System.out.println(Thread.currentThread().getName()+"正在运行" ); }); } }catch (Exception e){ e.printStackTrace(); }finally { threadPoolExecutor.shutdown(); } } }
结果显示,此时都是2个核心线程在运转
因为5个请求有3个放入了阻塞队列中
pool-1-thread-1正在运行
pool-1-thread-2正在运行
pool-1-thread-1正在运行
pool-1-thread-2正在运行
pool-1-thread-1正在运行
此时我们将循环次数调到8次,可以看到已经创建多出了3,4,5线程,也就是根据最大线程数来决定创建的非核心线程(5-2=3),也就是说能够多创建出3个非核心线程来排忧解难
pool-1-thread-2正在运行
pool-1-thread-5正在运行
pool-1-thread-3正在运行
pool-1-thread-5正在运行
pool-1-thread-1正在运行
pool-1-thread-2正在运行
pool-1-thread-3正在运行
pool-1-thread-4正在运行
大家知道,我们最大有5个线程和队列中的3个位置,也就是一共可以并发8个请求,那我们将循环调到9次时会发生什么呢?
pool-1-thread-2正在运行
pool-1-thread-4正在运行
pool-1-thread-3正在运行
pool-1-thread-1正在运行
pool-1-thread-3正在运行
pool-1-thread-4正在运行
pool-1-thread-5正在运行
pool-1-thread-2正在运行
java.util.concurrent.RejectedExecutionException(一大段文字省略)
没错,就是当并发请求处理不过来的时候,我们选取的拒绝策略是AbortPolicy中止策略
会直接拒绝请求并抛出异常RejectedExecutionException
4种拒绝策略
1、中止策略AbortPolicy
当并发请求处理不过来的时候,AbortPolicy中止策略
会直接丢弃任务并抛出异常java.util.concurrent.RejectedExecutionException
2、调用者运行策略CallerRunsPolicy
哪来的回哪去
线程池表示:我没资源继续处理你的请求了,谁将你放进来的就让谁去执行你把
pool-1-thread-1正在运行
main正在运行
pool-1-thread-1正在运行
pool-1-thread-3正在运行
pool-1-thread-2正在运行
pool-1-thread-3正在运行
pool-1-thread-1正在运行
pool-1-thread-5正在运行
pool-1-thread-4正在运行
3、丢弃策略DiscardPolicy
资源不足,直接丢弃任务且不抛出异常,(直接摆烂,哪有任务,我怎么不知道?)
4、饱和策略DiscardOldestPolicy
丢弃线程中旧的任务,将新的任务添加
将最早进入队列的任务删除,之后再尝试加入队列
当任务被拒绝添加时,会抛弃任务队列中最旧的任务也就是最先加入队列的,再把这个新任务添加进去
在rejectedExecution中先从任务队列中弹出最先加入的任务,空出一个位置,然后再次执行execute方法把任务加入队列
1 2 3 4 5 6 public void rejectedExecution (Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } }
最大线程数如何定义?
CPU密集型(CPU bound)
CPU密集型也叫计算密集型,指的是系统的硬盘、内存性能相对CPU要好很多
此时,系统运作大部分的状况是CPU Loading100%,CPU要读/写I/O(硬盘/内存),I/O在很短的时间就可以完成,而CPU还有许多运算要处理,CPU Loading很高。在多重程序系统中,大部份时间用来做计算、逻辑判断等CPU动作的程序称之CPU bound
CPU bound的程序一般而言CPU占用率相当高。这可能是因为任务本身不太需要访问I/O设备,也可能是因为程序是多线程实现因此屏蔽掉了等待I/O的时间
线程数一般设置为:
**线程数 = CPU核数+1 **(现代CPU支持超线程,利用等待空闲)
IO密集型(I/O bound)
IO密集型指的是系统的CPU性能相对硬盘、内存要好很多,此时,系统运作,大部分的状况是CPU在等I/O (硬盘/内存) 的读/写操作,此时CPU Loading并不高。
I/O bound的程序一般在达到性能极限时,CPU占用率仍然较低。这可能是因为任务本身需要大量I/O操作,而pipeline做得不是很好,没有充分利用处理器能力。
线程数一般设置为:
**线程数 = CPU总核心数 * 2 +1 **
java虚拟机的最大可用的处理器数量,决不会小于一个
1 Runtime.getRuntime().availableProcessors()
也可以任务管理器->性能->cpu->逻辑处理器,查看
四大函数式接口
新时代的程序员需要掌握:lambda表达式、链式编程、函数式接口、Stream流式计算
什么是函数式接口
只有一个方法的interface接口
典型的就是Runnable和Callable接口了
可以看到他们都是被注解@FunctionalInterface
标注的,这个注解直译也是叫函数式接口
1 2 3 4 5 6 7 8 9 @FunctionalInterface public interface Runnable { public abstract void run () ; } @FunctionalInterface public interface Callable <V > { V call () throws Exception ; }
1、函数型接口Function
有输入T,和输出R
1 2 3 4 @FunctionalInterface public interface Function <T , R > { R apply (T t) ; }
1 2 3 4 5 6 7 8 9 10 11 12 Function<String,String> function=new Function<String, String>() { @Override public String apply (String s) { return s; } }; System.out.println(function.apply("test" )); Function<String,String> functionL= (str)->{return str;}; System.out.println(functionL.apply("lambda" ));
甚至还能更简易
1 2 3 Function<String,String> functionLS= str-> str; System.out.println(functionLS.apply("simple" ));
2、断定型接口Predicate
有输入,返回boolean
1 2 3 4 @FunctionalInterface public interface Predicate <T > { boolean test (T t) ; }
1 2 3 4 5 6 7 8 9 10 11 Predicate<Integer> predicate = new Predicate<Integer>(){ @Override public boolean test (Integer num) { return num.equals(1 ); } }; System.out.println(predicate.test(2 )); System.out.println(predicate.test(1 )); Predicate<Integer> predicateL= num-> num.equals(1 ); System.out.println("predicateL.test(1) = " +predicateL.test(1 ));
3、消费型接口Consumer
只有输入,没有返回值
1 2 3 4 @FunctionalInterface public interface Consumer <T > { void accept (T t) ; }
sout甚至可以更加简化System.out::println
1 2 3 4 5 6 Consumer<String> consumer= str-> System.out.println(str); consumer.accept("out" ); Consumer<String> consumerS= System.out::println; consumerS.accept("out" );
4、供给型接口Supplier
只有返回值,没有参数
1 2 3 4 @FunctionalInterface public interface Supplier <T > { T get () ; }
1 2 Supplier<String> supplier= ()-> "asd" ; System.out.println(supplier.get());
Stream流式计算
stream流是io流?
**并不是!!!**这个是io流,是java.io包下的
而本处要讲的是Stream流,是java.util包下的
1 2 import java.io.InputStream;import java.util.stream.Stream;
什么是Stream流式计算?
大数据=计算+存储
存储:集合、mysql数据库。。。
计算:stream流
而这些流里面有很多很多的参数都是使用的函数式接口
话不多说,直接上代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 import java.util.Arrays;import java.util.List;public class Test01 { public static void main (String[] args) { User user1 = new User(1 ,"a" ,21 ); User user2 = new User(2 ,"b" ,22 ); User user3 = new User(3 ,"c" ,23 ); User user4 = new User(4 ,"d" ,24 ); User user5 = new User(6 ,"e" ,25 ); List<User> list= Arrays.asList(user1,user2,user3,user4,user5); list.stream() .filter(user-> user.getId()%2 ==0 ) .filter(user-> user.getAge()>23 ) .map(user -> user.getName().toUpperCase()) .sorted((u1,u2)->u2.compareTo(u1)) .limit(1 ) .forEach(System.out::println); } }
ForkJoin分支合并计算
什么是ForkJoin?
ForkJoin在jdk1.7中,并行执行任务,提高效率,大数据量!
大数据:Map Reduce (把大任务拆分为小任务)
ForkJoin本质:分而治之
一个大任务分为了许多的小任务,最后将结果汇总得到解答
ForkJoin特点:工作窃取
当B线程更先完成时,A还未完成,那B就会去从A的任务里拿一些工作来做,帮助分担压力,提高效率
这里面维护的都是双端队列Dequeue
如何使用ForkJoin
最好是大数据量的情况下使用 才可以提升效率,小数据量的情况下还不如直接for循环
直接上代码,感觉方式像递归一样,加入了forkjoin的工作队列的机制
有3种调用
public void execute(ForkJoinTask<?> task),直接调用,没有返回值
public ForkJoinTask submit(ForkJoinTask task),提交任务执行 ,返回得到ForkJoinTask类
public final V get(),返回的ForkJoinTask类的get方法可以得到执行结果
public T invoke(ForkJoinTask task),直接invoke得到执行结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 import java.util.concurrent.ExecutionException;import java.util.concurrent.ForkJoinPool;import java.util.concurrent.ForkJoinTask;import java.util.concurrent.RecursiveTask;import java.util.stream.LongStream;public class ForkJoinDemo extends RecursiveTask <Long > { public static void main (String[] args) throws ExecutionException, InterruptedException { long start=1L ; long end=10_0000_0000L ; ForkJoinDemo forkJoinDemo = new ForkJoinDemo(start,end); ForkJoinPool forkJoinPool=new ForkJoinPool(); Long sum = forkJoinPool.invoke(forkJoinDemo); System.out.println("sum = " + sum); } private Long start; private Long end; private Long temp=10000L ; public ForkJoinDemo (Long start, Long end) { this .start = start; this .end = end; } @Override protected Long compute () { if ((end-start)<temp){ long sum=0L ; for (long i=start;i<=end;i++){ sum+=i; } return sum; } long mid=(start+end)/2 ; ForkJoinDemo fj1=new ForkJoinDemo(start,mid); fj1.fork(); ForkJoinDemo fj2=new ForkJoinDemo(mid+1 ,end); fj2.fork(); return fj1.join()+fj2.join(); } }
也可以使用stream并行流
1 2 3 4 5 Long sum = LongStream.rangeClosed(start, end) .parallel() .reduce(0 ,Long::sum); System.out.println("sum = " +sum1);
异步调用
我们采用了CompletableFuture
类的两个方法,runAsync()
和supplyAsync()
1 public class CompletableFuture <T > implements Future <T >, CompletionStage <T >
无返回值的异步调用
1 2 3 4 5 6 7 CompletableFuture<Void> runAsync = CompletableFuture.runAsync(() -> { System.out.println("CompletableFuture.runAsync" ); }); runAsync.get(); System.out.println("==============" );
有返回值的异步调用
这里和ajax、axios有点相似,提交请求的业务后需要进行成功回调和失败回调
supplyAsync()方法的参数是一个函数式接口Supplier<U> supplier
1 2 3 4 @FunctionalInterface public interface Supplier <T > { T get () ; }
就是说在其中执行业务逻辑
然后可以通过成功回调whenComplete
和失败回调exceptionally
来决定业务的处理
代码如下,注释写的很清楚了,这里排列出来更方便查看
成功回调whenComplete,无论是否异常都执行,所以判断t和u来决定代码逻辑
参数t,正常的返回结果,t不为null则正常执行,t为null时则执行失败(出现异常)
参数u,错误信息,u为null则正常执行,否则就是异常信息
没有返回值(因为业务成功执行的返回值200在supplyAsync中已经写了)
失败回调exceptionally,只有在出现异常时才会执行
参数e,一般是Exception e
有返回值(一般根据异常的不同决定不同的返回值)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 CompletableFuture<Integer> result = CompletableFuture.supplyAsync(() -> { System.out.println("CompletableFuture.supplyAsync中执行业务逻辑" ); return 200 ; }) .whenComplete((t, u) -> { if (!Objects.isNull(t)&&Objects.isNull(u)){ System.out.println("t = " + t); }else { System.out.println("异常信息u = " + u); } }) .exceptionally((e) -> { System.out.println("异常回调->e.getMessage() = " + e.getMessage()); return 400 ; }); System.out.println("result.get() = " +result.get());
执行结果
正常执行时
CompletableFuture.supplyAsync中执行业务逻辑
t = 200
result.get() = 200
执行失败时
CompletableFuture.supplyAsync中执行业务逻辑
异常信息u = java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
异常回调->e.getMessage() = java.lang.ArithmeticException: / by zero
result.get() = 400
理解JMM
之前我们经常接触到JVM,Java Virtual Machine,Java虚拟机
那JMM又是什么呢?
Java Memory Model,java内存模型
它是一个不存在的模型,相当于一种概念、一种约定
关于JMM的一些同步的约定
线程解锁前,必须把共享变量==立刻==刷新回主存
线程加锁前,必须读取主存中的最新值到工作内存中!
加锁和解锁是同一把锁
JVM在设计时候考虑到,如果JAVA线程每次读取和写入变量都直接操作主内存,对性能影响比较大,所以每条线程拥有各自的工作内存,工作内存中的变量是主内存中的一份拷贝,线程对变量的读取和写入,直接在工作内存中操作, 而不能直接去操作主内存中的变量。但是这样就会出现一个问题,当一个线程修改了自己工作内存中变量,对其他线程是不可见的,会导致线程不安全的问题。因为JMM制定了一套标准来保证开发者在编写多线程程序的时候,能够控制什么时候内存会被同步给其他线程
我们来看下面两个线程对于主内存的操作
内存交互操作
内存交互操作有8种 ,虚拟机实现必须保证每一个操作都是原子的 ,不可在分的
(对于double和long类型的变量来说,load、store、read和write操作在某些平台上允许例外)
lock (锁定):作用于主内存的变量,把一个变量标识为线程独占状态
unlock (解锁):作用于主内存的变量,它把一个处于锁定状态的变量释放出来,释放后的变量才可以被其他线程锁定
read (读取):作用于主内存变量,它把一个变量的值从主内存传输到线程的工作内存中,以便随后的load动作使用
load (载入):作用于工作内存的变量,它把read操作从主存中变量放入工作内存中
use (使用):作用于工作内存中的变量,它把工作内存中的变量传输给执行引擎,每当虚拟机遇到一个需要使用到变量的值,就会使用到这个指令
assign (赋值):作用于工作内存中的变量,它把一个从执行引擎中接受到的值放入工作内存的变量副本中
store (存储):作用于主内存中的变量,它把一个从工作内存中一个变量的值传送到主内存中,以便后续的write使用
write (写入):作用于主内存中的变量,它把store操作从工作内存中得到的变量的值放入主内存的变量中
JMM对这八种指令的使用,制定了如下规则:
不允许read和load、store和write操作之一单独出现
即使用了read必须load ,使用了store必须write
不允许线程丢弃他最近的assign操作
不允许一个线程将没有assign的数据从工作内存同步回主内存
一个新的变量必须在主内存中诞生,不允许工作内存直接使用一个未被初始化的变量
就是对变量实施use、store操作之前,必须经过assign和load操作
一个变量同一时间只有一个线程能对其进行lock。多次lock后,必须执行相同次数的unlock才能解锁
如果对一个变量进行lock操作,会清空所有工作内存中此变量的值,在执行引擎使用这个变量前,必须重新load或assign操作初始化变量的值
如果一个变量没有被lock,就不能对其进行unlock操作。也不能unlock一个被其他线程锁住的变量
对一个变量进行unlock操作之前,必须把此变量同步回主内存
Volatile
Volatile 是 Java 虚拟机提供轻量级的同步机制 ,是一个java的关键字,但是volatile 并不能 保证线程安全性
1、保证可见性
我们来看看这一个问题代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 import java.util.concurrent.TimeUnit;public class JmmTest { private static int num=0 ; public static void main (String[] args) throws InterruptedException { new Thread(()->{ while (num==0 ){ } System.out.println("线程结束" ); }).start(); TimeUnit.SECONDS.sleep(1 ); num=1 ; System.out.println("num = " +num); } }
此时会输出“线程”结束的语句嘛?并不会
因为线程并不知道num已经被main线程改变了,工作内存中的num还是0,所以一直在循环
如果我们将num加上关键字volatile呢?
private static volatile int num=0;
可以看到,线程结束了
线程结束
num = 1
Process finished with exit code 0
2、不保证原子性
ACID ,是指数据库管理系统(DBMS)在写入或更新资料的过程中,为保证事务(transaction)是正确可靠的,所必须具备的四个特性:
原子性 (atomicity,或称不可分割性)
一个事务中的所有操作,要么全部完成,要么全部不完成 ,不会结束在中间某个环节
一致性 (consistency)
在事务开始之前和事务结束以后,数据库的完整性没有被破坏
隔离性 (isolation,又称独立性)
数据库允许多个并发事务同时对其数据进行读写和修改的能力,隔离性可以**防止多个事务并发执行时由于交叉执行而导致数据的不一致 **
事务隔离分为不同级别,包括读未提交(Read uncommitted)、读提交(read committed)、可重复读(repeatable read)和串行化(Serializable)
持久性 (durability)
事务处理结束后,对数据的修改就是永久的 ,即便系统故障也不会丢失
我们看一下下面的代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;public class VolatileTest { private static int num=0 ; private static volatile int vnum=0 ; public static void add () { num++; vnum++; } private static int snum=0 ; public synchronized static void sAdd () { snum++; } private static int lnum=0 ; private static Lock lock=new ReentrantLock(); public static void lAdd () { lock.lock(); try { lnum++; }catch (Exception e){ e.printStackTrace(); }finally { lock.unlock(); } } public static void main (String[] args) { for (int i = 0 ; i < 20 ; i++) { new Thread(()->{ for (int j = 0 ; j < 1000 ; j++) { add(); sAdd(); lAdd(); } }).start(); } while (Thread.activeCount()>2 ){ Thread.yield(); } System.out.println("结束,普通的num = " +num+",加了volatile的vnum = " +vnum); System.out.println("结束,synchronized方法的snum = " +snum); System.out.println("结束,lock方法的lnum = " +lnum); } }
可以看见,不管是普通的num还是加了volatile的vnum都是不是正确的20000结果
加了lock和synchronized的方法中的lnum和snum的结果是正确的20000结果
结束,普通的num = 19986,加了volatile的vnum = 19986
结束,synchronized方法的snum = 20000
结束,lock方法的lnum = 20000
因为volatile不保证原子性 ,num++
自增也不是一个原子性操作
反编译查看
我们使用javap -c VolatileTest.class
命令反编译一下这个class文件查看一下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public static void add () ; Code: 0 : getstatic #2 3 : iconst_1 4 : iadd 5 : putstatic #2 8 : getstatic #3 11 : iconst_1 12 : iadd 13 : putstatic #3 16 : return
原子类Atomic
那如果要求不使用synchronized和lock怎么保证原子性实现这个方法?
使用java.util.concurrent.atomic包下的原子类
这些类的底层都直接和操作系统挂钩!在内存中修改值!Unsafe类是一个很特殊的存在!
代码举例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 import java.util.concurrent.atomic.AtomicInteger;public class AtomicTest { private static AtomicInteger num=new AtomicInteger(0 ); public static void add () { num.getAndIncrement(); } public static void main (String[] args) { long startTime=System.currentTimeMillis(); for (int i = 0 ; i < 20 ; i++) { new Thread(()->{ for (int j = 0 ; j < 1000 ; j++) { add(); } }).start(); } while (Thread.activeCount()>2 ){ Thread.yield(); } long endTime=System.currentTimeMillis(); System.out.println("程序运行时间: " +(endTime-startTime)+"ms" ); System.out.println("num.get() = " +num.get()); } }
结果正常达到20000且效率也不低
程序运行时间: 44ms
num.get() = 20000
我们来看看之前的synchronized和lock的
程序运行时间: 46ms
结束,synchronized方法的snum = 20000
程序运行时间: 47ms
结束,lock方法的lnum = 20000
现在看起来三者效率都不错
当我们把循环次数提起来后
1 2 3 4 5 6 7 for (int i = 0 ; i < 20000 ; i++) { new Thread(()->{ for (int j = 0 ; j < 10000 ; j++) { } }).start(); }
程序运行时间: 2136ms
结束,synchronized方法的snum = 200000000
程序运行时间: 6287ms
结束,lock方法的lnum = 200000000
程序运行时间: 3881ms
num.get() = 200000000
可以看到是synchronized占优势,atomic其次,lock反而是时间最长的
3、禁止指令重排
什么是指令重排?
我们写的程序,计算机并不是按照你写的那样去执行的
为了性能考虑, 编译器和CPU可能会对指令重新排序
什么是指令重排:不影响结果的前提下,对某些指令优先执行,提高效率
源代码–>编译器优化的重排–> 指令并行也可能会重排–> 内存系统也会重排—> 执行
as-if-serial语义
不管怎么重排序,单线程程序的执行结果不能被改变
编译器、runtime和处理器都必须遵守as-if-serial语义
处理器在进行指令重排的时候考虑:数据之间的依赖性 !
1 2 3 4 5 6 int x = 1 ; int y = 2 ; x = x + 5 ; y = x * x; 我们所期望的:1234 但是可能执行的时候回变成 2134 1324 不可能是 4123 !因为y赋值依赖于x!
只要加了volatile就可以避免指令重排
对于内存区的读写都加内存屏障 :静止上下指令的顺序交换
作用:
保证特定的操作的执行顺序!
可以保证某些变量的内存可见性 (利用这些特性volatile实现了可见性)
Volatile 是可以保持可见性。不能保证原子性,由于内存屏障,可以保证避免指令重排的现象产生!
单例模式
关于单例模式的话,可以去看看我写的单例模式的笔记
理解CAS
什么是CAS?
CAS,compare and swap的缩写,中文翻译成比较并交换
CAS是CPU的并发原语,是操作系统层面的原子性操作
我们查看AtomicInteger类中有一个Unsafe
类
1 2 public class AtomicInteger extends Number implements java .io .Serializable { private static final Unsafe unsafe = Unsafe.getUnsafe();
大家知道,java不能直接操作系统,而是通过关键字native
操作C++来操作系统底层
而这个Unsafe类就是java留的“后门”,可以直接操作系统的内存
在讲cas之前,我想先看看compareAndSet
方法,交换并赋值
1 2 private volatile static AtomicInteger num=new AtomicInteger(0 );num.compareAndSet(1 ,2 );
就是当num的值为1时,将其替换为2,其实CAS的结果和这个比较相似
我们来看一下上一小节的AtomicInteger是怎么原子性自增的
1 2 3 4 5 6 private volatile static AtomicInteger num=new AtomicInteger(0 );public static void add () { num.getAndIncrement(); }
然后查看这个getAndIncrement
方法
可以看到就是调用的Unsafe类的getAndAddInt
方法
1 2 3 4 private static final Unsafe unsafe = Unsafe.getUnsafe(); public final int getAndIncrement () { return unsafe.getAndAddInt(this , valueOffset, 1 ); }
我们继续查看Unsafe类的getAndAddInt
方法的源码
1 2 3 4 5 6 7 8 public final int getAndAddInt (Object var1, long var2, int var4) { int var5; do { var5 = this .getIntVolatile(var1, var2); } while (!this .compareAndSwapInt(var1, var2, var5, var5 + var4)); return var5; }
也就是说我们调用了是将this(AtomicInteger对象),valueOffset(内存地址偏移值)和需要加的值i进行操作
简单来说其实也就是上一小节讲的
获取static的num放入操作栈顶
把常量1放入操作栈顶
当前操作栈顶中两个值相加并且把结果放入操作栈顶(num=num+1)
操作栈顶的结果赋值给static的num
样例的CAS源码的简单解释
根据var1实例对象
与其的内存地址var2
可以取出现在元素的值var5
然后循环 调用CAS操作compareAndSwapInt
,去比较var5
的值是否尚未发生改变,如果还是原值,则交换成新值
这个我们也称其为自旋 操作,或者叫自旋锁
比较当前工作内存中的值和主内存中的值
如果这个值是期望的,那么则执行操作!
如果不是就一直循环!
1 public final native boolean compareAndSwapInt (Object var1, long var2, int var4, int var5) ;
如果根据var1
和var2
取出来的值,还是与var5
相同,那我就将var5
替换为var5 + var4
并返回
方法调用的native,也就是CAS原语了,(这里var4就是1)
CAS的缺点
循环会耗时
一次性只能保证一个共享变量的原子性
ABA问题
什么是ABA问题?
在其他线程不知情的情况,来了一手狸猫换太子又换狸猫 ,但是别的线程并不知道这个被替换过,也不知道这个狸猫是否还是原来的那个狸猫
好比现在A=1
然后B线程调用cas(1,3)和cas(3,1)
先把1换为了3,再把3换为了1
对于线程A来说,A的值还是1,但是他可能并不是原来的那个1了
对于基本类型来说没有太大影响,因为指向常量池的位置
如果是引用类型来说,可能就有问题了,传递的值也许没变,可是对象变了!
ABA问题的解决方法
带版本号 的原子操作,详见下一节“原子引用”
原子引用
乐观锁的实现不仅只有CAS操作,还有一个版本号机制 也可以实现
我们这里采用AtomicStampedReference
类来实现版本号机制
需要注意的是:compareAndSet
方法底层用的==判断相等,所以使用Integer的话只能使用缓存区间-128~127!
1 2 3 4 5 6 7 8 9 static final int low = -128 ; static final int high; assert IntegerCache.high >= 127 ;public static Integer valueOf (int i) { if (i >= IntegerCache.low && i <= IntegerCache.high) return IntegerCache.cache[i + (-IntegerCache.low)]; return new Integer(i); }
代码实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 import java.util.concurrent.TimeUnit;import java.util.concurrent.atomic.AtomicStampedReference;public class CASDemo { public static void main (String[] args) { AtomicStampedReference<Integer> atomicInteger = new AtomicStampedReference<>(1 , 1 ); new Thread(()->{ int stamp = atomicInteger.getStamp(); System.out.println("一开始的A - stamp = " + stamp); try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("A -> " +atomicInteger.compareAndSet(1 , 2 , stamp, (stamp + 1 ))); System.out.println("结束的的A - stamp = " + atomicInteger.getStamp()); System.out.println("=======================" ); },"A" ).start(); new Thread(()->{ int stamp = atomicInteger.getStamp(); System.out.println("B - stamp = " + stamp); System.out.println("B -> " +atomicInteger.compareAndSet(1 , 2 , stamp, stamp + 1 )); System.out.println("结束的的B - stamp = " + atomicInteger.getStamp()); System.out.println("=======================" ); },"B" ).start(); try { TimeUnit.SECONDS.sleep(2 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("最后getStamp = " +atomicInteger.getStamp()); System.out.println("atomicInteger值 = " +atomicInteger.get(new int []{atomicInteger.getStamp()})); } }
各种锁的理解
1、公平锁和非公平锁
这个咱们在学习Lock类时应该就接触到了
Lock lock=new ReentrantLock();
我们来看看可重入锁ReentrantLock
的构造器
1 2 3 4 5 6 7 8 public ReentrantLock () { sync = new NonfairSync(); } public ReentrantLock (boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
公平锁:公平:需要先来后到
非公平锁:不公平:可以插队 (默认)
2、可重入锁
可重入:某个线程已经获得某个锁,可以再次获取锁而不会出现死锁
synchronized和ReentrantLock都是可重入的
隐式锁 (即synchronized关键字使用的锁)默认是可重入锁
显式锁 (即Lock)也有ReentrantLock这样的可重入锁
可重入锁的意义之一在于防止死锁
当然,有一次lock()也得要一次unlock(),即加锁次数和释放次数要一样
实现原理实现是通过为每个锁关联一个请求计数器和一个占有它的线程
当计数为0时,认为锁是未被占有的,线程请求一个未被占有的锁时,JVM将记录锁的占有者,并且将请求计数器置为1
如果同一个线程再次请求这个锁,计数器将递增
每次占用线程退出同步块,计数器值将递减。直到计数器为0,锁被释放
现有阶段的锁默认都是可重入锁 (也称递归锁)
如果要实现不可重入的效果,可以自己设置一个继承Lock的类
成员变量绑定一个线程,第一次调用将当前线程赋值给绑定线程,然后后续的调用lock时,去判断绑定线程是不是当前线程,如果当前线程就是绑定线程则给他wait,在unlock方法中就清除绑定线程即可
具体实现可以参考一下这篇博客https://blog.csdn.net/wb_zjp283121/article/details/88973970
3、自旋锁
其实之前讲CAS的时候,就讲到了自旋操作
1 2 3 4 5 6 7 8 public final int getAndAddInt (Object var1, long var2, int var4) { int var5; do { var5 = this .getIntVolatile(var1, var2); } while (!this .compareAndSwapInt(var1, var2, var5, var5 + var4)); return var5; }
“自旋”可以理解为“自我旋转”,这里的“旋转”指“循环”,比如 while 循环或者 for 循环
“自旋”就是自己在这里不停地循环,直到目标达成
而不像普通的锁那样,如果获取不到锁就进入阻塞
非自旋锁和自旋锁最大的区别,就是如果它遇到拿不到锁的情况,它会把线程阻塞,直到被唤醒。而自旋锁会不停地尝试
自旋锁的好处在于,自旋锁用循环去不停地尝试获取锁,让线程始终处于 Runnable 状态,节省了线程状态切换带来的开销
可是如果临界区很大,线程一旦拿到锁,很久才会释放的话,那就不合适用自旋锁,因为自旋会一直占用 CPU 却无法拿到锁,白白消耗资源
4、死锁
什么是死锁
死锁:多线程下,由于竞争资源或者由于彼此通信而造成的一种阻塞现象,若无外力作用,它们都讲无法推进下去
此时称系统处于死锁状态或系统产生了死锁,这些永远在相互等待的进程成为死锁进程
出现死锁的条件
必须是两个或者两个以上进程(线程)
必须有竞争资源
一张图带你看懂死锁!
代码示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 import java.util.concurrent.TimeUnit;public class DeadLockDemo { public static void main (String[] args) { Object lockA=new Object(); Object lockB=new Object(); new Thread(()->{ synchronized (lockA){ System.out.println(Thread.currentThread().getName()+"获取到A锁" ); try { TimeUnit.SECONDS.sleep(1 ); }catch (Exception e){ e.printStackTrace(); } synchronized (lockB){ System.out.println(Thread.currentThread().getName()+"获取到B锁" ); } } },"A" ).start(); new Thread(()->{ synchronized (lockB){ System.out.println(Thread.currentThread().getName()+"获取到B锁" ); try { TimeUnit.SECONDS.sleep(1 ); }catch (Exception e){ e.printStackTrace(); } synchronized (lockA){ System.out.println(Thread.currentThread().getName()+"获取到A锁" ); } } },"B" ).start(); } }
代码应该很清晰明了了,中间sleep是因为怕一个线程同时抢了两把锁导致不成功
A抢到A锁进入睡眠,B抢到B锁进入睡眠,然后A唤醒后尝试获取B锁,当然获取不到啊,B也尝试获取A锁,当然也获取不到,需要的锁都在对方的手中,自然陷入死锁
A获取到A锁
B获取到B锁
如何排查死锁?
定位进程号:
在windows命令窗口,使用 jps -l
查看当前的java进程的pid,通过包路径很容易区分出自己开发的程序进程
找到线程状态和问题代码:
查看到pid,输入jstack -l 15528
,15528是进程pid
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 >jps -l 14720 1464 org.jetbrains.jps.cmdline.Launcher15528 com.ifyyf.test.deadlock.DeadLockDemo4040 org.jetbrains.idea.maven.server.RemoteMavenServer369176 sun.tools.jps.Jps>jstack -l 15528 Found one Java-level deadlock: ============================= "B" : waiting to lock monitor 0 x0000000002e39fe8 (object 0 x000000076b614298, a java.lang.Object), which is held by "A" "A" : waiting to lock monitor 0 x0000000002e3c928 (object 0 x000000076b6142a8, a java.lang.Object), which is held by "B" Java stack information for the threads listed above: =================================================== "B" : at com.ifyyf.test.deadlock.DeadLockDemo.lambda$main $1 (DeadLockDemo.java:42 ) - waiting to lock <0 x000000076b614298> (a java.lang.Object) - locked <0 x000000076b6142a8> (a java.lang.Object) at com.ifyyf.test.deadlock.DeadLockDemo$ $Lambda $2 /1078694789 .run(Unknown Source) at java.lang.Thread.run(Thread.java:748 ) "A" : at com.ifyyf.test.deadlock.DeadLockDemo.lambda$main $0 (DeadLockDemo.java:26 ) - waiting to lock <0 x000000076b6142a8> (a java.lang.Object) - locked <0 x000000076b614298> (a java.lang.Object) at com.ifyyf.test.deadlock.DeadLockDemo$ $Lambda $1 /990368553 .run(Unknown Source) at java.lang.Thread.run(Thread.java:748 ) Found 1 deadlock.
完结撒花
本篇juc的学习并没有特别深入,可以说是简单入个门吧
本篇代码都放在我的gitee仓库 了,需要的可以自取
本个人博客提供的内容仅用于个人学习,不保证内容的正确性。通过使用本站内容随之而来的风险与本站无关!