Регулирование скорости отправки задач в Java

1. Введение

В примере BlockingQueue и ThreadPoolExecutor мы научились создавать CustomThreadPoolExecutor, который имел следующие возможности:

  • Задачи отправляются в очередь на блокировку.
  • Исполнитель выбирает задачу из очереди и выполняет ее.
  • Он переопределил методы beforeExecute() и afterExecute() для выполнения предварительных и последующих действий при необходимости.
  • Прикреплен RejectedExecutionHandler для обработки задачи, если она отклонена из-за заполнения очереди.

Наш подход уже был достаточно хорош и способен справиться с большинством практических сценариев. Теперь давайте добавим к нему еще одну концепцию, которая может оказаться полезной в некоторых условиях. Эта концепция касается регулирования отправки задач в очередь.

В этом примере регулирование поможет удерживать количество задач в очереди в пределах лимита, чтобы ни одна задача не была отклонена. По сути, это также устраняет необходимость в RejectedExecutionHandler.

2. Что такое регулирование?

На веб-сервере мы можем настроить максимальное количество одновременных подключений к серверу. Если на сервер поступает больше подключений, чем этот лимит, они должны ждать, пока не освободятся или не будут закрыты другие подключения. Это ограничение можно рассматривать как дросселирование.

Дросселирование — это возможность регулирования скорости ввода для системы, где скорость вывода медленнее, чем скорость ввода. Необходимо предотвратить сбой системы или истощение ресурсов.

3. Регулирование отправки задач с помощью семафора

Мы будем использовать семафор с числом, которое должно быть равно максимальному количеству задач, которым разрешено выполняться одновременно. Таким образом, подход работает следующим образом:

  • Перед выполнением задачи запрашивается блокировка семафора.
  • Если блокировка получена, то выполнение проходит нормально; в противном случае будут повторяться попытки до тех пор, пока блокировка не будет получена.
  • После завершения задачи блокировка семафора снимается.

Блокировка и освобождение Semephore гарантирует, что больше не будет настроенного числа потоков, выполняющих задачи одновременно. Остальные задачи должны ждать в очереди блокировки и повторить попытку через некоторое время.

Наш новый BlockingThreadPoolExecutor с поддержкой регулирования выглядит как следующий класс:

import java.util.concurrent.*;public class BlockingThreadPoolExecutor extends ThreadPoolExecutor {private final Semaphore semaphore;public BlockingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);semaphore = new Semaphore(corePoolSize);}@Overrideprotected void beforeExecute(Thread t, Runnable r) {super.beforeExecute(t, r);}@Overridepublic void execute(final Runnable task) {boolean acquired = false;do {try {semaphore.acquire();acquired = true;} catch(final InterruptedException e) {e.printStackTrace();}} while(!acquired);try {super.execute(task);} catch(final RejectedExecutionException e) {System.out.println("Task Rejected");semaphore.release();return;}semaphore.release();}@Overrideprotected void afterExecute(Runnable r, Throwable t) {super.afterExecute(r, t);if(t != null) {t.printStackTrace();}}}

Теперь мы можем отправлять неограниченное количество задач в очередь блокировки или исполнителю, и все они будут выполнены без отклонений.

import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.BlockingQueue;import java.util.concurrent.TimeUnit;public class BlockingThreadPoolExecutorDemo {public static void main(String[] args) throws InterruptedException {BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<>(10);BlockingThreadPoolExecutor executor = new BlockingThreadPoolExecutor(1, 1, 5000, TimeUnit.MILLISECONDS, blockingQueue);executor.setRejectedExecutionHandler(new CustomRejectedExecutionHandler());executor.prestartAllCoreThreads();int threadCounter = 0;while(true) {threadCounter++;// Adding threads one by oneSystem.out.println("Adding DemoTask : " + threadCounter);blockingQueue.offer(new DemoTask(Integer.toString(threadCounter)));if(threadCounter == 100)break;}Thread.sleep(1000000);}}

При запуске программы DemoExecutor с использованием BlockingThreadPoolExecutor вы не увидите отклоненных задач, и все задачи будут выполнены успешно.

4. Заключение

В этом уроке мы научились ограничивать скорость отправки и выполнения задач с помощью BlockingQueue, ThreadPoolExecutor и Semaphore.

Мы можем контролировать количество задач, выполняемых в любой момент времени, передавая соответствующий счетчик в конструктор семафора.

Прокрутить вверх