Перейти к основному содержимому

Руководство по ConcurrentSkipListMap

· 4 мин. чтения

Задача: Медиана двух отсортированных массивов

Даны два отсортированных массива размерами n и m. Найдите медиану слияния этих двух массивов.
Временная сложность решения должна быть O(log(m + n)) ...

ANDROMEDA

1. Обзор

В этой быстрой статье мы рассмотрим класс ConcurrentSkipListMap из пакета java.util.concurrent .

Эта конструкция позволяет нам создавать потокобезопасную логику без блокировок. Это идеально для проблем, когда мы хотим сделать неизменяемый снимок данных, пока другие потоки все еще вставляют данные в карту.

С помощью этой конструкции мы будем решать задачу сортировки потока событий и получения моментального снимка событий, произошедших за последние 60 секунд .

2. Логика потоковой сортировки

Допустим, у нас есть поток событий, который постоянно поступает из нескольких потоков. Нам нужно иметь возможность брать события за последние 60 секунд, а также события старше 60 секунд.

Во-первых, давайте определим структуру наших данных события:

public class Event {
private ZonedDateTime eventTime;
private String content;

// standard constructors/getters
}

Мы хотим, чтобы наши события сортировались с помощью поля eventTime . Чтобы добиться этого с помощью ConcurrentSkipListMap, нам нужно передать Comparator в его конструктор при создании его экземпляра:

ConcurrentSkipListMap<ZonedDateTime, String> events
= new ConcurrentSkipListMap<>(
Comparator.comparingLong(v -> v.toInstant().toEpochMilli()));

Мы будем сравнивать все поступившие события, используя их метки времени. Мы используем метод compareLong () и передаем функцию извлечения, которая может получить длинную метку времени из ZonedDateTime.

Когда наши события приходят, нам нужно только добавить их на карту с помощью метода put() . Обратите внимание, что этот метод не требует какой-либо явной синхронизации:

public void acceptEvent(Event event) {
events.put(event.getEventTime(), event.getContent());
}

ConcurrentSkipListMap будет обрабатывать сортировку этих событий с помощью компаратора , который был передан ему в конструкторе.

Наиболее заметные плюсы ConcurrentSkipListMap — это методы, которые могут сделать неизменяемый снимок своих данных без блокировок. Чтобы получить все события, которые произошли за последнюю минуту, мы можем использовать метод tailMap() и передать время, из которого мы хотим получить элементы:

public ConcurrentNavigableMap<ZonedDateTime, String> getEventsFromLastMinute() {
return events.tailMap(ZonedDateTime.now().minusMinutes(1));
}

Он вернет все события за последнюю минуту. Это будет неизменяемый моментальный снимок, и, что наиболее важно, другие потоки записи могут добавлять новые события в ConcurrentSkipListMap без необходимости выполнять явную блокировку.

Теперь мы можем получить все события, которые произошли позже этой минуты, используя метод headMap() :

public ConcurrentNavigableMap<ZonedDateTime, String> getEventsOlderThatOneMinute() {
return events.headMap(ZonedDateTime.now().minusMinutes(1));
}

Это вернет неизменяемый снимок всех событий старше одной минуты. Все вышеперечисленные методы принадлежат классу EventWindowSort , который мы будем использовать в следующем разделе.

3. Тестирование логики потока сортировки

После того, как мы реализовали нашу логику сортировки с помощью ConcurrentSkipListMap, теперь мы можем протестировать ее, создав два потока записи, каждый из которых будет отправлять по сто событий:

ExecutorService executorService = Executors.newFixedThreadPool(3);
EventWindowSort eventWindowSort = new EventWindowSort();
int numberOfThreads = 2;

Runnable producer = () -> IntStream
.rangeClosed(0, 100)
.forEach(index -> eventWindowSort.acceptEvent(
new Event(ZonedDateTime.now().minusSeconds(index), UUID.randomUUID().toString()))
);

for (int i = 0; i < numberOfThreads; i++) {
executorService.execute(producer);
}

Каждый поток вызывает метод acceptEvent() , отправляя события, которые имеют eventTime с настоящего момента до «сейчас минус сто секунд».

Тем временем мы можем вызвать метод getEventsFromLastMinute() , который вернет моментальный снимок событий, произошедших в течение одной минуты:

ConcurrentNavigableMap<ZonedDateTime, String> eventsFromLastMinute 
= eventWindowSort.getEventsFromLastMinute();

Количество событий в eventsFromLastMinute будет меняться при каждом запуске теста в зависимости от скорости, с которой потоки-производители будут отправлять события в EventWindowSort. Мы можем утверждать, что в возвращенном снимке нет ни одного события старше одной минуты:

long eventsOlderThanOneMinute = eventsFromLastMinute
.entrySet()
.stream()
.filter(e -> e.getKey().isBefore(ZonedDateTime.now().minusMinutes(1)))
.count();

assertEquals(eventsOlderThanOneMinute, 0);

И что в моментальном снимке больше нуля событий в пределах одной минуты:

long eventYoungerThanOneMinute = eventsFromLastMinute
.entrySet()
.stream()
.filter(e -> e.getKey().isAfter(ZonedDateTime.now().minusMinutes(1)))
.count();

assertTrue(eventYoungerThanOneMinute > 0);

Наш getEventsFromLastMinute() использует tailMap() внизу.

Теперь давайте протестируем метод getEventsOlderThatOneMinute() , использующий метод headMap() из ConcurrentSkipListMap:

ConcurrentNavigableMap<ZonedDateTime, String> eventsFromLastMinute 
= eventWindowSort.getEventsOlderThatOneMinute();

На этот раз мы получаем снимок событий старше одной минуты. Можно утверждать, что таких событий больше нуля:

long eventsOlderThanOneMinute = eventsFromLastMinute
.entrySet()
.stream()
.filter(e -> e.getKey().isBefore(ZonedDateTime.now().minusMinutes(1)))
.count();

assertTrue(eventsOlderThanOneMinute > 0);

И далее, что нет ни одного события, происходящего в последнюю минуту:

long eventYoungerThanOneMinute = eventsFromLastMinute
.entrySet()
.stream()
.filter(e -> e.getKey().isAfter(ZonedDateTime.now().minusMinutes(1)))
.count();

assertEquals(eventYoungerThanOneMinute, 0);

Самое важное, что следует отметить, это то, что мы можем сделать снимок данных, в то время как другие потоки все еще добавляют новые значения в ConcurrentSkipListMap.

4. Вывод

В этом кратком руководстве мы рассмотрели основы ConcurrentSkipListMap , а также несколько практических примеров .

Мы использовали высокую производительность ConcurrentSkipListMap для реализации неблокирующего алгоритма, который может предоставить нам неизменный снимок данных, даже если несколько потоков одновременно обновляют карту.

Реализацию всех этих примеров и фрагментов кода можно найти в проекте GitHub ; это проект Maven, поэтому его легко импортировать и запускать как есть.