Java 50 introduces the java.util.concurrency API, which leverages hardware level constructs to allow Java programs to use lock-free, and wait-free thread-safety mechanisms, without having to use native code. The performance of these lock and wait free algorithms used to implement thread-safety makes them more efficient than using Object level monitors (using the synchronized keyword). In addition to introducing lock and wait free concurrency algorithms, a slew of other classes and interfaces have been introduced for locks and queues, and much more. You can learn more about threads in this chapter of the Concurrency in Practice book (on Safari Books Online).
If you are comfortable with using Object level monitors and synchronization mechanisms of prior Java versions, then it makes sense to continue using those techniques. However, there are some advantages of using the new atomic concurrency APIs:
- There is considerable overhead to using Object level monitors and the synchronization mechanism. In contrast to this, using the atomic concurrency API has very little runtime overhead, when compared to the use of Object level monitors/synchronization.
- It’s not easy to create really fine grained object-level synchronization or locking. Along with less overhead, it’s possible to create really fine grained critical sections with the use of this atomic concurrency API.
- When lots of threads are running through your code that uses these atomic concurrency API, they will scale much better than code which uses Object level monitors/synchronization. Since, Java’s synchronization mechanisms makes code wait, when there are lots of threads running through your critical sections, a substantial amount of CPU time is spent in managing the synchronization mechanism itself (waiting, notifying, etc). Since the new API uses hardware level constructs (atomic variables) and wait and lock free algorithms to implement thread-safety, a lot more of CPU time is spent “doing stuff” rather than in managing synchronization.
- These new APIs not only offer better throughput, but they also provide greater resistance to liveness problems such as deadlock and priority inversion.
There are some clear advantages of using the new Java 50 atomic concurrency API. However there are a few caveats:
- Make sure that you understand the new API before using it. If you are an expert using the synchronized keyword, then don’t just rewrite all your code before understanding the new stuff, just to the runtime advantages. It’s better to have slower safer code, than faster less stable code :).
- Make sure that there’s no requirement for backward compatibility with older VMs in the code that you generate. These new APIs only run on Java 50 and beyond.
If you are not bound by any of the constraints above, then the new APIs are a joy to use! They are fast at runtime, and easy to program with. There is also a tremendous amount of new and useful constructs to make your life easy when solving concurrency problems in your code! It’s a great addition to Java, and makes it an even more useful platform!
I will just provide a simple overview of how the new concurrency APIs provide thread-safety without using the Object-level synchronization mechanism of Java, but for a more in-depth discussion of this topic, read the book: Java Concurrency in Practice.
Modern CPUs support multiprocessing, and they provide provide support for multiple processes to share memory, attached peripherals, etc. This CAS instruction allows processes to update shared variables in a way that they can detect or prevent concurrent access from other processes (running on the same processor, or on multiple processors).
Essentially, CAS instructions allow an algorithm to execute a read-modify-write sequence on a variable, without fear of another thread modifying the variable in the meantime, because if another thread did modify the variable, the CAS would detect it (and fail) and the algorithm could retry the operation. CAS operations are very lightweight, so they don’t have a big performance penalty. These algorithms leverage the CAS hardware construct to provide wait, and lock free synchronization functionality to your Java code.
A CAS operation includes three operands — a memory location (V), the expected old value (A), and a new value (B). The processor will atomically update the location to the new value if the value that is there matches the expected old value, otherwise it will do nothing. In either case, it returns the value that was at that location prior to the CAS instruction. (Some flavors of CAS will instead simply return whether or not the CAS succeeded, rather than fetching the current value.) CAS effectively says “I think location V should have the value A; if it does, put B in it, otherwise, don’t change it but tell me what value is there now.”
The natural way to use CAS for synchronization is to read a value A from an address V, perform a multi step computation to derive a new value B, and then use CAS to change the value of V from A to B. The CAS succeeds if the value at V has not been changed in the meantime.
Instructions like CAS allow an algorithm to execute a read-modify-write sequence without fear of another thread modifying the variable in the meantime, because if another thread did modify the variable, the CAS would detect it (and fail) and the algorithm could retry the operation.
Also, for a quick overview of how wait and lock free algorithms are implemented, you can find more information about the Compare and Swap (CAS) hardware instruction and atomic variables here.
So how do you use this new API in practice. If you are familiar with Java’s synchronized keyword, and object level monitors, how do you leverage that knowledge to quickly get up to speed with these new APIs? Fortunately, the Java creators made the new APIs very natural for people who are familiar with the existing object level monitor knowledge. Here are some rules to guide you in writing code that uses the new API:
|Rule 1: Instead of using
|Existing API||New API|
Instead of using Java’s object level monitors, via the synchronized keyword, you have to now surround your critical sections of code (the parts that need thread-safety) with calls to Lock.lock() and Lock.unlock(). So, instead of relying on acquiring an object level monitor, the new API relies on acquiring a lock to a java.util.concurrent.locks.Lock object. Lock is an interface, and a few implementations are provided – ReentrantLock, ReentrantReadWriteLock.ReadLock, ReentrantReadWriteLock.WriteLock. In your code, for every call you make to Lock.lock(), you must remember to make a corresponding call to Lock.unlock(). This is why the call to Lock.unlock() is made in the finally block of the try-catch block. The code that would go in your critical section, protected by synchronized, now goes inside the try-catch-finally block.
|Rule 2: Instead of using
|Existing API||New API|
In sections of your code which require use of wait() and notify()/notifyAll(), you have to use condition variables. Using object level monitors, threads are able to wait() in queues, until they are signaled by notify()/notifyAll(). Since you are no longer using object level monitors, you have to use a condition variable (of type java.util.concurrent.locks.Condition). Condition is an interface, and before you can use a condition variable, you have to request one from the Lock object that you’ve acquired to enter the critical section. You can generate as many condition variables as you wish from a Lock object. Once you have the desired condition variable, you can call await() on it, and get the same functionality as Object.wait(). Typically, you would evaluate a condition in a while loop, and depending on the outcome of this evaluation, you would wait(). Similarly, with Condition.await(), you have to evaluate this condition in a while loop, and if it’s necessary to wait, then you can call Condition.await(). Similarly, if you want to call notify()/notifyAll() on a thread that’s wait()ing on a monitor, you can use signal()/signalAll() instead.
If you want to interrupt() your threads to terminate them gracefully, the behavior is the same as it was before. The concurrency API doesn’t affect this behavior of Java threads – you can’t really preempt the execution of a thread, you can only cooperatively signal it that things are interrupted (and it an InterruptedException is thrown, that can be caught by the thread that’s waiting). The only issue with using interrupt() to stop threads is when threads are waiting on blocking I/O. In these cases, you have to close the Input or OutputStream that the thread is blocking in, and catch the IOException and use it to terminate the thread that was blocked in an IO operation.
The code example provided below will use the concurrency API to demonstrate the classic Producer-Consumer synchronization problem, using Rule 1 and 2, and it will show you how terminate threads gracefully using interrupt().
In the Java VM, when multiple threads are running through sections of your code, each one of these threads may not have the same copy of the value of a variable in this critical section. There’s the value of the variable that’s in “main memory” vs. the “thread copy” of the value of the same variable. To avoid this kind of confusion, when you have multiple threads running through a section of your code, and you want them to have the same value for a particular variable, you must declare that variable as “volatile”. In the code example below, a volatile boolean variable is used to act as a flag to shutdown threads. For more details on the volatile keyword, “main memory” and “thread copy of a variable”, please visit this link.
The ReentrantLockTest.java class file provided below illustrates the use of the new concurrency API to implement a bounded buffer for use by multiple producer and consumer threads. Please note that classes are provided in the new concurrency API which implement this functionality already, but the purpose of the tutorial is to show what you can do with Locks and Condition variables. Parts 2 and 3 of the tutorial will show more examples of classes that are already provided for your convenience, to make you really productive when writing code to tackle common concurrency issues.
The ReentrantLockTest class creates a set of Producer threads and Consumer threads. The Producer threads add objects to a bounded buffer, that Consumer threads consume. Since Producer threads produce at a different rate than Consumer threads, and since there are different numbers of each, threads have to wait before they can put objects in the bounded buffer, or remove objects from the bounded buffer. Also, to illustrate interrupt(), the ReentrantLockTest class tries to terminate all the threads after it runs them for a while (all these parameters are configurable in the code).
Notes on the code:
2. Another thing to note is the use of the join() method. Once the main thread interrupt()s all the producer and consumer threads, it then join()s each of those threads to ensure that all those threads die before the main thread moves forwards past the join() method. This is important to do if you want to wait until all your threads have cleanly terminated before proceeding with the remainder of the shutdown sequence.
ProducerTask, ConsumerTask inner classes: (the two condition variables are shared between ProducerTask and ConsumerTask threads, since there is only one shared buffer)
waiting_on_full_buffer: The ProducerTask thread creates objects that it puts in a shared bounded buffer. So, when this bounded buffer is full, it has to wait until another thread removes an object from the shared buffer, before it can produce again. This condition variable is used to wait on this “buffer is full” condition. The await() method is used on the waiting_on_full_buffer when the buffer is full. There is some complementary code in the ConsumerTask inner class that calls signal() on this condition variable, when a consumer thread has removed an object from the shared bounded buffer (and the producer can create another object to put in this bounded buffer).
2. Condition variable
waiting_on_empty_buffer: The ConsumerTask thread removes objects from the shared bounded buffer. So, when this bounded buffer is empty, it has to wait until another thread puts an object into the shared buffer, before it can consume again. This condition variable is used to wait on this “buffer is empty” condition. The await() method is used on the waiting_on_empty_buffer when the buffer is empty. There is some complementary code in the ProducerTask inner class that calls signal() on this condition variable, when a producer thread has added an object to the shared bounded buffer (and the consumer can remove another object from the bounded buffer).
Here’s a listing of
The following class
Test.java runs the class above.