Java TransferQueue против. LinkedTransferQueue

В Java интерфейс TransferQueue представляет собой параллельную реализацию BlockingQueue с поддержкой «синхронной передачи сообщений» между производителями и потребителями. Ключевой особенностью TransferQueue является метод transfer(), который блокирует, пока сообщение не будет передано потребителю.

1. Интерфейс TransferQueue

TransferQueue — это интерфейс в пакете java.util.concurrent. Это блокирующая очередь, в которой производители могут ожидать получения элементов потребителями.

public interface TransferQueue<E> extends BlockingQueue<E> { ... }

TransferQueue полезен, например, в приложениях передачи сообщений, в которых производители иногда(используя метод transfer()) ожидают получения элементов потребителями, вызывающими take или poll, а в других случаях ставят элементы в очередь(с помощью метода put()), не дожидаясь получения.

Когда производитель обращается к TransferQueue для передачи сообщения и есть потребители, ожидающие приема сообщения, производитель напрямую передает сообщение потребителю.

Если ожидающих потребителей нет, то производитель не будет напрямую отправлять сообщение и возвращать его, а будет ждать, пока какой-либо потребитель не будет доступен для использования сообщения.

2. Класс LinkedTransferQueue

Класс LinkedTransferQueue — это конкретная реализация интерфейса TransferQueue в Java. Это неограниченная, потокобезопасная очередь, основанная на связанных узлах.

Давайте отметим несколько важных моментов относительно LinkedTransferQueue в Java.

  • Это неограниченная очередь на связанных узлах.
  • Эта очередь упорядочивает элементы по принципу FIFO(первым пришел — первым ушел) относительно любого данного производителя.
  • Элементы вставляются в конец и извлекаются из начала очереди.
  • Обеспечивает блокировку операций вставки и извлечения.
  • Не допускается наличие объектов NULL.
  • LinkedTransferQueue является потокобезопасным.
  • Метод size() НЕ является операцией с постоянным временем из-за асинхронной природы, поэтому может выдавать неточные результаты, если эта коллекция изменяется во время обхода.
  • Массовые операции addAll, removeAll, remainAll, containsAll, equals и toArray не гарантированно будут выполнены атомарно. Например, итератор, работающий одновременно с операцией addAll, может просматривать только некоторые из добавленных элементов.

3. Пример Java LinkedTransferQueue

3.1. Неблокируемая вставка и удаление

Следующая программа на Java представляет собой очень простой пример добавления и опроса сообщений из LinkedTransferQueue.

LinkedTransferQueue<Integer> linkedTransferQueue = new LinkedTransferQueue<>();linkedTransferQueue.put(1);System.out.println("Added Message = 1");Integer message = linkedTransferQueue.poll();System.out.println("Recieved Message = " + message);

Вывод программы.

Added Message = 1Recieved Message = 1

3.2 Блокировка вставки и извлечения

Пример Java для помещения и извлечения элементов из LinkedTransferQueue с использованием блокирующих вставок и извлечения.

  • Поток-производитель будет ждать, пока не появится потребитель, готовый взять элемент из очереди.
  • Потребительский поток будет ждать, если очередь пуста. Как только в очереди появляется один элемент, он извлекает его. Только после того, как потребитель принял сообщение, производитель может отправить другое сообщение.
import java.util.Random;import java.util.concurrent.LinkedTransferQueue;import java.util.concurrent.TimeUnit;public class LinkedTransferQueueExample {public static void main(String[] args) throws InterruptedException {LinkedTransferQueue<Integer> linkedTransferQueue = new LinkedTransferQueue<>();new Thread(() -> {Random random = new Random(1);try {while(true) {System.out.println("Producer is waiting to transfer message...");Integer message = random.nextInt();boolean added = linkedTransferQueue.tryTransfer(message);if(added) {System.out.println("Producer added the message - " + message);}Thread.sleep(TimeUnit.SECONDS.toMillis(3));}} catch(InterruptedException e) {e.printStackTrace();}}).start();new Thread(() -> {try {while(true) {System.out.println("Consumer is waiting to take message...");Integer message = linkedTransferQueue.take();System.out.println("Consumer recieved the message - " + message);Thread.sleep(TimeUnit.SECONDS.toMillis(3));}} catch(InterruptedException e) {e.printStackTrace();}}).start();}}

Вывод программы.

Producer is waiting to transfer message...Consumer is waiting to take message...Producer is waiting to transfer message...Producer added the message - 431529176Consumer recieved the message - 431529176Consumer is waiting to take message...Producer is waiting to transfer message...Producer added the message - 1761283695Consumer recieved the message - 1761283695Consumer is waiting to take message...Producer is waiting to transfer message...Producer added the message - 1749940626Consumer recieved the message - 1749940626Consumer is waiting to take message...Producer is waiting to transfer message...Producer added the message - 892128508Consumer recieved the message - 892128508Consumer is waiting to take message...Producer is waiting to transfer message...Producer added the message - 155629808Consumer recieved the message - 155629808

Обратите внимание, что в консоли могут быть некоторые операторы печати, где кажется, что потребитель потребил сообщение даже до того, как производитель его создал. Не путайте, это из-за параллельной природы примера. На самом деле, это работает так, как и ожидалось.

4. Создание экземпляра LinkedTransferQueue

Класс LinkedTransferQueue предоставляет различные способы построения очереди в Java.

  • LinkedTransferQueue(): создает изначально пустую LinkedTransferQueue.
  • LinkedTransferQueue(Collection c): создает LinkedTransferQueue, изначально содержащую элементы заданной коллекции, добавленные в порядке обхода итератора коллекции.

5. Методы LinkedTransferQueue

Класс LinkedTransferQueue содержит нижеприведенные важные методы, которые вам следует знать.

  • Object take() : извлекает и удаляет заголовок этой очереди, при необходимости ожидая, пока элемент станет доступным.
  • void transfer(Object o) : передает элемент потребителю, ожидая при необходимости сделать это.
  • boolean tryTransfer(Object o) : немедленно передает элемент ожидающему потребителю, если это возможно.
  • boolean tryTransfer(Object o, long timeout, TimeUnit unit): передает элемент потребителю, если это возможно сделать до истечения времени ожидания.
  • int getWaitingConsumerCount() : возвращает оценку количества потребителей, ожидающих получения элементов через BlockingQueue.take() или синхронизированный опрос.
  • boolean hasWaitingConsumer() : возвращает true, если есть хотя бы один потребитель, ожидающий получения элемента через BlockingQueue.take() или синхронизированный опрос.
  • void put(Object o) : вставляет указанный элемент в конец очереди.</li
  • boolean add(object) : вставляет указанный элемент в конец очереди.
  • boolean offer(object) : вставляет указанный элемент в конец очереди.
  • boolean remove(object) : удаляет один экземпляр указанного элемента из этой очереди, если он присутствует.
  • Object peek() : извлекает, но не удаляет, заголовок этой очереди или возвращает null, если эта очередь пуста.
  • Object poll(): извлекает и удаляет заголовок этой очереди или возвращает null, если эта очередь пуста.
  • Object poll(timeout, timeUnit): извлекает и удаляет заголовок этой очереди, ожидая в течение указанного времени ожидания, если это необходимо для того, чтобы элемент стал доступен.
  • void clear() : удаляет все элементы из очереди.
  • boolean contains(Object o) : возвращает true, если эта очередь содержит указанный элемент.
  • Итератор iterator() : возвращает итератор по элементам в этой очереди в правильной последовательности.
  • int size() : возвращает количество элементов в этой очереди.
  • int drainTo(Collection c) : удаляет все доступные элементы из этой очереди и добавляет их в указанную коллекцию.
  • int drainTo(Collection c, int maxElements) : удаляет не более указанного количества доступных элементов из этой очереди и добавляет их в указанную коллекцию.
  • int remainCapacity() : возвращает количество дополнительных элементов, которые эта очередь может в идеале(при отсутствии ограничений памяти или ресурсов) принять без блокировки.
  • Object[] toArray() : возвращает массив, содержащий все элементы в этой очереди в правильной последовательности.

6. Заключение

В этом руководстве по Java LinkedTransferQueue мы научились использовать класс LinkedTransferQueue, представляющий собой реализацию параллельной блокирующей очереди, в которой производители могут ожидать получения сообщений от потребителей.

Мы также изучили несколько важных методов и конструкторов класса LinkedTransferQueue.

Пишите мне свои вопросы в комментариях.

Ссылки:

Интерфейс TransferQueue Java Docs
Документация Java по классу LinkedTransferQueue

Прокрутить вверх