JUC
1
2
3
4
5
6
7
| public class Test1 {
public static void main(String[] args) {
// 获取cpu核数
// cpu密集型 io密集型
System.out.println(Runtime.getRuntime().availableProcessors());
}
}
|
- ==wait会释放锁,sleep不会释放==
- wait必须在同步代码块中,sleep任意位置
- wait不要捕获异常,sleep必须捕获
锁的分类
可重入锁
如果锁具备可重入性,则称作为可重入锁。像synchronized和ReentrantLock都是可重入锁。举个简单的例子,当一个线程执行到某个synchronized方法时,比如说method1,而在method1中会调用另外一个synchronized方法method2,此时线程不必重新去申请锁,而是可以直接执行方法method2。
可中断锁
synchronized就不是可中断锁,而Lock是可中断锁。如果某一线程A正在执行锁中的代码,另一线程B正在等待获取该锁,可能由于等待时间过长,线程B不想等待了,可以让它中断自己或者在别的线程中中断它。
公平锁
公平锁即尽量以请求锁的顺序来获取锁。多个线程在等待一个锁,当这个锁被释放时,等待时间最久的线程(最先请求的线程)会获得锁,这种就是公平锁。非公平锁即无法保证锁的获取是按照请求锁的顺序进行的。这样就可能导致某个或者一些线程永远获取不到锁。 synchronized就是非公平锁,它无法保证等待的线程获取锁的顺序。而对于ReentrantLock和ReentrantReadWriteLock,它默认情况下是非公平锁,但是可以设置为公平锁。
集合类不安全
- ConcurrentModificationException并发修改异常
- 解决办法
- List不安全
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
| import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
class ListTest {
public static void main(String[] args) {
/**
* 并发编程下ArrayList不安全
* 1.List<String> list = new Vector<>();
* 2.List<String> list = Collections.synchronizedList(new ArrayList<>());
* 3.List<String> list = new CopyOnWriteArrayList<>();
*/
// CopyOnWrite 写入时复制
List<String> list = new CopyOnWriteArrayList<>();
for (int i = 1; i <= 10; i++) {
new Thread(() -> {
list.add(UUID.randomUUID().toString().substring(0, 5));
System.out.println(list);
}, String.valueOf(i)).start();
}
}
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| import java.util.*;
import java.util.concurrent.CopyOnWriteArraySet;
class SetTest {
public static void main(String[] args) {
/**
* 1.Set<String> set = Collections.synchronizedSet(new HashSet<>());
* 2.Set<String> set = new CopyOnWriteArraySet<>();
*/
Set<String> set = new CopyOnWriteArraySet<>();
for (int i = 0; i < 10; i++) {
new Thread(()->{
set.add(UUID.randomUUID().toString().substring(0, 5));
System.out.println(set);
}, String.valueOf(i)).start();
}
}
}
|
1
2
3
4
5
6
7
| public HashSet() {
map = new HashMap<>();
}
public boolean add(E e) {
return map.put(e, PRESENT)==null;
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
class MapTest {
public static void main(String[] args) {
// 默认等价与new HashMap<>(16, 0.75); 初始化容量和加载因子
Map<String, String> map = new ConcurrentHashMap<>();
for (int i = 1; i <= 10; i++) {
new Thread(() -> {
map.put(Thread.currentThread().getName(), UUID.randomUUID().toString().substring(0, 5));
System.out.println(map);
}, String.valueOf(i)).start();
}
}
}
|
Callable
- Runnable没有返回值:public abstract void run();,Callable可以有返回值:V call() throws Exception;
- 可以抛出异常
- 方法不同,run()/call()
- 源码
1
2
3
4
5
6
7
8
9
| public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}
|
Future
- Future只是一个接口
- 能判断任务是否完成;能够中断任务;能够获取任务执行结果。
- 源码
1
2
3
4
5
6
7
8
9
10
11
12
13
| public interface Future<V> {
// 如果取消已经完成的任务会返回false;如果任务正在执行,若mayInterruptIfRunning设置为true,则返回true,若mayInterruptIfRunning设置为false,则返回false;如果任务还没有执行,则无论mayInterruptIfRunning为true还是false,肯定返回true。
boolean cancel(boolean mayInterruptIfRunning);
// 是否被取消成功
boolean isCancelled();
// 是否已经完成
boolean isDone();
// 获取执行结果,这个方法会产生阻塞,会一直等到任务执行完毕才返回
V get() throws InterruptedException, ExecutionException;
// 如果在指定时间内,还没获取到结果,就直接返回null
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
|
FutureTask
1
2
3
| public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}
|
1
| public class FutureTask<V> implements RunnableFuture<V>
|
RunnableFuture继承了Runnable接口和Future接口,而FutureTask实现了RunnableFuture接口。所以它既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值。
1
2
3
4
5
6
7
8
9
10
11
| public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW;
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW;
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
| import java.util.concurrent.*;
class MyCallable {
public static void main(String[] args) throws InterruptedException {
ExecutorService executor = Executors.newCachedThreadPool();
Task task = new Task();
Future<Integer> result = executor.submit(task);
executor.shutdown();
TimeUnit.SECONDS.sleep(1);
System.out.println("主线程在执行任务");
try {
System.out.println("task运行结果" + result.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
System.out.println("所有任务执行完毕");
}
}
class Task implements Callable<Integer> {
@Override
public Integer call() throws Exception {
System.out.println("子线程在进行计算");
TimeUnit.SECONDS.sleep(2);
int sum = 0;
for (int i = 0; i < 100; i++)
sum += i;
return sum;
}
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
| import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
class MyCallable {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// new Thread(new Runnable()).start();
// new Thread(new FutureTask<V>()).start();
// new Thread(new FutureTask<V>( Callable )).start();
MyThread myThread = new MyThread();
FutureTask<Integer> integerFutureTask = new FutureTask<>(myThread); // 适配类
new Thread(integerFutureTask, "A").start();
new Thread(integerFutureTask, "B").start(); // 结果会被缓存,只会打印一个call
Integer o = integerFutureTask.get(); // 返回值 此方法可能会产生阻塞,最好放在最后一行
System.out.println(o);
}
}
class MyThread implements Callable<Integer> {
@Override
public Integer call() throws Exception {
System.out.println("call()");
return 7;
}
}
|
CountDownLatch减法计数器
- CountDownLatch 允许一个或多个线程等待一些特定的操作完成,而这些操作是在其它的线程中进行的
- CountDownLatch 构造函数中有一个 count 参数,表示有多少个线程需要被等待。其他线程调用countDown()方法,每调用一次 countDown 方法就表示有一个被等待的线程到达,count 变为 0 时,latch(闩shuan锁)就会被打开,处于等待状态的那些线程接着可以执行
- 一个线程中可以调用多次countDown()
- CountDownLatch 是一次性使用的,没有为 count 变量提供 set 的方法
- CountDownLatch非常适合于对任务进行拆分,使其并行执行,总的执行时间将决定于执行最慢的任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
| import java.util.concurrent.CountDownLatch;
class MyCountDownLatch {
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(6);
new Thread(()->{
try {
countDownLatch.await();
// 可以多个线程一起等待
System.out.println("over1");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
for (int i = 0; i < 6; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName());
// 计数器减一
countDownLatch.countDown();
}, String.valueOf(i)).start();
}
// 等待计数器归零才会往下执行
countDownLatch.await();
System.out.println("over2");
}
}
|
CyclicBarrier栅栏
- CyclicBarrier也叫同步屏障,在JDK1.5被引入,可以让一组线程达到一个屏障时被阻塞,直到最后一个线程达到屏障时,所以被阻塞的线程才能继续执行。
- CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset()方法重置,可以使用多次,所以CyclicBarrier能够处理更为复杂的场景;
- CyclicBarrier还提供了一些其他有用的方法,比如getNumberWaiting()方法可以获得CyclicBarrier阻塞的线程数量,isBroken()方法用来了解阻塞的线程是否被中断;
- CountDownLatch允许一个或多个线程等待一组事件的产生,而CyclicBarrier用于等待其他线程运行到栅栏位置。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
class MyCyclicBarrier {
public static void main(String[] args) throws BrokenBarrierException, InterruptedException {
CyclicBarrier cyclicBarrier = new CyclicBarrier(7, () -> {
System.out.println("over");
});
for (int i = 0; i < 7; i++) {
final int temp = i;
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + " " + temp);
try {
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}
}
}
|
Semaphore
1
2
3
4
5
6
7
| public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
| import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
class MySemaphore {
public static void main(String[] args) {
// 线程数量 限流
Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < 6; i++) {
new Thread(() -> {
try {
// P获取
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + "获得资源");
// 6个线程一开始有三个获取到临界资源,剩下三个阻塞
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName() + "释放资源");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// V释放
semaphore.release();
}
}).start();
}
}
}
|
ReadWriteLock读写锁
- ReadWriteLock只允许一个线程修改,但是get()方法只读,不修改数据,实际上允许多个线程同时调用,使用
ReadWriteLock
可以解决这个问题 适合读多写少的场景
- 源码
1
2
3
4
| public interface ReadWriteLock {
Lock readLock();
Lock writeLock();
}
|
- 实现类:ReentrantReadWriteLock
- 读锁(共享锁),写锁(独占锁)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
| import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
class MyReadWriteLock {
public static void main(String[] args) {
MyCache myCache = new MyCache();
for (int i = 0; i < 6; i++) {
final int temp = i;
new Thread(() -> {
// lambda无法访问到外部的变量
myCache.put(temp + "", "value:" + temp);
}, String.valueOf(i)).start();
}
for (int i = 0; i < 6; i++) {
final int temp = i;
new Thread(() -> {
myCache.get(temp + "");
}, String.valueOf(i)).start();
}
}
}
class MyCache {
private final Map<String, Object> map = new HashMap<>();
// 读写锁
private final ReadWriteLock lock = new ReentrantReadWriteLock();
// 只有一个可以写
public void put(String key, Object value) {
lock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName() + "写入" + value);
map.put(key, value);
System.out.println(Thread.currentThread().getName() + "写完毕");
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.writeLock().unlock();
}
}
// 可以多个同时读
public void get(String key) {
lock.readLock().lock();
try {
System.out.println(Thread.currentThread().getName() + "读取" + map.get(key));
System.out.println(Thread.currentThread().getName() + "读完毕");
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.readLock().unlock();
}
}
}
|
BlockingQueue
- Collection->Queue->BlockingQueue
- 使用场景:多线程并发处理,线程池
- Queue源码
1
2
3
4
5
6
7
8
| public interface Queue<E> extends Collection<E> {
boolean add(E e);
boolean offer(E e);
E remove();
E poll();
E element();
E peek();
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| public interface BlockingQueue<E> extends Queue<E> {
boolean add(E e);
boolean offer(E e);
void put(E e) throws InterruptedException;
boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException;
E take() throws InterruptedException;
E poll(long timeout, TimeUnit unit)
throws InterruptedException;
int remainingCapacity();
boolean remove(Object o);
boolean contains(Object o);
int drainTo(Collection<? super E> c);
int drainTo(Collection<? super E> c, int maxElements);
}
|
ArrayBlockingQueue
方式 | 抛出异常 | 不抛出异常 | 阻塞等待 | 超时等待 |
---|
添加 | add(o) | offer(o) | put(o) | offer(o, timeout, timeunit) |
移除 | remove() | poll() | take() | poll(timeout, timeunit) |
判断队首 | element() | peek() | — | — |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
| import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
class MyBlockingQueue {
public static void main(String[] args) throws InterruptedException {
// 抛出异常
// test1();
// 不抛出异常
// test2();
// 阻塞等待
// test3();
// 超时等待
test4();
}
public static void test1() {
// 队列大小
ArrayBlockingQueue<Object> arrayBlockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(arrayBlockingQueue.add("a")); // true
System.out.println(arrayBlockingQueue.add("b")); // true
System.out.println(arrayBlockingQueue.add("c")); // true
// System.out.println(arrayBlockingQueue.add("d")); // IllegalStateException队列满
System.out.println(arrayBlockingQueue.element()); // 查看队首
System.out.println(arrayBlockingQueue.remove()); // ture
System.out.println(arrayBlockingQueue.remove()); // ture
System.out.println(arrayBlockingQueue.remove()); // ture
// System.out.println(arrayBlockingQueue.remove()); // NoSuchElementException
}
public static void test2() {
// 队列大小
ArrayBlockingQueue<Object> arrayBlockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(arrayBlockingQueue.offer("a"));
System.out.println(arrayBlockingQueue.offer("b"));
System.out.println(arrayBlockingQueue.offer("c"));
System.out.println(arrayBlockingQueue.offer("d")); // 不跑出异常,返回false
System.out.println(arrayBlockingQueue.peek());
System.out.println(arrayBlockingQueue.poll());
System.out.println(arrayBlockingQueue.poll());
System.out.println(arrayBlockingQueue.poll());
System.out.println(arrayBlockingQueue.poll()); // 不抛出异常,返回null
}
public static void test3() throws InterruptedException {
// 队列大小
ArrayBlockingQueue<Object> arrayBlockingQueue = new ArrayBlockingQueue<>(3);
// 没有返回值
arrayBlockingQueue.put("a");
arrayBlockingQueue.put("b");
arrayBlockingQueue.put("c");
// arrayBlockingQueue.put("d");// 会一直等
System.out.println(arrayBlockingQueue.take()); // a
System.out.println(arrayBlockingQueue.take()); // b
System.out.println(arrayBlockingQueue.take()); // c
// System.out.println(arrayBlockingQueue.take());// 会一直等
}
public static void test4() throws InterruptedException {
// 队列大小
ArrayBlockingQueue<Object> arrayBlockingQueue = new ArrayBlockingQueue<>(3);
arrayBlockingQueue.offer("a");
arrayBlockingQueue.offer("b");
arrayBlockingQueue.offer("c");
arrayBlockingQueue.offer("d", 2, TimeUnit.SECONDS);// 两秒后结束等待
System.out.println(arrayBlockingQueue.poll()); // a
System.out.println(arrayBlockingQueue.poll()); // b
System.out.println(arrayBlockingQueue.poll()); // c
System.out.println(arrayBlockingQueue.poll(2, TimeUnit.SECONDS)); // 两秒后结束等待,返回null
System.out.println("over");
}
}
|
SynchronousQueue同步队列
1
2
3
4
5
6
7
8
9
| public SynchronousQueue() {
this(false);
}
public SynchronousQueue(boolean fair) {
// 公平:队尾匹配队头出队
// 非公平:先入栈后匹配
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
|
- 它一种阻塞队列,其中每个 put 必须等待一个 take,反之亦然。同步队列没有任何内部容量,甚至连一个队列的容量都没有。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
| import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
class MySynchronousQueue {
public static void main(String[] args) {
// 同步队列
BlockingQueue<String> blockingQueue = new SynchronousQueue<>();
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + " put\t1");
// 往queue放进去一个element以后就一直wait直到有其他thread进来把这个element取走
blockingQueue.put("1");
System.out.println(Thread.currentThread().getName() + " put\t2");
blockingQueue.put("2");
System.out.println(Thread.currentThread().getName() + " put\t3");
blockingQueue.put("3");
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "Producer").start();
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(1);
// 取出并删除element,取不到东西他会一直等
System.out.println(Thread.currentThread().getName() + " take\t" +blockingQueue.take());
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName() + " take\t" +blockingQueue.take());
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName() + " take\t" +blockingQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "Consumer").start();
}
}
|
LinkedBlockingQueue
1
2
3
4
5
6
7
8
9
10
| // 默认容量
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
|
PriorityBlockingQueue
- 带优先级的无界阻塞队列
- 每次出队都返回优先级最高的元素,是二叉树最小堆的实现
线程池
- 3大方法,7大参数,4大拒绝策略
- 降低资源消耗
- 提高响应速度
- 方便管理
- ==线程复用,控制最大并发数,管理线程==
- 最大线程定义
- CPU密集型:几核cpu就设置为几,通过代码去获取Runtime().getRuntime().availableProcessors()
- IO密集型:判断程序中十分耗费IO的线程
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
class Demo1 {
public static void main(String[] args) {
// 单个线程
// ExecutorService threadPool = Executors.newSingleThreadExecutor();
// 可伸缩的
// ExecutorService threadPool = Executors.newCachedThreadPool();
// 固定大小
ExecutorService threadPool = Executors.newFixedThreadPool(5);
// CPU核心数
System.out.println(Runtime.getRuntime().availableProcessors() + "核");
try {
for (int i = 0; i < 10; i++) {
// 使用线程池创建线程
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName());
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 关闭线程池
threadPool.shutdown();
}
}
}
1
2
3
| public interface Executor {
void execute(Runnable command);
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
| public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
|
- Executors部分函数全是调用了ThreadPoolExecutors的构造函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| public static ExecutorService newSingleThreadExecutor () {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newFixedThreadPool ( int nThreads){
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool () {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| public ThreadPoolExecutor(int corePoolSize, // 核心线程大小
int maximumPoolSize, // 最大大小
long keepAliveTime, // 超时了没人调用就会释放
TimeUnit unit, // 超时单位
BlockingQueue<Runnable> workQueue, // 阻塞队列
ThreadFactory threadFactory, // 线程工厂
RejectedExecutionHandler handler) { // 拒绝策略
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
|
拒绝策略(阻塞队列满了后触发拒绝策略)
- AbortPolicy():抛出异常
- CallerRunsPolicy():把任务队列中的任务放在调用者线程当中运行
- DiscardPolicy() :丢弃任务队列中最老的一个任务,也就是当前任务队列中最先被添加进去的,马上要被执行的那个任务,并尝试再次提交
- DiscardOldestPolicy():尝试和最早的线程竞争,不会抛出异常
自定义线程池
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
| import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
class Demo1 {
public static void main(String[] args) {
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, // 核心线程大小
5, // 最大大小(阻塞队列满了后会启用新线程)
3, // 超时时间(超时释放的是非核心的线程)
TimeUnit.SECONDS, // 超时时间单位
new LinkedBlockingQueue<>(3), // 阻塞队列
Executors.defaultThreadFactory(), // 线程工厂
new ThreadPoolExecutor.AbortPolicy()); // 阻塞队列满了后的拒绝策略
try {
// 最大承载 = 队列 + max
for (int i = 0; i < 10; i++) {
// 使用线程池创建线程
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName());
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 关闭线程池
threadPool.shutdown();
}
}
}
|
1
2
3
4
| ScheduledThreadPoolExecutor executor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1);
executor.scheduleAtFixedRate(() -> {
}, 0, 50, TimeUnit.MILLISECONDS);
|
1
2
3
4
5
6
7
| ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
scheduledExecutorService.schedule(new Runnable() {
@Override
public void run() {
System.out.println(111);
}
}, 3, TimeUnit.SECONDS);
|
- 阻塞队列
- new LinkedBlockingQueue<>(3):设置固定的容量,此例中:i < 5时,阻塞队列刚刚满,5个任务使用两个核心线程;i < 6时,多了一个线程,6>队列+core,此时会新开辟一个线程(总线程数不大于5),6个任务使用两个核心线程和一个新开辟的线程
- new LinkedBlockingQueue<>():无界阻塞队列。默认容量为Integer.MAX_VALUE,核心线程都被使用后,新来的线程全都放在阻塞队列中,此时相当于设置的maximumPoolSize无效,并发数固定为corePoolSize
- new SynchronousQueue<>():创建的线程数大于maximumPoolSize时,直接执行拒绝策略
- new ArrayBlockingQueue<>(3):有界阻塞队列。类似new LinkedBlockingQueue<>(3)
- new PriorityBlockingQueue<>():优先级阻塞队列
Stream流式计算
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
| package juc;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
public class Demo6 {
/**
* 有五个用户,进行筛选
* 1.id为偶数
* 2.年龄大于23
* 3.用户名转大写
* 4.用户名字母倒序
* 5.只输出一个用户
*/
public static void main(String[] args) {
User u1 = new User(1,"a",21 );
User u2 = new User(2,"b",22 );
User u3 = new User(3,"c",23 );
User u4 = new User(4,"d",24 );
User u5 = new User(6,"e",25 );
// 集合就是存储
List<User> list = Arrays.asList(u1, u2, u3, u4, u5);
// 计算交给Stream
// 链式编程,lambda表达式,函数式接口,Stream流式计算
list.stream()
.filter(u ->{return u.getId()%2==0;})
.filter(u->{return u.getAge()>23;})
.map(u->{return u.getName().toUpperCase();})
.sorted((uu1,uu2)->{return uu2.compareTo(uu1);})
.limit(1)
.forEach(System.out::println);
}
}
class User{
private int id;
private String name;
private int age;
public User(){
}
public User(int id, String name, int age) {
this.id = id;
this.name = name;
this.age = age;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
@Override
public String toString() {
return "User{" +
"id=" + id +
", name='" + name + '\'' +
", age=" + age +
'}';
}
}
|
ForkJoin
- 工作窃取,里面维护的都是双端队列,大数据量的时候使用
package juc;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
import java.util.stream.LongStream;
public class MyForkJoin extends RecursiveTask<Long> {
private Long start;
private Long end;
private Long temp = 10000L;// 临界值
public MyForkJoin(Long start, Long end) {
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
if((end-start)<temp){
Long sum = 0L;
for (Long i = start; i <= end; i++) {
sum += i;
}
return sum;
}else{
// 分支合并计算
long middle = (start+end) / 2;// 中间值
MyForkJoin task1 = new MyForkJoin(start, middle);
task1.fork();// 拆分任务,把任务压入队列
MyForkJoin task2 = new MyForkJoin(middle+1, end);
task2.fork();// 拆分任务,把任务压入队列
return task1.join() + task2.join();
}
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
// test1();// 5483
// test2();// 3799
test3();// 124
}
// 普通的
public static void test1(){
Long sum = 0L;
long start = System.currentTimeMillis();
for (Long i = 1L; i <= 10_0000_0000; i++) {
sum += i;
}
long end = System.currentTimeMillis();
System.out.println("总时间:" + (end-start) + " sum=" + sum);
}
// 使用ForkJoin
public static void test2() throws ExecutionException, InterruptedException {
long start = System.currentTimeMillis();
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask<Long> task = new MyForkJoin(0L, 10_0000_0000L);
ForkJoinTask<Long> submit = forkJoinPool.submit(task);
Long sum = submit.get();
long end = System.currentTimeMillis();
System.out.println("总时间:" + (end-start) + " sum=" + sum);
}
// Stream并行流
public static void test3(){
long start = System.currentTimeMillis();
Long sum = LongStream.rangeClosed(0L, 10_0000_0000L)
.parallel()
.reduce(0, Long::sum);
long end = System.currentTimeMillis();
System.out.println("总时间:" + (end-start) + " sum=" + sum);
}
}
异步回调
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
| package juc.Future;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
public class MyFuture {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// // 没有返回值的异步回调
// CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(()->{
// try {
// TimeUnit.SECONDS.sleep(2);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// System.out.println(Thread.currentThread().getName() + "runAsync->void");
// });
//
// System.out.println("77777");
// completableFuture.get();// 获取执行结果
// 有返回值的异步回调
CompletableFuture<Integer> uCompletableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "supplyAsync->Integer");
int i = 10/0;
return 1024;
});
System.out.println(uCompletableFuture.whenComplete((t, u) -> {
System.out.println("t=" + t);// 正常的返回
System.out.println("u=" + u);// 错误信息
}).exceptionally((e) -> {
System.out.println(e.getMessage());
return 404;// 可以获取到错误的返回结果
}).get());
}
}
|
JMM
- java内存模型,不存在,只是概念、约定
- 线程解锁前:必须把共享变量立刻刷回主存
- 线程加锁前:必须读取主存中最新值到工作内存中
- 加锁解锁必须是同一把锁
- 八种操作
- lock:作用于主内存的变量,它把有个变量表示为一个线程独占的状态
- unlock:作用于主内存的变量,它把一个锁定状态的变量解锁,解锁之后的变量才可以被其他变量锁定
- read:作用于主内存的变量,它把一个变量的值从主内存传输到线程的工作内存当中
- load:作用于工作内存的变量,它把 load 操作从主内存中得到的变量的值放入工作内存的变量副本当中
- use:作用于工作内存的变量,它把工作内存中的变量传递给执行引擎,每当虚拟机需要使用一个变量的值时就会只执行这个操作
- assign:作用于工作内存的变量,它把从执行引擎接收到的值赋给工作内存中的变量,每当虚拟机遇到一个给变量赋值的字节码是执行操作
- store:作用于工作内存的变量,它把工作内存中一个变量的值传送到主内存中,以便 write 使用
- write:作用于主内存的变量,它把 store 操作从工作内存中得到的变量的值放到主内存的变量中
- 要求
- 不允许 read 和 load、store 和 write 操作之一单独出现,即:不允许一个变量从主内存中读取但是工作内存不接受,或者从工作内存发起会写了但是主内存不接受的情况出现
- 不允许一个线程丢失了它的最近的 assign 操作,即:变量在工作内存中改变了之后必须把该变量同步到主内存当中
- 不允许一个线程无原因(没有发生过任何 assign 操作)把数据从线程的工作内存同步回主内存 一个新的变量只能在主内存中诞生(初始化)
- 不允许在工作内存中直接使用一个未被初始化(load 或 assign)的变量,即:就是对一个变量实施 use、store 操作之前,必须先执行 assign 和 load 操作
- 一个变量在同一个时刻只允许一个 lock 对其操作,但是 lock 操作可以被一个线程多次执行。执行多次 lock 之后只有执行相同的 unlock 变量才可以被其他线程使用
- 如果对一个变量执行 lock 操作,那将会清空工作内存中此变量的值,在执行引擎使用这个变量之前,需要重新执行 load 或 assign 操作进行初始化
- 如果一个变量实现没有被执行 lock 操作,那么无法执行 unlock 操作
- 对一个变量执行 unlock 操作之前必须先把此变量同步到主内存当中(store、write 操作)
Volatile
- jvm提供的==轻量级的同步机制==
- 保证可见性
- ==不保证原子性==
- 禁止指令重排:源代码,编译器优化的重排,指令并行重排,内存系统重排,执行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
| package juc;
import java.util.concurrent.TimeUnit;
public class MyVolatile {
// 加volatile可以保证可见性
private volatile static int num = 0;
public static void main(String[] args) {
new Thread(()->{// 线程对主内存的变化不知道
while (num == 0) {
}
}).start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
num = 1;
System.out.println(num);
}
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
| package juc;
public class MyV {
private volatile static int num = 0;
public static void main(String[] args) {
for (int i = 0; i < 20; i++) {
new Thread(()->{
for (int j = 0; j < 1000; j++) {
add();
}
}).start();
}
while (Thread.activeCount() > 2){
Thread.yield();
}
System.out.println(Thread.currentThread().getName() + " " + num);// 结果小于20000
}
public static void add(){// 加synchronized可以保证原子性
num++;
}
}
|
CAS
CAS
缺点:循环会耗时;一次性只能保持一个共享变量的原子性;ABA问题
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| package juc.cas;
import java.util.concurrent.atomic.AtomicInteger;
public class CASDemo {
// CAS compareAndSet比较并交换 是CPU的并发原语
public static void main(String[] args) {
AtomicInteger atomicInteger = new AtomicInteger(2020);
// 如果是期望的值就更新,否则一直循环
System.out.println(atomicInteger.compareAndSet(2020, 2021));// true
System.out.println(atomicInteger.get());
atomicInteger.getAndIncrement();// 2021
System.out.println(atomicInteger.compareAndSet(2020, 2021));// false
System.out.println(atomicInteger.get());// 2022
}
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| package juc.cas;
import java.util.concurrent.atomic.AtomicInteger;
public class CASDemo {
public static void main(String[] args) {
AtomicInteger atomicInteger = new AtomicInteger(2020);
// A对资源操作过了,又改回去了
System.out.println(atomicInteger.compareAndSet(2020, 2021));// true
System.out.println(atomicInteger.get());
System.out.println(atomicInteger.compareAndSet(2021, 2020));// true
System.out.println(atomicInteger.get());
// B不知情
System.out.println(atomicInteger.compareAndSet(2020, 2022));// true
System.out.println(atomicInteger.get());
}
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
| package juc.cas;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicStampedReference;
public class CASDemo {
public static void main(String[] args) {
// !如果泛型是一个包装类,注意对象的引用问题
// 带版本号的原子操作
AtomicStampedReference<Integer> atomicInteger = new AtomicStampedReference<>(1, 1);
new Thread(()->{
// 获得版本号
System.out.println("A1->" + atomicInteger.getStamp());
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(atomicInteger.compareAndSet(1, 2,
atomicInteger.getStamp(), atomicInteger.getStamp() + 1));
System.out.println("A2->" + atomicInteger.getStamp());
System.out.println(atomicInteger.compareAndSet(2, 1,
atomicInteger.getStamp(), atomicInteger.getStamp() + 1));
System.out.println("A3->" + atomicInteger.getStamp());
},"A").start();
// 乐观锁原理相同
new Thread(()->{
int stamp = atomicInteger.getStamp();// 获得版本号
System.out.println("B1->" + atomicInteger.getStamp());
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(atomicInteger.compareAndSet(1, 6, stamp, stamp + 1));
System.out.println("B2->" + atomicInteger.getStamp());
},"B").start();
}
}
|
Unsafe类
1
2
3
4
5
6
7
8
| @IntrinsicCandidate
public final int getAndAddInt(Object o, long offset, int delta) {
int v;
do {
v = getIntVolatile(o, offset);
} while (!weakCompareAndSetInt(o, offset, v, v + delta));// 获取内存地址中的值,自旋锁
return v;
}
|
1
2
3
| public final int getAndIncrement() {
return U.getAndAddInt(this, VALUE, 1);
}
|
各种锁的理解
- 公平锁:不能插队,必须先来后到
- 非公平锁:可以插队,默认都是非公平
1
2
3
4
5
6
7
| public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
|
- 可重入锁:拿到外面的锁,就可以拿到里面的锁,自动获得
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
| package juc.lock;
public class Demo1 {
public static void main(String[] args) {
Phone phone = new Phone();
new Thread(()->{
phone.sms();
}, "A").start();
new Thread(()->{
phone.sms();
}, "B").start();
/**
* Asms
* Acall
* Bsms
* Bcall
*/
}
}
class Phone{
public synchronized void sms(){
System.out.println(Thread.currentThread().getName() + "sms");
call();
}
public synchronized void call(){
System.out.println(Thread.currentThread().getName() + "call");
}
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
| package juc.lock;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class Demo2 {
public static void main(String[] args) {
Phone2 phone = new Phone2();
new Thread(()->{
phone.sms();
}, "A").start();
new Thread(()->{
phone.sms();
}, "B").start();
/**
* Asms
* Acall
* Bsms
* Bcall
*/
}
}
class Phone2{
Lock lock = new ReentrantLock();
public void sms(){
lock.lock();// 和call的锁不同,是两把锁
try {
System.out.println(Thread.currentThread().getName() + "sms");
call();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void call(){
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + "call");
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
| package juc.lock;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
public class MySpinlock {
AtomicReference<Thread> atomicReference = new AtomicReference<>();
// 加锁
public void myLock() {
Thread thread = Thread.currentThread();
System.out.println(Thread.currentThread().getName() + " -> mylock");
// 自旋锁
while (!atomicReference.compareAndSet(null, thread)){
}
}
// 解锁
public void myUnLock() {
Thread thread = Thread.currentThread();
System.out.println(Thread.currentThread().getName() + " -> myUnlock");
atomicReference.compareAndSet(thread, null);
}
public static void main(String[] args) throws InterruptedException {
MySpinlock mySpinlock = new MySpinlock();
new Thread(()->{
mySpinlock.myLock();
try {
TimeUnit.SECONDS.sleep(3);
} catch (Exception e) {
e.printStackTrace();
} finally {
mySpinlock.myUnLock();
}
}, "A").start();
TimeUnit.SECONDS.sleep(1);
new Thread(()->{
mySpinlock.myLock();
try {
TimeUnit.SECONDS.sleep(3);
} catch (Exception e) {
e.printStackTrace();
} finally {
mySpinlock.myUnLock();
}
}, "B").start();
/**
* A -> mylock
* B -> mylock
* A -> myUnlock
* B -> myUnlock
*/
}
}
|
- 死锁
- 使用“jps -l”定位进程号
- 使用“jstack 进程号”寻找死锁问题