Перейти к основному содержимому

Утилита параллелизма Java с JCTools

· 8 мин. чтения

1. Обзор

В этом руководстве мы познакомимся с библиотекой JCTools (Java Concurrency Tools).

Проще говоря, это предоставляет ряд служебных структур данных, подходящих для работы в многопоточной среде.

2. Неблокирующие алгоритмы

Традиционно многопоточный код, работающий с изменяемым общим состоянием, использует блокировки для обеспечения согласованности данных и публикации (изменения, сделанные одним потоком, видны другому).

Этот подход имеет ряд недостатков:

  • потоки могут быть заблокированы при попытке получить блокировку, не продвигаясь вперед, пока операция другого потока не будет завершена - это эффективно предотвращает параллелизм
  • Чем тяжелее конкуренция за блокировку, тем больше времени JVM тратит на планирование потоков, управление конфликтами и очередями ожидающих потоков и тем меньше реальной работы она выполняет.
  • взаимоблокировки возможны, если задействовано более одной блокировки, и они получены/освобождены в неправильном порядке.
  • возможна опасность инверсии приоритета — поток с высоким приоритетом блокируется в попытке получить блокировку, удерживаемую потоком с низким приоритетом
  • в большинстве случаев используются крупнозернистые блокировки, что сильно вредит параллелизму — мелкозернистая блокировка требует более тщательного проектирования, увеличивает накладные расходы на блокировку и более подвержена ошибкам.

Альтернативой является использование неблокирующего алгоритма, то есть алгоритма, в котором отказ или приостановка любого потока не может вызвать отказ или приостановку другого потока .

Неблокирующий алгоритм является неблокирующим, если хотя бы один из задействованных потоков гарантированно выполняет свою работу в течение произвольного периода времени, т. е. во время обработки не может возникнуть взаимоблокировка.

Кроме того, эти алгоритмы не требуют ожидания, если есть также гарантированный прогресс для каждого потока.

Вот пример неблокирующего стека из превосходной книги Java Concurrency in Practice ; он определяет основное состояние:

public class ConcurrentStack<E> {

AtomicReference<Node<E>> top = new AtomicReference<Node<E>>();

private static class Node <E> {
public E item;
public Node<E> next;

// standard constructor
}
}

А также пару методов API:

public void push(E item){
Node<E> newHead = new Node<E>(item);
Node<E> oldHead;

do {
oldHead = top.get();
newHead.next = oldHead;
} while(!top.compareAndSet(oldHead, newHead));
}

public E pop() {
Node<E> oldHead;
Node<E> newHead;
do {
oldHead = top.get();
if (oldHead == null) {
return null;
}
newHead = oldHead.next;
} while (!top.compareAndSet(oldHead, newHead));

return oldHead.item;
}

Мы видим, что алгоритм использует детализированные инструкции сравнения и замены ( CAS ) и не блокируется (даже если несколько потоков вызывают top.compareAndSet() одновременно, один из них гарантированно будет успешным), но не ждет. бесплатно , так как нет никакой гарантии, что CAS в конечном итоге будет успешным для любого конкретного потока.

3. Зависимость

Во-первых, давайте добавим зависимость JCTools к нашему pom.xml :

<dependency>
<groupId>org.jctools</groupId>
<artifactId>jctools-core</artifactId>
<version>2.1.2</version>
</dependency>

Обратите внимание, что последняя доступная версия доступна на Maven Central .

4. Очереди JCTools

Библиотека предлагает ряд очередей для использования в многопоточной среде, т. е. один или несколько потоков записывают в очередь, а один или несколько потоков читают из нее потокобезопасным способом без блокировок.

Общий интерфейс для всех реализаций Queueorg.jctools.queues.MessagePassingQueue .

4.1. Типы очередей

Все очереди можно разделить на категории по их политикам производителя/потребителя:

  • один производитель, один потребитель — такие классы называются с использованием префикса Spsc , например, SpscArrayQueue
  • один производитель, несколько потребителей — используйте префикс Spmc , например, SpmcArrayQueue
  • несколько производителей, один потребитель — используйте префикс Mpsc , например, MpscArrayQueue
  • несколько производителей, несколько потребителей — используйте префикс Mpmc , например, MpmcArrayQueue

Важно отметить, что внутренние проверки политик отсутствуют, т. е. очередь может работать некорректно в случае неправильного использования .

Например, приведенный ниже тест заполняет очередь одного производителя из двух потоков и проходит успешно, даже если потребитель не гарантирует, что увидит данные от разных производителей:

SpscArrayQueue<Integer> queue = new SpscArrayQueue<>(2);

Thread producer1 = new Thread(() -> queue.offer(1));
producer1.start();
producer1.join();

Thread producer2 = new Thread(() -> queue.offer(2));
producer2.start();
producer2.join();

Set<Integer> fromQueue = new HashSet<>();
Thread consumer = new Thread(() -> queue.drain(fromQueue::add));
consumer.start();
consumer.join();

assertThat(fromQueue).containsOnly(1, 2);

4.2. Реализации очереди

Подводя итог приведенным выше классификациям, вот список очередей JCTools:

  • SpscArrayQueue один производитель, один потребитель, использует массив внутри, ограниченная емкость
  • SpscLinkedQueue один производитель, один потребитель, использует связанный список внутри, свободная емкость
  • SpscChunkedArrayQueue один производитель, один потребитель, начинается с начальной емкости и увеличивается до максимальной емкости
  • SpscGrowableArrayQueue один производитель, один потребитель, начинается с начальной емкости и увеличивается до максимальной емкости. Это тот же контракт, что и SpscChunkedArrayQueue , единственное отличие — внутреннее управление чанками. Рекомендуется использовать SpscChunkedArrayQueue, так как он имеет упрощенную реализацию.
  • SpscUnboundedArrayQueue один производитель, один потребитель, использует массив внутри, несвязанная емкость
  • SpmcArrayQueue один производитель, несколько потребителей, внутреннее использование массива, ограниченная емкость
  • MpscArrayQueue несколько производителей, один потребитель, внутреннее использование массива, ограниченная емкость
  • MpscLinkedQueue несколько производителей, один потребитель, внутреннее использование связанного списка, несвязанная емкость
  • MpmcArrayQueue несколько производителей, несколько потребителей, внутреннее использование массива, ограниченная емкость

4.3. Атомные очереди

Все очереди, упомянутые в предыдущем разделе, используют sun.misc.Unsafe . Однако с появлением Java 9 и JEP-260 этот API по умолчанию становится недоступным.

Таким образом, существуют альтернативные очереди, которые используют java.util.concurrent.atomic.AtomicLongFieldUpdater (общедоступный API, менее производительный) вместо sun.misc.Unsafe .

Они генерируются из указанных выше очередей, а между их именами вставлено слово Atomic , например, SpscChunkedAtomicArrayQueue или MpmcAtomicArrayQueue .

Рекомендуется по возможности использовать «обычные» очереди и прибегать к AtomicQueues только в средах, где sun.misc.Unsafe запрещен/неэффективен, например, HotSpot Java9+ и JRockit.

4.4. Вместимость

Все очереди JCTools также могут иметь максимальную емкость или быть несвязанными. Когда очередь заполнена и ограничена емкостью, она перестает принимать новые элементы.

В следующем примере мы:

  • заполнить очередь
  • убедитесь, что после этого он перестанет принимать новые элементы
  • слейте его и убедитесь, что впоследствии можно добавить больше элементов

Обратите внимание, что несколько операторов кода опущены для удобства чтения. Полную реализацию можно найти на GitHub :

SpscChunkedArrayQueue<Integer> queue = new SpscChunkedArrayQueue<>(8, 16);
CountDownLatch startConsuming = new CountDownLatch(1);
CountDownLatch awakeProducer = new CountDownLatch(1);

Thread producer = new Thread(() -> {
IntStream.range(0, queue.capacity()).forEach(i -> {
assertThat(queue.offer(i)).isTrue();
});
assertThat(queue.offer(queue.capacity())).isFalse();
startConsuming.countDown();
awakeProducer.await();
assertThat(queue.offer(queue.capacity())).isTrue();
});

producer.start();
startConsuming.await();

Set<Integer> fromQueue = new HashSet<>();
queue.drain(fromQueue::add);
awakeProducer.countDown();
producer.join();
queue.drain(fromQueue::add);

assertThat(fromQueue).containsAll(
IntStream.range(0, 17).boxed().collect(toSet()));

5. Другие структуры данных JCTools

JCTools также предлагает несколько структур данных, отличных от Queue.

Все они перечислены ниже:

  • NonBlockingHashMap — альтернатива ConcurrentHashMap без блокировок с лучшими свойствами масштабирования и, как правило, более низкой стоимостью мутации. Он реализован через sun.misc.Unsafe , поэтому не рекомендуется использовать этот класс в среде HotSpot Java9+ или JRockit.
  • NonBlockingHashMapLong похож на NonBlockingHashMap, но использует примитивные длинные ключи
  • NonBlockingHashSet простая оболочка для NonBlockingHashMap ** ** , такая как java.util.Collections.newSetFromMap() в JDK.
  • NonBlockingIdentityHashMap аналогично NonBlockingHashMap, но сравнивает ключи по идентификатору.
  • NonBlockingSetInt многопоточный набор битовых векторов, реализованный в виде массива примитивов long . Неэффективно работает при бесшумном автобоксе

6. Тестирование производительности

Давайте воспользуемся JMH для сравнения производительности очереди JDK ArrayBlockingQueue и очереди JCTools. JMH — это среда микротестирования с открытым исходным кодом от гуру Sun/Oracle JVM, которая защищает нас от неопределенности алгоритмов оптимизации компилятора/jvm). Пожалуйста, не стесняйтесь, чтобы получить более подробную информацию об этом в этой статье .

Обратите внимание, что в приведенном ниже фрагменте кода отсутствует пара операторов, чтобы улучшить читабельность. Пожалуйста, найдите полный исходный код на GitHub:

public class MpmcBenchmark {

@Param({PARAM_UNSAFE, PARAM_AFU, PARAM_JDK})
public volatile String implementation;

public volatile Queue<Long> queue;

@Benchmark
@Group(GROUP_NAME)
@GroupThreads(PRODUCER_THREADS_NUMBER)
public void write(Control control) {
// noinspection StatementWithEmptyBody
while (!control.stopMeasurement && !queue.offer(1L)) {
// intentionally left blank
}
}

@Benchmark
@Group(GROUP_NAME)
@GroupThreads(CONSUMER_THREADS_NUMBER)
public void read(Control control) {
// noinspection StatementWithEmptyBody
while (!control.stopMeasurement && queue.poll() == null) {
// intentionally left blank
}
}
}

Результаты (выдержка для 95-го процентиля, наносекунды на операцию):

MpmcBenchmark.MyGroup:MyGroup·p0.95 MpmcArrayQueue sample 1052.000 ns/op
MpmcBenchmark.MyGroup:MyGroup·p0.95 MpmcAtomicArrayQueue sample 1106.000 ns/op
MpmcBenchmark.MyGroup:MyGroup·p0.95 ArrayBlockingQueue sample 2364.000 ns/op

Мы видим, что MpmcArrayQueue работает чуть лучше, чем MpmcAtomicArrayQueue, а ArrayBlockingQueue медленнее в два раза. ``

7. Недостатки использования JCTools

Использование JCTools имеет существенный недостаток — невозможно обеспечить правильное использование библиотечных классов. Например, рассмотрим ситуацию, когда мы начинаем использовать MpscArrayQueue в нашем большом и зрелом проекте (обратите внимание, что должен быть один потребитель).

К сожалению, поскольку проект большой, есть вероятность, что кто-то допустит ошибку программирования или конфигурации, и теперь очередь будет считываться более чем из одного потока. Система вроде бы работает как и раньше, но теперь есть вероятность, что потребители пропустят некоторые сообщения. Это реальная проблема, которая может оказать большое влияние и которую очень сложно отладить.

В идеале должна быть возможность запуска системы с определенным системным свойством, которое заставляет JCTools обеспечивать политику доступа к потокам. Например, в локальных/тестовых/промежуточных средах (но не в рабочей среде) он может быть включен. К сожалению, JCTools не предоставляет такого свойства.

Еще одно соображение заключается в том, что хотя мы и добились того, что JCTools работает значительно быстрее, чем аналог JDK, это не означает, что наше приложение получает такую же скорость, как мы начинаем использовать собственные реализации очередей. Большинство приложений не обмениваются большим количеством объектов между потоками и в основном связаны с вводом-выводом.

8. Заключение

Теперь у нас есть общее представление о классах утилит, предлагаемых JCTools, и мы увидели, насколько хорошо они работают по сравнению с аналогами JDK при большой нагрузке.

В заключение, стоит использовать библиотеку только в том случае, если мы обмениваемся большим количеством объектов между потоками, и даже тогда необходимо быть очень осторожным, чтобы сохранить политику доступа к потокам.

Как всегда, полный исходный код приведенных выше примеров можно найти на GitHub .