Зачем нам нужен пул потоков в Java? Ответ в том, что когда мы разрабатываем простое, параллельное приложение в Java, мы создаем некоторые объекты Runnable, а затем создаем соответствующие объекты Thread для их выполнения. Создание потока в Java — дорогостоящая операция. И если вы начнете создавать новый экземпляр потока каждый раз для выполнения задачи, производительность приложения ухудшится.
1. Как работает пул потоков?
Пул потоков — это коллекция предварительно инициализированных потоков. Обычно размер коллекции фиксирован, но это не обязательно. Он облегчает выполнение N-го количества задач с использованием одних и тех же потоков. Если задач больше, чем потоков, то задачи должны ждать в структуре, похожей на очередь( FIFO — First in first out ).
Когда любой поток завершает свое выполнение, он может выбрать новую задачу из очереди и выполнить ее. Когда все задачи завершены, потоки остаются активными и ждут новых задач в пуле потоков.

Наблюдатель продолжает следить за очередью(обычно BlockingQueue ) на предмет появления новых задач. Как только задачи приходят, потоки начинают подбирать задачи и выполнять их снова.
2. Класс ThreadPoolExecutor
Начиная с Java 5, Java concurrency API предоставляет механизм Executor framework. Основными частями являются интерфейс Executor, его подинтерфейс ExecutorService и класс ThreadPoolExecutor, реализующий оба интерфейса.
ThreadPoolExecutor разделяет создание задачи и ее выполнение. С ThreadPoolExecutor нам нужно только реализовать объекты Runnable и отправить их исполнителю. Он отвечает за выполнение, создание экземпляров и запуск задач с необходимыми потоками.
Он выходит за рамки этого и повышает производительность, используя пул потоков. Когда вы отправляете задачу исполнителю, он пытается использовать объединенный поток для выполнения этой задачи, чтобы избежать постоянного создания потоков.
3. Создание ThreadPoolExecutor
Мы можем создать следующие 5 типов исполнителей пула потоков с помощью предварительно созданных методов в интерфейсе java.util.concurrent.Executors.
3.1 Исполнитель пула потоков фиксированного размера
Создает пул потоков, который повторно использует фиксированное количество потоков для выполнения любого количества задач. Если дополнительные задачи отправляются, когда все потоки активны, они будут ждать в очереди, пока поток не станет доступен. Это лучший вариант для большинства реальных случаев использования.
ThreadPoolExecutor executor =(ThreadPoolExecutor) Executors.newFixedThreadPool(10);
3.2. Исполнитель кэшированного пула потоков
Создает пул потоков, который создает новые потоки по мере необходимости, но повторно использует ранее созданные потоки, когда они доступны. НЕ используйте этот пул потоков, если задачи выполняются долго. Это может привести к падению системы, если количество потоков превысит то, что система может обработать.
ThreadPoolExecutor executor =(ThreadPoolExecutor) Executors.newCachedThreadPool();
3.3. Запланированный исполнитель пула потоков
Создает пул потоков, который может планировать выполнение команд после заданной задержки или выполнять их периодически.
ThreadPoolExecutor executor =(ThreadPoolExecutor) Executors.newScheduledThreadPool(10);
3.4. Исполнитель пула с одним потоком
Создает один поток для выполнения всех задач. Используйте его, когда у вас есть только одна задача для выполнения.
ThreadPoolExecutor executor =(ThreadPoolExecutor) Executors.newSingleThreadExecutor();
3.5. Исполнитель пула потоков, захватывающих работу
Создает пул потоков, который поддерживает достаточно потоков для поддержки заданного уровня параллелизма. Здесь уровень параллелизма означает максимальное количество потоков, которые будут использоваться для выполнения заданной задачи в одной точке на многопроцессорных машинах.
ThreadPoolExecutor executor =(ThreadPoolExecutor) Executors.newWorkStealingPool(4);
4. Пример ThreadPoolExecutor
4.1 Создание задачи
Давайте создадим задачу, выполнение которой каждый раз будет занимать 2 секунды.
class Task implements Runnable {private final String name;public Task(String name) {this.name = name;}@SneakyThrows@Overridepublic void run() {Thread.sleep(2000l);System.out.println("Task [" + name + "] executed on : " + LocalDateTime.now().toString());}}
4.2 Выполнение задач с помощью Thread Pool Executor
Данная программа создает 5 задач и отправляет их в очередь исполнителя. Исполнитель использует один поток для выполнения всех задач.
import java.time.LocalDateTime;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.TimeUnit;public class ThreadPoolExample {public static void main(String[] args) {ExecutorService executor = Executors.newSingleThreadExecutor();for(int i = 1; i <= 5; i++) {Task task = new Task("Task " + i);executor.execute(task);}shutdownAndAwaitTermination(executor);}static void shutdownAndAwaitTermination(ExecutorService pool) {// Disable new tasks from being submittedpool.shutdown();try {// Wait a while for existing tasks to terminateif(!pool.awaitTermination(60, TimeUnit.SECONDS)) {// Cancel currently executing tasks forcefullypool.shutdownNow();// Wait a while for tasks to respond to being cancelledif(!pool.awaitTermination(60, TimeUnit.SECONDS))System.err.println("Pool did not terminate");}} catch(InterruptedException ex) {//(Re-)Cancel if current thread also interruptedpool.shutdownNow();// Preserve interrupt statusThread.currentThread().interrupt();}}}
Вывод программы:
Task [Task 1] executed on : 2022-08-07T17:05:18.470589200Task [Task 2] executed on : 2022-08-07T17:05:20.482150Task [Task 3] executed on : 2022-08-07T17:05:22.482660Task [Task 4] executed on : 2022-08-07T17:05:24.498243500Task [Task 5] executed on : 2022-08-07T17:05:26.499919700
5. Использование ScheduledThreadPoolExecutor
Фиксированные пулы потоков или кэшированные пулы потоков хороши при выполнении одной уникальной задачи только один раз. Когда вам нужно выполнить задачу, многократно N раз, либо N фиксированное количество раз, либо бесконечно после фиксированной задержки, вам следует использовать ScheduledThreadPoolExecutor.
5.1 Методы составления расписания
ScheduledThreadPoolExecutor предоставляет 4 метода, которые предлагают различные возможности для многократного выполнения задач.
- schedule(команда Runnable, длительная задержка, единица TimeUnit) – создает и выполняет задачу, которая становится доступной после заданной задержки.
- schedule(Callable callable, long delay, TimeUnit unit) – создает и выполняет ScheduledFuture, который становится активным после заданной задержки.
- scheduleAtFixedRate(Runnable command, long initialDelay, long delay, TimeUnit unit) – Создает и выполняет периодическое действие, которое включается сначала после заданной начальной задержки, а затем с заданным периодом задержки. Если выполнение какой-либо задачи занимает больше времени, чем ее период, последующие выполнения могут начаться с опозданием, но не будут выполняться одновременно.
- scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) – Создает и выполняет периодическое действие, которое включается сначала после заданной начальной задержки, а затем с заданным периодом задержки. Независимо от того, сколько времени занимает длительная задача, между двумя выполнениями будет фиксированный промежуток времени задержки.
5.2 Пример ScheduledThreadPoolExecutor
В следующем примере задача будет выполняться периодически, пока не будет отменена. Между временем завершения первой задачи и временем начала второй задачи всегда будет задержка в 10 секунд.
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);Task task = new Task("App-Task");ScheduledFuture<?> result = executor.scheduleWithFixedDelay(task1, 0, 10, TimeUnit.SECONDS);
6. Реализация пользовательского пула потоков
Хотя Java имеет очень надежную функциональность пула потоков через фреймворк Executor. И это помогло бы, если бы вы не создавали свой собственный пул потоков без executor. Я буду категорически препятствовать любой такой попытке. Но если вы хотите создать его для своего обучения, ниже приведена реализация такого пула потоков в Java.
public class CustomThreadPool {//Thread pool sizeprivate final int poolSize;//Internally pool is an arrayprivate final WorkerThread[] workers;// FIFO orderingprivate final LinkedBlockingQueue<Runnable> queue;public CustomThreadPool(int poolSize) {this.poolSize = poolSize;queue = new LinkedBlockingQueue<Runnable>();workers = new WorkerThread[poolSize];for(int i = 0; i < poolSize; i++) {workers[i] = new WorkerThread();workers[i].start();}}public void execute(Runnable task) {synchronized(queue) {queue.add(task);queue.notify();}}private class WorkerThread extends Thread {public void run() {Runnable task;while(true) {synchronized(queue) {while(queue.isEmpty()) {try {queue.wait();} catch(InterruptedException e) {System.out.println("An error occurred while queue is waiting: " + e.getMessage());}}task =(Runnable) queue.poll();}try {task.run();} catch(RuntimeException e) {System.out.println("Thread pool is interrupted due to an issue: " + e.getMessage());}}}}public void shutdown() {System.out.println("Shutting down thread pool");for(int i = 0; i < poolSize; i++) {workers[i] = null;}}}
Выполните ту же задачу, которую мы выполнили с CustomThreadPool и 2 рабочими потоками.
public class CustomThreadPoolExample {public static void main(String[] args) {CustomThreadPool customThreadPool = new CustomThreadPool(2);for(int i = 1; i <= 5; i++) {Task task = new Task("Task " + i);System.out.println("Created : " + task.getName());customThreadPool.execute(task);}}}
Вывод программы. Обратите внимание, что она выполняет две задачи одновременно.
Создано: Задача 1Создано: Задача 2Создано: Задача 3Создано: Задача 4Создано: Задача 5Задача [Задача 2] выполнена: 2022-08-07T17:19:15.846912100Задача [Задача 1] выполнена: 2022-08-07T17:19:15.846912100Задача [Задача 4] выполнена: 2022-08-07T17:19:17.874728800Задача [Задача 3] выполнена: 2022-08-07T17:19:17.874728800Задача [Задача 5] выполнена: 2022-08-07T17:19:19.878018200
Выше представлена очень сырая реализация пула потоков с возможностью множества улучшений. Но все же, вместо того, чтобы совершенствовать код выше, сосредоточьтесь на изучении Java executor framework.
Также обратите внимание, что неправильное объединение или обработка очередей может привести к взаимоблокировкам или перегрузке ресурсов. Вы, безусловно, можете избежать этих проблем с помощью фреймворка Executor, который хорошо протестирован сообществом Java.
7. Заключение
- Класс ThreadPoolExecutor имеет четыре различных конструктора, но из-за их сложности API параллелизма Java предоставляет класс Executors для создания исполнителей и других связанных объектов. Хотя мы можем создать ThreadPoolExecutor напрямую, используя один из его конструкторов, рекомендуется использовать класс Executors.
- Кэшированный пул потоков создает новые потоки, если необходимо выполнить новые задачи, и повторно использует существующие, если они завершили выполнение задачи, которую они выполняли, и которые теперь доступны. Однако кэшированный пул потоков имеет недостаток в виде постоянных лежащих потоков для новых задач, поэтому, если вы отправите слишком много задач этому исполнителю, вы можете перегрузить систему. Это можно преодолеть с помощью фиксированного пула потоков, о котором мы поговорим в следующем уроке.
- Одним из важнейших аспектов класса ThreadPoolExecutor и исполнителей в целом является то, что вы должны явно завершить его. Если вы этого не сделаете, исполнитель продолжит свое выполнение, и программа не завершится. Если у исполнителя нет задач для выполнения, он продолжает ждать новых задач и не завершает свое выполнение. Приложение Java не завершится, пока все его потоки, не являющиеся демонами, не завершат свое выполнение, поэтому ваше приложение никогда не завершится, если вы не завершите исполнителя.
- Чтобы указать исполнителю, что вы хотите завершить его, вы можете использовать метод shutdown() класса ThreadPoolExecutor. Когда исполнитель завершает выполнение всех ожидающих задач, он завершает свое выполнение. После вызова метода shutdown(), если вы попытаетесь отправить исполнителю еще одну задачу, она будет отклонена, и исполнитель выдаст исключение RejectedExecutionException.
- Класс ThreadPoolExecutor предоставляет множество методов для получения информации о своем статусе. В примере мы использовали методы getPoolSize(), getActiveCount() и getCompletedTaskCount() для получения информации о размере пула, количестве потоков и количестве завершенных задач исполнителя. Вы также можете использовать метод getLargestPoolSize(), который возвращает максимальное количество потоков, находящихся в пуле одновременно.
- Класс ThreadPoolExecutor также предоставляет другие методы, связанные с завершением исполнителя. Эти методы:
- shutdownNow(): Этот метод немедленно завершает работу исполнителя. Он не выполняет ожидающие задачи. Он возвращает список со всеми этими ожидающими задачами. Задачи, которые выполняются, когда вы вызываете этот метод, продолжают свое выполнение, но метод не ждет их завершения.
- isTerminated(): этот метод возвращает значение true, если вы вызвали методы shutdown() или shutdownNow() и исполнитель завершает процесс его завершения.
- isShutdown(): этот метод возвращает true, если вы вызвали метод shutdown() исполнителя.
- awaitTermination(long timeout,TimeUnitunit): Этот метод блокирует вызывающий поток до тех пор, пока не завершатся задачи исполнителя или не истечет время ожидания. Класс TimeUnit представляет собой перечисление со следующими константами: ДНИ, ЧАСЫ, МИКРОСЕКУНДЫ и т. д.