Java Structured Concurrency и StructuredTaskScope

Представленный как функция инкубации в JEP-428(Java 19) и как функция предварительного просмотра в JEP-453( Java 21 ), структурированный параллелизм направлен на упрощение параллельных программ Java путем обработки нескольких потоков/подзадач как единой единицы работы. Эти несколько потоков ответвляются от одного и того же родительского потока, таким образом, обрабатываются как единая единица работы.

Основной принцип структурированного параллелизма заключается в том, что когда задача должна быть решена одновременно, все потоки, необходимые для ее решения, закручиваются и воссоединяются в одном и том же блоке кода. Другими словами, время жизни всех этих потоков привязано к области действия блока, поэтому у нас есть четкие и явные точки входа-выхода для каждого параллельного блока кода.

Рассмотрение всех таких дочерних потоков как единого целого поможет нам управлять всеми потоками как единым целым; таким образом, отмена и обработка ошибок могут выполняться более надежно, что исключает такие распространенные риски, как утечки потоков и задержки отмены.

Обратите внимание, что структурированный параллелизм внутренне использует облегченную реализацию VirtualThread, также являющуюся частью проекта Loom.

1. Проблемы с традиционным/неструктурированным параллелизмом

1.1. Утечки нитей

В традиционном многопоточном программировании(неструктурированный параллелизм), если приложение должно выполнить сложную задачу, оно разбивает программу на несколько меньших и независимых единиц подзадач. Затем приложение отправляет все задачи в ThreadPoolExecutor, как правило, с ExecutorService, который запускает все задачи и подзадачи.

В такой модели программирования все дочерние задачи выполняются одновременно, поэтому каждая из них может быть выполнена или потерпеть неудачу независимо. В API нет поддержки отмены всех связанных подзадач, если одна из них не выполняется. Приложение не контролирует подзадачи и должно ждать их завершения, прежде чем возвращать результат родительской задачи. Такое ожидание является пустой тратой ресурсов и снижает производительность приложения.

В традиционном параллелизме дочерние потоки работают одновременно, и каждый из них может преуспеть или потерпеть неудачу независимо. Нет способа немедленно остановить другие дочерние потоки, если один из них потерпит неудачу.

Например, если задача должна извлечь данные учетной записи и для этого требуется извлечь данные из нескольких источников, таких как данные учетной записи, связанные учетные записи, демографические данные пользователя и т. д., то псевдокод параллельной обработки запросов будет выглядеть следующим образом:

Response fetch(Long id) throws ExecutionException, InterruptedException {Future<AccountDetails> accountDetailsFuture = es.submit(() -> getAccountDetails(id)); //Sub-task 1Future<LinkedAccounts> linkedAccountsFuture = es.submit(() -> fetchLinkedAccounts(id)); //Sub-task 2Future<DemographicData> userDetailsFuture = es.submit(() -> fetchUserDetails(id)); //Sub-task 3AccountDetails accountDetails = accountDetailsFuture.get(); //Result of Sub-task 1LinkedAccounts linkedAccounts = linkedAccountsFuture.get(); //Result of Sub-task 2DemographicData userDetails = userDetailsFuture.get(); //Result of Sub-task 3return new Response(accountDetails, linkedAccounts, userDetails); //Combine results and return the response}

В приведенном выше примере все три потока выполняются независимо.

  • Предположим, что при извлечении связанных учетных записей произошла ошибка, тогда fetch() вернет ответ об ошибке. Но два других потока продолжат работать в фоновом режиме. Это случай утечки потоков.
  • Аналогично, если пользователь отменяет запрос из интерфейса и fetch() прерывается, все три потока продолжат работу в фоновом режиме.

Хотя отмена подзадач возможна программно, простого способа сделать это не существует, и существует вероятность ошибки.

1.2. Несвязанные потоковые дампы и диагностика

В предыдущем примере, если в API fetch() есть ошибка, то трудно анализировать дампы потоков, поскольку они выполняются в 3 разных потоках. Создание связи между информацией в 3 потоках очень сложно, поскольку между этими потоками нет связи на уровне API.

Когда стек вызовов определяет иерархию задач и подзадач, например, при последовательном выполнении методов, мы получаем родительско-дочернюю связь, которая приводит к распространению ошибок.

В идеале, взаимосвязь задач должна отражаться на уровне API, чтобы контролировать выполнение дочерних потоков и отлаживать их при необходимости. Это позволило бы дочернему процессу сообщать о результате или исключении только своему родителю — уникальной задаче, которой принадлежат все подзадачи — которая затем могла бы неявно отменить оставшиеся подзадачи.

2. Введение в структурированный параллелизм

В структурированном многопоточном коде, если задача разделяется на параллельные подзадачи, все они возвращаются в одно и то же место, т. е. в блок кода задачи. Таким образом, время жизни параллельной подзадачи ограничивается этим синтаксическим блоком.

Java Structured Concurrency и StructuredTaskScope0

В этом подходе подзадачи работают от имени задачи, которая ожидает их результатов и отслеживает их на предмет сбоев. Во время выполнения структурированный параллелизм создает древовидную иерархию задач, при этом родственные подзадачи принадлежат одной и той же родительской задаче.

В структурированном параллелизме дочерние потоки — это виртуальные потоки, контролируемые JVM. Используя StructuredTaskScope, JVM дает нам возможность управлять жизненным циклом дочерних потоков.

Это дерево вызовов можно рассматривать как параллельный аналог стека вызовов одного потока с несколькими вызовами методов.

3. Реализация структурированного параллелизма с помощью StructuredTaskScope

StructuredTaskScope — это основной API для структурированного параллелизма, который поддерживает разделение задачи на несколько параллельных подзадач, которые будут выполняться в собственных виртуальных потоках, управлять их жизненными циклами и обеспечивать правильную вложенность параллельных задач.

StructuredTaskScope обеспечивает, чтобы подзадачи были завершены до продолжения основной задачи. Он гарантирует, что время жизни параллельной операции ограничено блоком синтаксиса.

StructuredTaskScope предоставляет два статических внутренних класса, оба из которых расширяют класс StructuredTaskScope. Исходя из наших требований, мы создаем экземпляр одного из этих классов, чтобы начать со структурированного параллелизма:

  • ShutdownOnFailure захватывает первое исключение и отключает область действия задачи. Это помогает собирать результаты всех задач. Если какая-либо подзадача не выполняется, то результаты других незавершенных подзадач больше не нужны.
  • ShutdownOnSuccess захватывает первый результат и отключает область действия задачи, чтобы прервать незавершенные потоки и разбудить основной поток(родительскую задачу). Это помогает собирать результат любой задачи, которая будет завершена первой.

Давайте подробнее рассмотрим обе политики.

4. ShutdownOnFailure: Ожидание завершения всех подзадач

Давайте перепишем предыдущий пример с помощью API StructuredTaskScope. В этом примере мы создаем экземпляр StructuredTaskScope с помощью фабричного метода ShutdownOnFailure(). Это гарантирует немедленное завершение всех задач, если какая-либо из задач или даже родительская задача завершается неудачей.

  • Мы отправляем задачи с помощью scope.fork() вместо использования ExecutorService, который возвращает Supplier вместо объекта Future. Каждая операция fork() запускает новый виртуальный поток и выполняет отправленную подзадачу.
  • Затем scope.join() объединяет задачи в этой области и ждет, пока все задачи не будут завершены.
  • Затем scope.throwIfFailed() распространяет указанное исключение, если какая-либо дочерняя задача завершается неудачей.
  • Наконец, scope.close() закрывает область действия задачи.
public Response getClientById(Long id) {System.out.println("Forking new threads...");try(var scope = new StructuredTaskScope.ShutdownOnFailure()) {Supplier<AccountDetails> accountDetailsFuture = scope.fork(() -> getAccountDetails(id));Supplier<LinkedAccounts> linkedAccountsFuture = scope.fork(() -> fetchLinkedAccounts(id));Supplier<DemographicData> userDetailsFuture = scope.fork(() -> fetchUserDetails(id));System.out.println("Joining all threads...");scope.join(); // Join all subtasksscope.throwIfFailed(WebApplicationException::new); //Handle error when any subtask failsSystem.out.println("Response is received from all workers...");//The subtasks have completed by now so process the resultreturn new Response(accountDetailsFuture.get(),linkedAccountsFuture.get(),userDetailsFuture.get());} catch(InterruptedException e) {throw new RuntimeException(e);}}private DemographicData fetchUserDetails(Long id) throws InterruptedException {Thread.sleep(2000L);System.out.println("Retrieved DemographicData.");return new DemographicData();}private LinkedAccounts fetchLinkedAccounts(Long id) throws InterruptedException {Thread.sleep(3000L);System.out.println("Retrieved LinkedAccounts.");return new LinkedAccounts();}private AccountDetails getAccountDetails(Long id) throws InterruptedException {Thread.sleep(4000L);System.out.println("Retrieved AccountDetails.");return new AccountDetails();}}

Вывод программы:

Forking new threads...Joining all threads...Retrieved DemographicData.Retrieved LinkedAccounts.Retrieved AccountDetails.Response is received from all workers...

Класс StructuredTaskScope реализует интерфейс AutoCloseable, поэтому, если мы используем блок try-with-resources, то close() будет вызван автоматически после завершения выполнения родительского потока.

Это решение устраняет все проблемы неструктурированного параллелизма, отмеченные в традиционном параллелизме.

5. ShutdownOnSuccess: использовать результат первой завершенной задачи

Похожий факторный метод StructuredTaskScope.ShutdownOnSuccess() фиксирует результат первой успешно завершенной подзадачи, а затем отменяет все остальные подзадачи.

Это помогает в ситуациях, когда работу можно выполнить несколькими способами, а нам нужен максимально быстрый результат, и мы отменяем другие методы, чтобы избежать ненужной траты ресурсов.

Вот StructuredTaskScope с политикой завершения при успешном выполнении, которая возвращает результат первой успешной подзадачи:

<T> T runSubTasks(List<Callable<T>> tasks) throws Exception {try(var scope = new StructuredTaskScope.ShutdownOnSuccess<T>()) {for(var task : tasks) {scope.fork(task);}scope.join();return scope.result();}}

Давайте расширим наш предыдущий пример. Мы можем получить данные пользователя из двух возможных источников, и какой бы источник не вернул результат быстрее, мы используем его и двигаемся дальше. Мы полностью отбрасываем второй поток.

У нас есть два метода, представляющих два источника fetchUserDetails и fetchUserDetailsNew. Второй метод всегда возвращает результат быстрее в нашем случае, поэтому даже если первый метод вызывается в новом потоке, его выполнение немедленно отменяется, когда другой поток завершается.

public DemographicData getClientDetails(Long id) {System.out.println("Forking new threads...");try(var scope = new StructuredTaskScope.ShutdownOnSuccess<>()) {scope.fork(() -> fetchUserDetails(id));scope.fork(() -> fetchUserDetailsNew(id));scope.join();System.out.println("Response is received from a worker...");return(DemographicData) scope.result();} catch(Exception e) {throw new RuntimeException(e);}}private DemographicData fetchUserDetails(Long id) throws InterruptedException {Thread.sleep(2000L);System.out.println("Retrieved DemographicData.");return new DemographicData();}private DemographicData fetchUserDetailsNew(Long id) throws InterruptedException {Thread.sleep(1000L);System.out.println("Retrieved DemographicData from fetchUserDetailsNew.");return new DemographicData();}

Вывод программы:

Forking new threads...Retrieved DemographicData from fetchUserDetailsNew.Response is received from a worker...

6. Выполнение подзадач в течение указанного времени

В структурированном параллелизме мы можем указать крайний срок, в течение которого все задачи должны быть выполнены, используя метод scope.joinUntil().

При указании продолжительности времени с помощью Instant метод joinUntil() ожидает до тех пор, пока:

  • Все потоки, запущенные в области задачи, завершили выполнение.
  • Вызывается метод scope.shutdown()
  • Текущий поток прерван
  • Или срок истек

В следующем примере обе подзадачи должны быть завершены в течение 5 секунд.

try(var scope = new StructuredTaskScope.ShutdownOnFailure()) {Supplier<String> subTask1 = scope.fork(() -> subTask(...));Supplier<String> subTask2 = scope.fork(() -> subTask(...));scope.joinUntil(Instant.now().plusSeconds(5));//process the results}

7. Структурированный параллелизм и виртуальные потоки

В структурированном параллелизме каждый вызов scope.fork() запускает новый поток для выполнения подзадачи, который по умолчанию является виртуальным потоком.

Виртуальные потоки — это управляемые JVM легкие потоки для написания высокопроизводительных параллельных приложений. Поскольку виртуальные потоки недороги по сравнению с традиционными потоками ОС, структурированный параллелизм использует их для разветвления всех новых потоков.

Помимо того, что их много, виртуальные потоки достаточно дешевы, чтобы представлять любую параллельную единицу поведения, даже поведение, которое включает ввод-вывод. За кулисами связь задача-подзадача поддерживается путем связывания каждого виртуального потока с его уникальным владельцем, поэтому он знает свою иерархию, подобно тому, как фрейм в стеке вызовов знает своего уникального вызывающего.

Обратите внимание, что все потоки подзадач гарантированно завершаются после закрытия области действия, и при выходе из блока не остается ни одного потока.

НЕ используйте повторно виртуальные потоки. Всегда создавайте новый виртуальный поток, когда он вам нужен.

8. Заключение

В сочетании с виртуальными потоками структурированный параллелизм обещает долгожданные и крайне необходимые функции Java, которые уже присутствуют в других языках программирования(например, горутины в Go и процессы в Erlang). Это поможет в написании более сложных и параллельных приложений с превосходной надежностью и меньшим количеством утечек потоков.

Такие приложения будет легче отлаживать и профилировать при возникновении ошибок.

Исходный код на Github

Прокрутить вверх