В Java интерфейс TransferQueue представляет собой параллельную реализацию BlockingQueue с поддержкой «синхронной передачи сообщений» между производителями и потребителями. Ключевой особенностью TransferQueue является метод transfer(), который блокирует, пока сообщение не будет передано потребителю.
1. Интерфейс TransferQueue
TransferQueue — это интерфейс в пакете java.util.concurrent. Это блокирующая очередь, в которой производители могут ожидать получения элементов потребителями.
public interface TransferQueue<E> extends BlockingQueue<E> { ... }
TransferQueue полезен, например, в приложениях передачи сообщений, в которых производители иногда(используя метод transfer()) ожидают получения элементов потребителями, вызывающими take или poll, а в других случаях ставят элементы в очередь(с помощью метода put()), не дожидаясь получения.
Когда производитель обращается к TransferQueue для передачи сообщения и есть потребители, ожидающие приема сообщения, производитель напрямую передает сообщение потребителю.
Если ожидающих потребителей нет, то производитель не будет напрямую отправлять сообщение и возвращать его, а будет ждать, пока какой-либо потребитель не будет доступен для использования сообщения.
2. Класс LinkedTransferQueue
Класс LinkedTransferQueue — это конкретная реализация интерфейса TransferQueue в Java. Это неограниченная, потокобезопасная очередь, основанная на связанных узлах.
Давайте отметим несколько важных моментов относительно LinkedTransferQueue в Java.
- Это неограниченная очередь на связанных узлах.
- Эта очередь упорядочивает элементы по принципу FIFO(первым пришел — первым ушел) относительно любого данного производителя.
- Элементы вставляются в конец и извлекаются из начала очереди.
- Обеспечивает блокировку операций вставки и извлечения.
- Не допускается наличие объектов NULL.
- LinkedTransferQueue является потокобезопасным.
- Метод size() НЕ является операцией с постоянным временем из-за асинхронной природы, поэтому может выдавать неточные результаты, если эта коллекция изменяется во время обхода.
- Массовые операции addAll, removeAll, remainAll, containsAll, equals и toArray не гарантированно будут выполнены атомарно. Например, итератор, работающий одновременно с операцией addAll, может просматривать только некоторые из добавленных элементов.
3. Пример Java LinkedTransferQueue
3.1. Неблокируемая вставка и удаление
Следующая программа на Java представляет собой очень простой пример добавления и опроса сообщений из LinkedTransferQueue.
LinkedTransferQueue<Integer> linkedTransferQueue = new LinkedTransferQueue<>();linkedTransferQueue.put(1);System.out.println("Added Message = 1");Integer message = linkedTransferQueue.poll();System.out.println("Recieved Message = " + message);
Вывод программы.
Added Message = 1Recieved Message = 1
3.2 Блокировка вставки и извлечения
Пример Java для помещения и извлечения элементов из LinkedTransferQueue с использованием блокирующих вставок и извлечения.
- Поток-производитель будет ждать, пока не появится потребитель, готовый взять элемент из очереди.
- Потребительский поток будет ждать, если очередь пуста. Как только в очереди появляется один элемент, он извлекает его. Только после того, как потребитель принял сообщение, производитель может отправить другое сообщение.
import java.util.Random;import java.util.concurrent.LinkedTransferQueue;import java.util.concurrent.TimeUnit;public class LinkedTransferQueueExample {public static void main(String[] args) throws InterruptedException {LinkedTransferQueue<Integer> linkedTransferQueue = new LinkedTransferQueue<>();new Thread(() -> {Random random = new Random(1);try {while(true) {System.out.println("Producer is waiting to transfer message...");Integer message = random.nextInt();boolean added = linkedTransferQueue.tryTransfer(message);if(added) {System.out.println("Producer added the message - " + message);}Thread.sleep(TimeUnit.SECONDS.toMillis(3));}} catch(InterruptedException e) {e.printStackTrace();}}).start();new Thread(() -> {try {while(true) {System.out.println("Consumer is waiting to take message...");Integer message = linkedTransferQueue.take();System.out.println("Consumer recieved the message - " + message);Thread.sleep(TimeUnit.SECONDS.toMillis(3));}} catch(InterruptedException e) {e.printStackTrace();}}).start();}}
Вывод программы.
Producer is waiting to transfer message...Consumer is waiting to take message...Producer is waiting to transfer message...Producer added the message - 431529176Consumer recieved the message - 431529176Consumer is waiting to take message...Producer is waiting to transfer message...Producer added the message - 1761283695Consumer recieved the message - 1761283695Consumer is waiting to take message...Producer is waiting to transfer message...Producer added the message - 1749940626Consumer recieved the message - 1749940626Consumer is waiting to take message...Producer is waiting to transfer message...Producer added the message - 892128508Consumer recieved the message - 892128508Consumer is waiting to take message...Producer is waiting to transfer message...Producer added the message - 155629808Consumer recieved the message - 155629808
Обратите внимание, что в консоли могут быть некоторые операторы печати, где кажется, что потребитель потребил сообщение даже до того, как производитель его создал. Не путайте, это из-за параллельной природы примера. На самом деле, это работает так, как и ожидалось.
4. Создание экземпляра LinkedTransferQueue
Класс LinkedTransferQueue предоставляет различные способы построения очереди в Java.
- LinkedTransferQueue(): создает изначально пустую LinkedTransferQueue.
- LinkedTransferQueue(Collection c): создает LinkedTransferQueue, изначально содержащую элементы заданной коллекции, добавленные в порядке обхода итератора коллекции.
5. Методы LinkedTransferQueue
Класс LinkedTransferQueue содержит нижеприведенные важные методы, которые вам следует знать.
- Object take() : извлекает и удаляет заголовок этой очереди, при необходимости ожидая, пока элемент станет доступным.
- void transfer(Object o) : передает элемент потребителю, ожидая при необходимости сделать это.
- boolean tryTransfer(Object o) : немедленно передает элемент ожидающему потребителю, если это возможно.
- boolean tryTransfer(Object o, long timeout, TimeUnit unit): передает элемент потребителю, если это возможно сделать до истечения времени ожидания.
- int getWaitingConsumerCount() : возвращает оценку количества потребителей, ожидающих получения элементов через BlockingQueue.take() или синхронизированный опрос.
- boolean hasWaitingConsumer() : возвращает true, если есть хотя бы один потребитель, ожидающий получения элемента через BlockingQueue.take() или синхронизированный опрос.
- void put(Object o) : вставляет указанный элемент в конец очереди.</li
- boolean add(object) : вставляет указанный элемент в конец очереди.
- boolean offer(object) : вставляет указанный элемент в конец очереди.
- boolean remove(object) : удаляет один экземпляр указанного элемента из этой очереди, если он присутствует.
- Object peek() : извлекает, но не удаляет, заголовок этой очереди или возвращает null, если эта очередь пуста.
- Object poll(): извлекает и удаляет заголовок этой очереди или возвращает null, если эта очередь пуста.
- Object poll(timeout, timeUnit): извлекает и удаляет заголовок этой очереди, ожидая в течение указанного времени ожидания, если это необходимо для того, чтобы элемент стал доступен.
- void clear() : удаляет все элементы из очереди.
- boolean contains(Object o) : возвращает true, если эта очередь содержит указанный элемент.
- Итератор iterator() : возвращает итератор по элементам в этой очереди в правильной последовательности.
- int size() : возвращает количество элементов в этой очереди.
- int drainTo(Collection c) : удаляет все доступные элементы из этой очереди и добавляет их в указанную коллекцию.
- int drainTo(Collection c, int maxElements) : удаляет не более указанного количества доступных элементов из этой очереди и добавляет их в указанную коллекцию.
- int remainCapacity() : возвращает количество дополнительных элементов, которые эта очередь может в идеале(при отсутствии ограничений памяти или ресурсов) принять без блокировки.
- Object[] toArray() : возвращает массив, содержащий все элементы в этой очереди в правильной последовательности.
6. Заключение
В этом руководстве по Java LinkedTransferQueue мы научились использовать класс LinkedTransferQueue, представляющий собой реализацию параллельной блокирующей очереди, в которой производители могут ожидать получения сообщений от потребителей.
Мы также изучили несколько важных методов и конструкторов класса LinkedTransferQueue.
Пишите мне свои вопросы в комментариях.
Ссылки:
Интерфейс TransferQueue Java Docs
Документация Java по классу LinkedTransferQueue