Java’s concurrent package is very powerful and many people have not really utilized it to the fullest yet. I am trying to take a simple example to demonstrate how we can leverage this powerful implementation.
Here is a brief description about concurrent Blocking Queue from Java API docs
An ArrayBlockingQueue is a implementation of blocking queue with an array used to store the queued objects. The head of the queue is that element that has been on the queue the longest time. The tail of the queue is that element that has been on the queue the shortest time. New elements are inserted at the tail of the queue, and the queue retrieval operations obtain elements at the head of the queue.
ArrayBlockingQueue requires you to specify the capacity of queue at the object construction time itself. Once created, the capacity cannot be increased.
This is a classic “bounded buffer” (fixed size buffer), in which a fixed-sized array holds elements inserted by producers and extracted by consumers.
Attempts to put an element to a full queue will result in the put operation blocking; attempts to retrieve an element from an empty queue will be blocked.
The implementation of ArrayBlockingQueue supports both blocking and non-blocking operations for publishing to queue and reading from queue.
Here are few important methods to keep in mind while programming with ArrayBlockingQueue
1. Methods for Publishing
The Non-blocking offer(E) method to publish – This method inserts the specified element at the tail of this queue if possible, returning immediately if this queue is full.
The Timed-blocking offer(E o, long time-out, TimeUnit unit) method to publish – This method inserts the specified element at the tail of this queue, waiting if necessary up to the specified wait time for space to become available.
The blocking put(E) method to publish – This method adds the specified element to the tail of this queue, waiting if necessary for space to become available.
1. Methods for Consuming
The non-blocking peek() method to read Retrieves, but does not remove, the head of this queue, returning null if this queue is empty.
The Non-blocking poll() method to read & remove from queue – This method retrieves and removes the head of this queue, or null if this queue is empty.
The Timed-blocking poll(long time-out, TimeUnit unit) method to read & remove from queue – This method retrieves and removes the head of this queue, waiting if necessary up to the specified wait time if no elements are present on this queue.
The blocking take() method to read & remove from queue – This method retrieves and removes the head of this queue, waiting if no elements are present on this queue.
Below is a simple producer consumer example with various scenarios. The scenarios of producer and consumer may vary based on the speed and concurrency of producers and consumers.
Consumer.java is the example code for Consumer using ArrayBlockingQueue implementation.
package queue; import java.util.concurrent.BlockingQueue; /** * This is a simple Consumer example class which uses ArrayBlockingQueue. * * @author swiki * */ public class Consumer implements Runnable { private final BlockingQueue queue; private volatile boolean stopConsuming = false; private int timeToConsume = 1; Consumer(BlockingQueue q) { queue = q; } public void run() { try { while (true) { Object objectFromQueue = queue.poll(); /** * The non-blocking poll() method returns null if the queue is * empty */ if (objectFromQueue == null) { long start = System.currentTimeMillis(); /** * Now use the blocking take() method which can wait for the * object to be available in queue. */ objectFromQueue = queue.take(); System.out .println("It seems Producer is slow. Consumer waited for " + (System.currentTimeMillis() - start) + "ms"); } if (objectFromQueue != null) { consume(objectFromQueue); } if (stopConsuming) { break; } } } catch (InterruptedException ex) { } } void consume(Object x) { try { Thread.sleep(timeToConsume); } catch (Throwable t) { } } public void setStopConsuming(boolean stopConsuming) { this.stopConsuming = stopConsuming; } public void setTimeToConsume(int timeToConsume) { this.timeToConsume = timeToConsume; } }
Producer.java is the example code for Producer using ArrayBlockingQueue implementation.
package queue; import java.util.concurrent.BlockingQueue; /** * This is a simple Producer example class which uses ArrayBlockingQueue. * * @author swiki * */ public class Producer implements Runnable { private final BlockingQueue queue; private int timeToProduce = 1; private volatile boolean stopProducing = false; Producer(BlockingQueue q) { queue = q; } public void run() { try { while (true) { Object objectForQueue = produce(); if (!queue.offer(objectForQueue)) { /* * The non-blocking offer() method returns false if it was * not possible to add the element to this queue. */ long start = System.currentTimeMillis(); /* * Now use the put method as its a blocking call and it wail * until the queue space is available. */ queue.put(objectForQueue); System.out .println("It seems Consumer is slow. Producer waited for " + (System.currentTimeMillis() - start) + "ms"); } if (stopProducing) { break; } } } catch (InterruptedException ex) { } } /** * Produces something in timeToProduce ms * * @return */ public Object produce() { try { Thread.sleep(timeToProduce); } catch (Throwable t) { } return "product"; } public void setTimeToProduce(int timeToProduce) { this.timeToProduce = timeToProduce; } }
TestArrayBlockingQueue.java file is a example class which is used to test the producer/consumer example.
package queue; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; /** * This is a simple example class which tests the Producer/Consumer example * using ArrayBlockingQueue. * * @author swiki * */ public class TestArrayBlockingQueue { public static void main(String args[]) { // testSlowProducer(); testSlowConsumer(); } /** * This test uses 2 consumers and 1 producer which will make consumers * faster then producer. * * Only for demonstration purpose 1. You can also try * Consumer.setTimeToConsume() method to explicitly slow down/speed up the * consumer. * * 2. You can also try Producer.setTimeToProduce() method to explicitly slow * down/speed up the producer. * */ public static void testSlowProducer() { BlockingQueue q = new ArrayBlockingQueue(100); Producer p = new Producer(q); Consumer c1 = new Consumer(q); Consumer c2 = new Consumer(q); new Thread(p).start(); new Thread(c1).start(); new Thread(c2).start(); } /** * This test uses 2 producers and 1 consumer which will make consumers * slower then producer. * * Only for demonstration purpose 1. You can also try * Consumer.setTimeToConsume() method to explicitly slow down/speed up the * consumer. * * 2. You can also try Producer.setTimeToProduce() method to explicitly slow * down/speed up the producer. * */ public static void testSlowConsumer() { BlockingQueue q = new ArrayBlockingQueue(100); Producer p = new Producer(q); Producer p2 = new Producer(q); Consumer c1 = new Consumer(q); new Thread(p).start(); new Thread(p2).start(); new Thread(c1).start(); } }
Using combination of above described methods may help you get better control over the situation. If you see the Producer implementation I have first called non-blocking offer() method and if the offer fails I start my waiting counter and use the blocking put() method.
Similarly in case of consumer I have called the non-blocking method poll which returns null in case the queue is empty and then I start my waiting counter and use the blocking take() method. This way you can get live status of the producer consumer situation and take action based on which end is slow/fast.
Scenario 1 – Here is a sample output when I run the TestArrayBlockingQueue class. This is a scenario when I am assuming the producer is slow and taking more time to produce then consumers are consuming. In this test I have used 2 consumers and 1 producer, which will make consumers faster then producer. Its only for demonstration purpose, so if you want to try more options you can try following.
1. You can also try Consumer.setTimeToConsume() method to explicitly speed up the consumer (say 0 ms).
2. You can also try Producer.setTimeToProduce() method to explicitly slow down the producer. (say 5 ms)
It seems Producer is slow. Consumer waited for 15ms It seems Producer is slow. Consumer waited for 15ms It seems Producer is slow. Consumer waited for 0ms It seems Producer is slow. Consumer waited for 0ms It seems Producer is slow. Consumer waited for 0ms It seems Producer is slow. Consumer waited for 0ms It seems Producer is slow. Consumer waited for 16ms
Scenario 2 – Here is a sample output when I run the TestArrayBlockingQueue class. This is a scenario when I am assuming the producer is faster and taking less time to produce then consumers are consuming. In this test I have used 2 producers and 1 consumer, which will make consumers faster then producer. Its only for demonstration purpose, so if you want to try more options you can try following.
1. You can try Consumer.setTimeToConsume() method to explicitly slow down the consumer(say 5 ms).
2. You can try Producer.setTimeToProduce() method to explicitly speed up the producer. (say 0 ms)
It seems Consumer is slow. Producer waited for 0ms It seems Consumer is slow. Producer waited for 0ms It seems Consumer is slow. Producer waited for 15ms It seems Consumer is slow. Producer waited for 0ms It seems Consumer is slow. Producer waited for 0ms It seems Consumer is slow. Producer waited for 0ms It seems Consumer is slow. Producer waited for 0ms It seems Consumer is slow. Producer waited for 0ms It seems Consumer is slow. Producer waited for 0ms It seems Consumer is slow. Producer waited for 16ms
In this example I have used ArrayBlockingQueue implementation, you can try different implementations like DelayQueue, LinkedBlockingQueue, PriorityBlockingQueue, SynchronousQueue etc to experiment more on same.