Java Concurrency: Atomic Variables
http://www.baptiste-wicht.com/2010/09/java-concurrency-atomic-variables/
http://ifeve.com/atomic-operation/
AtomicStampedReference
使用循环CAS实现原子操作
在Java并发包中有一些并发框架也使用了自旋CAS的方式来实现原子操作,比如LinkedTransferQueue类的Xfer方法。CAS虽然很高效的解决原子操作,但是CAS仍然存在三大问题。ABA问题,循环时间长开销大和只能保证一个共享变量的原子操作。
DK提供了AtomicReference类来保证引用对象之间的原子性,你可以把多个变量放在一个对象里来进行CAS操作。
public class Stack {
private final AtomicReference<Element> head = new AtomicReference<Element>(null);
public void push(String value){
Element newElement = new Element(value);
while(true){
Element oldHead = head.get();
newElement.next = oldHead;
//Trying to set the new element as the head
if(head.compareAndSet(oldHead, newElement)){
return;
}
}
}
}
http://sns.ruanko.com/space.php?uid=60391&do=blog&id=186531
workerCount:当前活动的线程数;
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
这个是用一个int来表示workerCount和runState的,其中runState占int的高3位,其它29位为workerCount的值。
用AtomicInteger是因为其在并发下使用compareAndSet效率非常高;
当改变当前活动的线程数时只对低29位操作,如每次加一减一,workerCount的值变了,但不会影响高3位的runState的值。
当改变当前状态的时候,只对高3位操作,不会改变低29位的计数值。
http://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/7u40-b43/java/util/concurrent/ThreadPoolExecutor.java#ThreadPoolExecutor
因为CAPACITY值为:00011111111111111111111111111111,所以&操作将参数的高3位置0了
* 保留参数的低29位,也就是workerCount的值
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int runStateOf(int c) { return c & ~CAPACITY; }
~为按位取反操作,则~CAPACITY值为:11100000000000000000000000000000
* 再同参数做&操作,就将低29位置0了,而高3位还是保持原先的值,也就是runState的值
* 将runState和workerCount存到同一个int中
private static int ctlOf(int rs, int wc) { return rs | wc; }
不知道大家有没有注意到,当当前活动的线程数量 >= corePoolSize 的时候,都是优先添加到队列中,直到队列满了才会去创建新的线程
http://blog.csdn.net/novelly/article/details/16344105
http://www.ticmy.com/?p=243
1. 添加一个task的过程
当要添加一个新的task,如果当前线程数小于 corePoolSize,直接添加一个线程,即使当前有空闲的线程。否则添加队列中。如果队列满了呢?则会判断当前线程数是否小于maximumPoolSize,如是则添加一个新的线程用来执行该task。如果超出最大线程数,那就只能reject了。
http://xiaoz5919.iteye.com/blog/1746217
http://tutorials.jenkov.com/java-util-concurrent/threadpoolexecutor.html
https://github.com/kimchy/kimchy.github.com/blob/master/_posts/2008-11-23-juc-executorservice-gotcha.textile
Java Fork/Join
ForkJoinPool:
http://www.javabeat.net/simple-introduction-to-fork-join-framework-in-java-7/
Fork/Join addresses the need for divide-and-conquer, or recursive task-processing in Java programs
Fork/Join's logic is very simple: (1) separate (fork) each large task into smaller tasks; (2) process each task in a separate thread (separating those into even smaller tasks if necessary); (3) join the results.
It is an ExecutorService for running ForkJoinTasks and also managing and monitoring the tasks. It employs worker-stealing algorithm where in idle threads can pick subtasks created by other active tasks and execute them. In this way there is efficient execution of tasks spawned by other tasks.
http://ifeve.com/talk-concurrency-forkjoin/
Fork/Join框架提供了以下两个子类:
RecursiveAction:用于没有返回结果的任务。
RecursiveTask :用于有返回结果的任务。
ForkJoinPool :ForkJoinTask
需要通过ForkJoinPool来执行,任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务。
ForkJoinPool由ForkJoinTask数组和ForkJoinWorkerThread数组组成,ForkJoinTask数组负责存放程序提交给ForkJoinPool的任务,而ForkJoinWorkerThread数组负责执行这些任务。
public class ForkJoinPool extends AbstractExecutorService
* Array serving as submission queue. Initialized upon construction.
private ForkJoinTask<?>[] submissionQueue;
private final ForkJoinWorkerThreadFactory factory;
pushTask方法把当前任务存放在ForkJoinTask 数组queue里。然后再调用ForkJoinPool的signalWork()方法唤醒或创建一个工作线程来执行任务
ForkJoinTask的join方法实现原理。Join方法的主要作用是阻塞当前线程并等待获取结果
http://coopsoft.com/ar/CalamityArticle.html
http://www.ibm.com/developerworks/library/j-jtp11137/
Merge sort is an example of a divide-and-conquer algorithm, where a problem is recursively broken down into subproblems, and the solutions to the subproblems are combined to arrive at the final result. Divide-and-conquer algorithms are often useful in sequential environments but can become even more effective in parallel environments because the subproblems can often be solved concurrently.
Result solve(Problem problem) {
if (problem.size < SEQUENTIAL_THRESHOLD)
return solveSequentially(problem);
else {
Result left, right;
INVOKE-IN-PARALLEL {
left = solve(extractLeftHalf(problem));
right = solve(extractRightHalf(problem));
}
return combine(left, right);
}
}
This kind of parallel decomposition is often called fork-join because executing a task forks (starts) multiple subtasks and then joins (waits for completion) with them.
Note that the subproblem() method does not copy the elements; it merely copies the array reference and offsets into an existing data structure.
A ForkJoinExecutor is like an Executor in that it is designed for running tasks, except that it specifically designed for computationally intensive tasks that do not ever block except to wait for another task being processed by the same ForkJoinExecutor.
Conventional thread pools are designed for tasks that are independent of each other and are also designed with potentially blocking, coarse-grained tasks in mind — fork-join solutions produce neither. Fine-grained tasks in conventional thread pools can also generate excessive contention for the task queue shared among all workers.
Work stealing
Fork-join is a technique that makes it easy to express divide-and-conquer parallel algorithms.
Fork-join embodies the technique of divide-and-conquer; take a problem and recursively break it down into subproblems until the subproblems are small enough that they can be more effectively solved sequentially. The recursive step involves dividing a problem into two or more subproblems, queueing the subproblems for solution (the fork step), waiting for the results of the subproblems (the join step), and merging the results.
http://www.java-allandsundry.com/2012/08/mergesort-using-forkjoin-framework.html
http://www.oracle.com/technetwork/articles/java/fork-join-422606.html
The issue is that of parallelism: When a Callable waits for the result of another Callable, it is put in a waiting state, thus wasting an opportunity to handle another Callable queued for execution.
http://www.baptiste-wicht.com/2010/09/java-concurrency-atomic-variables/
http://ifeve.com/atomic-operation/
AtomicStampedReference
使用循环CAS实现原子操作
在Java并发包中有一些并发框架也使用了自旋CAS的方式来实现原子操作,比如LinkedTransferQueue类的Xfer方法。CAS虽然很高效的解决原子操作,但是CAS仍然存在三大问题。ABA问题,循环时间长开销大和只能保证一个共享变量的原子操作。
DK提供了AtomicReference类来保证引用对象之间的原子性,你可以把多个变量放在一个对象里来进行CAS操作。
public class Stack {
private final AtomicReference<Element> head = new AtomicReference<Element>(null);
public void push(String value){
Element newElement = new Element(value);
while(true){
Element oldHead = head.get();
newElement.next = oldHead;
//Trying to set the new element as the head
if(head.compareAndSet(oldHead, newElement)){
return;
}
}
}
}
http://sns.ruanko.com/space.php?uid=60391&do=blog&id=186531
workerCount:当前活动的线程数;
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
这个是用一个int来表示workerCount和runState的,其中runState占int的高3位,其它29位为workerCount的值。
用AtomicInteger是因为其在并发下使用compareAndSet效率非常高;
当改变当前活动的线程数时只对低29位操作,如每次加一减一,workerCount的值变了,但不会影响高3位的runState的值。
当改变当前状态的时候,只对高3位操作,不会改变低29位的计数值。
http://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/7u40-b43/java/util/concurrent/ThreadPoolExecutor.java#ThreadPoolExecutor
因为CAPACITY值为:00011111111111111111111111111111,所以&操作将参数的高3位置0了
* 保留参数的低29位,也就是workerCount的值
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int runStateOf(int c) { return c & ~CAPACITY; }
~为按位取反操作,则~CAPACITY值为:11100000000000000000000000000000
* 再同参数做&操作,就将低29位置0了,而高3位还是保持原先的值,也就是runState的值
* 将runState和workerCount存到同一个int中
private static int ctlOf(int rs, int wc) { return rs | wc; }
不知道大家有没有注意到,当当前活动的线程数量 >= corePoolSize 的时候,都是优先添加到队列中,直到队列满了才会去创建新的线程
http://blog.csdn.net/novelly/article/details/16344105
http://www.ticmy.com/?p=243
1. 添加一个task的过程
当要添加一个新的task,如果当前线程数小于 corePoolSize,直接添加一个线程,即使当前有空闲的线程。否则添加队列中。如果队列满了呢?则会判断当前线程数是否小于maximumPoolSize,如是则添加一个新的线程用来执行该task。如果超出最大线程数,那就只能reject了。
http://xiaoz5919.iteye.com/blog/1746217
http://tutorials.jenkov.com/java-util-concurrent/threadpoolexecutor.html
https://github.com/kimchy/kimchy.github.com/blob/master/_posts/2008-11-23-juc-executorservice-gotcha.textile
Java Fork/Join
ForkJoinPool:
http://www.javabeat.net/simple-introduction-to-fork-join-framework-in-java-7/
Fork/Join addresses the need for divide-and-conquer, or recursive task-processing in Java programs
Fork/Join's logic is very simple: (1) separate (fork) each large task into smaller tasks; (2) process each task in a separate thread (separating those into even smaller tasks if necessary); (3) join the results.
It is an ExecutorService for running ForkJoinTasks and also managing and monitoring the tasks. It employs worker-stealing algorithm where in idle threads can pick subtasks created by other active tasks and execute them. In this way there is efficient execution of tasks spawned by other tasks.
http://ifeve.com/talk-concurrency-forkjoin/
Fork/Join框架提供了以下两个子类:
RecursiveAction:用于没有返回结果的任务。
RecursiveTask :用于有返回结果的任务。
ForkJoinPool :ForkJoinTask
需要通过ForkJoinPool来执行,任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务。
ForkJoinPool由ForkJoinTask数组和ForkJoinWorkerThread数组组成,ForkJoinTask数组负责存放程序提交给ForkJoinPool的任务,而ForkJoinWorkerThread数组负责执行这些任务。
public class ForkJoinPool extends AbstractExecutorService
* Array serving as submission queue. Initialized upon construction.
private ForkJoinTask<?>[] submissionQueue;
private final ForkJoinWorkerThreadFactory factory;
pushTask方法把当前任务存放在ForkJoinTask 数组queue里。然后再调用ForkJoinPool的signalWork()方法唤醒或创建一个工作线程来执行任务
ForkJoinTask的join方法实现原理。Join方法的主要作用是阻塞当前线程并等待获取结果
http://coopsoft.com/ar/CalamityArticle.html
http://www.ibm.com/developerworks/library/j-jtp11137/
Merge sort is an example of a divide-and-conquer algorithm, where a problem is recursively broken down into subproblems, and the solutions to the subproblems are combined to arrive at the final result. Divide-and-conquer algorithms are often useful in sequential environments but can become even more effective in parallel environments because the subproblems can often be solved concurrently.
Result solve(Problem problem) {
if (problem.size < SEQUENTIAL_THRESHOLD)
return solveSequentially(problem);
else {
Result left, right;
INVOKE-IN-PARALLEL {
left = solve(extractLeftHalf(problem));
right = solve(extractRightHalf(problem));
}
return combine(left, right);
}
}
This kind of parallel decomposition is often called fork-join because executing a task forks (starts) multiple subtasks and then joins (waits for completion) with them.
Note that the subproblem() method does not copy the elements; it merely copies the array reference and offsets into an existing data structure.
A ForkJoinExecutor is like an Executor in that it is designed for running tasks, except that it specifically designed for computationally intensive tasks that do not ever block except to wait for another task being processed by the same ForkJoinExecutor.
Conventional thread pools are designed for tasks that are independent of each other and are also designed with potentially blocking, coarse-grained tasks in mind — fork-join solutions produce neither. Fine-grained tasks in conventional thread pools can also generate excessive contention for the task queue shared among all workers.
Work stealing
Fork-join is a technique that makes it easy to express divide-and-conquer parallel algorithms.
Fork-join embodies the technique of divide-and-conquer; take a problem and recursively break it down into subproblems until the subproblems are small enough that they can be more effectively solved sequentially. The recursive step involves dividing a problem into two or more subproblems, queueing the subproblems for solution (the fork step), waiting for the results of the subproblems (the join step), and merging the results.
http://www.java-allandsundry.com/2012/08/mergesort-using-forkjoin-framework.html
http://www.oracle.com/technetwork/articles/java/fork-join-422606.html
The issue is that of parallelism: When a Callable waits for the result of another Callable, it is put in a waiting state, thus wasting an opportunity to handle another Callable queued for execution.
No comments:
Post a Comment