Параллелизм Java позволяет запускать несколько подзадач задачи в отдельных потоках. Иногда необходимо дождаться, пока все потоки завершат свое выполнение. В этом уроке мы изучим несколько способов заставить текущий поток ждать завершения других потоков.
1. Использование ExecutorService и Future.get()
Java ExecutorService(или ThreadPoolExecutor ) помогает выполнять задачи Runnable или Callable асинхронно. Его метод submit() возвращает объект Future, который мы можем использовать для отмены выполнения и/или ожидания завершения.
В следующем примере у нас есть демонстрационная задача Runnable. Каждая задача завершается за случайное время от 0 до 1 секунды.
публичный класс DemoRunnable реализует Runnable {частный Integer jobNum;public DemoRunnable(целочисленный индекс) {this.jobNum = индекс;}@SneakyThrows@Переопределитьpublic void run() {Thread.sleep(new Random(0).nextLong(1000));System.out.println("DemoRunnable завершен для индекса: " + jobNum);}}
Мы отправляем 10 задач в службу исполнителя. Затем мы вызываем метод Future.get() для каждого объекта Future, полученного после отправки задачи исполнителю. Future.get() ожидает, если необходимо, завершения задачи, а затем извлекает ее результат.
ExecutorService executor = Executors.newFixedThreadPool(5);List<Future<?>> futures = new ArrayList<>();for(int i = 1; i <= 10; i++) {Future<?> f = executor.submit(new DemoRunnable(i));futures.add(f);}System.out.println("###### All tasks are submitted.");for(Future<?> f : futures) {f.get();}System.out.println("###### All tasks are completed.");
###### Все задания отправлены.DemoRunnable завершен для индекса: 3DemoRunnable завершен для индекса: 4DemoRunnable завершен для индекса: 1DemoRunnable завершен для индекса: 5DemoRunnable завершен для индекса: 2DemoRunnable завершен для индекса: 6DemoRunnable завершен для индекса: 10DemoRunnable завершен для индекса: 7DemoRunnable завершен для индекса: 9DemoRunnable завершен для индекса: 8###### Все задачи выполнены.
Обратите внимание, что ожидание может закончиться раньше при следующих условиях:
- задача отменена
- выполнение задачи вызвало исключение
- возникло исключение InterruptedException, т.е. текущий поток был прерван во время ожидания.
В таком случае нам следует реализовать собственную логику обработки исключения.
2. Использование ExecutorService shutdown() и awaitTermination()
Метод awaitTermination() блокируется до тех пор, пока все задачи не завершат выполнение после запроса shutdown() на службе исполнителя. Подобно Future.get(), он может разблокироваться раньше, если произойдет тайм-аут или текущий поток будет прерван.
Метод shutdown() закрывает исполнителя, поэтому новые задачи не могут быть отправлены, но выполнение ранее отправленных задач продолжается.
Следующий метод имеет полную логику ожидания завершения всех задач в течение 1 минуты. После этого служба исполнителя будет принудительно остановлена с помощью метода shutdownNow().
void shutdownAndAwaitTermination(ExecutorService executorService) {executorService.shutdown();try {if(!executorService.awaitTermination(60, TimeUnit.SECONDS)) {executorService.shutdownNow();}} catch(InterruptedException ie) {executorService.shutdownNow();Thread.currentThread().interrupt();}}
Мы можем использовать этот метод следующим образом:
ExecutorService executor = Executors.newFixedThreadPool(5);for(int i = 1; i <= 10; i++) {executor.submit(new DemoRunnable(i));}System.out.println("###### All tasks are submitted.");shutdownAndAwaitTermination(executor);System.out.println("###### All tasks are completed.");
3. Использование ExecutorService invokeAll()
Этот подход можно рассматривать как комбинацию двух предыдущих подходов. Он принимает задачи как коллекцию и возвращает список объектов Future для извлечения выходных данных при необходимости. Кроме того, он использует логику shutdown и awaits для ожидания завершения задач.
В следующем примере мы используем класс DemoCallable, который очень похож на DemoRunnable, за исключением того, что он возвращает целочисленное значение.
ExecutorService executor = Executors.newFixedThreadPool(10);List<DemoCallable> tasks = Arrays.asList(new DemoCallable(1), new DemoCallable(2),new DemoCallable(3), new DemoCallable(4),new DemoCallable(5), new DemoCallable(6),new DemoCallable(7), new DemoCallable(8),new DemoCallable(9), new DemoCallable(10));System.out.println("###### Submitting all tasks.");List<Future<Integer>> listOfFutures = executor.invokeAll(tasks);shutdownAndAwaitTermination(executor);System.out.println("###### All tasks are completed.");
Обратите внимание, что listOfFutures сохраняет выходные данные задач в том же порядке, в котором мы отправили задачи службе-исполнителю.
for(Future f : listOfFutures) {System.out.print(f.get() + " "); //Prints 1 2 3 4 5 6 7 8 9 10}
4. Использование CountDownLatch
Класс CountDownLatch позволяет потоку Java ожидать, пока набор потоков(ожидающих защелку) завершит свои задачи.
CountDownLatch работает, имея счетчик, инициализированный числом потоков, который уменьшается каждый раз, когда поток завершает свое выполнение. Когда счетчик достигает нуля, это означает, что все потоки завершили свое выполнение, и основной поток, ожидающий защелки, возобновляет выполнение.
В следующем примере основной поток ожидает завершения 3 заданных служб, прежде чем сообщить о конечном состоянии системы. Мы можем прочитать весь пример в примере CountDownLatch.
CountDownLatch latch = new CountDownLatch(3);List<BaseHealthChecker> services = new ArrayList<>();services.add(new NetworkHealthChecker(latch));services.add(new CacheHealthChecker(latch));services.add(new DatabaseHealthChecker(latch));Executor executor = Executors.newFixedThreadPool(services.size());for(final BaseHealthChecker s : services) {executor.execute(s);}//Now wait till all health checks are completelatch.await();
5. Заключение
В этом уроке мы научились заставлять поток приложения ждать завершения других потоков. Мы научились использовать методы ExecutorService и класс CountDownLatch.