Java DelayQueue — это неограниченная блокирующая очередь задержанных элементов, в которой элемент может быть взят только после истечения его задержки. Класс DelayQueue является частью пакета java.util.concurrent.
1. Что такое задержанный элемент?
- Элемент будет считаться задержанным, если он реализует интерфейс java.util.concurrent.Delayed, а его метод getDelay() возвращает нулевое или отрицательное значение, указывающее на то, что задержка уже истекла.
- Чтобы сделать вещи более понятными, мы можем считать, что каждый элемент хранит свою дату/время активации. Как только эта временная метка достигнута, элемент готов к извлечению из очереди. Метод getDelay() возвращает время до активации элемента.
- Обратите внимание, что реализация интерфейса Delayed должна определять метод compareTo(), который обеспечивает порядок, согласованный с его методом getDelay().
- Метод compareTo(Delayed o) обычно не возвращает фактическую временную метку. Он возвращает значение меньше нуля, если объект, выполняющий метод, имеет задержку меньше, чем объект, переданный в качестве параметра, в противном случае — положительное значение больше нуля. Он вернет ноль, если оба объекта имеют одинаковую задержку.
public interface Delayed extends Comparable<Delayed>{/*** Returns the remaining delay associated with this object, in the* given time unit.** @param unit the time unit** @return the remaining delay; zero or negative values indicate* that the delay has already elapsed*/long getDelay(TimeUnit unit);}
2. Создание отложенного элемента
В данном примере мы создали объект события. Каждое событие будет иметь идентификатор события, имя и дату активации(после этого времени оно будет обработано).
class DelayedEvent implements Delayed{private long id;private String name;private LocalDateTime activationDateTime;//Constructor and getter methods@Overridepublic int compareTo(Delayed that){long result = this.getDelay(TimeUnit.NANOSECONDS)- that.getDelay(TimeUnit.NANOSECONDS);if(result < 0) {return -1;} else if(result > 0) {return 1;}return 0;}@Overridepublic long getDelay(TimeUnit unit) {LocalDateTime now = LocalDateTime.now();long diff = now.until(activationDateTime, ChronoUnit.MILLIS);return unit.convert(diff, TimeUnit.MILLISECONDS);}@Overridepublic String toString() {return "DelayedEvent [id=" + id + ", name=" + name + ", activationDateTime=" + activationDateTime + "]";}}
3. Что такое DelayQueue?
- DelayQueue — это неограниченная блокирующая очередь задержанных элементов. Когда потребитель элемента хочет взять элемент из очереди, он может сделать это только после того, как задержка для этого конкретного элемента истечет.
- DelayQueue — это специализированная PriorityQueue, которая упорядочивает элементы на основе времени задержки.
- Головой очереди является элемент, задержка которого истекла раньше всего.
- Если нет элемента, задержка которого еще не истекла, в очереди нет головного элемента, и poll() вернет null.
- Несмотря на то, что неистекшие элементы нельзя удалить с помощью take() или poll(), в остальном они обрабатываются как обычные элементы в очереди, то есть метод size() возвращает количество как просроченных, так и неистекших элементов.
- Эта очередь не допускает нулевых элементов, поскольку их задержка не может быть определена.
4. Пример Java DelayQueue
Для демонстрации DelayQueue я переписал задачу производитель-потребитель с использованием ScheduledExecutorService. В этой программе поток-производитель добавляет события в DelayQueue. Поток-потребитель вызывается периодически и забирает все элементы, у которых истекло время активации, т. е. в прошлом.
4.1. Организатор мероприятия
class DelayedEventProducer implements Runnable{private final DelayQueue<DelayedEvent> queue;private AtomicInteger counter;public DelayedEventProducer(DelayQueue<DelayedEvent> queue, AtomicInteger counter) {this.queue = queue;this.counter = counter;}@Overridepublic void run(){LocalDateTime now = LocalDateTime.now();int id = counter.incrementAndGet();DelayedEvent event = new DelayedEvent(id, "Task-" + id, now);System.out.println("Added to queue :: " + event);queue.add(event);}}
4.2 Потребитель событий
class DelayedEventConsumer implements Runnable{private final DelayQueue<DelayedEvent> queue;public DelayedEventConsumer(DelayQueue<DelayedEvent> queue) {this.queue = queue;}@Overridepublic void run(){List<DelayedEvent> events = new ArrayList<DelayedEvent>();queue.drainTo(events);System.out.println("\nEvent processing start **********\n");events.stream().forEach(System.out::println);System.out.println("\nEvent processing end **********\n");}}
4.3 Основная программа
import java.time.LocalDateTime;import java.time.temporal.ChronoUnit;import java.util.ArrayList;import java.util.List;import java.util.concurrent.DelayQueue;import java.util.concurrent.Delayed;import java.util.concurrent.ExecutionException;import java.util.concurrent.Executors;import java.util.concurrent.ScheduledExecutorService;import java.util.concurrent.TimeUnit;import java.util.concurrent.atomic.AtomicInteger;public class Main{public static void main(String[] args) throws InterruptedException, ExecutionException{DelayQueue<DelayedEvent> queue = new DelayQueue<>();AtomicInteger counter = new AtomicInteger();ScheduledExecutorService ses = Executors.newScheduledThreadPool(2);ses.scheduleAtFixedRate(new DelayedEventProducer(queue, counter), 1, 2, TimeUnit.SECONDS);ses.scheduleAtFixedRate(new DelayedEventConsumer(queue), 1, 10, TimeUnit.SECONDS);}}
Вывод программы.
Added to queue :: DelayedEvent [id=1, name=Task-1, activationDateTime=2019-05-27T15:56:33.689]Added to queue :: DelayedEvent [id=2, name=Task-2, activationDateTime=2019-05-27T15:56:35.619]Added to queue :: DelayedEvent [id=3, name=Task-3, activationDateTime=2019-05-27T15:56:37.619]Added to queue :: DelayedEvent [id=4, name=Task-4, activationDateTime=2019-05-27T15:56:39.619]Added to queue :: DelayedEvent [id=5, name=Task-5, activationDateTime=2019-05-27T15:56:41.619]Added to queue :: DelayedEvent [id=6, name=Task-6, activationDateTime=2019-05-27T15:56:43.619]Event processing start **********DelayedEvent [id=1, name=Task-1, activationDateTime=2019-05-27T15:56:33.689]DelayedEvent [id=2, name=Task-2, activationDateTime=2019-05-27T15:56:35.619]DelayedEvent [id=3, name=Task-3, activationDateTime=2019-05-27T15:56:37.619]DelayedEvent [id=4, name=Task-4, activationDateTime=2019-05-27T15:56:39.619]DelayedEvent [id=5, name=Task-5, activationDateTime=2019-05-27T15:56:41.619]DelayedEvent [id=6, name=Task-6, activationDateTime=2019-05-27T15:56:43.619]Event processing end **********Added to queue :: DelayedEvent [id=7, name=Task-7, activationDateTime=2019-05-27T15:56:45.620]Added to queue :: DelayedEvent [id=8, name=Task-8, activationDateTime=2019-05-27T15:56:47.618]Added to queue :: DelayedEvent [id=9, name=Task-9, activationDateTime=2019-05-27T15:56:49.620]Added to queue :: DelayedEvent [id=10, name=Task-10, activationDateTime=2019-05-27T15:56:51.618]Added to queue :: DelayedEvent [id=11, name=Task-11, activationDateTime=2019-05-27T15:56:53.619]Event processing start **********DelayedEvent [id=7, name=Task-7, activationDateTime=2019-05-27T15:56:45.620]DelayedEvent [id=8, name=Task-8, activationDateTime=2019-05-27T15:56:47.618]DelayedEvent [id=9, name=Task-9, activationDateTime=2019-05-27T15:56:49.620]DelayedEvent [id=10, name=Task-10, activationDateTime=2019-05-27T15:56:51.618]DelayedEvent [id=11, name=Task-11, activationDateTime=2019-05-27T15:56:53.619]Event processing end **********
Очевидно, что программа работает так, как задумано.
Задавайте мне вопросы в разделе комментариев, связанные с этим примером программы Java DelayQueue.