Thursday, 28 January 2016

Phaser in Java concurrency

Phaser is also one of the synchronization aid provided by java concurrency util which is similar to other synchronization barrier utils like CountDownLatch and CyclicBarrier. What sets Phaser apart is it is reusable (like CyclicBarrier) and more flexible in usage. In both CountDownLatch and CyclicBarrier number of parties (thread) that are registered for waiting can't change where as in Phaser that number can vary. Also note that Phaser has been introduced in Java 7.

Phaser is more suitable for use where it is required to synchronize threads over one or more phases of activity. Though Phaser can be used to synchronize a single phase, in that case it acts more like a CyclicBarrier. But it is more suited where threads should wait for a phase to finish, then advance to next phase, wait again for that phase to finish and so on.

How Phaser is more flexible

Unlike the case for other barriers, the number of parties registered to synchronize on a phaser may vary over time. Tasks may be registered at any time (using methods register(), bulkRegister(int), or by specifying initial number of parties in the constructor). Tasks may also be optionally deregistered upon any arrival (using arriveAndDeregister()).

How Phaser works

Phaser class has 4 constructors

  • Phaser() - Creates a new phaser with no initially registered parties, no parent, and initial phase number 0.
  • Phaser(int parties) - Creates a new phaser with the given number of registered unarrived parties, no parent, and initial phase number 0.
  • Phaser(Phaser parent) - Creates a new phaser with the given parent with no initially registered parties.
  • Phaser(Phaser parent, int parties) - Creates a new phaser with the given parent and number of registered unarrived parties.

So first thing is to create a new instance of Phaser.

Next thing is to register one or more parties with the Phaser. That can be done using register(), bulkRegister(int) or by specifying number of parties in the constructor.

resgister() method

public int register()

Adds a new unarrived party to this phaser. It returns the arrival phase number to which this registration applied.

Now since Phaser is a synchronization barrier so we have to make phaser wait until all registered parties finish a phase. That waiting can be done using arrive() or any of the variants of arrive() method. When the number of arrivals is equal to the parties which are registered that phase is considered completed and it advances to next phase (if there is any), or terminate.

arrive() method

public int arrive()

Arrives at this phaser, without waiting for others to arrive. Note that arrive() method does not suspend execution of the calling thread. Returns the arrival phase number, or a negative value if terminated. Note that this method should not be called by an unregistered party.

arriveAndDeregister

public int arriveAndDeregister()

Arrives at this phaser and deregisters from it without waiting for others to arrive. Returns the arrival phase number, or a negative value if terminated.

arriveAndAwaitAdvance

public int arriveAndAwaitAdvance()

Arrives at this phaser and awaits others. Returns the arrival phase number, or the (negative) current phase if terminated. If you want to wait for all the other registered parties to complete a given phase then use this method.

Note that each generation of a phaser has an associated phase number. The phase number starts at zero, and advances when all parties arrive at the phaser, wrapping around to zero after reaching Integer.MAX_VALUE.

Phaser termination

A phaser may enter a termination state, that may be checked using method isTerminated(). Upon termination, all synchronization methods immediately return without waiting for advance, as indicated by a negative return value. Similarly, attempts to register upon termination have no effect.

Phaser Tiering

Phasers may be tiered (i.e., constructed in tree structures) to reduce contention. Phasers with large numbers of parties may experience heavy synchronization contention costs. These may be set up as a groups of sub-phasers which share a common parent. This may greatly increase throughput even though it incurs greater per-operation overhead.

Phaser Monitoring

Phaser class has several methods for monitoring. These methods can be called by any caller not only by registered parties.

  • getRegisteredParties() - Returns the number of parties registered at this phaser.
  • getArrivedParties() - Returns the number of registered parties that have arrived at the current phase of this phaser.
  • getUnarrivedParties() - Returns the number of registered parties that have not yet arrived at the current phase of this phaser.
  • getPhase() - Returns the current phase number.

Phaser example code

Let's try to make things clearer through an example. So we'll have two phase in the application where in first phase let's say we have three threads reading 3 different files, parsing and storing them in DB, then in second phase 2 threads are started to query the DB table on the inserted records. Let's assume that one of the field is age in the DB table and we want to query count of those having age greater than 40 using one thread and in another thread we want to get the count of those having age less than or equal to 40.

public class PhaserDemo {

    public static void main(String[] args) {
        Phaser ph = new Phaser(1);
        // Threads for first phase
        new FileReaderThread("thread-1", "file-1", ph);
        new FileReaderThread("thread-2", "file-2", ph);
        new FileReaderThread("thread-3", "file-3", ph);
        int curPhase;
        curPhase = ph.getPhase();
        ph.arriveAndAwaitAdvance();
        System.out.println("Phase " + curPhase + " completed");
        
        // This will be the second phase where 
        // threads are deregistered from the phaser
        curPhase = ph.getPhase();
        ph.arriveAndAwaitAdvance();
        System.out.println("Phase " + curPhase + " completed");
        
        // Threads for third phase
        new QueryThread("thread-1", 40, ph);
        new QueryThread("thread-2", 40, ph);
        curPhase = ph.getPhase();
        ph.arriveAndAwaitAdvance();
        System.out.println("Phase " + curPhase + " completed");
        // deregistering the main thread
        ph.arriveAndDeregister();
    }
}
class FileReaderThread implements Runnable {
    private String threadName;
    private String fileName;
    private Phaser ph;

    FileReaderThread(String threadName, String fileName, Phaser ph){
        this.threadName = threadName;
        this.fileName = fileName;
        this.ph = ph;
        ph.register();
        new Thread(this).start();
    }
    @Override
    public void run() {
        System.out.println("This is phase " + ph.getPhase());
        System.out.println("Reading file " + fileName + " thread " + threadName + "parsing and storing to DB ");
        // Using await and advance so that all thread wait here
        ph.arriveAndAwaitAdvance();
        try {
            // Just for creating some delay, not 
            // actually required 
Thread.sleep(20);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        System.out.println("Deregistering");
        ph.arriveAndDeregister();
    }
}
class QueryThread implements Runnable {
    private String threadName;
    private int param;
    private Phaser ph;
    
    QueryThread(String threadName, int param, Phaser ph){
        this.threadName = threadName;
        this.param = param;
        this.ph = ph;
        ph.register();
        new Thread(this).start();
    }
    
    @Override
    public void run() {
        System.out.println("This is phase " + ph.getPhase());
        System.out.println("Querying DB using param " + param + " Thread " + threadName);
        ph.arriveAndAwaitAdvance();
    }
}

Output

This is phase 0
This is phase 0
This is phase 0
Reading file file-2 thread thread-2parsing and storing to DB 
Reading file file-1 thread thread-1parsing and storing to DB 
Reading file file-3 thread thread-3parsing and storing to DB 
Phase 0 completed
Deregistering
Deregistering
Deregistering
Phase 1 completed
This is phase 2
Querying DB using param 40 Thread thread-1
This is phase 2
Querying DB using param 40 Thread thread-2
Phase 2 completed

Here it can be seen that first a Phaser instance ph is created with initial party count as 1, which corresponds to the main thread. Then in the first set of 3 threads which are used in the first phase ph object is also passed which is used for synchronization. Then in the second phase the set of three threads used in the first phase are deregistered. In the third phase another set of two threads are created which used the same phaser object ph for synchronization. Main logic for reading the file, parsing the file and storing it in the DB is not given here. Also the queries used in the second thread are not given. The scenario used here is to explain Phaser so that's where the concentration is.

Overriding onAdvance() method

If you want to perform an action before advancing from one phase to another, it can be done by overriding the onAdvance() method of the Phaser class. This method is invoked when the Phaser advances from one phase to another.
If this method returns true, this phaser will be set to a final termination state upon advance, and subsequent calls to isTerminated() will return true.
If this method returns false, phaser will be kept alive.

onAdvance() method

protected boolean onAdvance(int phase, int registeredParties)
Here
  • phase - current phase number on entry to this method, before this phaser is advanced.
  • registeredParties - the current number of registered parties.

One of the use case to override onAdvance() method is to ensure that your phaser executes a given number of phases and then stop.

So we'll create a class called TestPhaser that will extend Phaser and override the onAdvance() method to ensure that specified number of phases are executed.

Example code overriding onAdvance() method

public class PhaserAdvance extends Phaser{
    PhaserAdvance(int parties){
        super(parties);
    }
    
    // Overriding the onAdvance method
    @Override
    protected boolean onAdvance(int phase, int registeredParties) {
        System.out.println("In onAdvance method, current phase which is completed is " + phase );
        // Want to ensure that phaser runs for 2 phases i.e. phase 1 
        // or the no. of registered parties become zero
        if(phase == 1 || registeredParties == 0){
            System.out.println("phaser will be terminated ");
            return true;
        }else{
            System.out.println("phaser will continue ");
            return false;
        }     
    }
    
    public static void main(String... args) {
        // crating phaser instance
        PhaserAdvance ph = new PhaserAdvance(1);
        // creating three threads
        new TestThread("thread-1", ph);
        new TestThread("thread-2", ph);
        new TestThread("thread-3", ph);
        
         while(!ph.isTerminated()){
             ph.arriveAndAwaitAdvance();
         }
         System.out.println("In main method, phaser is terminated");
    }
}


class TestThread implements Runnable {
    private String threadName;
    private Phaser ph;

    TestThread(String threadName, Phaser ph){
        this.threadName = threadName;
        this.ph = ph;
        // register new unarrived party to this phaser
        ph.register();
        new Thread(this).start();
    }
    @Override
    public void run() {
        // be in the loop till the phaser is terminated
         while(!ph.isTerminated()){
            System.out.println("This is phase " + ph.getPhase() + " And Thread - "+ threadName);
            // Using await and advance so that all thread wait here
            ph.arriveAndAwaitAdvance();
         }
        
    }
}

Output

This is phase 0 And Thread - thread-1
This is phase 0 And Thread - thread-2
This is phase 0 And Thread - thread-3
In onAdvance method, current phase which is completed is 0
phaser will continue 
This is phase 1 And Thread - thread-3
This is phase 1 And Thread - thread-2
This is phase 1 And Thread - thread-1
In onAdvance method, current phase which is completed is 1
phaser will be terminated 
In main method, phaser is terminated

Here it can be seen that a new class PhaserAdvance is created extending the Phaser class. This PhaserAdvance class overrides the onAdvance() method of the Phaser class. In the overridden onAdvance() method it is ensured that 2 phases are executed thus the if condition with phase == 1 (phase count starts from 0).

That's all for this topic Phaser in Java. If you have any doubt or any suggestions to make please drop a comment. Thanks!


Related Topics

  1. ConcurrentHashMap in Java
  2. CopyOnWriteArrayList in Java
  3. CountDownLatch in Java concurrency
  4. CyclicBarrier in Java concurrency
  5. Difference between CountDownLatch and CyclicBarrier
  6. Java Concurrency interview questions

You may also like -

Wednesday, 27 January 2016

Difference between CountDownLatch and CyclicBarrier in Java

Though both CountDownLatch and CyclicBarrier are used as a synchronization aid that allows one or more threads to wait but there are certain differences between them that you should know in order to know when one of these utilities will serve you better and of course it is a good java interview question too.

CountDownLatch Vs CyclicBarrier

  1. One of the most important difference is When you are using a CountDownLatch, you specify the number of calls to the countdown() method when creating a CountDownLatch object. So a CountDownLatch initialized to N can be used to make one thread wait until N threads have completed some action, or some action has been completed N times.
    What this means is you can use CountDownLatch with only a single thread and using countdown() to decrement as and when the specified even occur.

    When you are using CyclicBarrier you specify the number of threads that should call await() method in order to trip the barrier. That means if you have a CyclicBarrier initialized to 3 that means you should have at least 3 threads to call await().

  2. A CyclicBarrier supports an optional Runnable command that is run once per barrier point, after the last thread in the party arrives, but before any threads are released. So with CyclicBarrier you have an option to have an Action class specified in the CyclicBarrier constructor that will be run after the last thread has called await().This barrier action is useful for updating shared-state before any of the parties continue.

    public CyclicBarrier(int parties, Runnable barrierAction)
    

    CountDownLatch doesn't provide any such constructor to specify a runnable action.

  3. CountDownLatch can't be reused, when count reaches zero it cannot be reset.

    CyclicBarrier can be reused after the waiting threads are released.

  4. In a CyclicBarrier, if a thread encounters a problem (timeout, interruption), all the other threads that have reached await() get an exception.

    According to Java docs

    The CyclicBarrier uses an all-or-none breakage model for failed synchronization attempts: If a thread leaves a barrier point prematurely because of interruption, failure, or timeout, all other threads waiting at that barrier point will also leave abnormally via BrokenBarrierException (or InterruptedException if they too were interrupted at about the same time).

    In CountDownLatch only the current thread that has a problem throws exception. According to the description of await() method in CountDownLatch

    If the current thread:

    • has its interrupted status set on entry to this method; or
    • is interrupted while waiting,
    • then InterruptedException is thrown and the current thread's interrupted status is cleared.

That's all for this topic Difference between CountDownLatch and CyclicBarrier in Java. If you have any doubt or any suggestions to make please drop a comment. Thanks!


Related Topics

  1. CountDownLatch in Java concurrency
  2. CyclicBarrier in Java concurrency
  3. ConcurrentHashMap in Java
  4. CopyOnWriteArrayList in Java
  5. Synchronization in Java multithreading
  6. Java Concurrency interview questions

You may also like -

Monday, 25 January 2016

CyclicBarrier in Java concurrency

There are scenarios in concurrent programming when you want set of threads to wait for each other at a common point until all threads in the set have reached that common point, concurrent util provides a synchronization aid CyclicBarrier to handle such scenarios where you want set of threads to wait for each other to reach a common barrier point.

The barrier is called cyclic because it can be re-used after the waiting threads are released.

Note that CyclicBarrier was introduced in Java 5 along with other concurrent classes like CountDownLatch, ConcurrentHashMap, CopyOnWriteArrayList, BlockingQueue with in java.util.Concurrent package.

How CyclicBarrier is used

CyclicBarrier class has following two constructors -

CyclicBarrier(int parties)

Creates a new CyclicBarrier that will trip when the given number of parties (threads) are waiting upon it, and does not perform a predefined action when the barrier is tripped.

CyclicBarrier(int parties, Runnable barrierAction)

Creates a new CyclicBarrier that will trip when the given number of parties (threads) are waiting upon it, and which will execute the given barrier action when the barrier is tripped, performed by the last thread entering the barrier.

Here parties parameter signifies the number of threads that must invoke await() before the barrier is tripped.

barrierAction specifies a thread that will be executed when the barrier is reached.

So first thing is to create a CyclicBarrier object using any of the two constructors, specifying the number of threads that should wait for each other.

When each thread reaches the barrier (common point) call await() method on the CyclicBarrier object. This will suspend the thread until all the thread also call the await() method on the same CyclicBarrier object.

Once all the specified threads have called await() method that will trip the barrier and all threads can resume operation.

If the current thread is the last thread to arrive, and a non-null barrier action was supplied in the constructor, then the current thread runs the action before allowing the other threads to continue.

await() method has following two forms -

public int await() throws InterruptedException, BrokenBarrierException
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException

In the second form it Waits until all parties have invoked await on this barrier, or the specified waiting time elapses.

If the current thread is not the last to arrive then it is disabled for thread scheduling purposes and lies dormant until one of the following things happens:

  • The last thread arrives; or
  • The specified timeout elapses; (In case of second form) or
  • Some other thread interrupts the current thread; or
  • Some other thread interrupts one of the other waiting threads; or
  • Some other thread times out while waiting for barrier; or
  • Some other thread invokes reset() on this barrier.

Await() method returns int which is the arrival index of the current thread, where index (Number of specified threads - 1) indicates the first to arrive and zero indicates the last to arrive.

CyclicBarrier example code

Let's take a scenario where your application needs to read 3 files, parse the read lines and only after reading and parsing all the three files the application should call another thread for further processing. In this scenario we can use CyclicBarrier and provide a runnable action to execute thread once all the threads reach the barrier.

public class CyclicBarrierDemo {

    public static void main(String[] args) {
        CyclicBarrier cb = new CyclicBarrier(3, new AfterAction());
        // Initializing three threads to read 3 different files.
        Thread t1 = new Thread(new TxtReader("thread-1", "file-1", cb));
        Thread t2 = new Thread(new TxtReader("thread-2", "file-2", cb));
        Thread t3 = new Thread(new TxtReader("thread-3", "file-3", cb));
        t1.start();
        t2.start();
        t3.start();
        
        System.out.println("Done ");
    }

}

class TxtReader implements Runnable {
    private String threadName;
    private String fileName;
    private CyclicBarrier cb;
    TxtReader(String threadName, String fileName, CyclicBarrier cb){
        this.threadName = threadName;
        this.fileName = fileName;
        this.cb = cb;        
    }
    @Override
    public void run() {
        System.out.println("Reading file " + fileName + " thread " + threadName);    
        try{
            // calling await so the current thread suspends
            cb.await();
            
        } catch (InterruptedException e) {
            System.out.println(e);
        } catch (BrokenBarrierException e) {
            System.out.println(e);
        }
    }
}

class AfterAction implements Runnable {
    @Override
    public void run() {
        System.out.println("In after action class, start further processing as all files are read");
    }
}

Output

Done 
Reading file file-2 thread thread-2
Reading file file-1 thread thread-1
Reading file file-3 thread thread-3
In after action class, start further processing as all files are read

One thing to note here is that main thread doesn't block as can be seen from the "Done" printed even before the threads start. Also it can be seen the AfterAction class is executed once all the three threads call the await() method.

Now if you want to block the main thread then you have to call the await() on the main thread too. Let's take another example where two services are started using two separate threads and main thread should start process only after both the services are executed.

public class CBExample {

    public static void main(String[] args) {
        CyclicBarrier cb = new CyclicBarrier(3);
        // Creating two threads with CyclicBarrier obj as param
        Thread t1 = new Thread(new FirstService(cb));
        Thread t2 = new Thread(new SecondService(cb));
        System.out.println("starting threads ");
        t1.start();
        t2.start();
        
        try {
            // Calling await for main thread
            cb.await();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        // once await is called for all the three threads, execution starts again
        System.out.println("In main thread, processing starts again ..... ");
    }

}

class FirstService implements Runnable {
    CyclicBarrier cb;
    FirstService(CyclicBarrier cb){
        this.cb = cb;
    }
    @Override
    public void run() {
        System.out.println("In First service, thread " + Thread.currentThread().getName());
        try {
            // Calling await for Thread-0
            cb.await();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        
    }
    
}

class SecondService implements Runnable {
    CyclicBarrier cb;
    SecondService(CyclicBarrier cb){
        this.cb = cb;
    }
    @Override
    public void run() {
        System.out.println("In Second service, thread " + Thread.currentThread().getName());
        try {
            // Calling await for Thread-1
            cb.await();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }    
    }    
}

Output

starting threads 
In First service, thread Thread-0
In Second service, thread Thread-1
In main thread, processing starts again .....

Here it can be seen that main thread starts only after both the services are executed.

CyclicBarrier can be reused

Unlike CountDownLatch, CyclicBarrier can be reused after the waiting threads are released.

Let's reuse the same example as above where three threads were used to read 3 files. Now three more threads are added to read 3 more files and the same CyclicBarrier object is used with initial count as 3.

public class CyclicBarrierDemo {

    public static void main(String[] args) {
        CyclicBarrier cb = new CyclicBarrier(3, new AfterAction());
        // Initializing three threads to read 3 different files.
        Thread t1 = new Thread(new TxtReader("thread-1", "file-1", cb));
        Thread t2 = new Thread(new TxtReader("thread-2", "file-2", cb));
        Thread t3 = new Thread(new TxtReader("thread-3", "file-3", cb));
        
        t1.start();
        t2.start();
        t3.start();
        
        System.out.println("Start another set of threads ");
        
        Thread t4 = new Thread(new TxtReader("thread-4", "file-4", cb));
        Thread t5 = new Thread(new TxtReader("thread-5", "file-5", cb));
        Thread t6 = new Thread(new TxtReader("thread-6", "file-6", cb));
        t4.start();
        t5.start();
        t6.start();
        
        
    }

}

class TxtReader implements Runnable {
    private String threadName;
    private String fileName;
    private CyclicBarrier cb;
    TxtReader(String threadName, String fileName, CyclicBarrier cb){
        this.threadName = threadName;
        this.fileName = fileName;
        this.cb = cb;        
    }
    @Override
    public void run() {
        System.out.println("Reading file " + fileName + " thread " + threadName);    
        try{
            // calling await so the current thread suspends
            cb.await();
            
        } catch (InterruptedException e) {
            System.out.println(e);
        } catch (BrokenBarrierException e) {
            System.out.println(e);
        }
    }
}

class AfterAction implements Runnable {
    @Override
    public void run() {
        System.out.println("In after action class, start further processing as all files are read");
    }
}

Output

Start another set of threads 
Reading file file-1 thread thread-1
Reading file file-2 thread thread-2
Reading file file-3 thread thread-3
In after action class, start further processing as all files are read
Reading file file-4 thread thread-4
Reading file file-5 thread thread-5
Reading file file-6 thread thread-6
In after action class, start further processing as all files are read

Here it can be seen that specified runnableAction class is called twice as the CyclicBarrier is reused here. Note that the thread order may be different while executing the code.

Points to note

  • A CyclicBarrier initialized to N, using its constructor, can be used to make N threads wait using await() and the barrier will be broken once all the N threads call await() method.
  • A barrierAction can also be provided while creating CyclicBarrier object. This barrierAction will be executed once the barrier is tripped. This barrier action is useful for updating shared-state before any of the parties continue.
  • If the current thread is not the last to arrive then it is paused after calling await() and lies dormant until the last thread arrives, current thread or some other waiting thread is interrupted by any other thread, specified timeout elapses (as provided in await()) or some thread calls reset() method.
  • reset() method resets the barrier to its initial state. If any parties are currently waiting at the barrier, they will return with a BrokenBarrierException.
  • The CyclicBarrier uses an all-or-none breakage model for failed synchronization attempts: If a thread leaves a barrier point prematurely because of interruption, failure, or timeout, all other threads waiting at that barrier point will also leave abnormally via BrokenBarrierException (orInterruptedException if they too were interrupted at about the same time).

That's all for this topic CyclicBarrier in Java. If you have any doubt or any suggestions to make please drop a comment. Thanks!


Related Topics

  1. CountDownLatch in Java concurrency
  2. Difference between CountDownLatch and CyclicBarrier
  3. ConcurrentHashMap in Java
  4. Semaphore in Java concurrency
  5. Exchanger in Java concurrency
  6. Java Concurrency interview questions

You may also like -

Friday, 22 January 2016

CountDownLatch in Java concurrency

There are scenarios in an application when you want one or more threads to wait until one or more events being performed in other threads complete. CountDownLatch provided in the concurrent API helps in handling such scenarios.

Note that CountDownLatch was introduced in Java 5 along with other concurrent classes like CyclicBarrier, ConcurrentHashMap, CopyOnWriteArrayList, BlockingQueue with in java.util.Concurrent package.

How CountDownLatch is used

CountDownLatch as the name suggests can be visualized as a latch that is released only after the given number of events occur.
CountDownLatch is initialized with that count (given number of events).

Each time one of those events occur count is decremented, for that countdown() method is used.

Thread(s) that are waiting for the latch to release (current count reaches zero due to invocations of the countDown()method) are blocked using await() method.

CountDownLatch Constructor

CountDownLatch(int count)
Constructs a CountDownLatch initialized with the given count. Here count specifies the number of events that must happen in order for the latch to open.

await() method

A thread that waits on the latch to open calls await() method, await() method has two forms -

public void await() throws InterruptedException

Causes the current thread to wait until the latch has counted down to zero, unless the thread is interrupted. If the current count is zero then this method returns immediately.

If the current count is greater than zero then the current thread becomes disabled for thread scheduling purposes and lies dormant until one of two things happen:

  • The count reaches zero due to invocations of the countDown() method
  • Some other thread interrupts the current thread.
public boolean await(long timeout, TimeUnit unit) throws InterruptedException

Causes the current thread to wait until the latch has counted down to zero, unless the thread is interrupted, or the specified waiting time elapses, the waiting time is specified by an object of TimeUnit enumeration.

If the current count is zero then this method returns immediately with the value true. If the current count is greater than zero then the current thread becomes disabled for thread scheduling purposes and lies dormant until one of three things happen:

  • The count reaches zero due to invocations of the countDown() method.
  • Some other thread interrupts the current thread.
  • The specified waiting time elapses.

countdown() method

Threads which are executing the events signal the completion of the event by calling countDown()method.

public void countDown()
Decrements the count of the latch, releasing all waiting threads if the count reaches zero.

CountDownLatch Example program

That's a lot of theory so let's see an example to make it clearer and see how await(), countdown() and the constructor to provide count are actually used.

Let's take a scenario where your application needs to read 3 files, parse the read lines and only after reading and parsing all the three files the application should move ahead to do some processing with the parsed objects.
So here we'll have three separate threads reading three separate files and the main thread awaits until all the three threads finish and call countdown().

public class CountdownlatchDemo {

    public static void main(String[] args) {
        CountDownLatch cdl = new CountDownLatch(3);
        // Initializing three threads to read 3 different files.
        Thread t1 = new Thread(new FileReader("thread-1", "file-1", cdl));
        Thread t2 = new Thread(new FileReader("thread-2", "file-2", cdl));
        Thread t3 = new Thread(new FileReader("thread-3", "file-3", cdl));
        t1.start();
        t2.start();
        t3.start();
        try {
            // main thread waiting till all the files are read
            cdl.await();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        System.out.println("Files are read ... Start further processing");
    }

}

class FileReader implements Runnable {
    private String threadName;
    private String fileName;
    private CountDownLatch cdl;
    FileReader(String threadName, String fileName, CountDownLatch cdl){
        this.threadName = threadName;
        this.fileName = fileName;
        this.cdl = cdl;        
    }
    @Override
    public void run() {
        System.out.println("Reading file " + fileName + " thread " + threadName);
        // do countdown here
        cdl.countDown();
    } 
}

Output

Reading file file-1 thread thread-1
Reading file file-3 thread thread-3
Reading file file-2 thread thread-2
Files are read ... Start further processing

Here it can be seen that inside main() method, CountDownLatch instance cdl is created with an initial count of 3. Then three instances of FileReader are created that start three new threads. Then the min thread calls await() on cdl, which causes the main thread to wait until cdl count has been decremented three times. Notice that cdl instance is passed as a parameter to the FileReader constructor that cdl instance is used to call countdown() method in order to decrement the count. Once the countdown reaches zero, the latch opens allowing the main thread to resume.

You can comment the code where await() is called, then main thread will resume even before all the 3 files are read, so you see in these type of scenarios where you want the thread to resume only after certain events occur then CountDownLatch is a powerful synchronization aid that allows one or more threads to wait for certain events to finish in other threads.

From the above example if you got the feeling that whatever count you have given in the CountDownLatch, you should spawn the same number of threads for countdown then that is a wrong understanding. As I have mentioned it depends on the number of events, so you can very well have a single thread with a loop and decrementing the count there.

Let's change the example used above to have single thread and use for loop to countdown.

public class CountdownlatchDemo {

    public static void main(String[] args) {
        CountDownLatch cdl = new CountDownLatch(3);
        // Initializing three threads to read 3 different files.
        Thread t1 = new Thread(new FileReader("thread-1", "file-1", cdl));
        /*Thread t2 = new Thread(new FileReader("thread-2", "file-2", cdl));
        Thread t3 = new Thread(new FileReader("thread-3", "file-3", cdl));*/
        t1.start();
        /*t2.start();
        t3.start();*/
        try {
            // main thread waiting till all the files are read
            cdl.await();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        System.out.println("Files are read ... Start further processing");
    }
}

class FileReader implements Runnable {
    private String threadName;
    private String fileName;
    private CountDownLatch cdl;
    FileReader(String threadName, String fileName, CountDownLatch cdl){
        this.threadName = threadName;
        this.fileName = fileName;
        this.cdl = cdl;        
    }
    @Override
    public void run() {
        for(int i = 0; i < 3; i++){
            System.out.println("Reading file " + fileName + " thread " + threadName);
            // do countdown here
            cdl.countDown();
        }
    }
}

Output

Reading file file-1 thread thread-1
Reading file file-1 thread thread-1
Reading file file-1 thread thread-1
Files are read ... Start further processing

Here you can see that only a single thread is used and countdown is done on the number of events. So it is true both ways. A CountDownLatch initialized to N can be used to make one thread wait until N threads have completed some action, or some action has been completed N times.

Usage of CountDownLatch

As you have seen in the example you can use CountDownLatch when you want to break your code is such a way that more than one thread can process the part of the code but you can start further processing only when all the threads which are working on some part of the code have finished. Once all the threads have finished main thread can come out of the await (as the latch is released) and start further processing.

You can also use CountDownLatch to test concurrency by giving a certain count in the CountDownLatch Constructor and start that many threads. Also there may be more than one waiting thread, so that scenario how waiting threads behave once the countdown reaches zero (as all of them will be released at once) can also be tested.

If you have some external dependencies and once all the dependencies are up and running then only you should start processing in your application. That kind of scenario can also be handled with CountDownLatch.

CountDownLatch can not be reused

One point to remember is CountDownLatch cannot be reused. Once the countdown reaches zero any further call to await() method won't block any thread. It won't throw any exception either.

Let's see an example. We'll use the same example as above and spawn 3 more threads once the first three set of threads are done.

public class CountdownlatchDemo {

    public static void main(String[] args) {
        CountDownLatch cdl = new CountDownLatch(3);
        // Initializing three threads to read 3 different files.
        Thread t1 = new Thread(new FileReader("thread-1", "file-1", cdl));
        Thread t2 = new Thread(new FileReader("thread-2", "file-2", cdl));
        Thread t3 = new Thread(new FileReader("thread-3", "file-3", cdl));
        t1.start();
        t2.start();
        t3.start();
        try {
            // main thread waiting till all the files are read
            cdl.await();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        System.out.println("Files are read ... Start further processing");
        Thread t4 = new Thread(new FileReader("thread-4", "file-4", cdl));
        Thread t5 = new Thread(new FileReader("thread-5", "file-5", cdl));
        Thread t6 = new Thread(new FileReader("thread-6", "file-6", cdl));
        t4.start();
        t5.start();
        t6.start();
        try {
            // main thread waiting till all the files are read
            cdl.await();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        System.out.println("Files are read again ... Start further processing");
    }
}

class FileReader implements Runnable {
    private String threadName;
    private String fileName;
    private CountDownLatch cdl;
    FileReader(String threadName, String fileName, CountDownLatch cdl){
        this.threadName = threadName;
        this.fileName = fileName;
        this.cdl = cdl;        
    }
    @Override
    public void run() {
        System.out.println("Reading file " + fileName + " thread " + threadName);
        // do countdown here
        cdl.countDown();
    }
}

Output

Reading file file-2 thread thread-2
Reading file file-3 thread thread-3
Reading file file-1 thread thread-1
Files are read ... Start further processing
Files are read again ... Start further processing
Reading file file-4 thread thread-4
Reading file file-6 thread thread-6
Reading file file-5 thread thread-5

Here note that await() is called again after starting thread4, thread5 and thread6 but it doesn't block main thread as it did for the first three threads. "Files are read again ... Start further processing" is printed even before the next three threads are processed. Another concurrent utility CyclicBarrier can be resued infact that is one of the difference between CountDownLatch and CyclicBarrier.

Points to note

  • A CountDownLatch initialized to N, using its constructor, can be used to make one (or more) thread wait until N threads have completed some action, or some action has been completed N times.
  • countDown() method is used to decrement the count, once the count reaches zero the latch is released.
  • await() method is used to block the thread(s) waiting for the latch to release.
  • CountDownLatch cannot be reused. Once the countdown reaches zero any further call to await() method won't block any thread.

That's all for this topic CountDownLatch in Java. If you have any doubt or any suggestions to make please drop a comment. Thanks!


Related Topics

  1. CyclicBarrier in Java concurrency
  2. Difference between CountDownLatch and CyclicBarrier
  3. ConcurrentHashMap in Java
  4. Inter-thread communication using wait, notify and notifyAll
  5. Synchronization in Java multithreading
  6. Java Concurrency interview questions

You may also like -

Thursday, 14 January 2016

Optional class in Java 8

Optional class, added in Java 8, provides another way to handle situations when value may or may not be present. Till now you would be using null to indicate that no value is present but it may lead to problems related to null references. This new class java.util.Optional introduced in Java 8 can alleviate some of these problems.

General structure of Optional class

Class Optional<T>

Here T is the type of the value stored in the Optional instance. Note that Optional instance may contain value or it may be empty, but if it contains value then it is of type T.

Let's see an example how null checks may end up being the bulk of our logic. Let's say we have a class Hospital, which has a cancer ward and cancer ward has patients.

So if we need id of a patient following code will do just fine.

String id = hospital.getCancerWard().getPatient().getId();

But many hospitals don't have a separate cancer ward, what if null reference is returned to indicate that there is no cancer ward. That would mean getPatient() will be called on a null reference, which will result in NullPointerException at runtime.

In this we'll have to add null checks to avoid null pointer exception.

if(hospital != null){
    Ward cancerWard = hospital.getCancerWard();
    if(cancerWard != null){
        Patient patient = cancerWard.getPatient();
        if(patient != null){
            String id = patient.getId();
        }
    }
}

Now let us see how Optional can help in this case and in many other cases -

How to create Optional objects

Optional doesn't have any constructors, there are several static methods for the purpose of creating Optional objects.

If you want to create an Optional instance which doesn't have any value, you can use empty method.

As Exp.

Optional<String> op = Optional.empty();

If you want to create an Optional instance with a given value, you can use of method.

Optional<String> op = Optional.of("Hello");

Or if you want to create instance of Optional with Ward class (as mentioned above) object

Ward ward = new Ward();
Optional<Ward> op = Optional.of(ward);

Note that, in this case if Ward is null, NullPointerException will be thrown immediately.

There is also ofNullable method which returns an Optional describing the specified value, if non-null, otherwise returns an empty Optional.

How to use Optional Values

Usage of Optional is more appropriate in the cases where you need some default action to be taken if there is no value.

Suppose for class Patient, if patient is not null then you return the id otherwise return the default id as 9999.
This can be written typically as -

String id = patient != null ? patient.getId() : "9999";

Using an Optional object same thing can be written using orElse() method which returns the value if present, otherwise returns the default value.

Optional<String> op1 = Optional.ofNullable(patient.getId());
String id = op1.orElse("9999");

You can use orElseGet() which will execute the given logic to get the default.

As exp. You may way want to get System property if not already there

String country = op.orElseGet(()-> System.getProperty("user.country"));

Note that lambda expression is used here to implement a functional interface.

You can use orElseThrow() method if you want to throw an exception if there is no value.

op1.orElseThrow(IllegalStateException::new);

Note that here double colon operator (method reference) is used to create exception.

There are scenarios when you want to execute some logic only if some value is present or do nothing. ifPresent() method can be used in such scenarios.

As exp. You want to add value to the list if there is value.

op1.ifPresent(v->numList.add(v));

How not to use Optional Value

There is a get() method provided by Optional class which returns the value, if a value is present in this Optional, otherwise throws NoSuchElementException. Using it directly without first ascertaining whether value is there or not is not safer than normally using value without checking for null.

Optional<Hospital> op = Optional.of(hospital);
op.get().getCancerWard();

Second wrong usage is using both isPresent and get method which is as good as null checks.

if (op.isPresent()){
    op.get().getCancerWard();
}

Better way would be to use map or flatMap.

Using flatMap

If we get back to the classes mentioned in the beginning; Hospital, Ward and Patient then if we use Optional the classes will look like this -

public class Hospital {
    private Optional<Ward> cancerWard;

    public Optional<Ward> getCancerWard() {
        return cancerWard;
    }

    public void setCancerWard(Optional<Ward> cancerWard) {
        this.cancerWard = cancerWard;
    }

}
public class Ward {
    private Optional<Patient> patient;

    public Optional<Patient> getPatient() {
        return patient;
    }

    public void setPatient(Optional<Patient> patient) {
        this.patient = patient;
    }   
}
public class Patient {
    private String id;

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }
}

So now if we have to write this

 
String id = hospital.getCancerWard().getPatient().getId();

With Optional then we have to use flatMap() method. There is a map() method too but writing something like this

 
String id = op.map(Hospital::getCancerWard).map(Ward::getPatient).map(Patient::getId).orElse("9999");

Will result in compiler error as the return type of map is Optional<U> and the return type of getCancerWard() is Optional<Ward> so it will make the result of the map of type Optional<Optional<ward>>. That is why you will get compiler error with map method in this case.

Using flatMap will apply the provided Optional-bearing mapping function to it i.e. flatten the 2 level Optional into one.

Thus the correct usage in this chaining is

 
String id = op.flatMap(Hospital::getCancerWard).flatMap(Ward::getPatient).map(Patient::getId).orElse("9999");

That's all for this topic Optional class in Java. If you have any doubt or any suggestions to make please drop a comment. Thanks!

Related Topics

  1. interface default methods in Java 8
  2. interface static methods in Java 8
  3. Functional interface annotation in Java 8
  4. Functional interfaces & lambda expression
  5. Lambda expression examples in Java 8
  6. Method reference in Java 8

You may also like -

>>>Go to Java advance topics page

Monday, 11 January 2016

CopyOnWriteArrayList in Java

Though we have an option to synchronize the collections like List or Set using synchronizedList or synchronizedSet methods respectively of the Collections class but there is a drawback of this synchronization; very poor performance as the whole collection is locked and only a single thread can access it at a given time. On the other hand there is Vector class too which is thread safe but that thread safety is achieved by having all its method as synchronized which again results in poor performance.

From Java 5 CopyOnWriteArrayList is introduced as a thread-safe variant of ArrayList. It is designed for concurrent access from multiple threads. CopyOnWriteArrayList provides a thread-safe alternative for ArrayList same way ConcurrentHashMap provides a thread-safe alternative for HashMap and CopyOnWriteArraySet for HashSet.

Also check other concurrent classes like CyclicBarrier, ReentrantLock, CountDownLatch with in java.util.Concurrent package.

CopyOnWriteArrayList and thread-safety

CopyOnWriteArrayList is also an implementation of the List interface but it is a thread safe variant. This thread safety is achieved by making a fresh copy of the underlying array with every mutative operations (add, set, and so on). It is evident from the name also Copy on write whenever value is change create a copy.
You may argue that this way of creating a fresh copy whenever any mutative operation is performed must be very costly. Yes it is that is why using CopyOnWriteArrayList provides better performance in scenarios where there are more iterations of the list than mutations.

That brings us to the second point "snapshot style" iterator in CopyOnWriteArrayList.

CopyOnWriteArrayList has a fail-safe iterator

Iterators for the CopyOnWriteArrayList uses a reference to the state of the array at the point that the iterator was created. You know by now any mutation will result in a fresh copy of the underlying array. Thus the array that the iterator has a reference to never changes during the lifetime of the iterator, so interference is impossible and the iterator is guaranteed not to throw ConcurrentModificationException.

The iterator will not reflect additions, removals, or changes to the list since the iterator was created thus it is also known as "snapshot style" iterator.

Element-changing operations on iterators themselves (remove, set, and add) are not supported. These methods throw UnsupportedOperationException.

Since iterator is not affected by the mutations thus multiple threads can iterate the collection without interference from one another or from threads wanting to modify the collection.

Example Code

Let's see it with an example. In the code there is an arraylist and 2 threads are operating on it. While one of the thread is iterating the list the second thread will add a new element to the list. This will result in a ConcurrentModificationException.

public class FailFastDemo {

    public static void main(String[] args) {

        List<String> numList = new ArrayList<String>();
        numList.add("1");
        numList.add("2");
        numList.add("3");
        numList.add("4");
        
        //This thread will iterate the list
        Thread thread1 = new Thread(){ 
            public void run(){ 
                try{ 
                    Iterator<String> i = numList.iterator(); 
                    while (i.hasNext()){ 
                        System.out.println(i.next()); 
                        // Using sleep to simulate concurrency
                        Thread.sleep(1000); 
                    }     
                }catch(ConcurrentModificationException e){ 
                    System.out.println("thread1 : Concurrent modification detected on this list"); 
                    e.printStackTrace();
                }catch(InterruptedException e){
                    
                } 
            } 
        }; 
        thread1.start(); 
        
         // This thread will try to add to the collection,
        // while the collection is iterated by another thread.
        Thread thread2 = new Thread(){ 
            public void run(){ 
              try{ 
                // Using sleep to simulate concurrency
                  Thread.sleep(2000);
                  // adding new value to the shared list
                  numList.add("5"); 
                  System.out.println("new value added to the list"); 
              }catch(ConcurrentModificationException e){ 
                  System.out.println("thread2 : Concurrent modification detected on the List"); 
              } catch(InterruptedException e){}
          } 
        }; 
        thread2.start(); 
    }
}

Output

1
2
new value added to the list
thread1 : Concurrent modification detected on this list
java.util.ConcurrentModificationException
 at java.util.ArrayList$Itr.checkForComodification(Unknown Source)
 at java.util.ArrayList$Itr.next(Unknown Source)
 at org.netjs.prog.FailFastDemo$1.run(FailFastDemo.java:24)

Here it can be seen that the ConcurrentModificationException is thrown because the list is changed by a thread while it has been iterated by another thread.

Now in the same code change the ArrayList to CopyOnWriteArrayList. Also added on sysout after adding new element to the list.

public class FailFastDemo {

    public static void main(String[] args) {

        List<String> numList = new CopyOnWriteArrayList<String>();
        numList.add("1");
        numList.add("2");
        numList.add("3");
        numList.add("4");
        
        //This thread will iterate the list
        Thread thread1 = new Thread(){ 
            public void run(){ 
                try{ 
                    Iterator<String> i = numList.iterator(); 
                    while (i.hasNext()){ 
                        System.out.println(i.next()); 
                        // Using sleep to simulate concurrency
                        Thread.sleep(1000); 
                    }     
                }catch(ConcurrentModificationException e){ 
                    System.out.println("thread1 : Concurrent modification detected on this list"); 
                    e.printStackTrace();
                }catch(InterruptedException e){
                    
                } 
            } 
        }; 
        thread1.start(); 
        
        // This thread will try to add to the collection,
        // while the collection is iterated by another thread.
        Thread thread2 = new Thread(){ 
            public void run(){ 
              try{ 
                // Using sleep to simulate concurrency
                  Thread.sleep(2000);
                  // adding new value to the shared list
                  numList.add("5"); 
                  System.out.println("new value added to the list"); 
                  System.out.println("List " + numList);
              }catch(ConcurrentModificationException e){ 
                  System.out.println("thread2 : Concurrent modification detected on the List"); 
              } catch(InterruptedException e){}
          } 
        }; 
        thread2.start();    
    }
}

Output

1
2
3
new value added to the list
List [1, 2, 3, 4, 5]
4

Here ConcurrentModificationException is not thrown as CopyOnWriteArrayList is used now. Also note that, though one of the thread adds a new element and at that time the list prints all the elements from 1-5. But the iterator has the reference to the old copy of the list and it prints from 1-4.

Points to note

  • CopyOnWriteArrayList provides a thread-safe alternative to the normal ArrayList.
  • In CopyOnWriteArrayList thread safety is achieved in a different way from a thread safe collection like Vector. In CopyOnWriteArrayList fresh copy of the underlying array is created with every mutative operations (add, set, and so on).
  • That approach makes it giving better performance in case there are more threads iterating the list than mutating it. Several threads can iterate CopyOnWriteArrayList concurrently.
  • CopyOnWriteArrayList's iterator is fail-safe and guaranteed not to throw ConcurrentModificationException.
  • Iterators for the CopyOnWriteArrayList uses a reference to the state of the array at the point that the iterator was created.
  • The iterator will not reflect additions, removals, or changes to the list since the iterator was created thus it is also known as "snapshot style" iterator.
  • All elements are permitted, including null.

That's all for this topic CopyOnWriteArrayList in Java. If you have any doubt or any suggestions to make please drop a comment. Thanks!


Related Topics

  1. ConcurrentHashMap in Java
  2. CyclicBarrier in Java concurrency
  3. CountDownLatch in Java concurrency
  4. How ArrayList works internally in Java
  5. fail-fast Vs fail-safe iterator in Java
  6. Java Concurrency interview questions

You may also like -

Wednesday, 6 January 2016

ConcurrentHashMap in Java

Though we have an option to synchronize the collections like List or Set using synchronizedList or synchronizedSet method respectively of the Collections class but there is a drawback of this synchronization; very poor performance as the whole collection is locked and only a single thread can access it at a given time. On the other hand there is HashTable too which is thread safe but that thread safety is achieved by having all its method as synchronized which again results in poor performance.

From Java 5 ConcurrentHashMap is introduced as another alternative for HashTable(or explicitly synchronizing the map using synchronizedMap method).

Why another Map

First thing that comes to mind is why another map when we already have HashMap or HashTable (if thread safe data structure is required). The answer is better performance though still providing a thread safe alternative. So it can be said that ConcurrentHashMap is a hash map whose operations are threadsafe.

How performance is improved

As I said ConcurrentHashMap is also a hash based map like HashMap, how it differs is the locking strategy used by ConcurrentHashMap. Unlike HashTable (or synchronized HashMap) it doesn't synchronize every method on a common lock. ConcurrentHashMap uses separate lock for separate buckets thus locking only a portion of the Map. Just for information ConcurrentHashMap uses ReentrantLock for locking

If you have idea about the internal implementation of the HashMap you must be knowing that by default there are 16 buckets. Same concept is used in ConcurrentHashMap and by default there are 16 buckets and also separate locks for separate buckets. So the default concurrency level is 16.

Which is effectively this constructor

ConcurrentHashMap()
Creates a new, empty map with a default initial capacity (16), load factor (0.75) and concurrencyLevel (16).

There are several other constructors, one of them is

ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel)

Creates a new, empty map with the specified initial capacity, load factor and concurrency level. So you can see that you can provide your own initial capacity (default is 16), own load factor and also concurrency level (Which by default is 16).

Since there are 16 buckets having separate locks of their own which effectively means at any given time 16 threads can operate on the map concurrently provided all these threads are operating on separate buckets.

So you see how ConcurrentHashMap provides better performance by locking only the portion of the map rather than blocking the whole map resulting in greater shared access.

That is not all, performance is further improved by providing read access concurrently without any blocking. Retrieval operations (including get) generally do not block, so may overlap with update operations (including put and remove). Retrievals reflect the results of the most recently completed update operations which may mean that retrieval operations may not fetch the current/in-progress value (Which is one drawback). Memory visibility for the read operations is ensured by volatile reads. You can see in the ConcurrentHashMap code that the val and next fields of Node are volatile.

Also for aggregate operations such as putAll and clear which works on the whole map, concurrent retrievals may reflect insertion or removal of only some entries (another drawback of separate locking). Because read operations are not blocking but some of the writes (which are on the same bucket) may still be blocking.

Simple example using ConcurrentHashMap

At this juncture let's see a simple example of ConcurrentHashMap.

public class CHMDemo {

    public static void main(String[] args) {
        // Creating ConcurrentHashMap
        Map<String, String> cityTemperatureMap = new ConcurrentHashMap<String, String>();
        
        // Storing elements
        cityTemperatureMap.put("Delhi", "24");
        cityTemperatureMap.put("Mumbai", "32");
        //cityTemperatureMap.put(null, "26");
        cityTemperatureMap.put("Chennai", "35");
        cityTemperatureMap.put("Bangalore", "22" );
        
        for (Map.Entry e : cityTemperatureMap.entrySet()) {
            System.out.println(e.getKey() + " = " + e.getValue());
        }
    }
}

Null is not allowed

Though HashMap allows one null as key but ConcurrentHashMap doesn't allow null as key. In the previous example you can uncomment the line which has null key. While trying to execute the program it will throw null pointer exception.

Exception in thread "main" java.lang.NullPointerException
 at java.util.concurrent.ConcurrentHashMap.putVal(Unknown Source)
 at java.util.concurrent.ConcurrentHashMap.put(Unknown Source)
 at org.netjs.prog.CHMDemo.main(CHMDemo.java:16)

Atomic operations

ConcurrentHashMap provides a lot of atomic methods, let's see it with an example how these atomic methods help. Note that from Java 8 many new atomic methods are added.

Suppose you have a word Map that counts the frequency of every word where key is the word and count is the value, in a multi-threaded environment, even if ConcurrentHashMap is used, there may be a problem as described in the code snippet.

public class CHMAtomicDemo {

    public static void main(String[] args) {
        ConcurrentHashMap<String, Integer> wordMap = new ConcurrentHashMap<>();
        ..
        ..
        // Suppose one thread is interrupted after this line and 
        // another thread starts execution
        Integer prevValue = wordMap.get(word); 
        
        Integer newValue = (prevValue == null ? 1 : prevValue + 1);
        // Here the value may not be correct after the execution of 
        // both threads
        wordMap.put(word, newValue);  

    }

}

To avoid these kind of problems you can use atomic method, one of the atomic method is compute which can be used here.

wordMap.compute(word, (k,v)-> v == null ? 1 : v + 1);

If you see the general structure of the Compute method

compute(K key, BiFunction<? super K,? super V,? extendsV> remappingFunction)
Here BiFunction functional interface is used which can be implemented as a lambda expression.

So here rather than having these lines -

Integer prevValue = wordMap.get(word); 
Integer newValue = (prevValue == null ? 1 : prevValue + 1);
wordMap.put(word, newValue);
you can have only this line
wordMap.compute(word, (k,v)-> v == null ? 1 : v + 1);

The entire method invocation is performed atomically. Some attempted update operations on this map by other threads may be blocked while computation is in progress.

There are several other atomic operations like computeIfAbsent, computeIfPresent, merge, putIfAbsent.

Fail-safe iterator

The iterator generated by the ConcurrentHashMap is fail-safe which means it will not throw ConcurrentModificationException.

Example code

public class CHMDemo {

    public static void main(String[] args) {
        // Creating ConcurrentHashMap
        Map<String, String> cityTemperatureMap = new ConcurrentHashMap<String, String>();
        
        // Storing elements
        cityTemperatureMap.put("Delhi", "24");
        cityTemperatureMap.put("Mumbai", "32");
        cityTemperatureMap.put("Chennai", "35");
        cityTemperatureMap.put("Bangalore", "22" );
        
        Iterator<String> iterator = cityTemperatureMap.keySet().iterator();   
        while (iterator.hasNext()){
            System.out.println(cityTemperatureMap.get(iterator.next()));
            // adding new value, it won't throw error
            cityTemperatureMap.put("Kolkata", "34");
            
        }
    }
}

Output

24
35
34
32
22

According to the JavaDocs - The view's iterator is a "weakly consistent" iterator that will never throw ConcurrentModificationException, and guarantees to traverse elements as they existed upon construction of the iterator, and may (but is not guaranteed to) reflect any modifications subsequent to construction.

When ConcurrentHashMap is a better choice

ConcurrentHashMap is a better choice when there are more reads than writes. As mentioned above retrieval operations are non-blocking so many concurrent threads can read without any performance problem. If there are more writes and that too many threads operating on the same segment then the threads will block which will deteriorate the performance.

Points to note

  • ConcurrentHashMap is also a hash based map like HashMap, but ConcurrentHashMap is thread safe.
  • In ConcurrentHashMap thread safety is ensured by having separate locks for separate buckets, resulting in better performance.
  • By default the bucket size is 16 and the concurrency level is also 16.
  • No null keys are allowed in ConcurrentHashMap.
  • Iterator provided by the ConcurrentHashMap is fail-safe, which means it will not throw ConcurrentModificationException.
  • Retrieval operations (like get) don't block so may overlap with update operations (including put and remove).

That's all for this topic ConcurrentHashMap in Java. If you have any doubt or any suggestions to make please drop a comment. Thanks!


Related Topics

  1. Difference between HashMap and ConcurrentHashMap in Java
  2. CopyOnWriteArrayList in Java
  3. How HashMap internally works in Java
  4. Lambda expression examples in Java 8
  5. CountDownLatch in Java concurrency
  6. Java Concurrency interview questions

You may also like -