Notice: This page requires JavaScript to function properly.
Please enable JavaScript in your browser settings or update your browser.
BlockingQueue and its Implementations | Synchronized Collections
Multithreading in Java
course content

Contenido del Curso

Multithreading in Java

Multithreading in Java

1. Multithreading Basics
2. Synchronized Collections
3. High-level Synchronization Mechanisms
4. Multithreading Best Practices

bookBlockingQueue and its Implementations

Basic BlockingQueue Implementations

We won't go through each realization in detail, as it would take a lot of time, and it's unlikely you'll need them all. I will talk about the general concepts and what constructors they have.

Real Life Example

Imagine a factory where one thread, the producer, creates parts, and another thread, the consumer, processes them. The producer places the parts into a queue, while the consumer retrieves and processes them from the queue. If the queue runs out of parts, the consumer waits for the producer to add more. Conversely, if the queue is full, the producer waits for the consumer to make space.

Note

A little below we will implement this task in code.

Differences from Other Collection Types

The BlockingQueue provides automated synchronization, managing thread access to the queue without requiring manual synchronization. It also supports blocking operations for adding and retrieving items, a feature not found in other collections like ArrayList or LinkedList.

BlockingQueue Implementations

ArrayBlockingQueue: A size-limited queue that uses an array to store items.

java

Main

copy
123456789
// Constructor with fixed capacity BlockingQueue<String> queue1 = new ArrayBlockingQueue<>(5); // Constructor with fixed capacity and fair access BlockingQueue<String> queue2 = new ArrayBlockingQueue<>(5, true); // Constructor with fixed capacity and initial collection of elements Collection<String> initialElements = java.util.Arrays.asList("One", "Two", "Three"); BlockingQueue<String> queue3 = new ArrayBlockingQueue<>(5, false, initialElements);

Explanation

The true parameter enables a fair access policy by providing a FIFO order for thread access.

LinkedBlockingQueueue: A queue based on linked nodes that can be restricted or unrestricted.

java

Main

copy
123456789
// Constructor without capacity bounds BlockingQueue<String> queue1 = new LinkedBlockingQueue<>(); // Constructor with fixed capacity BlockingQueue<String> queue2 = new LinkedBlockingQueue<>(5); // Constructor with initial collection of elements Collection<String> initialElements = java.util.Arrays.asList("One", "Two", "Three"); BlockingQueue<String> queue3 = new LinkedBlockingQueue<>(initialElements);

PriorityBlockingQueue: An unbounded prioritized queue where elements are retrieved according to their natural order or as specified by a comparator.

java

Main

copy
12345678910111213
// Constructor without initial capacity (default is 11) BlockingQueue<Integer> queue1 = new PriorityBlockingQueue<>(); // Constructor with initial capacity BlockingQueue<Integer> queue2 = new PriorityBlockingQueue<>(5); // Constructor with initial capacity and comparator Comparator<Integer> comparator = Integer::compareTo; BlockingQueue<Integer> queue3 = new PriorityBlockingQueue<>(5, comparator); // Constructor with initial collection of elements Collection<Integer> initialElements = java.util.Arrays.asList(1, 3, 2); BlockingQueue<Integer> queue4 = new PriorityBlockingQueue<>(initialElements)

DelayQueue: A delayed queue where items can only be retrieved after their delay has expired.

java

DelayedElement

java

DelayQueueConstructors

copy
123456789101112131415161718
class DelayedElement implements Delayed { private final long expirationTime; // The time when the element will be available public DelayedElement(long delay, TimeUnit unit) { this.expirationTime = System.currentTimeMillis() + unit.toMillis(delay); } @Override public long getDelay(TimeUnit unit) { long delay = expirationTime - System.currentTimeMillis(); // Calculate the remaining delay return unit.convert(delay, TimeUnit.MILLISECONDS); // Convert the delay to the specified time unit } @Override public int compareTo(Delayed o) { return Long.compare(this.expirationTime, ((DelayedElement) o).expirationTime); } }

This code demonstrates the use of the DelayedElement class, which implements the Delayed interface, and the DelayQueue delay queue in Java. The DelayedElement class defines a getDelay method to calculate the remaining delay time and a compareTo method to compare objects based on the delay expiration time.

The main method creates two queues: queue1, an empty delay queue, and queue2, a queue initialized with elements that have a delay of 5 and 1 second, respectively.

The items in DelayQueueue become available for retrieval after the specified delay time has elapsed.

SynchronousQueueue: A queue without capacity, where each insert operation must wait for the corresponding extract operation and vice versa.

java

Main

copy
12345
// Constructor without fair access BlockingQueue<String> queue1 = new SynchronousQueue<>(); // Constructor with fair access BlockingQueue<String> queue2 = new SynchronousQueue<>(true);

The Main Methods of the BlockingQueue:

Adding elements:

The void put(E e) method inserts an item into the queue, blocking the thread if the queue is full. Alternatively, the boolean offer(E e, long timeout, TimeUnit unit) method attempts to add an item to the queue, waiting for the specified time if the queue is full.

java

Main

copy
1234567891011121314151617
public class BlockingQueueExample { public static void main(String[] args) { BlockingQueue<String> queue = new ArrayBlockingQueue<>(2); try { queue.put("Element 1"); // Insert the first element, no blocking. queue.put("Element 2"); // Insert the second element, no blocking. // Try to add the third element with a 2-second timeout. // Since the queue is full, it will wait for 2 seconds. boolean success = queue.offer("Element 3", 2, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); } } }

This example demonstrates inserting two elements into a BlockingQueue without blocking, followed by an attempt to add a third element with a 2-second timeout using the offer() method, which will wait if the queue is full.

Element Retrieval:

The E take() method retrieves and returns an item from the queue, blocking the thread if the queue is empty. Alternatively, the E poll(long timeout, TimeUnit unit) method attempts to retrieve an item from the queue, waiting for the specified time if the queue is empty.

java

Main

copy
1234567891011121314151617181920212223
public class BlockingQueueRetrievalExample { public static void main(String[] args) { BlockingQueue<String> queue = new ArrayBlockingQueue<>(2); try { // Adding elements to the queue queue.put("Element 1"); queue.put("Element 2"); // Retrieve and remove the first element, no blocking since the queue is not empty String item1 = queue.take(); // Returns "Element 1" // Attempt to retrieve and remove the next element with a 2-second timeout String item2 = queue.poll(2, TimeUnit.SECONDS); // Returns "Element 2" // Attempt to retrieve an element when the queue is empty, this will block for 2 seconds String item3 = queue.poll(2, TimeUnit.SECONDS); // Returns `null` after timeout } catch (InterruptedException e) { e.printStackTrace(); } } }

This code adds two elements to a BlockingQueue, retrieves and removes the first element immediately, attempts to retrieve the next element with a 2-second timeout, and finally tries to retrieve an element from an empty queue, which results in a null after the timeout.

Checking and Removing Elements:

The boolean remove(Object o) method removes the specified element from the queue if it is present. On the other hand, the boolean contains(Object o) method checks if the specified element is present in the queue without removing it.

java

Main

copy
1234567891011121314151617181920212223242526
public class BlockingQueueCheckRemoveExample { public static void main(String[] args) { BlockingQueue<String> queue = new ArrayBlockingQueue<>(2); try { // Adding elements to the queue queue.put("Element 1"); queue.put("Element 2"); // Check if "Element 1" is in the queue, should return `true` boolean containsElement1 = queue.contains("Element 1"); // true // Remove "Element 1" from the queue, should return `true` boolean removedElement1 = queue.remove("Element 1"); // true // Check if "Element 1" is still in the queue, should return `false` boolean containsElement1AfterRemoval = queue.contains("Element 1"); // false // Try to remove an element that is not in the queue, should return `false` boolean removedElement3 = queue.remove("Element 3"); // false } catch (InterruptedException e) { e.printStackTrace(); } } }

This code adds two elements to a BlockingQueue, checks for the presence of "Element 1", removes it, checks again to confirm its removal, and then attempts to remove a nonexistent element.

Polls the State of the Queue:

The int size() method returns the number of elements currently in the queue. To determine if the queue is empty, you can use the boolean isEmpty() method, which checks whether the queue has no elements. For queues with a fixed capacity, the int remainingCapacity() method provides the number of remaining spaces available in the queue.

java

Main

copy
123456789101112131415161718192021222324252627282930
public class BlockingQueueCapacityExample { public static void main(String[] args) { BlockingQueue<String> queue = new ArrayBlockingQueue<>(3); try { // Adding elements to the queue queue.put("Element 1"); queue.put("Element 2"); // Get the number of elements in the queue int currentSize = queue.size(); // 2 // Check if the queue is empty boolean isQueueEmpty = queue.isEmpty(); // false // Get the remaining capacity in the queue int remainingSpace = queue.remainingCapacity(); // 1 // Add another element to fill the queue queue.put("Element 3"); // Check the size and remaining capacity after adding the third element currentSize = queue.size(); // 3 remainingSpace = queue.remainingCapacity(); // 0 } catch (InterruptedException e) { e.printStackTrace(); } } }

This code adds elements to a BlockingQueue, checks the current size, verifies whether the queue is empty, and determines the remaining capacity, then updates these values after filling the queue completely.

Realizing a Real-Life Example in Code

😭 Limitations

One key limitation is performance: due to the locking operations involved, performance may be reduced compared to non-synchronized collections. Additionally, resources can become a concern as large queues demand more memory and CPU time to handle the locks and synchronization processes.

💪 Advantages

On the positive side, the system is secure in multi-threading, offering safe communication between threads without requiring manual synchronization management. It also simplifies code by avoiding complex synchronization and blocking constructs. Furthermore, the flexibility of different BlockingQueue implementations means that they can be suited to various usage scenarios.

1. What is a BlockingQueueue in Java?
2. What are the main methods of BlockingQueue blocking a thread?
3. What is BlockingQueue useful for in multithreaded applications?
What is a BlockingQueueue in Java?

What is a BlockingQueueue in Java?

Selecciona la respuesta correcta

What are the main methods of BlockingQueue blocking a thread?

What are the main methods of BlockingQueue blocking a thread?

Selecciona la respuesta correcta

What is BlockingQueue useful for in multithreaded applications?

What is BlockingQueue useful for in multithreaded applications?

Selecciona la respuesta correcta

¿Todo estuvo claro?

¿Cómo podemos mejorarlo?

¡Gracias por tus comentarios!

Sección 2. Capítulo 3
We're sorry to hear that something went wrong. What happened?
some-alt