线程
大约 13 分钟
线程
进程&线程
- 进程:资源分配的最小单位,指在系统中运行的一个应用程序;程序一旦运行就是进程
- 线程:程序执行的最小单位,系统分配处理器时间资源的基本单元,或者说进程内独立执行的一个单元执行流
wait&sleep
- sleep 是 Thread 的静态方法,wait 是 Object 的方法,任何对象实例都能调用
- sleep 不需要释放锁,他也不需要占用锁,wait 会释放锁,但调用他的前提是当前线程占有锁(即代码要在 synchronized 中) 他们都可以被 interrupted 方法中断
并发&并行
- 并发:同一时刻多个线程在访问同一个资源,多个线程对同一个点
- 并行:多项工作一起执行,之后在汇总
线程
- 用户线程:用户自定义线程 主线程结束了,用户线程仍能继续执行,jvm 存活
- 守护线程:垃圾回收等守护程序运行的线程 主线程结束了,守护线程也会结束,jvm 结束
Sychronized 关键字(同步锁)
- 修饰代码块,被修饰的代码块称为同步语句块,其作用范围是{}括起来的代码,作用的对象是调用这个代码块的对象
- 修饰方法,被修饰的方法称为同步方法,其作用方位是整个方法,作用的对象是调用这个方法的对象
多线程编程步骤(高内聚,低耦合)
- 创建资源类,在资源类创建属性和操作方法
- 在资源类操作方法(1)判断(2)干活(3)通知(4)防止虚假唤醒
- 创建多个线程,调用资源类的操作方法
例:3 个售票员,卖出 30 张票
//1.资源类
class Ticket {
//票数
private int number = 30;
//2.操作方法 卖票
public synchronized void sale() {
//判断是否有票
if (number > 0) {
System.out.println(Thread.currentThread().getName() + ":卖出:" + (number--) + " 剩下:" + number);
}
}
}
public class SynchroTickets {
public static void main(String[] args) {
Ticket ticket = new Ticket();
//3.创建 3 个线程,操作资源类
new Thread(()->{
for (int i = 0; i < 40; i++) {
ticket.sale();
}
},"AA").start();
new Thread(()->{
for (int i = 0; i < 40; i++) {
ticket.sale();
}
},"BB").start();
new Thread(()->{
for (int i = 0; i < 40; i++) {
ticket.sale();
}
},"CC").start();
}
}
Lock&Synchronized
- Lock 不是 java 语言内置的,synchronized 是 java 语言的关键字
- lock 是一个类,通过这个类可实现同步访问 第 2 页 JUC 高并发编程
- 采用 synchronized 不需要用户去主动释放锁,当同步方法或者同步代码块执行完 以后,系统会自动让线程释放对锁的占用,而 lock 必须用户去主动释放锁,如果没有释放就 可能出现死锁的现象
- 通过 lock 可以知道有没有成功获取锁,而 synchronized 无法办到
- lock 可以提高多个线程进行读操作效率
例:3 个售票员,卖出 30 张票
class Ticket {
// 票数
private int number = 30;
// 操作方法 卖票
private final ReentrantLock lock = new ReentrantLock();
public void sale() {
lock.lock();
// 判断是否有票
try {
if (number > 0) {
System.out.println(Thread.currentThread().getName() + ":卖出:" + (number--) + " 剩下:" + number);
}
} finally {
lock.unlock();
}
}
}
public class LockTickets {
public static void main(String[] args) {
Ticket ticket = new Ticket();
// 2.创建 3 个线程,操作资源类
new Thread(() -> {
for (int i = 0; i < 40; i++) {
ticket.sale();
}
}, "AA").start();
new Thread(() -> {
for (int i = 0; i < 40; i++) {
ticket.sale();
}
}, "BB").start();
new Thread(() -> {// 调用 start 方法并不一定立马创建线程,看操作系统
for (int i = 0; i < 40; i++) {
ticket.sale();
}
}, "CC").start();
}
}
例:加 1 减 1 轮换操作:synchronized
//第一步:创建资源类,定义属性和操作方法
class Share {
// 初始值
private int number = 0;
// +1 的方法
public synchronized void incr() throws InterruptedException {
// 第二部操作:判断,干活,通知
while (number != 0) {// 不是 0,等待
this.wait();// 第四部防止虚假唤醒 wait 在哪里睡在哪里醒(while)
}
number++;
System.out.println(Thread.currentThread().getName() + "::" + number);
this.notifyAll();
}
// -1 的方法
public synchronized void decr() throws InterruptedException {
// 第二部操作:判断,干活,通知
while (number != 1) {// 不是 0,等待
this.wait();
}
number--;
System.out.println(Thread.currentThread().getName() + "::" + number);
this.notifyAll();
}
}
public class ThreadDemo1 {
// 创建多个线程,调用资源类
public static void main(String[] args) {
Share share = new Share();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
share.incr();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}, "AA").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
share.decr();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}, "BB").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
share.incr();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}, "CC").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
share.decr();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}, "DD").start();
}
}
例:加 1 减 1 轮换操作:lock
class Share {
private int number = 0;
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
public void incr() throws InterruptedException {
// 上锁
lock.lock();
try {
// 判断
while (number != 0) {
condition.await();
}
// 干活
number++;
System.out.println(Thread.currentThread().getName() + "::" + number);
// 通知
condition.signalAll();
} finally {
// 解锁
lock.unlock();
}
}
public void decr() throws InterruptedException {
// 上锁
lock.lock();
try {
// 判断
while (number != 1) {
condition.await();
}
// 干活
number--;
System.out.println(Thread.currentThread().getName() + "::" + number);
// 通知
condition.signalAll();
} finally {
// 解锁
lock.unlock();
}
}
}
public class ThreadDemo2 {
// 创建多个线程,调用资源类
public static void main(String[] args) {
Share share = new Share();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
share.incr();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}, "AA").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
share.decr();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}, "BB").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
share.incr();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}, "CC").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
share.decr();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}, "DD").start();
}
}
例:定向通知
//创建资源类
class ShareResource {
// 定义标志位
private int flag = 1;
private Lock lock = new ReentrantLock();
private Condition c1 = lock.newCondition();
private Condition c2 = lock.newCondition();
private Condition c3 = lock.newCondition();
public void print5(int loop) throws InterruptedException {
lock.lock();
try {
while (flag != 1) {
c1.await();
}
for (int i = 0; i <= 5; i++) {
System.out.println(Thread.currentThread().getName() + "::" + i + " :轮数" + loop);
}
flag = 2;
c2.signal();
} finally {
lock.unlock();
}
}
public void print10(int loop) throws InterruptedException {
lock.lock();
try {
while (flag != 2) {
c2.await();
}
for (int i = 0; i <= 10; i++) {
System.out.println(Thread.currentThread().getName() + "::" + i + " :轮数" + loop);
}
flag = 3;
c3.signal();
} finally {
lock.unlock();
}
}
public void print15(int loop) throws InterruptedException {
lock.lock();
try {
while (flag != 3) {
c3.await();
}
for (int i = 0; i <= 15; i++) {
System.out.println(Thread.currentThread().getName() + "::" + i + " :轮数" + loop);
}
flag = 1;
c1.signal();
} finally {
lock.unlock();
}
}
}
public class ThreadDemo3 {
public static void main(String[] args) {
ShareResource shareResource = new ShareResource();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
shareResource.print5(i);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}, "AA").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
shareResource.print10(i);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}, "BB").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
shareResource.print15(i);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}, "CC").start();
}
}
集合线程安全
- Vector
- Collections.synchronizedList(new ArrayList<>())
- new CopyOnWriteArrayList<>()
- new CopyOnWriteArraySet<>()
- new ConcurrentHashMap<>()
问题8锁8 锁(Lock_8)
- 标准访问,先打印短信还是邮件
- AA---------sengSMS
- BB---------sengEmail
- 停 4 秒在短信方法内,先打印短信还是邮件
- AA---------sengSMS
- BB---------sengEmail
- 新增普通微信方法,先打印短信还是微信
- BB---------sengWeiXin
- AA---------sengSMS
- 现在有两部手机,先打印短信还是先打印邮件
- BB---------sengEmail
- AA---------sengSMS
- 两个静态同步方法,1 不手机,先打印短信还是先打印邮件
- AA---------sengSMS
- BB---------sengEmail
- 两个静态同步方法,2 部手机,先打印短信还是先打印邮件
- AA---------sengSMS
- BB---------sengEmail
- 1 个静态同步方法,1 个普通同步方法,1 部手机,先打印邮件还是先打印短信
- BB---------sengEmail
- AA---------sengSMS
- 1 个静态同步方法,1 个普通同步方法,2 部手机,先打印邮件还是先打印短信
- BB---------sengEmail
- AA---------sengSMS
class Phone {
public static synchronized void sengSMS() {
try {
TimeUnit.SECONDS.sleep(4);
System.out.println(Thread.currentThread().getName() + "---------sengSMS");
} catch (Exception e) {
e.printStackTrace();
}
}
public synchronized void sengEmail() {
System.out.println(Thread.currentThread().getName() + "---------sengEmail");
}
public void sengWeiXin() {
System.out.println(Thread.currentThread().getName() + "---------sengWeiXin");
}
}
public class Lock_8 {
public static void main(String[] args) {
Phone phone = new Phone();
Phone phone1 = new Phone();
new Thread(() -> {
phone.sengSMS();
}, "AA").start();
new Thread(() -> {
phone1.sengEmail();
// phone.sengWeiXin();
}, "BB").start();
}
}
syncronized 实现同步的基础
java 中的每一个对象都可以作为锁,具体表现形式为:
- 对于普通同步方法,锁是当前实例对象
- 对于静态同步方法,锁是当前类的 Class 对象
- 对于同步方法块,锁是 Syncronized 括号例配置的对象
可重入锁(递归锁)
注意:使用 lock 完成可重入锁时必须上锁和解锁成对出现,否则其他线程一直处于等待释放锁的过程
死锁
- 两个或者两个以上的进程在执行过程中,因为争夺资源而造成的一种相互等待的现象,如果没有外力干涉,他们无法在执行下去
- 产生原因:
- 系统资源不足
- 进程运行推进顺序不合适
- 资源分配不当
public class DeadLock {
// 创建两个对象
static Object a = new Object();
static Object b = new Object();
public static void main(String[] args) {
new Thread(() -> {
synchronized (a) {
System.out.println(Thread.currentThread().getName() + " 持有锁 a,试图获取锁 b");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
synchronized (b) {
System.out.println(Thread.currentThread().getName() + " 获取锁到锁 b");
}
}
}, "A").start();
new Thread(() -> {
synchronized (b) {
System.out.println(Thread.currentThread().getName() + " 持有锁 b,试图获取锁 a");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
synchronized (a) {
System.out.println(Thread.currentThread().getName() + " 获取锁到锁 a");
}
}
}, "B").start();
}
}
创建线程的 4 种方式
- 继承 Thread 类
- 实现 Runnable 接口
- Callable 接口(可以得到返回结果)
- 线程池方式
Runnable&Callable
- 是否有返回值,callable 有返回值
- 是否会抛出异常,callable 会抛出异常
- 实现方法名不同,一个是 run 方法,一个是 call 方法
- Runnable 接口有实现类 FutureTask,FutureTask 的构造器可以传 Callable
class MyThread1 implements Runnable {
@Override
public void run() {
}
}
class MyThread2 implements Callable {
@Override
public Object call() throws Exception {
return 200;
}
}
public class Demo1 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<Integer> futureTask = new FutureTask<>(new MyThread2());
// 简化
FutureTask<Integer> futureTask2 = new FutureTask<>(() -> {
System.out.println(Thread.currentThread().getName() + " come in callable");
return 1024;
});
// 创建一个线程
new Thread(futureTask2, "AA").start();
while (!futureTask2.isDone()) {
System.out.println("wait.......");
}
// 调用 FutureTask 方法
System.out.println(futureTask2.get());
System.out.println(Thread.currentThread().getName() + " come over");
}
}
FutureTask 未来任务
在不影响主线程的情况下,单开一个线程去执行任务,主线程所需资源另开线程去获取
CountDownLatch 减少计数
- CountDownLatch 主要有两个方法,当一个或者多个线程调用 await 方法时,这些线程会阻塞
- 其他线程调用 countDown 方法会将计数器减 1(调用 countDown 方法的线程不会阻塞)
- 当计数器的值变为 0 时,因 await 方法阻塞的线程会被唤醒,继续执行
例:6 个同学陆续离开教室后,班长才可以锁门
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
CountDownLatch c = new CountDownLatch(6);
for (int i = 1; i <= 6; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + " 号位同学离开教室");
c.countDown();
}, String.valueOf(i)).start();
}
c.await();
System.out.println(Thread.currentThread().getName() + " 班长锁门走人");
}
}
CyclicBarrier 循环栅栏
例:集齐 7 龙珠召唤神龙
/**
* 集齐 7 龙珠召唤神龙
*/
public class CyclicBarrierDemo {
private static final int NUMBER = 7;
public static void main(String[] args) {
CyclicBarrier c = new CyclicBarrier(NUMBER, () -> {
System.out.println("集齐 7 龙珠召唤神龙");
});
// 集齐 7 颗龙珠的过程
for (int i = 1; i <= 7; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + " 星龙珠被收集到了");
try {
c.await();
} catch (Exception e) {
throw new RuntimeException(e);
}
}, String.valueOf(i)).start();
}
}
}
Semaphore 信号量
例:6 辆汽车停 3 个停车位
public class SemaphoreDemo {
public static void main(String[] args) {
// 创建 3 个停车位
Semaphore semaphore = new Semaphore(3);
// 模拟 6 辆汽车
for (int i = 1; i <= 6; i++) {
new Thread(() -> {
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + " 抢到了车位");
// 设置随机停车时间
TimeUnit.SECONDS.sleep(new Random().nextInt(5));
System.out.println(Thread.currentThread().getName() + " ---离开了车位----");
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
semaphore.release();
}
}, String.valueOf(i)).start();
}
}
}
乐观锁&悲观锁
- 悲观锁:效率低,不支持并发操作
- 乐观锁:携带数据版本号控制
表锁&行锁
行锁会发生死锁
读锁&写锁
- 读锁:共享锁,会发生死锁(同一条记录:线程 1 的写在等待线程 2 读完成,线程 2的写在等待线程 1 的读完成)
- 写锁:独占锁,会发生死锁(两条记录:线程 1 在写第一条记录,等待写第二条记录,线程 2 此时在写第二条记录;线程 2 在写第二条记录,等待写第一条记录,线程 1 此时在写 第一条记录;)
- 一个资源可以被多个读线程访问,或者可以被一个写线程访问,但是不能同时存在读写线程,读写互斥,读读共享。
- 可能会造成锁饥饿,一直读,没有写操作,读时不能进行写操作,只有读完成之后才可以写,写操作时可以读
锁降级
将写入锁降级为读锁,jdk8 说明:
- 获取写锁
- 获取读锁(write.lock 后可以 read.lock,反过来不行)
- 释放写锁
- 释放读锁
阻塞队列
通过一个共享的队列,可以使得数据有队列的一端输入另一端输出
特点:当队列是空的,从队列中获取元素的操作将会被阻塞当队列是满的,从队列中添加元素的操作将会被阻塞
好处:不用关心什么时候需要被唤醒线程,什么时候需要被阻塞线程
分类:
- ArrayBlockingQueue(常用,由数组结构组成的有界阻塞队列)
- LinkedBlockingQueue(常用,由链表结构组成的有界阻塞队列)
- DelayQueue(使用优先级队列实现的延迟无界阻塞队列)
- PriorityBlockingQueue(支持优先级排序的阻塞队列)
- SynchroniousQueue(不存储元素的阻塞队列,即单个元素的队列)
- LinkedTransferQueue(由链表组成的无界阻塞队列)
- LinkedBlockingDeque(由链表组成的双向阻塞队列)
public class BlockingQueueDemo {
public static void main(String[] args) throws Exception {
test4();
}
private static void test1() {
BlockingQueue<String> a = new ArrayBlockingQueue(3);
// 第一组方法
System.out.println(a.add("a"));
System.out.println(a.add("b"));
System.out.println(a.add("c"));
System.out.println(a.remove("d"));
System.out.println(a.element());
// System.out.println(a.add("d"));//抛出异常
System.out.println(a.remove());
System.out.println(a.remove());
System.out.println(a.remove());
System.out.println(a.remove());// 抛出异常
}
private static void test2() {
BlockingQueue<String> a = new ArrayBlockingQueue(3);
// 第二组方法
System.out.println(a.offer("a"));
System.out.println(a.offer("b"));
System.out.println(a.offer("c"));
System.out.println(a.offer("d"));// 添加失败返回 false
// System.out.println(a.poll());
System.out.println(a.poll());
System.out.println(a.poll());
System.out.println(a.poll());// 取出失败返回 null
}
private static void test3() throws Exception {
BlockingQueue<String> a = new ArrayBlockingQueue(3);
// 第三组方法
a.put("a");
a.put("b");
a.put("c");
// a.put("d");//放不进去阻塞中
System.out.println(a.take());
System.out.println(a.take());
System.out.println(a.take());
System.out.println(a.take());// 取不出来阻塞中
}
private static void test4() throws Exception {
BlockingQueue<String> a = new ArrayBlockingQueue(3);
// 第三组方法
System.out.println(a.offer("a"));
System.out.println(a.offer("b"));
System.out.println(a.offer("c"));
System.out.println(a.offer("222", 3L, TimeUnit.SECONDS));// 放不进去阻塞 3 秒
System.out.println(a.poll());
System.out.println(a.poll());
System.out.println(a.poll());
System.out.println(a.poll(3L, TimeUnit.SECONDS));// 取不出来阻塞 3 秒
}
}
ThreadPool 线程池
特点:
- 降低资源消耗,通过重复利用一创建的线程降低线程创建和销毁的消耗
- 提高响应速度,当任务到达时,任务可以不需要等待线程创建就能立即执行
- 提高线程的可管理性:线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配,调优和监控
//演示线程池 3 中基础分类(底层都用到了 ThreadPoolExecutor)
public class ThreadPoolDemo {
public static void main(String[] args) {
// 一池 n 线程
// ExecutorService pool1 = Executors.newFixedThreadPool(5);//5 个窗口
// 一池 1 线程
// ExecutorService pool1 = Executors.newSingleThreadExecutor();//1 个窗口
// 可扩容
ExecutorService pool1 = Executors.newCachedThreadPool();
// 10 个顾客请求
try {
for (int i = 1; i <= 10; i++) {
pool1.execute(() -> {
System.out.println(Thread.currentThread().getName() + " 办理业务");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
pool1.shutdown();
}
}
}
线程池的 7 个参数介绍
- corePoolSize:核心线程数量
- maximumPoolSize:最大线程数量
- keepAliveTime:线程存活时间
- TimeUnit:线程存活时间单位
BlockingQueue<Runnable>
:阻塞队列- ThreadFactory:线程工厂
- RejectdExecutionHandler:拒绝策略
自定义线程池
Executors 返回的线程池对象弊端如下:
- FixedThreadPool 和 SingleThreadPool:允许的请求队列长度为Integer.MAX_VALUE,可能会堆积大量请求,从而导致 oom
- CachedThreadPool 和 ScheduledThreadPool:允许的创建线程数量为Integer.MAX_VALUE,可能会创建大量线程,从而导致 oom
package com.game.server.controller;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadPoolDemo1 {
public static void main(String[] args) {
ExecutorService pool1 = new ThreadPoolExecutor(2, // corePoolSize
5, // maximumPoolSize
2L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(3), Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
// 10 个顾客请求
try {
for (int i = 1; i <= 10; i++) {
pool1.execute(() -> {
System.out.println(Thread.currentThread().getName() + " 办理业务");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
pool1.shutdown();
}
}
}