Параллелизм Java — довольно сложная тема, требующая большого внимания при написании кода приложения, работающего с несколькими потоками, получающими доступ к одному или нескольким общим ресурсам в любой момент времени. В Java 5 были введены некоторые классы, такие как BlockingQueue и Executors, которые убирают часть сложности, предоставляя простые в использовании API.
Программисты, использующие классы параллелизма, будут чувствовать себя намного увереннее, чем программисты, напрямую работающие с синхронизацией с помощью вызовов методов wait(), notify() и notifyAll(). Я также порекомендую использовать эти новые API вместо синхронизации самостоятельно, НО часто нам приходится делать это по разным причинам, например, для поддержки устаревшего кода. Хорошее знание этих методов поможет вам в такой ситуации, когда вы окажетесь.
В этом уроке я обсуждаю цель wait() notify() notifyall() в Java. Мы поймем разницу между wait и notify.
Подробнее: Разница между wait() и sleep() в Java
1. Что такое методы wait(), notify() и notifyAll()?
Класс Object в Java имеет три последних метода, которые позволяют потокам сообщать о заблокированном состоянии ресурса.
ждать()
Он сообщает вызывающему потоку о необходимости снять блокировку и перейти в спящий режим до тех пор, пока какой-либо другой поток не войдет в тот же монитор и не вызовет notify(). Метод wait() снимает блокировку перед ожиданием и повторно устанавливает блокировку перед возвратом из метода wait(). Метод wait() на самом деле тесно интегрирован с блокировкой синхронизации, используя функцию, недоступную напрямую из механизма синхронизации.
Другими словами, мы не можем реализовать метод wait() чисто на Java. Это нативный метод.
Общий синтаксис вызова метода wait() выглядит следующим образом:
synchronized( lockObject ){while( ! condition ){lockObject.wait();}//take the action here;}
уведомить()
Он пробуждает один поток, который вызвал wait() для того же объекта. Следует отметить, что вызов notify() на самом деле не снимает блокировку ресурса. Он сообщает ожидающему потоку, что этот поток может проснуться. Однако блокировка на самом деле не снимается, пока не завершится синхронизированный блок уведомителя.
Таким образом, если уведомитель вызывает notify() для ресурса, но уведомителю все еще необходимо выполнить 10 секунд действий с ресурсом в своем синхронизированном блоке, потоку, который ожидал, придется ждать по крайней мере еще 10 секунд, чтобы уведомитель снял блокировку с объекта, даже если notify() был вызван.
Общий синтаксис вызова метода notify() выглядит следующим образом:
synchronized(lockObject){//establish_the_condition;lockObject.notify();//any additional code if needed}
уведомитьВсе()
Он пробуждает все потоки, которые вызвали wait() для одного и того же объекта. Поток с наивысшим приоритетом будет запущен первым в большинстве ситуаций, хотя это и не гарантировано. В остальном все то же самое, что и у метода notify() выше.
Общий синтаксис вызова метода notify() выглядит следующим образом:
synchronized(lockObject){establish_the_condition;lockObject.notifyAll();}
В общем случае поток, использующий метод wait(), подтверждает, что условие не существует(обычно путем проверки переменной), а затем вызывает метод wait(). Когда другой поток устанавливает условие(обычно путем установки той же переменной), он вызывает метод notify(). Механизм wait-and-notify не определяет, какое конкретное условие/значение переменной. Указать условие, которое должно быть проверено перед вызовом wait() или notify(), должен разработчик.
Давайте напишем небольшую программу, чтобы понять, как следует использовать методы wait(), notify(), notifyall() для получения желаемых результатов.
2. Как использовать методы wait(), notify() и notifyAll()
В этом упражнении мы решим проблему производителя-потребителя, используя методы wait() и notify(). Чтобы программа была простой и чтобы сосредоточиться на использовании методов wait() и notify(), мы задействуем только один поток производителя и один поток потребителя.
Другие возможности программы:
- Поток-производитель создает новый ресурс каждую секунду и помещает его в «taskQueue».
- Потребительскому потоку требуется 1 секунда для обработки потребляемого ресурса из «taskQueue».
- Максимальная емкость taskQueue составляет 5, т.е. в любой момент времени внутри taskQueue может находиться максимум 5 ресурсов.
- Оба потока работают бесконечно.
2.1. Тема производителя
Ниже представлен код для потока производителя, основанный на наших требованиях:
class Producer implements Runnable{private final List<Integer> taskQueue;private final int MAX_CAPACITY;public Producer(List<Integer> sharedQueue, int size){this.taskQueue = sharedQueue;this.MAX_CAPACITY = size;}@Overridepublic void run(){int counter = 0;while(true){try{produce(counter++);}catch(InterruptedException ex){ex.printStackTrace();}}}private void produce(int i) throws InterruptedException{synchronized(taskQueue){while(taskQueue.size() == MAX_CAPACITY){System.out.println("Queue is full " + Thread.currentThread().getName() + " is waiting , size: " + taskQueue.size());taskQueue.wait();}Thread.sleep(1000);taskQueue.add(i);System.out.println("Produced: " + i);taskQueue.notifyAll();}}}
- Здесь код «produce(counter++)» написан внутри бесконечного цикла, так что производитель продолжает производить элементы через равные промежутки времени.
- Мы написали код метода produce(), следуя общим рекомендациям по написанию метода wait(), упомянутым в первом разделе.
- После завершения wait() производитель добавляет элемент в taskQueue и вызывает метод notifyAll(). Поскольку последний раз метод wait() был вызван потоком потребителя(вот почему производитель вышел из состояния ожидания), потребитель получает уведомление.
- Потребительский поток после получения уведомления, если он готов потребить элемент в соответствии с написанной логикой.
- Обратите внимание, что оба потока также используют методы sleep() для имитации задержек при создании и потреблении элементов.
2.2 Потребительская тема
Ниже представлен код для потребительского потока, основанный на наших требованиях:
class Consumer implements Runnable{private final List<Integer> taskQueue;public Consumer(List<Integer> sharedQueue){this.taskQueue = sharedQueue;}@Overridepublic void run(){while(true){try{consume();} catch(InterruptedException ex){ex.printStackTrace();}}}private void consume() throws InterruptedException{synchronized(taskQueue){while(taskQueue.isEmpty()){System.out.println("Queue is empty " + Thread.currentThread().getName() + " is waiting , size: " + taskQueue.size());taskQueue.wait();}Thread.sleep(1000);int i =(Integer) taskQueue.remove(0);System.out.println("Consumed: " + i);taskQueue.notifyAll();}}}
- Здесь код «consum()» был написан внутри бесконечного цикла, так что потребитель продолжает потреблять элементы всякий раз, когда он находит что-то в taskQueue.
- После завершения wait() потребитель удаляет элемент в taskQueue и вызывает метод notifyAll(). Поскольку последний раз метод wait() был вызван потоком производителя(именно поэтому производитель находится в состоянии ожидания), производитель получает уведомление.
- Поток-производитель после получения уведомления, если он готов, производит элемент в соответствии с написанной логикой.
2.3. Тестовый пример производителя-потребителя
Теперь давайте протестируем потоки производителя и потребителя.
public class ProducerConsumerExampleWithWaitAndNotify{public static void main(String[] args){List<Integer> taskQueue = new ArrayList<Integer>();int MAX_CAPACITY = 5;Thread tProducer = new Thread(new Producer(taskQueue, MAX_CAPACITY), "Producer");Thread tConsumer = new Thread(new Consumer(taskQueue), "Consumer");tProducer.start();tConsumer.start();}}
Вывод программы.
Produced: 0Consumed: 0Queue is empty Consumer is waiting , size: 0Produced: 1Produced: 2Consumed: 1Consumed: 2Queue is empty Consumer is waiting , size: 0Produced: 3Produced: 4Consumed: 3Produced: 5Consumed: 4Produced: 6Consumed: 5Consumed: 6Queue is empty Consumer is waiting , size: 0Produced: 7Consumed: 7Queue is empty Consumer is waiting , size: 0
Я предлагаю вам изменить время, затрачиваемое потоками-производителями и потребителями, на разное время и проверить разные результаты в разных сценариях.
3. Вопросы интервью по методам wait(), notify() и notifyAll()
3.1 Что происходит, когда вызывается notify() и ни один поток не ожидает?
В общей практике это не будет иметь место в большинстве сценариев, если эти методы используются правильно. Хотя, если метод notify() вызывается, когда никакой другой поток не ждет, notify() просто возвращается, и уведомление теряется.
Поскольку механизм wait-and-notify не знает условия, о котором он отправляет уведомление, он предполагает, что уведомление остается неуслышанным, если ни один поток не ждет. Поток, который позже выполняет метод wait(), должен ждать, пока не произойдет другое уведомление.
3.2. Может ли возникнуть состояние гонки в период, когда метод wait() освобождает ИЛИ повторно устанавливает блокировку?
Метод wait() тесно интегрирован с механизмом блокировки. Блокировка объекта фактически не освобождается, пока ожидающий поток не окажется в состоянии, в котором он может получать уведомления. Это означает, что блокировка удерживается только тогда, когда состояние потока изменяется таким образом, что он может получать уведомления. Система предотвращает возникновение любых состояний гонки в этом механизме.
Аналогичным образом система гарантирует, что объект полностью удержит блокировку, прежде чем вывести поток из состояния ожидания.
3.3. Если поток получает уведомление, гарантируется ли, что условие установлено правильно?
Просто нет. Перед вызовом метода wait() поток всегда должен проверять условие, удерживая блокировку синхронизации. После возврата из метода wait() поток всегда должен повторно проверять условие, чтобы определить, следует ли ему снова ждать. Это связано с тем, что другой поток также может проверить условие и определить, что ожидание не требуется — обрабатывая допустимые данные, которые были установлены потоком уведомления.
Это распространенный случай, когда в уведомлениях задействовано несколько потоков. В частности, потоки, обрабатывающие данные, можно рассматривать как потребителей; они потребляют данные, произведенные другими потоками. Нет никакой гарантии, что когда потребитель получает уведомление, оно не было обработано другим потребителем.
Таким образом, когда потребитель просыпается, он не может предполагать, что состояние, которого он ждал, все еще действительно. Оно могло быть действительным в прошлом, но состояние могло измениться после вызова метода notify() и до того, как поток потребителя проснулся. Ожидающие потоки должны предоставлять возможность проверки состояния и возврата в состояние ожидания в случае, если уведомление уже было обработано. Вот почему мы всегда помещаем вызовы метода wait() в цикл.
3.4. Что происходит, когда более одного потока ожидают уведомления? Какие потоки на самом деле получают уведомление при вызове метода notify()?
Это зависит от многих факторов.Спецификация Java не определяет, какой поток получает уведомление. Во время выполнения, какой поток фактически получает уведомление, зависит от нескольких факторов, включая реализацию виртуальной машины Java и проблемы планирования и синхронизации во время выполнения программы.
Даже на однопроцессорной платформе невозможно определить, какой из нескольких потоков получает уведомление.
Как и метод notify(), метод notifyAll() не позволяет нам решать, какой поток получит уведомление: они все получат уведомление. Когда все потоки получат уведомление, можно разработать механизм, позволяющий потокам выбирать между собой, какой поток должен продолжить работу, а какой(ие) поток(и) должен(ы) снова вызвать метод wait().
3.5. Действительно ли метод notifyAll() пробуждает все потоки?
И да, и нет. Все ожидающие потоки просыпаются, но им все равно приходится повторно захватывать блокировку объекта. Поэтому потоки не работают параллельно: каждый из них должен ждать освобождения блокировки объекта. Таким образом, только один поток может работать одновременно, и только после того, как поток, вызвавший метод notifyAll(), освободит свою блокировку.
3.6. Зачем вам может понадобиться будить все потоки, если только один из них будет выполняться?
Причин несколько. Например, может быть более одного условия ожидания. Поскольку мы не можем контролировать, какой поток получит уведомление, вполне возможно, что уведомление разбудит поток, который ожидает совершенно другого условия.
Пробуждая все потоки, мы можем спроектировать программу так, чтобы потоки решали между собой, какой поток должен выполняться следующим. Другой вариант может быть, когда производители генерируют данные, которые могут удовлетворить более одного потребителя. Поскольку может быть сложно определить, сколько потребителей могут быть удовлетворены уведомлением, вариантом является уведомление их всех, позволяя потребителям решать это между собой.