操作系统相关概念
CPU
cpu主要是用来解释计算机指令以及处理计算机软件的中的数据
主频,单位是兆赫(MHz)或千兆赫(GHz),用来表示CPU的运算、处理数据的速度。通常,主频越高,CPU处理数据的速度就越快。
缓存,缓存大小也是CPU的重要指标之一,而且缓存的结构和大小对CPU速度的影响非常大。
cpu有L1 Cache和L2 Cache、L3 Cache
物理核
物理核=cpu数(机子上装的cpu的数量)*每个cpu的核心数
单核与多核
都是一个cpu,不同的是每个cpu上的核心数,多核cpu是多个单核cpu的替代方案,多核cpu减小了体积,同时也减少了功耗
一个核心只能同时执行一个线程
进程和线程
对比 | 进程 | 线程 |
---|---|---|
定义 | 进程是操作系统进行资源(包括cpu、内存、磁盘IO等)分配的最小单位 | 线程是进程运行和执行的最小调度单位 |
系统开销 | 创建撤销切换开销大,资源要重新分配和收回 | 仅保存少量寄存器的内容,开销小,在进程的地址空间执行代码 |
安全性 | 进程间相互独立,互不影响 | 线程共享一个进程下面的资源,可以互相通信和影响 |
线程的切换和创建都会增加cpu的开销
串行、并行、并发
串行就是多个任务按顺序执行
并行就是每个线程分配给独立的核心,线程各自运行
并发,多个线程在单个核心运行,同一时间一个线程运行,系统不同切换线程,看起来像同时运行
其实是线程不同切换。
并发是逻辑上的同时发生(simultaneous),而并行是物理上的同时发生。
如果系统只有一个CPU,则它根本不可能真正同时进行一个以上的线程,
它只能把CPU运行时间划分成若干个时间段,再将时间段分配给各个线程执行,
在一个时间段的线程代码运行时,其它线程处于挂起状态.这种方式我们称之为并发(Concurrent)。
当系统有一个以上CPU时,则线程的操作有可能非并发.当一个CPU执行一个线程时,
另一个CPU可以执行另一个线程,两个线程互不抢占CPU资源,可以同时进行,这种方式我们称之为并行(Parallel)。
io密集与计算密集
io密集主要是IO操作,CPU利用率不高,可以多开点线程
计算密集主要是复杂的逻辑运算,CPU利用率比较高,不需要开太多线程
Java内存模型
cpu高速缓存
计算机程序在运行的时候,每条指令都在cpu中执行的,执行过程中会涉及到数据的读写,多线程共享的变量是存储在主内存,读写主内存的
数据没有CPU指令中执行指令的速度快,如果任何操作都要和主内存交互,会影响效率,所以才会有CPU高速缓存(本地内存),CPU高速缓存为
某个CPU独有,只与在该CPU运行的线程有关。
CPU高速缓存虽然解决了效率问题,但是它会带来一个新的问题:数据一致性
解决缓存一致性方案有两种:
1、通过在总线加LOCK#锁的方式;(volatile)
但是方案1存在一个问题,它是采用一种独占的方式来实现的,即总线加LOCK#锁的话,只能有一个CPU能够运行,其他CPU都得阻塞,效率较为低下。
2、 通过缓存一致性协议。
第二种方案,缓存一致性协议(MESI协议)它确保每个缓存中使用的共享变量的副本是一致的。所以JMM就解决这个问题。
理解java内存模型才能了解线程安全
每一个运行在Java虚拟机里的线程都拥有自己的线程栈。这个线程栈包含了这个线程调用的方法当前执行点相关的信息。
一个线程仅能访问自己的线程栈。一个线程创建的本地变量对其它线程不可见,仅自己可见。
即使两个线程执行同样的代码,这两个线程任然在在自己的线程栈中的代码来创建本地变量。因此,每个线程拥有每个本地变量的独有版本。
所有原始类型的本地变量都存放在线程栈上,因此对其它线程不可见。
一个线程可能向另一个线程传递一个原始类型变量的拷贝,但是它不能共享这个原始类型变量自身。
堆上包含在Java程序中创建的所有对象,无论是哪一个对象创建的。这包括原始类型的对象版本。
如果一个对象被创建然后赋值给一个局部变量,或者用来作为另一个对象的成员变量,这个对象任然是存放在堆上。
java线程相关
为什么使用多线程
想象一下,一个应用程序需要从本地文件系统中读取和处理文件的情景。比方说,从磁盘读取一个文件需要5秒,处理一个文件需要2秒。处理两个文件则需要:
1:5秒读取文件A
2:2秒处理文件A
3:5秒读取文件B
4:2秒处理文件B
总共需要14秒
从磁盘中读取文件的时候,大部分的CPU时间用于等待磁盘去读取数据。在这段时间里,CPU非常的空闲。它可以做一些别的事情。通过改变操作的顺序,就能够更好的使用CPU资源。看下面的顺序:
1:5秒读取文件A
2:5秒读取文件B + 2秒处理文件A
3:2秒处理文件B
4
---------------------
5
总共需要12秒
CPU等待第一个文件被读取完。然后开始读取第二个文件。当第二文件在被读取的时候,CPU会去处理第一个文件。
资源利用率更好
提高系统CPU的使用效率,因为当系统进行磁盘IO或者网络IO的时候CPU是空闲的。
程序设计更简单
程序响应更快
多线程的代价
线程安全
设计更复杂
上下文切换的开销
增加资源消耗
线程数
线程池里面线程数是不是越多越好?答案是否定的,因为线程的切换会增加CPU的开销。
Runtime.getRuntime().availableProcessors()
一般来推荐线程数来说IO密集的都是两倍CPU个数+1,计算密集的都是1CPU个数+1
//线程等待时间所占比例越高,需要越多线程。线程CPU时间所占比例越高,需要越少线程。
最佳线程数目 = ((线程等待时间+线程CPU时间)/线程CPU时间 )* CPU数目
最佳线程数目 = (线程等待时间与线程CPU时间之比 + 1)* CPU数目
Thread的六种状态
NEW
新创建,没有执行start()方法
RUNNABLE
运行状态包含就绪与运行两种状态,因为线程启动只会,不是立即执行的,而是需要系统调度去分配CPU时间片
BLOCKED
线程堵塞了,多个线程竞争带锁的方法,没有获取到锁的方法会被堵塞
WAITING
某个方法调用了wait()或者join() 线程会进入等待唤醒状态
TIMED_WAITING
某个方法调用了wait(time)或者sleep(time),join(time) 线程会进入等待唤醒状态
TERMINATED
线程执行结束状态
线程状态流转
waiting
||
new-------> RUNNABLE ---------> terminated
|| ||
timed_waiting blocked
blocked与wating区别
BLOCKED是被动的标记,而WAITING是主动操作,
处于WAITING状态的线程,被唤醒以后,需要进入同步队列去竞争锁操作,
而在同步队列中,如果已经有其他线程持有锁,则线程会处于BLOCKED状态。所以可以说BLOCKED状态是处于WAITING状态的线程重新唤醒的必经的状态
线程的优先级
线程优先级的大小从1到10,默认是5,优先级越高获得调度的时间可能就越大,但是不一定先执行。
如果超过这个范围就报IllegalArgumentException 优先级调度依赖于具体的操作系统调度方式,
不能完全依赖于优先级进行线程的资源调度,只有在底层平台不支持线程时,JVM才会自己实现线程的管理和调度,
在Linux上,Java线程的调度最终转化为了操作系统中的进程调度.
不要依赖线程的优先级,如果要设置线程的执行顺序,最好试用加锁方式实现
用户线程和守护线程
默认创建的线程是用户线程,而守护线程的优先级比较低,当主程序执行完只剩下守护线程时候,JVM就会退出
守护线程最经典的应用就是Java垃圾回收期
线程组
可以批量管理线程或线程组对象,有效地对线程或线程组对象进行组织
常见的功能比如讲多个线程放到同一个线程组里面,可以查询线程组里面的活跃线程数,
或者当某个线程执行失败,可以让整个线程组里面的线程都interrupt
suspend与resume
为什么suspend与resume被废弃了?
thread.suspend 天生容易引起死锁。
如果目标线程挂起时获取到了锁,它会一直保持对锁的占有,一直到其他的线程调用resume方法,它才能继续向下执行。
假如有A,B两个线程,A线程在获得某个锁之后被suspend阻塞,这时A不能继续执行,
线程B在或者相同的锁之后才能调用resume方法将A唤醒,
但是此时的锁被A占有,B不能继续执行,也就不能及时的唤醒A,
此时A,B两个线程都不能继续向下执行而形成了死锁。这就是suspend被弃用的原因。
stop
为什么stop为被废弃?
stop会即刻停止run()方法中剩余的全部工作,包括在catch或finally语句中,并抛出ThreadDeath异常
会立即释放该线程所持有的所有的锁,导致数据得不到同步的处理,出现数据不一致的问题。
wait与notify
因为suspend会引起死锁,所以我们可以用wait与notify来替代他们。
wait是线程间通信常用的信号量,作用就是让线程暂时停止运行,等待其他线程使用notify来唤醒或者达到一定条件自己苏醒。
<p>
* This method should only be called by a thread that is the owner
* of this object's monitor. A thread becomes the owner of the
* object's monitor in one of three ways:
* <ul>
* <li>By executing a synchronized instance method of that object.
* <li>By executing the body of a {@code synchronized} statement
* that synchronizes on the object.
* <li>For objects of type {@code Class,} by executing a
* synchronized static method of that class.
* </ul>
<p>
Only one thread at a time can own an object's monitor.
wait和notify必须要在同步方法里面,
因为只有在同步方法里面才会有锁的概念,在任意时刻有且仅有一个拥有该对象独占锁的线程能够调用它们。
如果是同步代码块,那么必须是
synchronized(obj){
obj.wait()
//如果直接用wait()会报错
}
如果是synchronized修饰的静态或者非静态方法,那么必须是
public void synchronized test(){
//其实等于this.wait(),默认锁住的是当前类的实例
wait()
}
public static void synchronized test(){
//其实等于this.wait(),默认锁住的是当前类
wait()
}
wait是一个本地方法,属于Object类,其底层实现是JVM内部实现,是基于monitor对象监视锁。
wait是让当前调用线程等待了,比如说如果wait()是在 Thread的run方法里面,那么就是该thread等待并且释放锁
如果wait()在main方法里面,那么就是主线程被wait()
如果wait(0)是在Thread的同步方法里面,当Thread执行完毕后,会被自动唤醒,利用这个原理,可以实现join
原理:
wait方法会将当前线程放入wait set等待被唤醒
1.将当前线程封装成objectwaiter对象node
2.通过objectmonitor::addwaiter方法将node添加到_WaitSet列表中
3.通过ObjectMonitor:exit方法释放当前的ObjectMonitor对象,这样其他竞争线程就可以获取该ObjectMonitor对象
4.最终底层的park方法会挂起线程
notify方法就是随机唤醒等待池中的一个线程
mutex.cpp源码:
bool Monitor::notify() {
assert (_owner == Thread::current(), "invariant") ;
assert (ILocked(), "invariant") ;
if (_WaitSet == NULL) return true ;
NotifyCount ++ ;
// Transfer one thread from the WaitSet to the EntryList or cxq.
// Currently we just unlink the head of the WaitSet and prepend to the cxq.
// And of course we could just unlink it and unpark it, too, but
// in that case it'd likely impale itself on the reentry.
Thread::muxAcquire (_WaitLock, "notify:WaitLock") ;
//从_WaitSet随机取一个
ParkEvent * nfy = _WaitSet ;
if (nfy != NULL) { // DCL idiom
_WaitSet = nfy->ListNext ;
assert (nfy->Notified == 0, "invariant") ;
// push nfy onto the cxq
for (;;) {
const intptr_t v = _LockWord.FullWord ;
assert ((v & 0xFF) == _LBIT, "invariant") ;
nfy->ListNext = (ParkEvent *)(v & ~_LBIT);
if (CASPTR (&_LockWord, v, UNS(nfy)|_LBIT) == v) break;
// interference - _LockWord changed -- just retry
}
// Note that setting Notified before pushing nfy onto the cxq is
// also legal and safe, but the safety properties are much more
// subtle, so for the sake of code stewardship ...
OrderAccess::fence() ;
nfy->Notified = 1;
}
Thread::muxRelease (_WaitLock) ;
if (nfy != NULL && (NativeMonitorFlags & 16)) {
// Experimental code ... light up the wakee in the hope that this thread (the owner)
// will drop the lock just about the time the wakee comes ONPROC.
nfy->unpark() ;
}
assert (ILocked(), "invariant") ;
return true ;
}
join
join 会让线程的执行按照一定顺序,其实本质就是堵塞当前主线程,直到线程执行结束。
public final synchronized void join(long millis)
throws InterruptedException {
long base = System.currentTimeMillis();
long now = 0;
if (millis < 0) {
throw new IllegalArgumentException("timeout value is negative");
}
if (millis == 0) {
while (isAlive()) {
wait(0);
}
} else {
while (isAlive()) {
long delay = millis - now;
if (delay <= 0) {
break;
}
wait(delay);
now = System.currentTimeMillis() - base;
}
}
}
join的底层其实就用利用wait(0)的特性,当执行thread.join()时候,主线程会释放锁,被block,只有当该thread执行结束后会
自动唤醒当前wait的主线程,从而实现当某个线程执行join的时候,会等待这个线程完全执行完后再执行别的线程。
join代码的注释
* <p> This implementation uses a loop of {@code this.wait} calls
* conditioned on {@code this.isAlive}. As a thread terminates the
* {@code this.notifyAll} method is invoked. It is recommended that
* applications not use {@code wait}, {@code notify}, or
* {@code notifyAll} on {@code Thread} instances.
当一个线程类执行结束之后,会调用this.notifyAll()方法。所以不推荐在Thread实例里面
用wait()和notifyall
自动苏醒例子:
public class WaitTwo extends Thread {
@Override
public void run() {
try {
log.info("thread 执行");
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private synchronized void waitZero() {
try {
log.info(Thread.currentThread().getName());
wait();
log.info("thread 执行完毕,自动苏醒");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
WaitTwo waitTwo = new WaitTwo();
//注释 start就可以看到效果
waitTwo.start();
waitTwo.waitZero();
}
}
synchronized
参考:http://www.hollischuang.com/archives/1883
作用:1)确保线程互斥的访问同步代码(2)保证共享变量的修改能够及时可见(3)有效解决重排序问题
用法:1、修饰方法,方法又分实例方法和static方法。如果是实例方法,那就是对当前实例的对象加锁,如果是静态方法,
那就是对该类进行加锁
2、修饰代码块,是对指定的对象加锁,可以实现局部代码块加锁,提高并发效率。
一般开发的时候使用synchronized对代码块加锁场景多,对方法加锁效率太低了。
同步代码块synchronized底层原理:
例如代码:
public void minus() {
synchronized (this) {
age--;
}
}
反编译后的代码:
public minus()V
TRYCATCHBLOCK L0 L1 L2 null
TRYCATCHBLOCK L2 L3 L2 null
L4
LINENUMBER 15 L4
ALOAD 0
DUP
ASTORE 1
MONITORENTER
L0
LINENUMBER 16 L0
ALOAD 0
DUP
GETFIELD synchronizedthread/SynchronizedByteCode.age : I
ICONST_1
ISUB
PUTFIELD synchronizedthread/SynchronizedByteCode.age : I
L5
LINENUMBER 17 L5
ALOAD 1
MONITOREXIT
我们需要关心的反编译后的字节码命令:
MONITORENTER与MONITOREXIT
MONITORENTER
每个对象有一个监视器锁(monitor)。当monitor被占用时就会处于锁定状态,线程执行monitorenter指令时尝试获取monitor的所有权,过程如下:
1、如果monitor的进入数为0,则该线程进入monitor,然后将进入数设置为1,该线程即为monitor的所有者。
2、如果线程已经占有该monitor,只是重新进入,则进入monitor的进入数加1.
3.如果其他线程已经占用了monitor,则该线程进入阻塞状态,直到monitor的进入数为0,再重新尝试获取monitor的所有权。
MONITOREXIT
执行monitorexit的线程必须是objectref所对应的monitor的所有者。
指令执行时,monitor的进入数减1,如果减1后进入数为0,那线程退出monitor,不再是这个monitor的所有者。其他被这个monitor阻塞的线程可以尝试去获取这个 monitor 的所有权。
同步方法锁的底层原理:
当方法调用时,调用指令将会检查方法的 ACC_SYNCHRONIZED(acc_synchronized) 访问标志是否被设置,如果设置了,
执行线程将先获取monitor(监视器锁),获取成功之后才能执行方法体,方法执行完后再释放monitor。
在方法执行期间,其他任何线程都无法再获得同一个monitor对象。
其实本质上没有区别,只是方法的同步是一种隐式的方式来实现,无需通过字节码来完成。
锁存在对象哪里?
Java对象在内存中储存的布局可以分为3块区域:对象头、实例数据、对齐填充。synchronized使用的锁对象储存在对象头中
synchronized锁膨胀过程
monitor对象存储在对象的对象头中,当锁当状态为重量级锁当时候,它当指针指向monitor对象,其他锁的状态就没有用到
monitor对象,这是JVM对锁的优化,我们知道重量级锁是基于底层系统的mutex互斥的,这个开销是很大的,所以JVM
java同步的monitor
为了解决多线程安全的问题,java提供了同步机制,互斥锁机制,保证同一个时刻只能有一个线程访问资源
这个机制的保障来自于监视器锁monitor,让每个对象都拥有自己的监视锁Monitor
在Java虚拟机(HotSpot)中,Monitor是基于C++实现的,由ObjectMonitor实现的
ObjectMonitor中有几个关键属性:
_owner:指向持有ObjectMonitor对象的线程
_WaitSet:存放处于wait状态的线程队列
_EntryList:存放处于等待锁block状态的线程队列
_recursions:锁的重入次数
_count:用来记录该线程获取锁的次数
当多个线程同时访问一段同步代码时,首先会进入_EntryList队列中,
当某个线程获取到对象的monitor后进入_Owner区域并把monitor中的_owner变量设置为当前线程,
同时monitor中的计数器_count加1。即获得对象锁。
若持有monitor的线程调用wait()方法,将释放当前持有的monitor,_owner变量恢复为null,_count自减1,
同时该线程进入_WaitSet集合中等待被唤醒。若当前线程执行完毕也将释放monitor(锁)并复位变量的值,
以便其他线程进入获取monitor(锁)。
为什么说synchroized是重量级锁
Java的线程是映射到操作系统原生线程之上的,如果要阻塞或唤醒一个线程就需要操作系统的帮忙,
这就要从用户态转换到核心态,因此状态转换需要花费很多的处理器时间,
对于代码简单的同步块(如被synchronized修饰的get 或set方法)状态转换消耗的时间有可能比用户代码执行的时间还要长,
所以说synchronized是java语言中一个重量级的操纵
volatile
内存屏障的概念是针对CPU架构级别的,需要在JIT编译器生成机器码的时候才能看到,所以查看字节码看不出来效果
可见性
volatile修饰的变量通过JIT编译器生成机器码,会发现多个lock前缀的指令,
lock 前缀的指令在多核处理器下会引发了两件事情:
1、修改volatile修饰的变量会强制将修改后的值刷新到主内存,
2、这个写回内存的操作会引起在其他CPU里缓存了该内存地址的数据无效。
由volatile修饰的变量在写之后会插入一个store屏障,在读之前插入一个load屏障。
store屏障保证写操作被后面的线程立即可见。load屏障保证所有的读操作之前的写立即生效
有序性
根据happens-before原则,对一个volatile域的写,happens-before于任意后续对这个volatile域的读。
volatile关键字通过提供“内存屏障”的方式来防止指令被重排序,为了实现volatile的内存语义,编译器在生成字节码时,
会在指令序列中插入内存屏障来禁止特定类型的处理器重排序。
用户线程与守护线程
setDaemon(true)设置为守护线程,表示该线程是不重要的,进程退出时不需要等待这个线程执行完成
thread默认是用户线程 不会随着主线程推出而退出的。
java线程池
ThreadPoolExecutor
execute()提交任务是不需要返回值的
submit()提交任务是需要返回值的
ThreadPoolExecutor构造方法有以下几个核心参数
corePoolSize
核心线程数,线程池保持的最低线程数
allowCoreThreadTimeOut的值默认是false,也就是说核心线程默认是不可以被回收的
除非设置了allowCoreThreadTimeOut为true,核心线程到了空闲的存活时间也会被回收
maximumPoolSize
最大线程数,线程池允许存在的最大线程数。
keepAliveTime,imeUnit unit
非核心线程的闲置存活时间。
所以如果任务很多,并且每个任务执行的时间比较短,可以调大这个时间,提高线程的利用率
BlockingQueue workQueue
任务队列
ThreadFactory threadFactory
线程工厂,用来创建线程池里的新的线程。可以用来设置线程别名
Executors默认的是Executors.defaultThreadFactory();
缺点是不能设置线程池pool名称如果有多个线程池就没法区别了,所以一般要自定义
ThreadPoolExecutor
RejectedExecutionHandler handler
拒绝处理策略,当线程数达到线程池的上限并且阻塞队列满了的时候执行。
AbortPolicy 直接拒绝 抛出异常(Executors默认这种模式)
DiscardPolicy 直接丢弃任务 不处理
DiscardOldestPolicy 丢弃队列最老的数据 也就是老的任务
CallerRunsPolicy 如果添加到线程池失败,那么主线程会自己去执行该任务
allowCoreThreadTimeOut
allowCoreThreadTimeOut为true
该值为true,则线程池数量最后销毁到0个。
allowCoreThreadTimeOut为false 默认为false 所以keepAliveTime才起作用
销毁机制:超过核心线程数时,而且(超过最大值或者timeout过),就会销毁。
thread pool 状态设计
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
// runState is stored in the high-order bits
//对应的高3位值是111
private static final int RUNNING = -1 << COUNT_BITS;
//对应的高3位值是000
private static final int SHUTDOWN = 0 << COUNT_BITS;
//对应的高3位值是001
private static final int STOP = 1 << COUNT_BITS;
//对应的高3位值是010
private static final int TIDYING = 2 << COUNT_BITS;
//对应的高3位值是011
private static final int TERMINATED = 3 << COUNT_BITS;
其中,高3位就表示着"线程池状态",低29位则表示"线程池中的任务数量"
COUNT_BITS = Integer.SIZE - 3;
也就是11111111 11111111 11111100
<<是向左边位移
threadPoolExecutor 中的 shutdown() 、 shutdownNow() 、 awaitTermination() 的用法和区别
shutdown()和shutdownNow()的区别
从字面意思就能理解,shutdownNow()能立即停止线程池,正在跑的和正在等待的任务都停下了。
这样做立即生效,但是风险也比较大;
shutdown()只是关闭了提交通道,用submit()是无效的;而内部该怎么跑还是怎么跑,跑完再停。
shutdown()和awaitTermination()的区别
shutdown()后,不能再提交新的任务进去;但是awaitTermination()后,可以继续提交。
awaitTermination()是阻塞的,返回结果是线程池是否已停止(true/false);shutdown()不阻塞。
shutdown()
1、将线程池状态变更为shutdown 2、停止接受外部的submit任务 3、内部已经在跑的任务会执行完 4、等所有任务完成才会停止
shutdownNow()
1、将线程状态变更为stop 2、停止接受外部的submit任务 3、【忽略队列里等待的任务】 4、【尝试将正在跑的任务interrupt中断】 5、返回为执行的任务列表
awaitTermination()
1、当前线程阻塞,直到所有已提交的任务都执行完 2、已经执行完的线程,等待线程超时时间超过,自动销毁完
线程池监控
(1).taskCount:线程池需要执行的任务数量。
(2).completedTaskCount:线程池在运行过程中已完成的任务数量。小于或等于taskCount。
(3).largestPoolSize:线程池曾经创建过的最大线程数量。通过这个数据可以知道线程池是否满过。如等于线程池的最大大小,则表示线程池曾经满了。
(4).getPoolSize:
当前线程池存活的线程数量。如果线程池不销毁的话,池里的线程不会自动销毁,所以这个大小只增不减。
(5).getActiveCount:
活跃的线程数
poolSize >=activeCount
ExecutorService的各种写法
newFixedThreadPool=new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>())
newFixedThreadPool是固定线程池,线程池可以允许创建的线程数是固定的,所以核心线程和最大线程数一样,
也没有非核心线程回收的概念,所以keepAliveTime为0,因为要保持这些固定的线程执行所有任务,所以队列
使用的是LinkedBlockingQueue,队列的容量是Integer.MaxValue
newSingleThreadExecutor=new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>())
newSingleThreadExecutor是单个线程,执行所有任务,和上面的newFixedThreadPool类似
newCachedThreadPool=new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.MILLISECONDS,
new SynchronousQueue<Runnable>())
newCachedThreadPool是指创建的线程可以被缓存,同时也可以被回收,所有核心线程数是0,最大线程数为Integer.MAX_VALUE
,线程的空闲存活时间为60s,但是阻塞队列用的是SynchronousQueue
isTerminated
isTerminated当调用shutdown()方法后,并且所有提交的任务完成后返回为true;
isTerminated当调用shutdownNow()方法后,成功停止后返回为true;
BlockQueue
Queuel里面一些方法
remove 移除并返回队列头部的元素 如果队列为空,则抛出一个NoSuchElementException异常
poll 移除并返问队列头部的元素 如果队列为空,则返回null
add 增加一个元索 如果队列已满,则抛出一个IIIegaISlabEepeplian异常
offer 添加一个元素并返回true 如果队列已满,则返回false
put 添加一个元素 如果队列满,则阻塞
element 返回队列头部的元素 如果队列为空,则抛出一个NoSuchElementException异常
peek 返回队列头部的元素 如果队列为空,则返回null
BlockQueue继承了Collection,所以可以使用集合类的,add,contains等方法,其实底层都是调用AbstractQueue的方法
对于add其实是调用off()方法.
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : extract();
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return extract();
} finally {
lock.unlock();
}
}
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
offer(anObject):表示如果可能的话,将anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,
则返回true,否则返回false.(本方法不阻塞当前执行方法的线程)
offer(E o, long timeout, TimeUnit unit),可以设定等待的时间,如果在指定的时间内,还不能往队列中
加入BlockingQueue,则返回失败。
put(anObject):把anObject加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻断
直到BlockingQueue里面有空间再继续.
获取数据:
poll(time):取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,
取不到时返回null;
poll(long timeout, TimeUnit unit):从BlockingQueue取出一个队首的对象,如果在指定时间内,
队列一旦有数据可取,则立即返回队列中的数据。否则知道时间超时还没有数据可取,返回失败。
take():取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到
BlockingQueue有新的数据被加入;
drainTo():一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数),
通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁
如果需要等待的就使用put()和take(),不需要等待的就是使用offer和poll!
ArrayBlockingQueue
FIFO先入先出,构造函数需要指定大小
LinkedBlockingQueue
和ABQ类似,构造函数可以不指定大小,默认的大小是Integer.MAX_VALUE
PriorityBlockingQueue
和ABQ类似,但是依据对象的自然排序顺序或者是构造函数的Comparator决定的顺序.
SynchronousQueue
和ABQ类似,但是容量只有一个
当往这个Queue放东西时,必须有另外一个线程在从这个Queue里拿,如没有,则直接失败
ConcurrentLinkedQueue
ConcurrentLinkedQueue是Queue的一个安全实现.Queue中元素按FIFO原则进行排序.采用CAS操作,
来保证元素的一致性。
ThreadLocal
为什么要用threadLocal,自己如何实现变量副本功能
实现原理
如果想实现每个线程都有自己的变量副本,有两种方式,
一种是直接加锁,保证同一时间只有一个线程访问这个变量
还有一种就是每个线程分配一个变量副本,ThreadLocal就是使用这种方式。
public void set(T value) {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
createMap(t, value);
}
private void set(ThreadLocal<?> key, Object value) {
Entry[] tab = table;
int len = tab.length;
//得到ThreadLocal对应的hash值
int i = key.threadLocalHashCode & (len-1);
//判断当前key 是否已经存在过了,如果存在就是修改,如果不存在
就是获取i的最新值,然后重新set
for (Entry e = tab[i];
e != null;
e = tab[i = nextIndex(i, len)]) {
ThreadLocal<?> k = e.get();
if (k == key) {
e.value = value;
return;
}
if (k == null) {
replaceStaleEntry(key, value, i);
return;
}
}
tab[i] = new Entry(key, value);
int sz = ++size;
if (!cleanSomeSlots(i, sz) && sz >= threshold)
rehash();
}
void createMap(Thread t, T firstValue) {
t.threadLocals = new ThreadLocalMap(this, firstValue);
}
Thread==> ThreadLocal.ThreadLocalMap threadLocals = null;
为什么使用ThreadLocalMap
以前总是不理解为什么Thread上面绑定的是ThreadLocal.ThreadLocalMap而不是ThreadLocal,因为我们设置值
时候无非就是set和get,但当一个线程需要多个局部变量副本的时候,如果线程绑定的是ThreadLocal肯定是不满足
多个变量的场景,所以只能是ThreadLocalMap,而且ThreadLocalMap的key就是ThreadLocal实例.
每一个ThreadLocal都一个唯一的threadLocalHashCode()
为什么使用ThreadLocalMap,而不是普通的Map
Thread类里面定义了一个 ThreadLocal.ThreadLocalMap threadLocals变量
如果绑定的是Map<ThreadLocal,value> 会有什么问题么。
ThreadLocalMap的Entry为什么是软引用
首先了解下
强引用:垃圾回收期不会主动去回收,哪怕Java虚拟机报OutOfMemoryError
软引用:当Java堆内存不够的时候,才会进行回收 (没有任何强引用关联),例如缓存系统
弱引用:比软引用更容易被回收,当垃圾回收期线程执行的时候,不管当前堆内存是否够,都会回收(没有任何强引用关联)
ThreadLocalMap里面Entry key就是threadLocal对象本身,value就是放进threadLocal里面的值
假设ThreadLocalMap的Entry key threadLocal是强引用,当线程创建一个threadLocalA,将threadLocalA置为null,
ThreadLocalMap里面的Entry key 还是会持有threadLocalA的强引用,这时候threadLocalA是始终无法被回收的,除非线程回收了
线程的ThreadLocalMap才会被回收,不然会造成内存泄露,同时在使用线程池的情况下,ThreadLocalMa会一直存在的。
内存泄漏
在配合线程池使用的情况下可能会有内存泄露的风险
ThreadLocalMap内部Entry中key使用的是对ThreadLocal对象的弱引用,当key被回收后,对于的value还是不能被回收,这时候ThreadLocalMap里面就会存在key为null但是value不为null的entry项,虽然ThreadLocalMap提供了set,get,remove方法在一些时机下会对这些Entry项进行清理,但是这是不及时的,也不是每次都会执行的,所以一些情况下还是会发生内存泄露,所以在使用完毕后即使调用remove方法才是解决内存泄露的王道。
首先从ThreadLocal的直接索引位置(通过ThreadLocal.threadLocalHashCode & (table.length-1)运算得到)获取Entry e,如果e不为null并且key相同则返回e;
如果e为null或者key不一致则向下一个位置查询,如果下一个位置的key和当前需要查询的key相等,则返回对应的Entry。否则,如果key值为null,则擦除该位置的Entry,并继续向下一个位置查询。在这个过程中遇到的key为null的Entry都会被擦除,那么Entry内的value也就没有强引用链,自然会被回收。仔细研究代码可以发现,set操作也有类似的思想,将key为null的这些Entry都删除,防止内存泄露。
但是光这样还是不够的,上面的设计思路依赖一个前提条件:要调用ThreadLocalMap的getEntry函数或者set函数。这当然是不可能任何情况都成立的,所以很多情况下需要使用者手动调用ThreadLocal的remove函数,手动删除不再需要的ThreadLocal,防止内存泄露。所以JDK建议将ThreadLocal变量定义成private static的,这样的话ThreadLocal的生命周期就更长,由于一直存在ThreadLocal的强引用,所以ThreadLocal也就不会被回收,也就能保证任何时候都能根据ThreadLocal的弱引用访问到Entry的value值,然后remove它,防止内存泄露。
Thread启动源码
参考:https://www.ibm.com/developerworks/cn/java/j-lo-processthread/#icomments
无论是实现Runnable还是继承Thread启动线程,底层都是调用Thread的本地方法start0
private native void start0();
Thread类有个registerNatives本地方法,然后这个方法就是注册一些本地方法提供给Thread类用。
static {
registerNatives();
}
当线程调用start()方法后,会创建一个本地线程,然后会调用到Thread.c的JVM_StartThread方法,JVM_StartThread会调用,Thread的run()方法,
public void run() {
if (target != null) {
target.run();
}
}
CAS(compare and swap)
利用CPU的CAS指令,同时借助JNI来完成Java的非阻塞算法。其它原子操作都是利用类似的特性完成的。
而整个J.U.C都是建立在CAS之上的,因此对于synchronized阻塞算法,J.U.C在性能上有了很大的提升
private volatile int value;
public final int get() {
return value;
}
public final int incrementAndGet() {
for (;;) {// 这样优于while(true)
int current = get();// 获取当前值
int next = current + 1;// 设置更新值
if (compareAndSet(current, next))
return next;
}
}
AbstractQueuedSynchronizer(AQS)
什么是aqs,aqs其实是个FIFO的双向队列,是实现同步锁的最核心组建,ReetrantLock 、CountDownLatch都有使用到
aqs提供以下两种功能:独占锁(ReetrantLock)与共享锁(ReetrantReadWriteLock)
ReentrantLock 类图
FairSync extends Sync 公平锁
NonfairSync extends Sync 非公平锁
Sync extends AbstractQueuedSynchronizer
ReentrantLock源码分析
lock()过程,以默认的非公平锁为例
总结:
1、由于这里是非公平锁,所以调用lock方法时,先去通过cas去抢占锁,
如果抢占锁成功,保存获得锁成功的当前线程, 抢占锁失败,调用acquire来走锁竞争逻辑
2、通过tryAcquire尝试获取独占锁,如果成功返回true,失败返回false
3、如果tryAcquire失败,则会通过addWaiter方法将当前线程封装成Node添加到AQS队列尾部
4、acquireQueued,将Node作为参数,通过自旋去尝试获取锁。
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
//由于这里是非公平锁,所以调用lock方法时,先去通过cas去抢占锁 如果抢占锁成功,保存获得锁成功的当前线程
//抢占锁失败,调用acquire来走锁竞争逻辑
final void lock() {
//1、cas取抢锁status
if (compareAndSetState(0, 1))
//2、设置当前获取到锁的线程
setExclusiveOwnerThread(Thread.currentThread());
else
//3、获取锁
acquire(1);
}
//子类调用父类Sync
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
/**
* Performs {@link Lock#lock}. The main reason for subclassing
* is to allow fast path for nonfair version.
*/
abstract void lock();
//当state=0时,表示无锁状态
//当state>0时,表示已经有线程获得了锁,也就是state=1,但是因为ReentrantLock允许重入,
//所以同一个线程多次获得同步锁的时候,state会递增,比如重入5次,那么state=5。
//而在释放锁的时候,同样需要释放5次直到state=0其他线程才有资格获得锁
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer {
//tryAcquire 调用子类的方法
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
//5获取锁失败,阻塞
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
//自旋获取锁
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
}
AQS内部实现
// https://segmentfault.com/a/1190000017372067
aqs的实现依赖内部的同步队列(fifo双向队列),如果当前线程竞争锁失败,那么aqs会把当前线程以及等待信息构造
成一个Node放到同步队列里面,同时阻塞该线程,只有当线程释放锁时候,会从队列中唤醒一个阻塞的节点(线程)
+------+ prev +-----+ +-----+
* head | | <---- | | <---- | | tail
* +------+ +-----+ +-----+
public final void acquire(int arg) {
//
if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
//竞争失败,组装node存放到aqs里面
private Node addWaiter(Node mode) {
//创建新的node 并且指向到队列尾部
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
//队列尾部
Node pred = tail;
if (pred != null) {
//当前node的pre 指向以前的队列尾部
node.prev = pred;
//cas 将当前node节点设置成新的尾部
if (compareAndSetTail(pred, node)) {
// 老的尾部next 关联的是当前node
pred.next = node;
return node;
}
}
enq(node);
return node;
}
锁的类型
https://ddnd.cn/2019/03/22/java-synchronized-2/
1.公平锁 / 非公平锁
2.可重入锁 / 不可重入锁
3.独享锁 / 共享锁
4.互斥锁 / 读写锁
5.乐观锁 / 悲观锁
6.分段锁
7.偏向锁 / 轻量级锁 / 重量级锁
8.自旋锁
公平锁与非公平锁
公平锁是指多个线程按照申请锁的顺序获取到锁,非公平锁就是随机
Synchronized:非公平锁
ReetrantLock:
默认非公平锁
public ReentrantLock() {
sync = new NonfairSync();
}
可以通过构造方法改变
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
可重入锁与不可重入锁
可重入锁是指可重复可递归调用的锁
ReentrantLock和synchronized都是可重入锁
例如:
synchronized void setA() throws Exception{
Thread.sleep(1000);
setB();
}
synchronized void setB() throws Exception{
Thread.sleep(1000);
}
public void stepOne(){
try{
lock.lock();
log.info("stepOne:得到锁");
stepTwo();
}catch (Exception e){
}finally {
log.info("stepOne:释放锁");
lock.unlock();
}
}
public void stepTwo(){
try{
lock.lock();
log.info("stepTwo:得到锁");
}catch (Exception e){
}finally {
log.info("stepTwo:释放锁");
lock.unlock();
}
}
不可重入锁,
同一线程两次调用lock()方法,如果不执行unlock()释放锁的话,第二次调用自旋的时候就会产生死循环
public void lock() {
Thread current = Thread.currentThread();
//这句是很经典的“自旋”语法,AtomicInteger中也有
for (;;) {
if (!owner.compareAndSet(null, current)) {
return;
}
}
}
重入
public void lock() {
Thread current = Thread.currentThread();
if (current == owner.get()) {
state++;
return;
}
//这句是很经典的“自旋”式语法,AtomicInteger中也有
for (;;) {
if (!owner.compareAndSet(null, current)) {
return;
}
}
}
独享锁与共享锁
独享锁:该锁每一次只能被一个线程所持有,例如ReentrantLock和synchronized
共享锁:该锁可被多个线程共有,典型的就是ReentrantReadWriteLock里的读锁,
它的读锁是可以被共享的,但是它的写锁确每次只能被独占。
独享锁与共享锁也是通过AQS(AbstractQueuedSynchronizer,基础又是CAS)来实现的
互斥锁与读写锁
在访问共享资源之前对进行加锁操作,在访问完成之后进行解锁操作。 加锁后,任何其他试图再次加锁的线程会被阻塞,直到当前进程解锁
读写锁既是互斥锁,又是共享锁,read模式是共享,write是互斥(排它锁)的。
读写锁有三种状态:读加锁状态、写加锁状态和不加锁状
乐观锁与悲观锁
乐观锁:
总是假设最好的情况,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据
版本号机制和CAS算法实现,Java里面的atomic包下面的原子变量类就是使用了乐观锁的一种实现方式CAS实现的。
悲观锁:
总是假设最坏的情况,每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁
行锁,表锁等,读锁,写锁等,都是在做操作之前先上锁。Java中synchronized和ReentrantLock等独占锁就是悲观锁思想的实现
分段锁
分段锁其实是一种锁的设计,并不是具体的一种锁,
对于ConcurrentHashMap而言,其并发的实现就是通过分段锁的形式来实现高效的并发操作
并发容器类的加锁机制是基于粒度更小的分段锁,分段锁也是提升多并发程序性能的重要手段之一
偏向锁、轻量级锁、重量级锁
synchronized的锁有四种状态,无锁状态、偏向锁、轻量级锁、重量级锁,锁状态只能升级不能降级,
是jvm1.6为了提高锁的获取与释放效率而做的优化。
无锁 : cas应用
无锁的特点就是修改操作在循环内进行,线程会不断的尝试修改共享资源。如果没有冲突就修改成功并退出,否则就会继续循环尝试。
如果有多个线程修改同一个值,必定会有一个线程能修改成功,而其他修改失败的线程会不断重试直到修改成功
偏向锁:(java 对象头)
大多数情况下,锁不存在多线程竞争,例如某段同步代码一直被某一个线程访问,那么为了让线程
获取锁的代价降低而引入了偏向锁,后续该线程访问同步代码块的时候不需要尝试获取锁了
当一个线程访问同步代码块并获取锁时,会在Mark Word里存储锁偏向的线程ID,这样在线程进入与推出时候不再需要cas操作加锁和解锁
,而是检测Mark Word里是否存储着指向当前线程的偏向锁,
轻量级锁:(java 对象头)
一般用于两个线程交替使用锁的时候,当某个线程获取到锁后,其他线程通过cas自旋的方式获取锁,不会阻塞
提供性能。
重量级锁:
轻量级锁的线程虽然是自旋,但不会一直旋转下去,当自旋达到一定次数(默认10次),就会升级为重量级锁
线程只能进行挂起阻塞等待唤醒了,每一个对象都有一个monitor,而monitor依赖操作系统的mutexlock互斥锁
来实现,线程被阻塞后便进入内核调度状态,导致系统在用户态与内核状态直接来回切换,严重印象锁的性能。
自旋锁
自旋锁是指:
当一个线程在获取锁的时候,如果锁已经被其它线程获取,那么该线程将循环等待,然后不断的判断锁是否能够被成功获取,直到获取到锁才会退出循环
CAS算法,是一种有名的无锁算法。无锁编程,即不使用锁的情况下实现多线程之间的变量同步,也就是在没有线程被阻塞的情况下实现变量的同步
CAS算法涉及到三个操作数
1.需要读写的内存值 V
2.进行比较的值 A
3.拟写入的新值 B
更新一个变量的时候,只有当变量的预期值A和内存地址V当中的实际值相同时,才会将内存地址V对应的值修改为B,
否则不会执行任何操作。一般情况下是一个自旋操作,即不断的重试
cas 会出现aba的问题,可以用版本号区别 AtomicStampedReference
自旋锁的优点:
1、自旋锁不会使线程状态发生切换,一直处于用户态,即线程一直都是active的;
不会使线程进入阻塞状态,减少了不必要的上下文切换,执行速度快
2、非自旋锁在获取不到锁的时候会进入阻塞状态,从而进入内核态,当获取到锁的时候需要从内核态恢复,需要线程上下文切换。
(线程被阻塞后便进入内核(Linux)调度状态,这个会导致系统在用户态与内核态之间来回切换,严重影响锁的性能)
AtomicInteger 其实就是通过unsafe类获取当前value的内存偏移量
public final int getAndAddInt(Object o, long offset, int delta) {
int v;
do {
//获取当前值
v = getIntVolatile(o, offset);
//如果预期值current与内存值相等
} while (!weakCompareAndSetInt(o, offset, v, v + delta));
return v;
}
public final int getAndIncrement() {
for (;;) {//一直循环直到CAS成功
int current = get();//获取当前值(预期值)
int next = current + 1; //当前值+1(更新值)
if (compareAndSet(current, next))//如果预期值current与内存值相等,设置新值
return current;//返回新增之前的值
}
}
底层代码:
unsaffe.app类:
// cas compareand set
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))
UnsafeWrapper("Unsafe_CompareAndSwapInt");
oop p = JNIHandles::resolve(obj);
jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);
return (jint)(Atomic::cmpxchg(x, addr, e)) == e;
UNSAFE_END
atomic_linux_x86.inline.hpp类:
inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value) {
int mp = os::is_MP();
//程序会根据当前处理器的类型来决定是否为cmpxchg指令添加lock前缀
__asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)"
: "=a" (exchange_value)
: "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp)
: "cc", "memory");
return exchange_value;
}
用嵌入的汇编实现的, CPU指令是 cmpxchg,程序会根据当前处理器的类型来决定是否为cmpxchg指令添加lock前缀。
如果程序是在多处理器上运行,就为cmpxchg指令加上lock前缀(lock cmpxchg).
反之,如果程序是在单处理器上运行,就省略lock前缀(单处理器自身会维护单处理器内的顺序一致性,不需要lock前缀提供的内存屏障效果).
lock前缀的作用说明:1禁止该指令与之前和之后的读和写指令重排序,2把写缓冲区中的所有数据刷新到内存中。
参考文章
使用threadlocal不当可能会导致内存泄露
Java并发性和多线程介绍目录
http://ifeve.com/volatile/
https://www.cnblogs.com/paddix/p/5367116.html