Перейти к основному содержимому

Руководство по Java SynchronousQueue

· 4 мин. чтения

1. Обзор

В этой статье мы рассмотрим SynchronousQueue из пакета java.util.concurrent .

Проще говоря, эта реализация позволяет нам обмениваться информацией между потоками потокобезопасным способом.

2. Обзор API

SynchronousQueue поддерживает только две операции: take() и put(), и обе они блокируют .

Например, когда мы хотим добавить элемент в очередь, нам нужно вызвать метод put() . Этот метод будет блокироваться до тех пор, пока какой-либо другой поток не вызовет метод take() , сигнализируя о том, что он готов принять элемент.

Хотя SynchronousQueue имеет интерфейс очереди, мы должны рассматривать его как точку обмена для одного элемента между двумя потоками, в которой один поток передает элемент, а другой поток принимает этот элемент.

3. Реализация передачи обслуживания с использованием общей переменной

Чтобы понять, почему SynchronousQueue может быть настолько полезным, мы реализуем логику, используя общую переменную между двумя потоками, а затем мы перепишем эту логику, используя SynchronousQueue , что сделает наш код намного проще и читабельнее.

Допустим, у нас есть два потока — производитель и потребитель — и когда производитель устанавливает значение общей переменной, мы хотим сообщить об этом факте потоку-потребителю. Затем поток-потребитель извлечет значение из общей переменной.

Мы будем использовать CountDownLatch для координации этих двух потоков, чтобы предотвратить ситуацию, когда потребитель обращается к значению общей переменной, которая еще не была установлена.

Мы определим переменную sharedState и CountDownLatch , которые будут использоваться для координации обработки:

ExecutorService executor = Executors.newFixedThreadPool(2);
AtomicInteger sharedState = new AtomicInteger();
CountDownLatch countDownLatch = new CountDownLatch(1);

Производитель сохранит случайное целое число в переменной sharedState и выполнит метод countDown() для countDownLatch, сигнализируя потребителю, что он может получить значение из sharedState:

Runnable producer = () -> {
Integer producedElement = ThreadLocalRandom
.current()
.nextInt();
sharedState.set(producedElement);
countDownLatch.countDown();
};

Потребитель будет ждать countDownLatch, используя метод await() . Когда производитель сигнализирует, что переменная была установлена, потребитель получит ее из sharedState:

Runnable consumer = () -> {
try {
countDownLatch.await();
Integer consumedElement = sharedState.get();
} catch (InterruptedException ex) {
ex.printStackTrace();
}
};

И последнее, но не менее важное: давайте запустим нашу программу:

executor.execute(producer);
executor.execute(consumer);

executor.awaitTermination(500, TimeUnit.MILLISECONDS);
executor.shutdown();
assertEquals(countDownLatch.getCount(), 0);

Он выдаст следующий результат:

Saving an element: -1507375353 to the exchange point
consumed an element: -1507375353 from the exchange point

Мы видим, что это очень много кода для реализации такой простой функциональности, как обмен элементом между двумя потоками. В следующем разделе мы постараемся сделать его лучше.

4. Реализация передачи обслуживания с использованием SynchronousQueue

Давайте теперь реализуем ту же функциональность, что и в предыдущем разделе, но с SynchronousQueue. Это имеет двойной эффект, потому что мы можем использовать его для обмена состояниями между потоками и для координации этого действия, так что нам не нужно использовать ничего, кроме SynchronousQueue.

Во-первых, мы определим очередь:

ExecutorService executor = Executors.newFixedThreadPool(2);
SynchronousQueue<Integer> queue = new SynchronousQueue<>();

Производитель вызовет метод put() , который будет блокироваться до тех пор, пока какой-либо другой поток не возьмет элемент из очереди:

Runnable producer = () -> {
Integer producedElement = ThreadLocalRandom
.current()
.nextInt();
try {
queue.put(producedElement);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
};

Потребитель просто извлечет этот элемент с помощью метода take() :

Runnable consumer = () -> {
try {
Integer consumedElement = queue.take();
} catch (InterruptedException ex) {
ex.printStackTrace();
}
};

Далее запускаем нашу программу:

executor.execute(producer);
executor.execute(consumer);

executor.awaitTermination(500, TimeUnit.MILLISECONDS);
executor.shutdown();
assertEquals(queue.size(), 0);

Он выдаст следующий результат:

Saving an element: 339626897 to the exchange point
consumed an element: 339626897 from the exchange point

Мы видим, что SynchronousQueue используется в качестве точки обмена между потоками, что намного лучше и понятнее, чем в предыдущем примере, в котором использовалось общее состояние вместе с CountDownLatch.

5. Вывод

В этом кратком руководстве мы рассмотрели конструкцию SynchronousQueue . Мы создали программу, которая обменивается данными между двумя потоками, используя общее состояние, а затем переписали эту программу, чтобы использовать конструкцию SynchronousQueue . Это служит точкой обмена, которая координирует поток производителя и потребителя.

Реализацию всех этих примеров и фрагментов кода можно найти в проекте GitHub — это проект Maven, поэтому его должно быть легко импортировать и запускать как есть.