BlockingQueue drainTo() – Опрос элементов очереди в коллекцию

Научитесь использовать метод BlockingQueue.drainTo() для опорожнения очереди(опрос всех или определенного количества элементов из очереди) в Collection. Опорожнение необходимо в ситуациях, когда несколько потоков-производителей добавляют элементы в BlockingQueue, а поток-потребитель периодически опрашивает несколько элементов из очереди и обрабатывает их вместе.

Одним из таких примеров может быть генератор отчетов Excel. Может быть ExecutorService с несколькими потоками, обрабатывающими записи и помещающими их в очередь блокировки. И поток записи отчетов, периодически опрашивающий очередь и записывающий записи в Excel.

1. Метод BlockingQueue drainTo()

  • Метод drainTo() удаляет все доступные элементы из указанной очереди и добавляет их в указанную коллекцию.
  • Это обеспечивает лучшую производительность, чем опрос всех элементов по одному.
  • Поведение этого метода не определено, если предоставленная коллекция изменяется во время слива.
  • Если коллекция неизменяема, метод выдаст исключение UnsupportedOperationException.
  • Для универсальных коллекций несовместимые типы классов приведут к исключению ClassCastException.

Есть две версии этого метода. Второй метод истощает не более maxElements доступных элементов.

int drainTo(Collection c)int drainTo(Collection c, int maxElements)

2. Демонстрация

Мы создаем две задачи, чтобы продемонстрировать использование метода drainTo(). Чтобы не усложнять, давайте назовем Producer и Consumer. Производители будут продолжать добавлять элементы в очередь блокировки, а потребитель будет сливать элементы, неоднократно, после некоторой задержки.

 публичный класс Producer реализует Runnable {Очередь BlockingQueue;публичный производитель(очередь BlockingQueue){эта.очередь = очередь;}@Переопределить@SneakyThrowspublic void run() {в то время как(истина){Поток.сон(2000);System.out.println("Создано новое сообщение в: " + LocalDateTime.now());queue.offer("Тестовое сообщение");}}}
 публичный класс Consumer реализует Runnable {Очередь BlockingQueue;публичный потребитель(очередь BlockingQueue){эта.очередь = очередь;}@Переопределить@SneakyThrowspublic void run() {в то время как(истина) {Поток.сон(10000);Список<Строка> сообщения = новый ArrayList<>();System.out.println("=============================================");System.out.println("Размер очереди перед сливом: " + queue.size());int messagesCount = queue.drainTo(messages, 20);System.out.println("Размер коллекции: " + messagesCount);//messages.stream().forEach(System.out::println);System.out.println("Размер очереди после слива: " + queue.size());System.out.println("=============================================");}}}

Следующий код создает ExecutorService и запускает два потока-производителя и один поток-потребителя. Поток-потребитель выполняется каждые 10 секунд и удаляет все сообщения из очереди.

public class QueueDrain {public static void main(String[] args) {BlockingQueue<String> queue = new ArrayBlockingQueue(20);ExecutorService executorService = Executors.newFixedThreadPool(3);executorService.submit(new Producer(queue));executorService.submit(new Producer(queue));executorService.submit(new Consumer(queue));}}

Вывод программы следующий. Оба потока-производителя производят по 1 сообщению с интервалом в 2 секунды, таким образом, всего 10 сообщений. Поток-потребитель опрашивает все сообщения с интервалом в 10 секунд.

Produced new message at : 2022-08-10T15:45:58.627532600Produced new message at : 2022-08-10T15:45:58.627532600Produced new message at : 2022-08-10T15:46:00.631044400Produced new message at : 2022-08-10T15:46:00.631044400Produced new message at : 2022-08-10T15:46:02.646342Produced new message at : 2022-08-10T15:46:02.646342Produced new message at : 2022-08-10T15:46:04.647652800Produced new message at : 2022-08-10T15:46:04.647790800=========================================Queue size before draining : 8Collection size : 8Queue size after draining : 0=========================================

3. Заключение

Метод BlockingQueue's drainTo() — это удобный метод для решения проблем производителя-потребителя, где скорость потоков-производителей значительно меньше, чем у потоков-потребителей. В таких случаях потребители могут слить все или определенное количество элементов из очереди и обработать их одновременно.

Исходный код на Github

Прокрутить вверх