Научитесь использовать Java ThreadPoolExecutor в сочетании с BlockingQueue.
1. Создание ThreadPoolExecutor
ThreadPoolExecutor — это тип ExecutorService, который выполняет каждую отправленную задачу, используя один из потоков из пула потоков. Этот класс предоставляет множество гибких способов создания пула потоков в различных контекстах.
1.1 Конструкторы
Следующие конструкторы можно использовать для создания экземпляра исполнителя пула потоков на основе наших требований.
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler)ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory)ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
Аргументы конструктора:
- corePoolSize – количество потоков, которые следует хранить в пуле, даже если они простаивают.
- maximumPoolSize – максимальное количество потоков, разрешенных в пуле.
- keepAliveTime – когда количество потоков превышает ядро, это максимальное время, в течение которого бездействующий поток будет ожидать новую задачу.
- unit – единица времени для аргумента keepAliveTime.
- workQueue – очередь, используемая для хранения задач Runnable перед их выполнением.
- threadFactory – необязательная фабрика, используемая при создании исполнителем нового потока.
- обработчик – обработчик отклоненного выполнения задачи.
1.2. Пользовательский ThreadPoolExecutor
Даже без расширения ThreadPoolExecutor мы можем использовать его очень эффективно. Но мы упустим некоторые чрезвычайно полезные функции с точки зрения управления потоком выполнения.
Например, класс ThreadPoolExecutor предоставляет два превосходных метода, которые я настоятельно рекомендую переопределить. Эти методы обеспечивают очень хорошую обработку жизненного цикла выполнения Runnable, который должен быть выполнен.
- доВыполнения()
- afterExecute()
импорт java.util.concurrent.BlockingQueue;импорт java.util.concurrent.ThreadPoolExecutor;импортировать java.util.concurrent.TimeUnit;открытый класс CustomThreadPoolExecutor расширяет ThreadPoolExecutor {public CustomThreadPoolExecutor(int corePoolSize, int maximumPoolSize,долго keepAliveTime, единица измерения TimeUnit,BlockingQueue<Выполняемый> workQueue) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);}@Переопределитьзащищенная пустота beforeExecute(Поток t, Выполняемый r) {super.beforeExecute(t, r);System.out.println("Выполнить логику beforeExecute()");}@Переопределитьзащищенная пустота afterExecute(Runnable r, Throwable t) {super.afterExecute(r, t);если(т != ноль) {System.out.println("Выполнить логику обработчика исключений");}System.out.println("Выполнить логику afterExecute()");}}
2. Создание очереди блокировки
BlockingQueue похож на другую реализацию Queue с дополнительными возможностями. Любая попытка извлечь что-либо из нее может считаться безопасной, поскольку она не вернется пустой. Потребительский поток автоматически будет ждать, пока BlockingQueue не будет заполнена какими-либо данными. После заполнения поток начнет потреблять ресурс.
BlockingQueue может использоваться для передачи и удержания задач, которые будут выполнены пулом потоков. Блокирующие очереди помогают во многих отношениях:
- Если запущено меньше потоков, чем corePoolSize, Исполнитель всегда предпочитает добавить новый поток, а не ставить его в очередь.
- Если запущено corePoolSize или более потоков, Исполнитель всегда предпочитает поставить запрос в очередь, а не добавлять новый поток.
- Если запрос не может быть поставлен в очередь, создается новый поток, если только он не превышает maximumPoolSize; в этом случае задача будет отклонена.
2.1 Стратегии очередей
ThreadPoolExecutor поддерживает различные виды блокирующих очередей. Каждая очередь обеспечивает различное поведение при обработке задач.
2.1.1 Прямые передачи
Этого можно достичь с помощью SynchronousQueue, не имеющей внутренней емкости. Мы не можем вставить задачу(используя любой метод), если другой поток не пытается ее принять.
При использовании синхронной очереди, когда мы пытаемся поставить задачу в очередь, то это не удастся, если нет потоков, доступных для ее немедленного выполнения. Если поток все еще не достиг максимального размера потока, то будет создан новый поток. В противном случае задача будет немедленно отклонена.
2.1.2 Неограниченные очереди
Неограниченная очередь(например, LinkedBlockingQueue ) заставляет новые отправленные задачи ждать в очереди, когда все(corePoolSize) потоки заняты. Поскольку задачи могут ждать неограниченное время, исполнителю не нужно создавать новые потоки. Поэтому maximumPoolSize не имеет никакого эффекта, если используется эта очередь.
Этот стиль очередей может быть полезен, когда на сервер поступает внезапный всплеск запросов. Хотя это может привести к проблемам с памятью, если запросы продолжают поступать быстрее, чем обрабатываются.
2.1.3 Ограниченные очереди
Ограниченные очереди(например, ArrayBlockingQueue ) помогают управлять ресурсами гораздо лучше. Они предоставляют механизмы для управления количеством потоков, а также задач в очередях, чтобы предотвратить истощение ресурсов.
Для различных сценариев мы можем протестировать индивидуальные размеры пула и размеры очереди и, наконец, использовать то, что лучше всего подходит для нашего варианта использования.
- Использование больших очередей и небольших пулов минимизирует накладные расходы системы, но приводит к низкой пропускной способности.
- Использование небольших очередей и больших пулов также перегружает ЦП, что также может привести к низкой пропускной способности.
- Поэтому важно найти правильный баланс между размером очереди и размером пула.
2.2 Обработка отклоненных задач
Могут быть ситуации, когда отправленные задачи не могут быть выполнены службой исполнителя и, таким образом, отклоняются. Отклонение задачи может произойти, когда больше нет доступных потоков или слотов очереди, поскольку их границы были превышены, или исполнитель был остановлен.
ThreadPoolExecutor предоставляет следующие 4 встроенных обработчика для обработки этих отклоненных задач. Мы также можем создать свой собственный обработчик.
- AbortPolicy : Это политика по умолчанию. Она заставляет исполнителя выдавать исключение RejectedExecutionException.
- CallerRunsPolicy : Эта политика запускает отклоненную задачу непосредственно в вызывающем потоке метода execute. Если исполнитель был остановлен, задача будет отменена.
- DiscardOldestPolicy : Эта политика отбрасывает самый старый необработанный запрос и затем повторяет попытку выполнения. Если исполнитель был остановлен, задача будет отброшена.
- DiscardPolicy: эта политика автоматически отменяет отклоненную задачу.
- Пользовательская политика: мы можем реализовать интерфейс RejectedExecutionHandler и предоставить собственную логику для обработки отклоненных задач.
3. Использование ThreadPoolExecutor с BlockingQueue
Для демонстрации использования ThreadPoolExecutor с BlockingQueue мы создали одну задачу DemoTask. Эта задача ничего не делает. Она просто ждет 500 мс, а затем завершается.
публичный класс DemoTask реализует Runnable {Имя частной строки = null;public DemoTask(имя строки) {это.имя = имя;}публичная строка getName() {вернуть это.имя;}@Переопределитьpublic void run() {пытаться {Поток.сон(500);} поймать(InterruptedException e) {e.printStackTrace();}System.out.println("Выполнение: " + name);}}
Теперь предположим, что у нас всего 100 задач. Мы хотим запустить их, используя в идеале 10, а максимум 20 потоков.
import java.util.concurrent.BlockingQueue;import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;public class DemoExecutor {public static void main(String[] args) throws InterruptedException {BlockingQueue<Runnable> blockingQueue =new LinkedBlockingQueue<Runnable>();CustomThreadPoolExecutor executor =new CustomThreadPoolExecutor(10, 20, 5, TimeUnit.SECONDS,blockingQueue, new ThreadPoolExecutor.AbortPolicy());// Let start all core threads initiallyexecutor.prestartAllCoreThreads();for(int i = 1; i <= 100; i++) {blockingQueue.offer(new DemoTask("Task " + i));}executor.shutdown();executor.awaitTermination(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);}}
Выполните приведенный выше код, и вы увидите, как все задачи будут выполнены одна за другой.