Перейти к основному содержимому

Руководство по ConcurrentMap

· 9 мин. чтения

Задача: Сумма двух

Дано массив целых чисел и целая сумма. Нужно найти индексы двух чисел, сумма которых равна заданной ...

ANDROMEDA

1. Обзор

Карты , естественно, являются одним из самых распространенных стилей коллекции Java.

И, что важно, HashMap не является потокобезопасной реализацией, в то время как Hashtable обеспечивает потокобезопасность за счет синхронизации операций.

Несмотря на то, что Hashtable является потокобезопасным, он не очень эффективен. Другая полностью синхронизированная карта, Collections.synchronizedMap, также не демонстрирует высокой эффективности. Если мы хотим потокобезопасности с высокой пропускной способностью при высоком уровне параллелизма, эти реализации не подходят.

Чтобы решить эту проблему, Java Collections Framework представила ConcurrentMap в Java 1.5 .

Следующие обсуждения основаны на Java 1.8 .

2. Конкурентная карта

ConcurrentMap — это расширение интерфейса Map . Он призван предоставить структуру и рекомендации по решению проблемы согласования пропускной способности с потокобезопасностью.

Переопределяя несколько методов интерфейса по умолчанию, ConcurrentMap дает рекомендации для допустимых реализаций для обеспечения потокобезопасности и согласованности атомарных операций с памятью.

Некоторые реализации по умолчанию переопределены, отключив поддержку нулевого ключа/значения:

  • getOrDefault
  • для каждого
  • заменить все
  • вычислитьифабсент
  • ComputeIfPresent
  • вычислить
  • сливаться

Следующие API также переопределены для поддержки атомарности без реализации интерфейса по умолчанию:

  • поставитьЕслиОтсутствует
  • удалять
  • заменить (ключ, старое значение, новое значение)
  • заменить(ключ, значение)

Остальные действия наследуются напрямую и в основном соответствуют Map .

3. ConcurrentHashMap

ConcurrentHashMap — это готовая к использованию реализация ConcurrentMap .

Для повышения производительности он состоит из массива узлов в виде сегментов таблиц (ранее они были сегментами таблиц до Java 8 ) под капотом и в основном использует операции CAS во время обновления.

Блоки таблицы инициализируются лениво при первой вставке. Каждое ведро можно заблокировать независимо, заблокировав самый первый узел в ведре. Операции чтения не блокируются, а конфликты при обновлении сведены к минимуму.

Требуемое количество сегментов зависит от количества потоков, обращающихся к таблице, так что обновление, выполняемое для каждого сегмента, в большинстве случаев будет не более одного.

До Java 8 количество требуемых «сегментов» зависело от количества потоков, обращающихся к таблице, так что в большинстве случаев выполняемое обновление для каждого сегмента не превышало одного.

Вот почему конструкторы, по сравнению с HashMap , предоставляют дополнительный аргумент concurrencyLevel для управления количеством предполагаемых используемых потоков:

public ConcurrentHashMap(
public ConcurrentHashMap(
int initialCapacity, float loadFactor, int concurrencyLevel)

Два других аргумента: initialCapacity и loadFactor работали точно так же , как и HashMap .

Однако, начиная с Java 8 , конструкторы присутствуют только для обратной совместимости: параметры могут влиять только на начальный размер карты .

3.1. Потокобезопасность

ConcurrentMap гарантирует согласованность памяти при операциях ключ/значение в многопоточной среде.

Действия в потоке до помещения объекта в ConcurrentMap в качестве ключа или значения происходят до действий, следующих за доступом или удалением этого объекта в другом потоке.

Чтобы подтвердить, давайте посмотрим на несовместимый случай с памятью:

@Test
public void givenHashMap_whenSumParallel_thenError() throws Exception {
Map<String, Integer> map = new HashMap<>();
List<Integer> sumList = parallelSum100(map, 100);

assertNotEquals(1, sumList
.stream()
.distinct()
.count());
long wrongResultCount = sumList
.stream()
.filter(num -> num != 100)
.count();

assertTrue(wrongResultCount > 0);
}

private List<Integer> parallelSum100(Map<String, Integer> map,
int executionTimes) throws InterruptedException {
List<Integer> sumList = new ArrayList<>(1000);
for (int i = 0; i < executionTimes; i++) {
map.put("test", 0);
ExecutorService executorService =
Executors.newFixedThreadPool(4);
for (int j = 0; j < 10; j++) {
executorService.execute(() -> {
for (int k = 0; k < 10; k++)
map.computeIfPresent(
"test",
(key, value) -> value + 1
);
});
}
executorService.shutdown();
executorService.awaitTermination(5, TimeUnit.SECONDS);
sumList.add(map.get("test"));
}
return sumList;
}

Для каждого параллельного действия map.computeIfPresent HashMap не обеспечивает согласованного представления того, каким должно быть текущее целочисленное значение, что приводит к противоречивым и нежелательным результатам.

Что касается ConcurrentHashMap , мы можем получить непротиворечивый и правильный результат:

@Test
public void givenConcurrentMap_whenSumParallel_thenCorrect()
throws Exception {
Map<String, Integer> map = new ConcurrentHashMap<>();
List<Integer> sumList = parallelSum100(map, 1000);

assertEquals(1, sumList
.stream()
.distinct()
.count());
long wrongResultCount = sumList
.stream()
.filter(num -> num != 100)
.count();

assertEquals(0, wrongResultCount);
}

3.2. Нулевой ключ/значение

Большинство API , предоставляемых ConcurrentMap , не допускают нулевой ключ или значение, например:

@Test(expected = NullPointerException.class)
public void givenConcurrentHashMap_whenPutWithNullKey_thenThrowsNPE() {
concurrentMap.put(null, new Object());
}

@Test(expected = NullPointerException.class)
public void givenConcurrentHashMap_whenPutNullValue_thenThrowsNPE() {
concurrentMap.put("test", null);
}

Однако для действий calculate* и merge вычисляемое значение может быть null , что указывает на то, что сопоставление ключ-значение удаляется, если оно присутствует, или остается отсутствующим, если оно отсутствовало ранее .

@Test
public void givenKeyPresent_whenComputeRemappingNull_thenMappingRemoved() {
Object oldValue = new Object();
concurrentMap.put("test", oldValue);
concurrentMap.compute("test", (s, o) -> null);

assertNull(concurrentMap.get("test"));
}

3.3. Поддержка потоков

Java 8 также обеспечивает поддержку Stream в ConcurrentHashMap .

В отличие от большинства потоковых методов, массовые (последовательные и параллельные) операции допускают безопасное параллельное изменение. ConcurrentModificationException не будет выброшено, что также относится к его итераторам. Для потоков также добавлено несколько методов forEach* , search и reduce* для поддержки расширенных операций обхода и уменьшения карты.

3.4. Производительность

Под капотом ConcurrentHashMap чем-то похож на HashMap , с доступом к данным и обновлением на основе хеш-таблицы (хотя и более сложной).

И, конечно же, ConcurrentHashMap должен давать гораздо лучшую производительность в большинстве параллельных случаев для извлечения и обновления данных.

Давайте напишем быстрый микротест производительности get и put и сравним его с Hashtable и Collections.synchronizedMap , выполнив обе операции 500 000 раз в 4 потоках.

@Test
public void givenMaps_whenGetPut500KTimes_thenConcurrentMapFaster()
throws Exception {
Map<String, Object> hashtable = new Hashtable<>();
Map<String, Object> synchronizedHashMap =
Collections.synchronizedMap(new HashMap<>());
Map<String, Object> concurrentHashMap = new ConcurrentHashMap<>();

long hashtableAvgRuntime = timeElapseForGetPut(hashtable);
long syncHashMapAvgRuntime =
timeElapseForGetPut(synchronizedHashMap);
long concurrentHashMapAvgRuntime =
timeElapseForGetPut(concurrentHashMap);

assertTrue(hashtableAvgRuntime > concurrentHashMapAvgRuntime);
assertTrue(syncHashMapAvgRuntime > concurrentHashMapAvgRuntime);
}

private long timeElapseForGetPut(Map<String, Object> map)
throws InterruptedException {
ExecutorService executorService =
Executors.newFixedThreadPool(4);
long startTime = System.nanoTime();
for (int i = 0; i < 4; i++) {
executorService.execute(() -> {
for (int j = 0; j < 500_000; j++) {
int value = ThreadLocalRandom
.current()
.nextInt(10000);
String key = String.valueOf(value);
map.put(key, value);
map.get(key);
}
});
}
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.MINUTES);
return (System.nanoTime() - startTime) / 500_000;
}

Имейте в виду, что микротесты рассматривают только один сценарий и не всегда хорошо отражают реальную производительность.

При этом в системе OS X со средней системой разработки мы видим средний результат выборки для 100 последовательных запусков (в наносекундах):

Hashtable: 1142.45
SynchronizedHashMap: 1273.89
ConcurrentHashMap: 230.2

В многопоточной среде, где ожидается, что несколько потоков будут обращаться к общей карте , ConcurrentHashMap явно предпочтительнее.

Однако, когда карта доступна только для одного потока, HashMap может быть лучшим выбором благодаря своей простоте и высокой производительности.

3.5. Подводные камни

Операции извлечения обычно не блокируются в ConcurrentHashMap и могут пересекаться с операциями обновления. Поэтому для лучшей производительности они отражают только результаты самых последних завершенных операций обновления, как указано в официальном Javadoc .

Следует иметь в виду еще несколько фактов:

  • результаты методов агрегатного состояния, включая size , isEmpty и containsValue , обычно полезны только тогда, когда карта не подвергается параллельным обновлениям в других потоках:
@Test
public void givenConcurrentMap_whenUpdatingAndGetSize_thenError()
throws InterruptedException {
Runnable collectMapSizes = () -> {
for (int i = 0; i < MAX_SIZE; i++) {
mapSizes.add(concurrentMap.size());
}
};
Runnable updateMapData = () -> {
for (int i = 0; i < MAX_SIZE; i++) {
concurrentMap.put(String.valueOf(i), i);
}
};
executorService.execute(updateMapData);
executorService.execute(collectMapSizes);
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.MINUTES);

assertNotEquals(MAX_SIZE, mapSizes.get(MAX_SIZE - 1).intValue());
assertEquals(MAX_SIZE, concurrentMap.size());
}

Если одновременные обновления находятся под строгим контролем, совокупный статус все равно будет надежным.

Хотя эти методы совокупного состояния не гарантируют точность в режиме реального времени, их может быть достаточно для целей мониторинга или оценки .

Обратите внимание, что использование size() в ConcurrentHashMap должно быть заменено на mappingCount() , так как последний метод возвращает длинный счет, хотя в глубине души они основаны на одной и той же оценке.

  • hashCode имеет значение : обратите внимание, что использование многих ключей с точно таким же hashCode() — это верный способ замедлить производительность любой хеш-таблицы.

Чтобы уменьшить влияние, когда ключи Comparable , ConcurrentHashMap может использовать порядок сравнения ключей, чтобы помочь разорвать связи. Тем не менее, мы должны избегать использования одного и того же hashCode() насколько это возможно.

  • итераторы предназначены для использования только в одном потоке, поскольку они обеспечивают слабую согласованность, а не обход с быстрым сбоем, и они никогда не вызовут исключение ConcurrentModificationException.
  • начальная емкость таблицы по умолчанию равна 16, и она регулируется указанным уровнем параллелизма:
public ConcurrentHashMap(
int initialCapacity, float loadFactor, int concurrencyLevel) {

//...
if (initialCapacity < concurrencyLevel) {
initialCapacity = concurrencyLevel;
}
//...
}
  • предостережение относительно функций переназначения: хотя мы можем выполнять операции переназначения с помощью предоставленных методов вычисления и слияния* , мы должны делать их быстрыми, короткими и простыми и сосредоточиться на текущем отображении, чтобы избежать неожиданной блокировки.
  • ключи в ConcurrentHashMap не отсортированы, поэтому для случаев, когда требуется упорядочение, ConcurrentSkipListMap является подходящим выбором.

4. ConcurrentNavigableMap

Для случаев, когда требуется порядок ключей, мы можем использовать ConcurrentSkipListMap , параллельную версию TreeMap .

В качестве дополнения к ConcurrentMap , ConcurrentNavigableMap поддерживает общий порядок своих ключей (в порядке возрастания по умолчанию) и может одновременно перемещаться. Методы, возвращающие представления карты, переопределяются для совместимости с параллелизмом:

  • подкарта
  • головаКарта
  • хвостКарта
  • подкарта
  • головаКарта
  • хвостКарта
  • по убываниюКарта

Итераторы представлений keySet() и разделители улучшены за счет совместимости со слабой памятью:

  • navigableKeySet
  • набор ключей
  • по убываниюKeySet

5. ConcurrentSkipListMap

Ранее мы рассмотрели интерфейс NavigableMap и его реализацию TreeMap . ConcurrentSkipListMap можно рассматривать как масштабируемую параллельную версию TreeMap .

На практике в Java нет параллельной реализации красно-черного дерева. Параллельный вариант SkipLists реализован в ConcurrentSkipListMap , предоставляя ожидаемую среднюю стоимость log(n) времени для операций containsKey , get , put и remove и их вариантов.

В дополнение к функциям TreeMap операции вставки, удаления, обновления и доступа ключа гарантируются потокобезопасностью. Вот сравнение с TreeMap при одновременной навигации:

@Test
public void givenSkipListMap_whenNavConcurrently_thenCountCorrect()
throws InterruptedException {
NavigableMap<Integer, Integer> skipListMap
= new ConcurrentSkipListMap<>();
int count = countMapElementByPollingFirstEntry(skipListMap, 10000, 4);

assertEquals(10000 * 4, count);
}

@Test
public void givenTreeMap_whenNavConcurrently_thenCountError()
throws InterruptedException {
NavigableMap<Integer, Integer> treeMap = new TreeMap<>();
int count = countMapElementByPollingFirstEntry(treeMap, 10000, 4);

assertNotEquals(10000 * 4, count);
}

private int countMapElementByPollingFirstEntry(
NavigableMap<Integer, Integer> navigableMap,
int elementCount,
int concurrencyLevel) throws InterruptedException {

for (int i = 0; i < elementCount * concurrencyLevel; i++) {
navigableMap.put(i, i);
}

AtomicInteger counter = new AtomicInteger(0);
ExecutorService executorService
= Executors.newFixedThreadPool(concurrencyLevel);
for (int j = 0; j < concurrencyLevel; j++) {
executorService.execute(() -> {
for (int i = 0; i < elementCount; i++) {
if (navigableMap.pollFirstEntry() != null) {
counter.incrementAndGet();
}
}
});
}
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.MINUTES);
return counter.get();
}

Полное объяснение проблем производительности за кулисами выходит за рамки этой статьи. Подробности можно найти в Javadoc ConcurrentSkipListMap , который находится в папке java/util/concurrent в файле src.zip .

6. Заключение

В этой статье мы в основном представили интерфейс ConcurrentMap и функции ConcurrentHashMap , а также рассказали о том, что ConcurrentNavigableMap требует упорядочения ключей.

Полный исходный код всех примеров, использованных в этой статье, можно найти в проекте GitHub .