1. Обзор
В этом руководстве мы рассмотрим java.util.concurrent.Exchanger<T>.
Это работает как общая точка для двух потоков в Java для обмена объектами между ними.
2. Введение в обменник
Класс Exchanger
в Java можно использовать для совместного использования объектов между двумя потоками типа T.
Класс предоставляет только один перегруженный метод exchange(T t)
.
При вызове exchange
ожидает, пока другой поток в паре также не вызовет его. В этот момент второй поток обнаруживает, что первый поток ожидает с его объектом. Поток обменивается объектами, которые они держат, и сигнализирует об обмене, и теперь они могут вернуться.
Давайте рассмотрим пример, чтобы понять обмен сообщениями между двумя потоками с помощью Exchanger
:
@Test
public void givenThreads_whenMessageExchanged_thenCorrect() {
Exchanger<String> exchanger = new Exchanger<>();
Runnable taskA = () -> {
try {
String message = exchanger.exchange("from A");
assertEquals("from B", message);
} catch (InterruptedException e) {
Thread.currentThread.interrupt();
throw new RuntimeException(e);
}
};
Runnable taskB = () -> {
try {
String message = exchanger.exchange("from B");
assertEquals("from A", message);
} catch (InterruptedException e) {
Thread.currentThread.interrupt();
throw new RuntimeException(e);
}
};
CompletableFuture.allOf(
runAsync(taskA), runAsync(taskB)).join();
}
Здесь у нас есть два потока, обменивающихся сообщениями между собой, используя общий обменник. Давайте посмотрим на пример, где мы обмениваем объект из основного потока с новым потоком:
@Test
public void givenThread_WhenExchangedMessage_thenCorrect() throws InterruptedException {
Exchanger<String> exchanger = new Exchanger<>();
Runnable runner = () -> {
try {
String message = exchanger.exchange("from runner");
assertEquals("to runner", message);
} catch (InterruptedException e) {
Thread.currentThread.interrupt();
throw new RuntimeException(e);
}
};
CompletableFuture<Void> result
= CompletableFuture.runAsync(runner);
String msg = exchanger.exchange("to runner");
assertEquals("from runner", msg);
result.join();
}
Обратите внимание, что нам нужно сначала запустить поток
исполнителя, а затем вызвать exchange()
в основном потоке.
Кроме того, обратите внимание, что время ожидания вызова первого потока может истечь, если второй поток не достигает точки обмена вовремя. Время ожидания первого потока можно контролировать с помощью перегруженного обмена (T t, long timeout, TimeUnit timeUnit).
3. Нет обмена данными с ГХ
Exchanger
можно использовать для создания конвейерных шаблонов с передачей данных из одного потока в другой. В этом разделе мы создадим простой стек потоков, непрерывно передающих данные друг другу в виде конвейера.
@Test
public void givenData_whenPassedThrough_thenCorrect() throws InterruptedException {
Exchanger<Queue<String>> readerExchanger = new Exchanger<>();
Exchanger<Queue<String>> writerExchanger = new Exchanger<>();
Runnable reader = () -> {
Queue<String> readerBuffer = new ConcurrentLinkedQueue<>();
while (true) {
readerBuffer.add(UUID.randomUUID().toString());
if (readerBuffer.size() >= BUFFER_SIZE) {
readerBuffer = readerExchanger.exchange(readerBuffer);
}
}
};
Runnable processor = () -> {
Queue<String> processorBuffer = new ConcurrentLinkedQueue<>();
Queue<String> writerBuffer = new ConcurrentLinkedQueue<>();
processorBuffer = readerExchanger.exchange(processorBuffer);
while (true) {
writerBuffer.add(processorBuffer.poll());
if (processorBuffer.isEmpty()) {
processorBuffer = readerExchanger.exchange(processorBuffer);
writerBuffer = writerExchanger.exchange(writerBuffer);
}
}
};
Runnable writer = () -> {
Queue<String> writerBuffer = new ConcurrentLinkedQueue<>();
writerBuffer = writerExchanger.exchange(writerBuffer);
while (true) {
System.out.println(writerBuffer.poll());
if (writerBuffer.isEmpty()) {
writerBuffer = writerExchanger.exchange(writerBuffer);
}
}
};
CompletableFuture.allOf(
runAsync(reader),
runAsync(processor),
runAsync(writer)).join();
}
Здесь у нас есть три потока: считыватель
, процессор
и писатель
. Вместе они работают как единый конвейер, обмениваясь данными между собой.
ReadExchanger совместно используется потоком
чтения
и процессорным
потоком, а WriteExchanger
совместно используется процессором
и потоком записи .
Обратите внимание, что приведенный здесь пример предназначен только для демонстрации. Мы должны быть осторожны при создании бесконечных циклов с помощью while(true)
. Также, чтобы код оставался читабельным, мы убрали обработку некоторых исключений.
Этот шаблон обмена данными при повторном использовании буфера позволяет уменьшить сборку мусора. Метод обмена возвращает те же экземпляры очереди, поэтому для этих объектов не будет сборщика мусора. В отличие от любой очереди блокировки, обменник не создает никаких узлов или объектов для хранения и обмена данными.
Создание такого конвейера похоже на паттерн Disrupter, с ключевым отличием: паттерн Disrupter поддерживает несколько производителей и потребителей, а обменник может использоваться между парой потребителей и производителей.
4. Вывод
Итак, мы узнали, что такое Exchanger<T>
в Java, как он работает, и увидели, как использовать класс Exchanger .
Кроме того, мы создали конвейер и продемонстрировали обмен данными между потоками без сборщика мусора.
Как всегда, код доступен на GitHub .