1. Обзор
В этой быстрой статье мы рассмотрим класс ConcurrentSkipListMap
из пакета java.util.concurrent
.
Эта конструкция позволяет нам создавать потокобезопасную логику без блокировок. Это идеально для проблем, когда мы хотим сделать неизменяемый снимок данных, пока другие потоки все еще вставляют данные в карту.
С помощью этой конструкции мы будем решать задачу сортировки потока событий и получения моментального снимка событий, произошедших за последние 60 секунд .
2. Логика потоковой сортировки
Допустим, у нас есть поток событий, который постоянно поступает из нескольких потоков. Нам нужно иметь возможность брать события за последние 60 секунд, а также события старше 60 секунд.
Во-первых, давайте определим структуру наших данных события:
public class Event {
private ZonedDateTime eventTime;
private String content;
// standard constructors/getters
}
Мы хотим, чтобы наши события сортировались с помощью поля eventTime
. Чтобы добиться этого с помощью ConcurrentSkipListMap,
нам нужно передать Comparator
в его конструктор при создании его экземпляра:
ConcurrentSkipListMap<ZonedDateTime, String> events
= new ConcurrentSkipListMap<>(
Comparator.comparingLong(v -> v.toInstant().toEpochMilli()));
Мы будем сравнивать все поступившие события, используя их метки времени. Мы используем метод compareLong ()
и передаем функцию извлечения, которая может получить длинную
метку времени из ZonedDateTime.
Когда наши события приходят, нам нужно только добавить их на карту с помощью метода put()
. Обратите внимание, что этот метод не требует какой-либо явной синхронизации:
public void acceptEvent(Event event) {
events.put(event.getEventTime(), event.getContent());
}
ConcurrentSkipListMap будет обрабатывать
сортировку этих событий с помощью компаратора
, который был передан ему в конструкторе.
Наиболее заметные плюсы ConcurrentSkipListMap
— это методы, которые могут сделать неизменяемый снимок своих данных без блокировок. Чтобы получить все события, которые произошли за последнюю минуту, мы можем использовать метод tailMap()
и передать время, из которого мы хотим получить элементы:
public ConcurrentNavigableMap<ZonedDateTime, String> getEventsFromLastMinute() {
return events.tailMap(ZonedDateTime.now().minusMinutes(1));
}
Он вернет все события за последнюю минуту. Это будет неизменяемый моментальный снимок, и, что наиболее важно, другие потоки записи могут добавлять новые события в ConcurrentSkipListMap
без необходимости выполнять явную блокировку.
Теперь мы можем получить все события, которые произошли позже этой минуты, используя метод headMap()
:
public ConcurrentNavigableMap<ZonedDateTime, String> getEventsOlderThatOneMinute() {
return events.headMap(ZonedDateTime.now().minusMinutes(1));
}
Это вернет неизменяемый снимок всех событий старше одной минуты. Все вышеперечисленные методы принадлежат классу EventWindowSort
, который мы будем использовать в следующем разделе.
3. Тестирование логики потока сортировки
После того, как мы реализовали нашу логику сортировки с помощью ConcurrentSkipListMap,
теперь мы можем протестировать ее, создав два потока записи, каждый из которых будет отправлять по сто событий:
ExecutorService executorService = Executors.newFixedThreadPool(3);
EventWindowSort eventWindowSort = new EventWindowSort();
int numberOfThreads = 2;
Runnable producer = () -> IntStream
.rangeClosed(0, 100)
.forEach(index -> eventWindowSort.acceptEvent(
new Event(ZonedDateTime.now().minusSeconds(index), UUID.randomUUID().toString()))
);
for (int i = 0; i < numberOfThreads; i++) {
executorService.execute(producer);
}
Каждый поток вызывает метод acceptEvent()
, отправляя события, которые имеют eventTime
с настоящего момента до «сейчас минус сто секунд».
Тем временем мы можем вызвать метод getEventsFromLastMinute()
, который вернет моментальный снимок событий, произошедших в течение одной минуты:
ConcurrentNavigableMap<ZonedDateTime, String> eventsFromLastMinute
= eventWindowSort.getEventsFromLastMinute();
Количество событий в eventsFromLastMinute
будет меняться при каждом запуске теста в зависимости от скорости, с которой потоки-производители будут отправлять события в EventWindowSort.
Мы можем утверждать, что в возвращенном снимке нет ни одного события старше одной минуты:
long eventsOlderThanOneMinute = eventsFromLastMinute
.entrySet()
.stream()
.filter(e -> e.getKey().isBefore(ZonedDateTime.now().minusMinutes(1)))
.count();
assertEquals(eventsOlderThanOneMinute, 0);
И что в моментальном снимке больше нуля событий в пределах одной минуты:
long eventYoungerThanOneMinute = eventsFromLastMinute
.entrySet()
.stream()
.filter(e -> e.getKey().isAfter(ZonedDateTime.now().minusMinutes(1)))
.count();
assertTrue(eventYoungerThanOneMinute > 0);
Наш getEventsFromLastMinute()
использует tailMap()
внизу.
Теперь давайте протестируем метод getEventsOlderThatOneMinute()
, использующий метод headMap()
из ConcurrentSkipListMap:
ConcurrentNavigableMap<ZonedDateTime, String> eventsFromLastMinute
= eventWindowSort.getEventsOlderThatOneMinute();
На этот раз мы получаем снимок событий старше одной минуты. Можно утверждать, что таких событий больше нуля:
long eventsOlderThanOneMinute = eventsFromLastMinute
.entrySet()
.stream()
.filter(e -> e.getKey().isBefore(ZonedDateTime.now().minusMinutes(1)))
.count();
assertTrue(eventsOlderThanOneMinute > 0);
И далее, что нет ни одного события, происходящего в последнюю минуту:
long eventYoungerThanOneMinute = eventsFromLastMinute
.entrySet()
.stream()
.filter(e -> e.getKey().isAfter(ZonedDateTime.now().minusMinutes(1)))
.count();
assertEquals(eventYoungerThanOneMinute, 0);
Самое важное, что следует отметить, это то, что мы можем сделать снимок данных, в то время как другие потоки все еще добавляют новые значения в ConcurrentSkipListMap.
4. Вывод
В этом кратком руководстве мы рассмотрели основы ConcurrentSkipListMap
, а также несколько практических примеров .
Мы использовали высокую производительность ConcurrentSkipListMap
для реализации неблокирующего алгоритма, который может предоставить нам неизменный снимок данных, даже если несколько потоков одновременно обновляют карту.
Реализацию всех этих примеров и фрагментов кода можно найти в проекте GitHub ; это проект Maven, поэтому его легко импортировать и запускать как есть.