Tuesday, 16 February 2016

BlockingQueue in Java Concurrency

BlockingQueue, an interface is added in Java 5 with in the java.util.concurrent package which provides many other concurrent utilities like CyclicBarrier, Phaser, ConcurrentHashMap, ReentranctLock etc.

BlockingQueue as the name suggests is a queue that can block the operations. Which means BlockingQueue supports operations that wait for the queue to become non-empty when retrieving an element, and wait for space to become available in the queue when storing an element.

For that BlockingQueue interface has two specific methods -

  1. put(E e) - Inserts the specified element into this queue, waiting if necessary for space to become available.
  2. take() - Retrieves and removes the head of this queue, waiting if necessary until an element becomes available.

BlockingQueue Methods

BlockingQueue methods come in four forms -

Throws exception Special value Blocks Times out
Insert add(e) offer(e) put(e) offer(e, time, unit)
Remove remove() poll() take() poll(time, unit)
Examine element() peek() not applicable not applicable
  1. Methods in first column throw exception if the operation cannot be executed immediately i.e. these methods won't block.
  2. Methods in second column return a special value (either null or false, depending on the operation) if operation cannot be performed immediately.
  3. Methods in third column will block the current thread indefinitely until the operation can succeed.
  4. Methods in fourth column block for only a given maximum time limit before giving up.

No nulls

A BlockingQueue does not accept null elements. Implementations (like LinkedBlockingQueue or ArrayBlockingQueue) throw NullPointerException on attempts to add, put or offer a null.

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class BQDemo {
    public static void main(String[] args) {
        BlockingQueue<String> arrayBlockingQ = new ArrayBlockingQueue<String>(2);
        try {
            arrayBlockingQ.put(null);
        } catch (InterruptedException e) {
            System.out.println("Exception occurred" + e);
        }
    }
}

Output

Exception in thread "main" java.lang.NullPointerException
 at java.util.concurrent.ArrayBlockingQueue.checkNotNull(Unknown Source)
 at java.util.concurrent.ArrayBlockingQueue.put(Unknown Source)
 at org.netjs.prgrm.BQDemo.main(BQDemo.java:10)

BlockingQueue Superinterfaces

BlockingQueue extends Collection, Queue and Iterable interfaces so it inherits all Collection and Queue methods.

As exp. add(E e), remove(Object o) from the Collection interface which are different from the other two methods put() and take() in the way that add() and remove() don't block, they throw exception if the operation cannot be executed immediately.

poll() and peek() operations from Queue interface where

  • poll() - Retrieves and removes the head of this queue, or returns null if this queue is empty.
  • peek() - Retrieves, but does not remove, the head of this queue, or returns null if this queue is empty.

BlockingQueue implementations are thread-safe

BlockingQueue implementations like ArrayBlockingQueue, LinkedBlockingQueue are thread-safe. All queuing methods use internal locks or other forms of concurrency control to achieve their effects atomically.

Since BlockingQueue interface also extends Collection interface so it inherits operations from Collection interface also. However, the bulk Collection operations addAll, containsAll, retainAll and removeAll are not necessarily performed atomically unless specified otherwise in an implementation. So it is possible, for example, for addAll(c) to fail (throwing an exception) after adding only some of the elements in c.

BlockingQueue capacity

A BlockingQueue may be capacity bounded or unbounded. At any given time it may have a remainingCapacity beyond which no additional elements can be put without blocking.

For a bounded BlockingQueue implementation we have to create the BlockingQueue with the given (fixed) capacity.

As exp. ArrayBlockingQueue for which capacity has to be specified.

BlockingQueue<String> arrayBlockingQ = new ArrayBlockingQueue<String>(2);

In case of LinkedBlockingQueue or PriorityBlockingQueue both can be bounded or unbounded.

BlockingQueue<String> linkedBlockingQ = new LinkedBlockingQueue<String>(2);
        
Queue<String> linkedBlockingQ = new LinkedBlockingQueue<String>();
So both are ok.

Note that a BlockingQueue without any intrinsic capacity constraints always reports a remaining capacity of Integer.MAX_VALUE.

BlockingQueue Usage

BlockingQueue implementations are designed to be used primarily for producer-consumer queues because of the blocking methods put() and take() which facilitates inter-thread communication.

It can also be used as a bounded buffer. Let's say you have a ArrayBlockingQueue of capacity 10. So one thread can keep putting values in it and another thread can read from it once the buffer is full thus creating a bounded buffer.

At any time if all 10 slots are filled put() will block and same way for take() if there are no elements to read it will block.

Example Code

public static void main(String[] args) {
    BlockingQueue<String> arrayBlockingQ = new ArrayBlockingQueue<String>(2);
    try {
              arrayBlockingQ.put("A");
              arrayBlockingQ.put("B");
              System.out.println("------ 1 -------");
              arrayBlockingQ.forEach(a->System.out.println(a));
              arrayBlockingQ.take();
              arrayBlockingQ.put("C");
              System.out.println("------ 2 -------");
              
              arrayBlockingQ.forEach(a->System.out.println(a));
          } catch (InterruptedException e) {
              System.out.println("Exception occurred" + e);
          }
}

Output

------ 1 -------
A
B
------ 2 -------
B
C

Here it can be seen how elements are added at the end, while taking it is retrieved from the head of the queue.

Implementing classes of the BlockingQueue

Source : https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/BlockingQueue.html

That's all for this topic BlockingQueue in Java. If you have any doubt or any suggestions to make please drop a comment. Thanks!


Related Topics

  1. ConcurrentHashMap in Java
  2. CopyOnWriteArrayList in Java
  3. Difference between CountDownLatch and CyclicBarrier
  4. Phaser in Java concurrency
  5. Synchronization in Java multithreading
  6. Java Concurrency interview questions

You may also like -

No comments:

Post a Comment