Java SynchronousQueue с примером

Java SynchronousQueue — это особый тип BlockingQueue без внутренней емкости, который в основном используется для обмена данными между двумя потоками, аналогично методу производитель-потребитель. В этой статье рассматриваются основы SynchronousQueue и его основные функции с практическими примерами.

1. Введение

Как мы знаем, очередь — это линейная структура данных, которая хранит элементы в порядке FIFO(First In, First Out). Каждая очередь имеет два конца: передний и задний. Новые элементы добавляются в конец и удаляются из начала. Интерфейс очереди Java и его классы реализации, такие как TransferQueue, BlockingQueue, LinkedList и PriorityQueue, используют структуру данных очереди.

SynchronousQueue — это особый вид BlockingQueue.

  • Он имеет нулевую или нулевую внутреннюю емкость и будет заблокирован до тех пор, пока другой поток не будет готов принять элемент на другом конце.
  • Он в основном используется для обмена данными между потоками потокобезопасным способом. Он синхронен в том смысле, что он передает данные синхронно другим потокам и ждет, пока потоки возьмут данные, вместо того, чтобы просто поместить данные в очередь и вернуть то, что делают другие очереди. Это означает, что каждая операция вставки должна ждать соответствующей операции удаления другого потока и наоборот.
  • Элементы NULL не допускаются; добавление значения NULL вызовет исключение NullPointerException.
  • Мы не можем выполнить итерацию по SynchronousQueue, поскольку итерировать нечего.
  • Мы не можем просмотреть SynchronousQueue, поскольку элемент присутствует только тогда, когда мы пытаемся его удалить, и поэтому peek() всегда возвращает null.

Поскольку SynchronousQueue не имеет емкости, он действует как пустая коллекция, а другие методы очереди будут демонстрировать особое поведение для SynchronousQueue.

  • isEmpty(): всегда возвращает true.
  • iterator(): возвращает пустой итератор, в котором hasNext() всегда возвращает false.
  • peek(): всегда возвращает null.
  • size(): всегда возвращает ноль.

Мы можем создать объект SynchronousQueue, используя следующие конструкторы:

  • SynchronousQueue(): создает экземпляр с несправедливой политикой доступа, т.е. если несколько потоков ожидают, им будет предоставлен доступ в неопределенном порядке.
  • SynchronousQueue(boolean fair): создает экземпляр с политикой равноправного доступа, т.е. ожидающим потокам будет предоставлен доступ в порядке FIFO(первым пришел — первым обслужен).
BlockingQueue<String> syncQueue = new SynchronousQueue<>();BlockingQueue<String> syncQueue = new SynchronousQueue<>(true);

2. Работа с SynchronousQueue

SynchronousQueue поддерживает две основные операции put() и take(), и обе они являются блокирующими.

  • put(): добавить элемент в очередь, но поток, вызывающий этот метод, будет блокировать или ждать, пока другой поток получит этот элемент с помощью метода take().
try {syncQueue.put("Element");} catch(InterruptedException ie) {ie.printStackTrace();}
  • take(): Извлечь и удалить элемент из очереди. Поток, вызывающий этот метод, будет ждать, пока другой поток вставит элемент.
try {String element = syncQueue.take();} catch(InterruptedException ie) {ie.printStackTrace();}

3. Решение проблемы «производитель-потребитель» с использованием SynchronousQueue

SynchronousQueue предназначен для синхронной передачи объектов из потоков-производителей в потоки-потребители.

  • Когда поток-производитель помещает объект в очередь, он должен дождаться, пока поток-потребитель его примет.
  • Когда поток-потребитель хочет взять элемент из очереди, он должен дождаться отправки от потока-производителя.

Одним из возможных решений является решение проблемы производителя-потребителя с помощью BlockingQueue, который предоставляет буфер для хранения данных и управляет потоком, блокируя поток производителя, если буфер заполнен, и блокируя поток потребителя, если буфер пуст.

Но с SynchronousQueue элементы передаются от производителя к потребителям без удержания в очереди, в отличие от BlockingQueue, в которой элементы помещаются, ожидая, пока их примут потоки потребителей. Поэтому, если мы хотим, чтобы производитель-потребитель работал синхронно вместо асинхронной связи, то мы можем использовать SynchronousQueue для решения этого конкретного варианта использования.

Давайте теперь создадим поток-производитель, публикующий событие в очереди, и поток-потребитель, потребляющий это событие из очереди.

BlockingQueue<String> syncQueue = new SynchronousQueue<>();Runnable producer =() -> {try {String event = "Event-1";syncQueue.put(event);System.out.println("Event Published : " + event);} catch(InterruptedException e) {e.printStackTrace();}};Runnable consumer =() -> {try {String event = syncQueue.take();System.out.println("Event Consumed : " + event);} catch(InterruptedException e) {e.printStackTrace();}};

Запустите оба потока и следите за выводом программы.

// Producer Thread creationThread producerThread = new Thread(producer);// Consumer Thread creationThread consumerThread = new Thread(consumer);// Starting Producer & Consumer ThreadsproducerThread.start();consumerThread.start();

Вывод программы.

 Событие опубликовано: Событие-1Событие потреблено: Событие-1

Если мы обратим внимание на вывод программы, то не увидим никакой разницы с традиционной проблемой производитель-потребитель. Чтобы понять, как работает SynchronousQueue, нам нужно выполнить следующие действия,

  • Просто запустите поток производителя, закомментировав строку consumerThread.start(); Если поток потребителя не запущен, то производитель заблокируется при вызове syncQueue.put(event);, и мы не увидим сообщение «Событие опубликовано: Событие-1» на консоли.
  • Аналогично, просто запустите поток потребителя, закомментировав строку ProducerThread.start(); и увидите, что поток потребителя блокируется в методе take() и не будет ничего потреблять из очереди.

Это происходит из-за особого поведения SynchronousQueue, которое гарантирует, что поток, вставляющий данные, будет блокироваться до тех пор, пока не появится поток, который удалит эти данные, и наоборот.

4. Когда использовать?

SynchronousQueue похож на каналы рандеву и лучше всего подходит для проектов с передачей данных, в которых объект, работающий в одном потоке, должен синхронизироваться с объектом, работающим в другом потоке, чтобы передать некоторую информацию, событие или задачу.

  • Если мы хотим безопасно совместно использовать переменную между двумя потоками, мы можем поместить эту переменную в SynchronousQueue и позволить другому потоку взять ее из очереди.
  • SynchronousQueue также используется в CachedThreadPool для достижения эффекта создания неограниченного количества(Integer.MAX) потоков по мере поступления задач. CachedThreadPool будет иметь coreSize как 0 и maxPoolSize как Integer.MAX с SynchronousQueue.
  • В сценариях на основе производителя и потребителя SynchronousQueue также является выбором, когда имеется достаточно потоков потребителей, поэтому потоку производителя не нужно ставить элементы в очередь.

5. Заключение

Мы узнали о Java SynchronousQueue, его основных функциях и его использовании с практическими примерами. Мы также увидели, как решить проблему Producer-Consumer с его помощью, и различные сценарии, где мы можем использовать его в нашем коде.

Прокрутить вверх