So far I’ve written two articles on Producer Consumer concept on Crunchify. 1st one to explain Java Semaphore and Mutex and 2nd one to explain Concurrent Read/Write.
In this Java Tutorial we will go over same Producer/Consumer concept to explain the BlockingQueue in Java
.
What are the advantages of Blocking Queue in Java?
A java.util.Queue
supports operations that wait for the queue to become non-empty when retrieving an element, and wait for space to become available in the queue when storing an element.
We need to create four Java Classes:
- CrunchifyMessage.java to put and get message
- CrunchifyBlockingProducer.java to put message into queue
- CrunchifyBlockingConsumer.java to get message from queue
- CrunchifyBlockingMain.java to start test
BlockingQueue implementations are thread-safe
. All queuing methods are atomic in nature and use internal locks.
Let’s get started on Thread-Safe BlockingQueue implementation in Java
Step-1
Create class CrunchifyMessage.java
. This is simple Java Object.
package com.crunchify.example; /** * @author Crunchify.com * simple Message class to put and get message into queue */ public class CrunchifyMessage { private String crunchifyMsg; public CrunchifyMessage(String string) { this.crunchifyMsg = string; } public String getMsg() { return crunchifyMsg; } }
Step-2
Create producer CrunchifyBlockingProducer.java
which created simple msg and put it into queue.
package com.crunchify.example; import java.util.concurrent.BlockingQueue; /** * @author Crunchify.com * */ public class CrunchifyBlockingProducer implements Runnable { private BlockingQueue<CrunchifyMessage> crunchQueue; public CrunchifyBlockingProducer(BlockingQueue<CrunchifyMessage> queue) { this.crunchQueue = queue; } @Override public void run() { // producing CrunchifyMessage messages for (int i = 1; i <= 5; i++) { CrunchifyMessage msg = new CrunchifyMessage("i'm msg " + i); try { Thread.sleep(10); crunchQueue.put(msg); System.out.println("CrunchifyBlockingProducer: Message - " + msg.getMsg() + " produced."); } catch (Exception e) { System.out.println("Exception:" + e); } } // adding exit message CrunchifyMessage msg = new CrunchifyMessage("All done from Producer side. Produced 50 CrunchifyMessages"); try { crunchQueue.put(msg); System.out.println("CrunchifyBlockingProducer: Exit Message - " + msg.getMsg()); } catch (Exception e) { System.out.println("Exception:" + e); } } }
Step-3
Create class CrunchifyBlockingConsumer.java
which consumes message from queue.
package com.crunchify.example; import java.util.concurrent.BlockingQueue; /** * @author Crunchify.com * */ public class CrunchifyBlockingConsumer implements Runnable { private BlockingQueue<CrunchifyMessage> queue; public CrunchifyBlockingConsumer(BlockingQueue<CrunchifyMessage> queue) { this.queue = queue; } @Override public void run() { try { CrunchifyMessage msg; // consuming messages until exit message is received while ((msg = queue.take()).getMsg() != "exit") { Thread.sleep(10); System.out.println("CrunchifyBlockingConsumer: Message - " + msg.getMsg() + " consumed."); } } catch (InterruptedException e) { e.printStackTrace(); } } }
Step-4
Create simple CrunchifyBlockingMain.java
method which runs the BlockingQueue test. Run this program to check BlockingQueue behavior.
package com.crunchify.example; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; /** * @author Crunchify.com * */ public class CrunchifyBlockingMain { public static void main(String[] args) { // Creating BlockingQueue of size 10 // BlockingQueue supports operations that wait for the queue to become non-empty when retrieving an element, and // wait for space to become available in the queue when storing an element. BlockingQueue<CrunchifyMessage> crunchQueue = new ArrayBlockingQueue<>(10); CrunchifyBlockingProducer crunchProducer = new CrunchifyBlockingProducer(crunchQueue); CrunchifyBlockingConsumer crunchConsumer = new CrunchifyBlockingConsumer(crunchQueue); // starting producer to produce messages in queue new Thread(crunchProducer).start(); // starting consumer to consume messages from queue new Thread(crunchConsumer).start(); System.out.println("Let's get started. Producer / Consumer Test Started.\n"); } }
A BlockingQueue does not accept null elements. Implementations throw NullPointerException on attempts to add, put or offer a null.
A null is used as a sentinel value to indicate failure of poll operations.
Result:
Let's get started. Producer / Consumer Test Started. CrunchifyBlockingProducer: Message - i'm msg 1 produced. CrunchifyBlockingProducer: Message - i'm msg 2 produced. CrunchifyBlockingConsumer: Message - i'm msg 1 consumed. CrunchifyBlockingConsumer: Message - i'm msg 2 consumed. CrunchifyBlockingProducer: Message - i'm msg 3 produced. CrunchifyBlockingConsumer: Message - i'm msg 3 consumed. CrunchifyBlockingProducer: Message - i'm msg 4 produced. CrunchifyBlockingConsumer: Message - i'm msg 4 consumed. CrunchifyBlockingProducer: Message - i'm msg 5 produced. CrunchifyBlockingProducer: Exit Message - All done from Producer side. Produced 50 CrunchifyMessages CrunchifyBlockingConsumer: Message - i'm msg 5 consumed. CrunchifyBlockingConsumer: Message - All done from Producer side. Produced 50 CrunchifyMessages consumed.
When we should use java.util.concurrent.BlockingQueue?
- When you want to throttle some sort of incoming request then you should use the same
- A producers can get far ahead of the consumers with an unbounded queue. If consumer is not catching up with producer then it may cause an
OutOfMemoryError
. In situations like these, it may be better to signal a would-be producer that the queue is full, and to give up quickly with a failure.- In other words: the producers are naturally throttled.
- Blocking Queue is normally used in concurrent application
- It provides a correct, thread-safe implementation
- Memory consumption should be limited as well