Пример Java Executors.newVirtualThreadPerTaskExecutor()

Начиная с Java 21, мы можем создавать виртуальные потоки(Project Loom). В отличие от потоков платформы, зависящих от операционной системы, виртуальные потоки управляются JVM. Вот почему можно создать миллион виртуальных потоков, не истощая системные ресурсы.

Мы можем создать виртуальный поток либо в запущенном режиме, либо в остановленном режиме, который может быть запущен в будущем. Когда виртуальные потоки должны быть выполнены позже, использование Executors является хорошей идеей для разделения логики, содержащейся в потоке, и управления его выполнением.

Добавленный в Java 21 метод Executors.newVirtualThreadPerTaskExecutor() возвращает Executor, который создает новый виртуальный поток для каждой переданной ему задачи. Давайте узнаем больше об этом методе и о том, как его использовать.

1. Когда следует использовать Executors.newVirtualThreadPerTaskExecutor()?

Мы знаем, что виртуальные потоки потребляют гораздо меньше памяти по сравнению с собственными потоками. Кроме того, виртуальные потоки управляются JVM, что позволяет лучше использовать ресурсы и снизить накладные расходы, связанные с потоками платформы. По сути, это означает, что создание виртуальных потоков обходится очень дешево, и мы можем создавать их миллионами, не влияя на производительность всего приложения.

Поэтому мудрым решением будет создать новый виртуальный поток для выполнения задачи, а не повторно использовать существующий виртуальный поток. Использование пула потоков в случае виртуальных потоков не приносит никакой выгоды, как мы видим в случае потоков платформы.

В Java мы традиционно используем исполнители для запуска отправленных задач, и эти исполнители используют пулы потоков фиксированного размера, в которых потоки платформы повторно используются для выполнения отправленных задач.

Теперь, когда пулы потоков не приносят никакой пользы виртуальным потокам, мы можем создать столько виртуальных потоков, сколько задач отправлено. Это именно то, что делает метод newVirtualThreadPerTaskExecutor(). Он возвращает Executor, который выполняет каждую отправленную задачу в новом виртуальном потоке.

Executor executor = Executors.newVirtualThreadPerTaskExecutor();

Мы можем получить больше контроля над выполнением задачи, используя ExecutorService вместо Executor. Обратите внимание, что 'ExecutorService extends Executor'.

ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor();

2. Пример исполнителей newVirtualThreadPerTaskExecutor()

В следующем примере мы предоставляем список чисел, вычисляем квадрат числа и печатаем в задаче Runnable. Позже, поскольку задача не ждет никаких внешних ресурсов, имеет смысл выполнить задачу в виртуальном потоке.

List<Integer> numList = Arrays.asList(1, 2, 3, 4, 5);ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();numList.forEach(num ->executor.execute(() -> {System.out.println(STR."Square of \{num} is :: \{square(num)}");}));executor.awaitTermination(2, TimeUnit.SECONDS);executor.shutdown();

В этом примере мы использовали awaitTermination(), поскольку основная программа была завершена до запуска виртуальных потоков, а вывод программы не был выведен на консоль. В производственном приложении инструкция awaitTermination() не требуется.

Программа выводит операторы в случайном порядке при каждом запуске.

Square of 1 is :: 1Square of 2 is :: 4Square of 5 is :: 25Square of 4 is :: 16Square of 3 is :: 9

Аналогично мы можем запустить несколько типов неблокирующих задач в виртуальных потоках, используя метод Executors.newVirtualThreadPerTaskExecutor().

Например, в проекте по веб-скрапингу мы можем использовать виртуальные потоки для одновременного скрапинга нескольких страниц следующим образом:

Executor executor = Executors.newVirtualThreadPerTaskExecutor();List<String> urls = Arrays.asList("https://example.com/page1","https://example.com/page2","https://example.com/page3");urls.forEach(url ->executor.execute(() -> scrapeWebPage(url)));

3. Выполнение асинхронных задач

С виртуальными потоками, асинхронными операциями и агрегацией задач все действительно очень просто. Мы можем вызвать блокирующий get() в Future, не испытывая снижения производительности, вызванного блокировкой системных ресурсов в случае потоков платформы. Аналогично, виртуальные потоки отлично подходят для агрегации результатов нескольких асинхронных задач.

В следующем примере мы можем использовать executor.submit() для выполнения двух действий асинхронно, а затем агрегировать их результаты. Поскольку виртуальные потоки создаются, ставятся в очередь и выполняются на уровне JVM, мы можем спокойно блокировать и ждать результатов, не потребляя чрезмерных ресурсов и не вызывая снижения производительности.

public String invokeAndCombineResults() throws ExecutionException, InterruptedException {try(ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {Future<String> future1 = executor.submit(this::callAPI_1);Future<String> future2 = executor.submit(this::callAPI_2);String result1 = future1.get();String result2 = future2.get();//Do something meaningfullreturn result1 + " " + result2;}}private String callAPI_1() {// Fetch result from an APIreturn "result1";}private String callAPI_2() {// Fetch result from an APIreturn "result2";}

4. Заключение

В этом коротком руководстве по параллелизму Java мы научились создавать Executor или ExecutorService с помощью метода newVirtualThreadPerTaskExecutor(). Мы научились отправлять и выполнять задачи в виртуальных потоках с помощью созданного executor.

Прокрутить вверх