Notice: This page requires JavaScript to function properly.
Please enable JavaScript in your browser settings or update your browser.
Lära BlockingQueue and its Implementations | Synchronized Collections
Multithreading in Java

book
BlockingQueue and its Implementations

Basic BlockingQueue Implementations

We won't go through each realization in detail, as it would take a lot of time, and it's unlikely you'll need them all. I will talk about the general concepts and what constructors they have.

Real Life Example

Imagine a factory where one thread, the producer, creates parts, and another thread, the consumer, processes them. The producer places the parts into a queue, while the consumer retrieves and processes them from the queue. If the queue runs out of parts, the consumer waits for the producer to add more. Conversely, if the queue is full, the producer waits for the consumer to make space.

Note

A little below we will implement this task in code.

Differences from Other Collection Types

The BlockingQueue provides automated synchronization, managing thread access to the queue without requiring manual synchronization. It also supports blocking operations for adding and retrieving items, a feature not found in other collections like ArrayList or LinkedList.

BlockingQueue Implementations

ArrayBlockingQueue: A size-limited queue that uses an array to store items.

Main.java

Main.java

copy
// Constructor with fixed capacity
BlockingQueue<String> queue1 = new ArrayBlockingQueue<>(5);

// Constructor with fixed capacity and fair access
BlockingQueue<String> queue2 = new ArrayBlockingQueue<>(5, true);

// Constructor with fixed capacity and initial collection of elements
Collection<String> initialElements = java.util.Arrays.asList("One", "Two", "Three");
BlockingQueue<String> queue3 = new ArrayBlockingQueue<>(5, false, initialElements);
123456789
// Constructor with fixed capacity BlockingQueue<String> queue1 = new ArrayBlockingQueue<>(5); // Constructor with fixed capacity and fair access BlockingQueue<String> queue2 = new ArrayBlockingQueue<>(5, true); // Constructor with fixed capacity and initial collection of elements Collection<String> initialElements = java.util.Arrays.asList("One", "Two", "Three"); BlockingQueue<String> queue3 = new ArrayBlockingQueue<>(5, false, initialElements);

Explanation

The true parameter enables a fair access policy by providing a FIFO order for thread access.

LinkedBlockingQueueue: A queue based on linked nodes that can be restricted or unrestricted.

Main.java

Main.java

copy
// Constructor without capacity bounds
BlockingQueue<String> queue1 = new LinkedBlockingQueue<>();

// Constructor with fixed capacity
BlockingQueue<String> queue2 = new LinkedBlockingQueue<>(5);

// Constructor with initial collection of elements
Collection<String> initialElements = java.util.Arrays.asList("One", "Two", "Three");
BlockingQueue<String> queue3 = new LinkedBlockingQueue<>(initialElements);
123456789
// Constructor without capacity bounds BlockingQueue<String> queue1 = new LinkedBlockingQueue<>(); // Constructor with fixed capacity BlockingQueue<String> queue2 = new LinkedBlockingQueue<>(5); // Constructor with initial collection of elements Collection<String> initialElements = java.util.Arrays.asList("One", "Two", "Three"); BlockingQueue<String> queue3 = new LinkedBlockingQueue<>(initialElements);

PriorityBlockingQueue: An unbounded prioritized queue where elements are retrieved according to their natural order or as specified by a comparator.

Main.java

Main.java

copy
// Constructor without initial capacity (default is 11)
BlockingQueue<Integer> queue1 = new PriorityBlockingQueue<>();

// Constructor with initial capacity
BlockingQueue<Integer> queue2 = new PriorityBlockingQueue<>(5);

// Constructor with initial capacity and comparator
Comparator<Integer> comparator = Integer::compareTo;
BlockingQueue<Integer> queue3 = new PriorityBlockingQueue<>(5, comparator);

// Constructor with initial collection of elements
Collection<Integer> initialElements = java.util.Arrays.asList(1, 3, 2);
BlockingQueue<Integer> queue4 = new PriorityBlockingQueue<>(initialElements)
12345678910111213
// Constructor without initial capacity (default is 11) BlockingQueue<Integer> queue1 = new PriorityBlockingQueue<>(); // Constructor with initial capacity BlockingQueue<Integer> queue2 = new PriorityBlockingQueue<>(5); // Constructor with initial capacity and comparator Comparator<Integer> comparator = Integer::compareTo; BlockingQueue<Integer> queue3 = new PriorityBlockingQueue<>(5, comparator); // Constructor with initial collection of elements Collection<Integer> initialElements = java.util.Arrays.asList(1, 3, 2); BlockingQueue<Integer> queue4 = new PriorityBlockingQueue<>(initialElements)

DelayQueue: A delayed queue where items can only be retrieved after their delay has expired.

DelayedElement.java

DelayedElement.java

DelayQueueConstructors.java

DelayQueueConstructors.java

copy
class DelayedElement implements Delayed {
private final long expirationTime; // The time when the element will be available

public DelayedElement(long delay, TimeUnit unit) {
this.expirationTime = System.currentTimeMillis() + unit.toMillis(delay);
}

@Override
public long getDelay(TimeUnit unit) {
long delay = expirationTime - System.currentTimeMillis(); // Calculate the remaining delay
return unit.convert(delay, TimeUnit.MILLISECONDS); // Convert the delay to the specified time unit
}

@Override
public int compareTo(Delayed o) {
return Long.compare(this.expirationTime, ((DelayedElement) o).expirationTime);
}
}
123456789101112131415161718
class DelayedElement implements Delayed { private final long expirationTime; // The time when the element will be available public DelayedElement(long delay, TimeUnit unit) { this.expirationTime = System.currentTimeMillis() + unit.toMillis(delay); } @Override public long getDelay(TimeUnit unit) { long delay = expirationTime - System.currentTimeMillis(); // Calculate the remaining delay return unit.convert(delay, TimeUnit.MILLISECONDS); // Convert the delay to the specified time unit } @Override public int compareTo(Delayed o) { return Long.compare(this.expirationTime, ((DelayedElement) o).expirationTime); } }

This code demonstrates the use of the DelayedElement class, which implements the Delayed interface, and the DelayQueue delay queue in Java. The DelayedElement class defines a getDelay method to calculate the remaining delay time and a compareTo method to compare objects based on the delay expiration time.

The main method creates two queues: queue1, an empty delay queue, and queue2, a queue initialized with elements that have a delay of 5 and 1 second, respectively.

The items in DelayQueueue become available for retrieval after the specified delay time has elapsed.

SynchronousQueueue: A queue without capacity, where each insert operation must wait for the corresponding extract operation and vice versa.

Main.java

Main.java

copy
// Constructor without fair access
BlockingQueue<String> queue1 = new SynchronousQueue<>();

// Constructor with fair access
BlockingQueue<String> queue2 = new SynchronousQueue<>(true);
12345
// Constructor without fair access BlockingQueue<String> queue1 = new SynchronousQueue<>(); // Constructor with fair access BlockingQueue<String> queue2 = new SynchronousQueue<>(true);

The Main Methods of the BlockingQueue:

Adding elements:

The void put(E e) method inserts an item into the queue, blocking the thread if the queue is full. Alternatively, the boolean offer(E e, long timeout, TimeUnit unit) method attempts to add an item to the queue, waiting for the specified time if the queue is full.

Main.java

Main.java

copy
public class BlockingQueueExample {
public static void main(String[] args) {
BlockingQueue<String> queue = new ArrayBlockingQueue<>(2);

try {
queue.put("Element 1"); // Insert the first element, no blocking.
queue.put("Element 2"); // Insert the second element, no blocking.

// Try to add the third element with a 2-second timeout.
// Since the queue is full, it will wait for 2 seconds.
boolean success = queue.offer("Element 3", 2, TimeUnit.SECONDS);

} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
1234567891011121314151617
public class BlockingQueueExample { public static void main(String[] args) { BlockingQueue<String> queue = new ArrayBlockingQueue<>(2); try { queue.put("Element 1"); // Insert the first element, no blocking. queue.put("Element 2"); // Insert the second element, no blocking. // Try to add the third element with a 2-second timeout. // Since the queue is full, it will wait for 2 seconds. boolean success = queue.offer("Element 3", 2, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); } } }

This example demonstrates inserting two elements into a BlockingQueue without blocking, followed by an attempt to add a third element with a 2-second timeout using the offer() method, which will wait if the queue is full.

Element Retrieval:

The E take() method retrieves and returns an item from the queue, blocking the thread if the queue is empty. Alternatively, the E poll(long timeout, TimeUnit unit) method attempts to retrieve an item from the queue, waiting for the specified time if the queue is empty.

Main.java

Main.java

copy
public class BlockingQueueRetrievalExample {
public static void main(String[] args) {
BlockingQueue<String> queue = new ArrayBlockingQueue<>(2);

try {
// Adding elements to the queue
queue.put("Element 1");
queue.put("Element 2");

// Retrieve and remove the first element, no blocking since the queue is not empty
String item1 = queue.take(); // Returns "Element 1"

// Attempt to retrieve and remove the next element with a 2-second timeout
String item2 = queue.poll(2, TimeUnit.SECONDS); // Returns "Element 2"

// Attempt to retrieve an element when the queue is empty, this will block for 2 seconds
String item3 = queue.poll(2, TimeUnit.SECONDS); // Returns `null` after timeout

} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
1234567891011121314151617181920212223
public class BlockingQueueRetrievalExample { public static void main(String[] args) { BlockingQueue<String> queue = new ArrayBlockingQueue<>(2); try { // Adding elements to the queue queue.put("Element 1"); queue.put("Element 2"); // Retrieve and remove the first element, no blocking since the queue is not empty String item1 = queue.take(); // Returns "Element 1" // Attempt to retrieve and remove the next element with a 2-second timeout String item2 = queue.poll(2, TimeUnit.SECONDS); // Returns "Element 2" // Attempt to retrieve an element when the queue is empty, this will block for 2 seconds String item3 = queue.poll(2, TimeUnit.SECONDS); // Returns `null` after timeout } catch (InterruptedException e) { e.printStackTrace(); } } }

This code adds two elements to a BlockingQueue, retrieves and removes the first element immediately, attempts to retrieve the next element with a 2-second timeout, and finally tries to retrieve an element from an empty queue, which results in a null after the timeout.

Checking and Removing Elements:

The boolean remove(Object o) method removes the specified element from the queue if it is present. On the other hand, the boolean contains(Object o) method checks if the specified element is present in the queue without removing it.

Main.java

Main.java

copy
public class BlockingQueueCheckRemoveExample {
public static void main(String[] args) {
BlockingQueue<String> queue = new ArrayBlockingQueue<>(2);

try {
// Adding elements to the queue
queue.put("Element 1");
queue.put("Element 2");

// Check if "Element 1" is in the queue, should return `true`
boolean containsElement1 = queue.contains("Element 1"); // true

// Remove "Element 1" from the queue, should return `true`
boolean removedElement1 = queue.remove("Element 1"); // true

// Check if "Element 1" is still in the queue, should return `false`
boolean containsElement1AfterRemoval = queue.contains("Element 1"); // false

// Try to remove an element that is not in the queue, should return `false`
boolean removedElement3 = queue.remove("Element 3"); // false

} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
1234567891011121314151617181920212223242526
public class BlockingQueueCheckRemoveExample { public static void main(String[] args) { BlockingQueue<String> queue = new ArrayBlockingQueue<>(2); try { // Adding elements to the queue queue.put("Element 1"); queue.put("Element 2"); // Check if "Element 1" is in the queue, should return `true` boolean containsElement1 = queue.contains("Element 1"); // true // Remove "Element 1" from the queue, should return `true` boolean removedElement1 = queue.remove("Element 1"); // true // Check if "Element 1" is still in the queue, should return `false` boolean containsElement1AfterRemoval = queue.contains("Element 1"); // false // Try to remove an element that is not in the queue, should return `false` boolean removedElement3 = queue.remove("Element 3"); // false } catch (InterruptedException e) { e.printStackTrace(); } } }

This code adds two elements to a BlockingQueue, checks for the presence of "Element 1", removes it, checks again to confirm its removal, and then attempts to remove a nonexistent element.

Polls the State of the Queue:

The int size() method returns the number of elements currently in the queue. To determine if the queue is empty, you can use the boolean isEmpty() method, which checks whether the queue has no elements. For queues with a fixed capacity, the int remainingCapacity() method provides the number of remaining spaces available in the queue.

Main.java

Main.java

copy
public class BlockingQueueCapacityExample {
public static void main(String[] args) {
BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);

try {
// Adding elements to the queue
queue.put("Element 1");
queue.put("Element 2");

// Get the number of elements in the queue
int currentSize = queue.size(); // 2

// Check if the queue is empty
boolean isQueueEmpty = queue.isEmpty(); // false

// Get the remaining capacity in the queue
int remainingSpace = queue.remainingCapacity(); // 1

// Add another element to fill the queue
queue.put("Element 3");

// Check the size and remaining capacity after adding the third element
currentSize = queue.size(); // 3
remainingSpace = queue.remainingCapacity(); // 0

} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
123456789101112131415161718192021222324252627282930
public class BlockingQueueCapacityExample { public static void main(String[] args) { BlockingQueue<String> queue = new ArrayBlockingQueue<>(3); try { // Adding elements to the queue queue.put("Element 1"); queue.put("Element 2"); // Get the number of elements in the queue int currentSize = queue.size(); // 2 // Check if the queue is empty boolean isQueueEmpty = queue.isEmpty(); // false // Get the remaining capacity in the queue int remainingSpace = queue.remainingCapacity(); // 1 // Add another element to fill the queue queue.put("Element 3"); // Check the size and remaining capacity after adding the third element currentSize = queue.size(); // 3 remainingSpace = queue.remainingCapacity(); // 0 } catch (InterruptedException e) { e.printStackTrace(); } } }

This code adds elements to a BlockingQueue, checks the current size, verifies whether the queue is empty, and determines the remaining capacity, then updates these values after filling the queue completely.

Realizing a Real-Life Example in Code

😭 Limitations

One key limitation is performance: due to the locking operations involved, performance may be reduced compared to non-synchronized collections. Additionally, resources can become a concern as large queues demand more memory and CPU time to handle the locks and synchronization processes.

💪 Advantages

On the positive side, the system is secure in multi-threading, offering safe communication between threads without requiring manual synchronization management. It also simplifies code by avoiding complex synchronization and blocking constructs. Furthermore, the flexibility of different BlockingQueue implementations means that they can be suited to various usage scenarios.

1. What is a BlockingQueueue in Java?

2. What are the main methods of BlockingQueue blocking a thread?

3. What is BlockingQueue useful for in multithreaded applications?

question mark

What is a BlockingQueueue in Java?

Select the correct answer

question mark

What are the main methods of BlockingQueue blocking a thread?

Select the correct answer

question mark

What is BlockingQueue useful for in multithreaded applications?

Select the correct answer

Var allt tydligt?

Hur kan vi förbättra det?

Tack för dina kommentarer!

Avsnitt 2. Kapitel 3

Fråga AI

expand

Fråga AI

ChatGPT

Fråga vad du vill eller prova någon av de föreslagna frågorna för att starta vårt samtal

We use cookies to make your experience better!
some-alt