Course Content
Multithreading in Java
Multithreading in Java
BlockingQueue and its Implementations
Basic BlockingQueue Implementations
We won't go through each realization in detail, as it would take a lot of time, and it's unlikely you'll need them all. I will talk about the general concepts and what constructors they have.
Real Life Example
Imagine a factory where one thread, the producer, creates parts, and another thread, the consumer, processes them. The producer places the parts into a queue, while the consumer retrieves and processes them from the queue. If the queue runs out of parts, the consumer waits for the producer to add more. Conversely, if the queue is full, the producer waits for the consumer to make space.
Note
A little below we will implement this task in code.
Differences from Other Collection Types
The BlockingQueue
provides automated synchronization, managing thread access to the queue without requiring manual synchronization. It also supports blocking operations for adding and retrieving items, a feature not found in other collections like ArrayList
or LinkedList
.
BlockingQueue Implementations
ArrayBlockingQueue
: A size-limited queue that uses an array to store items.
Main
// Constructor with fixed capacity BlockingQueue<String> queue1 = new ArrayBlockingQueue<>(5); // Constructor with fixed capacity and fair access BlockingQueue<String> queue2 = new ArrayBlockingQueue<>(5, true); // Constructor with fixed capacity and initial collection of elements Collection<String> initialElements = java.util.Arrays.asList("One", "Two", "Three"); BlockingQueue<String> queue3 = new ArrayBlockingQueue<>(5, false, initialElements);
Explanation
The
true
parameter enables a fair access policy by providing a FIFO order for thread access.
LinkedBlockingQueueue
: A queue based on linked nodes that can be restricted or unrestricted.
Main
// Constructor without capacity bounds BlockingQueue<String> queue1 = new LinkedBlockingQueue<>(); // Constructor with fixed capacity BlockingQueue<String> queue2 = new LinkedBlockingQueue<>(5); // Constructor with initial collection of elements Collection<String> initialElements = java.util.Arrays.asList("One", "Two", "Three"); BlockingQueue<String> queue3 = new LinkedBlockingQueue<>(initialElements);
PriorityBlockingQueue
: An unbounded prioritized queue where elements are retrieved according to their natural order or as specified by a comparator.
Main
// Constructor without initial capacity (default is 11) BlockingQueue<Integer> queue1 = new PriorityBlockingQueue<>(); // Constructor with initial capacity BlockingQueue<Integer> queue2 = new PriorityBlockingQueue<>(5); // Constructor with initial capacity and comparator Comparator<Integer> comparator = Integer::compareTo; BlockingQueue<Integer> queue3 = new PriorityBlockingQueue<>(5, comparator); // Constructor with initial collection of elements Collection<Integer> initialElements = java.util.Arrays.asList(1, 3, 2); BlockingQueue<Integer> queue4 = new PriorityBlockingQueue<>(initialElements)
DelayQueue
: A delayed queue where items can only be retrieved after their delay has expired.
DelayedElement
DelayQueueConstructors
class DelayedElement implements Delayed { private final long expirationTime; // The time when the element will be available public DelayedElement(long delay, TimeUnit unit) { this.expirationTime = System.currentTimeMillis() + unit.toMillis(delay); } @Override public long getDelay(TimeUnit unit) { long delay = expirationTime - System.currentTimeMillis(); // Calculate the remaining delay return unit.convert(delay, TimeUnit.MILLISECONDS); // Convert the delay to the specified time unit } @Override public int compareTo(Delayed o) { return Long.compare(this.expirationTime, ((DelayedElement) o).expirationTime); } }
This code demonstrates the use of the DelayedElement
class, which implements the Delayed
interface, and the DelayQueue
delay queue in Java. The DelayedElement
class defines a getDelay
method to calculate the remaining delay time and a compareTo
method to compare objects based on the delay expiration time.
The main
method creates two queues: queue1
, an empty delay queue, and queue2
, a queue initialized with elements that have a delay of 5 and 1 second, respectively.
The items in DelayQueueue
become available for retrieval after the specified delay time has elapsed.
SynchronousQueueue
: A queue without capacity, where each insert operation must wait for the corresponding extract operation and vice versa.
Main
// Constructor without fair access BlockingQueue<String> queue1 = new SynchronousQueue<>(); // Constructor with fair access BlockingQueue<String> queue2 = new SynchronousQueue<>(true);
The Main Methods of the BlockingQueue:
Adding elements:
The void put(E e)
method inserts an item into the queue, blocking the thread if the queue is full. Alternatively, the boolean offer(E e, long timeout, TimeUnit unit)
method attempts to add an item to the queue, waiting for the specified time if the queue is full.
Main
public class BlockingQueueExample { public static void main(String[] args) { BlockingQueue<String> queue = new ArrayBlockingQueue<>(2); try { queue.put("Element 1"); // Insert the first element, no blocking. queue.put("Element 2"); // Insert the second element, no blocking. // Try to add the third element with a 2-second timeout. // Since the queue is full, it will wait for 2 seconds. boolean success = queue.offer("Element 3", 2, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); } } }
This example demonstrates inserting two elements into a BlockingQueue
without blocking, followed by an attempt to add a third element with a 2-second timeout using the offer()
method, which will wait if the queue is full.
Element Retrieval:
The E take()
method retrieves and returns an item from the queue, blocking the thread if the queue is empty. Alternatively, the E poll(long timeout, TimeUnit unit)
method attempts to retrieve an item from the queue, waiting for the specified time if the queue is empty.
Main
public class BlockingQueueRetrievalExample { public static void main(String[] args) { BlockingQueue<String> queue = new ArrayBlockingQueue<>(2); try { // Adding elements to the queue queue.put("Element 1"); queue.put("Element 2"); // Retrieve and remove the first element, no blocking since the queue is not empty String item1 = queue.take(); // Returns "Element 1" // Attempt to retrieve and remove the next element with a 2-second timeout String item2 = queue.poll(2, TimeUnit.SECONDS); // Returns "Element 2" // Attempt to retrieve an element when the queue is empty, this will block for 2 seconds String item3 = queue.poll(2, TimeUnit.SECONDS); // Returns `null` after timeout } catch (InterruptedException e) { e.printStackTrace(); } } }
This code adds two elements to a BlockingQueue
, retrieves and removes the first element immediately, attempts to retrieve the next element with a 2-second timeout, and finally tries to retrieve an element from an empty queue, which results in a null
after the timeout.
Checking and Removing Elements:
The boolean remove(Object o)
method removes the specified element from the queue if it is present. On the other hand, the boolean contains(Object o)
method checks if the specified element is present in the queue without removing it.
Main
public class BlockingQueueCheckRemoveExample { public static void main(String[] args) { BlockingQueue<String> queue = new ArrayBlockingQueue<>(2); try { // Adding elements to the queue queue.put("Element 1"); queue.put("Element 2"); // Check if "Element 1" is in the queue, should return `true` boolean containsElement1 = queue.contains("Element 1"); // true // Remove "Element 1" from the queue, should return `true` boolean removedElement1 = queue.remove("Element 1"); // true // Check if "Element 1" is still in the queue, should return `false` boolean containsElement1AfterRemoval = queue.contains("Element 1"); // false // Try to remove an element that is not in the queue, should return `false` boolean removedElement3 = queue.remove("Element 3"); // false } catch (InterruptedException e) { e.printStackTrace(); } } }
This code adds two elements to a BlockingQueue
, checks for the presence of "Element 1", removes it, checks again to confirm its removal, and then attempts to remove a nonexistent element.
Polls the State of the Queue:
The int size()
method returns the number of elements currently in the queue. To determine if the queue is empty, you can use the boolean isEmpty()
method, which checks whether the queue has no elements. For queues with a fixed capacity, the int remainingCapacity()
method provides the number of remaining spaces available in the queue.
Main
public class BlockingQueueCapacityExample { public static void main(String[] args) { BlockingQueue<String> queue = new ArrayBlockingQueue<>(3); try { // Adding elements to the queue queue.put("Element 1"); queue.put("Element 2"); // Get the number of elements in the queue int currentSize = queue.size(); // 2 // Check if the queue is empty boolean isQueueEmpty = queue.isEmpty(); // false // Get the remaining capacity in the queue int remainingSpace = queue.remainingCapacity(); // 1 // Add another element to fill the queue queue.put("Element 3"); // Check the size and remaining capacity after adding the third element currentSize = queue.size(); // 3 remainingSpace = queue.remainingCapacity(); // 0 } catch (InterruptedException e) { e.printStackTrace(); } } }
This code adds elements to a BlockingQueue
, checks the current size, verifies whether the queue is empty, and determines the remaining capacity, then updates these values after filling the queue completely.
Realizing a Real-Life Example in Code
😭 Limitations
One key limitation is performance: due to the locking operations involved, performance may be reduced compared to non-synchronized collections. Additionally, resources can become a concern as large queues demand more memory and CPU time to handle the locks and synchronization processes.
💪 Advantages
On the positive side, the system is secure in multi-threading, offering safe communication between threads without requiring manual synchronization management. It also simplifies code by avoiding complex synchronization and blocking constructs. Furthermore, the flexibility of different BlockingQueue
implementations means that they can be suited to various usage scenarios.
1. What is a BlockingQueueue in Java?
2. What are the main methods of BlockingQueue blocking a thread?
3. What is BlockingQueue useful for in multithreaded applications?
Thanks for your feedback!