Перейти к основному содержимому

Обзор java.util.concurrent

· 9 мин. чтения

1. Обзор

Пакет java.util.concurrent предоставляет инструменты для создания параллельных приложений.

В этой статье мы сделаем обзор всего пакета.

2. Основные компоненты

java.util.concurrent содержит слишком много функций, чтобы обсуждать их в одной статье . В этой статье мы в основном сосредоточимся на некоторых из наиболее полезных утилит из этого пакета, таких как:

  • Исполнитель
  • ИсполнительСервис
  • ЗапланированныйExecutorService
  • Будущее
  • Защелка
  • циклическийбарьер
  • семафор
  • ThreadFactory
  • Блокировка очереди
  • DelayQueue
  • Замки
  • Фазер

Здесь вы также можете найти множество статей, посвященных отдельным классам.

2.1. Исполнитель

Executor — это интерфейс, представляющий объект, выполняющий поставленные задачи.

Это зависит от конкретной реализации (откуда инициируется вызов), должна ли задача выполняться в новом или текущем потоке. Следовательно, используя этот интерфейс, мы можем отделить поток выполнения задачи от фактического механизма выполнения задачи.

Здесь следует отметить, что Executor строго не требует, чтобы выполнение задачи было асинхронным. В простейшем случае исполнитель может мгновенно вызвать отправленную задачу в вызывающем потоке.

Нам нужно создать вызывающую программу для создания экземпляра исполнителя:

public class Invoker implements Executor {
@Override
public void execute(Runnable r) {
r.run();
}
}

Теперь мы можем использовать этот вызывающий объект для выполнения задачи.

public void execute() {
Executor executor = new Invoker();
executor.execute( () -> {
// task to be performed
});
}

Здесь следует отметить, что если исполнитель не может принять задачу для выполнения, он выдаст RejectedExecutionException .

2.2. ИсполнительСервис

ExecutorService — это комплексное решение для асинхронной обработки. Он управляет очередью в памяти и планирует отправленные задачи в зависимости от доступности потоков.

Чтобы использовать ExecutorService, нам нужно создать один класс Runnable .

public class Task implements Runnable {
@Override
public void run() {
// task details
}
}

Теперь мы можем создать экземпляр ExecutorService и назначить эту задачу. Во время создания нам нужно указать размер пула потоков.

ExecutorService executor = Executors.newFixedThreadPool(10);

Если мы хотим создать однопоточный экземпляр ExecutorService , мы можем использовать newSingleThreadExecutor(ThreadFactory threadFactory) для создания экземпляра.

Как только исполнитель создан, мы можем использовать его для отправки задачи.

public void execute() { 
executor.submit(new Task());
}

Мы также можем создать экземпляр Runnable при отправке задачи.

executor.submit(() -> {
new Task();
});

Он также поставляется с двумя готовыми методами завершения выполнения. Первый — shutdown() ; он ждет, пока все отправленные задачи не закончат выполнение. Другой метод — shutdownNow() , который немедленно завершает все ожидающие/выполняемые задачи .

Существует также другой метод awaitTermination (long timeout, блок TimeUnit), который принудительно блокируется до тех пор, пока все задачи не завершат выполнение после срабатывания события выключения или тайм-аута выполнения, или пока сам поток выполнения не будет прерван.

try {
executor.awaitTermination( 20l, TimeUnit.NANOSECONDS );
} catch (InterruptedException e) {
e.printStackTrace();
}

2.3. ЗапланированныйExecutorService

ScheduledExecutorService — это интерфейс, аналогичный ExecutorService, но он может периодически выполнять задачи.

Методы Executor и ExecutorService планируются на месте без какой-либо искусственной задержки. Ноль или любое отрицательное значение означает, что запрос должен быть выполнен мгновенно.

Мы можем использовать интерфейс Runnable и Callable для определения задачи.

public void execute() {
ScheduledExecutorService executorService
= Executors.newSingleThreadScheduledExecutor();

Future<String> future = executorService.schedule(() -> {
// ...
return "Hello world";
}, 1, TimeUnit.SECONDS);

ScheduledFuture<?> scheduledFuture = executorService.schedule(() -> {
// ...
}, 1, TimeUnit.SECONDS);

executorService.shutdown();
}

ScheduledExecutorService также может запланировать задачу после некоторой заданной фиксированной задержки :

executorService.scheduleAtFixedRate(() -> {
// ...
}, 1, 10, TimeUnit.SECONDS);

executorService.scheduleWithFixedDelay(() -> {
// ...
}, 1, 10, TimeUnit.SECONDS);

Здесь метод scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit ) создает и выполняет периодическое действие, которое вызывается сначала после предоставленной начальной задержки, а затем с заданным периодом до завершения работы экземпляра службы.

Метод scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit ) создает и выполняет периодическое действие, которое вызывается сначала после предоставленной начальной задержки, а затем повторно с заданной задержкой между завершением выполняющегося и вызовом следующий.

2.4. Будущее

Будущее используется для представления результата асинхронной операции. Он поставляется с методами для проверки завершения асинхронной операции, получения вычисленного результата и т. д.

Более того, API отмены (логическое значение mayInterruptIfRunning) отменяет операцию и освобождает исполняемый поток. Если значение mayInterruptIfRunning равно true, поток, выполняющий задачу, будет немедленно завершен.

В противном случае незавершенные задачи будут разрешены для завершения.

Мы можем использовать приведенный ниже фрагмент кода для создания будущего экземпляра:

public void invoke() {
ExecutorService executorService = Executors.newFixedThreadPool(10);

Future<String> future = executorService.submit(() -> {
// ...
Thread.sleep(10000l);
return "Hello world";
});
}

Мы можем использовать следующий фрагмент кода, чтобы проверить, готов ли будущий результат, и получить данные, если вычисление выполнено:

if (future.isDone() && !future.isCancelled()) {
try {
str = future.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}

Мы также можем указать время ожидания для данной операции. Если задача занимает больше этого времени, генерируется исключение TimeoutException :

try {
future.get(10, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
e.printStackTrace();
}

2.5. Защелка

CountDownLatch (представленный в JDK 5 ) — это служебный класс, который блокирует набор потоков до завершения некоторой операции.

CountDownLatch инициализируется счетчиком (целочисленный тип); этот счетчик уменьшается по мере завершения выполнения зависимых потоков. Но как только счетчик достигает нуля, освобождаются другие потоки.

Подробнее о CountDownLatch можно узнать здесь .

2.6. циклическийбарьер

CyclicBarrier работает почти так же, как CountDownLatch , за исключением того, что мы можем использовать его повторно. В отличие от CountDownLatch , он позволяет нескольким потокам ожидать друг друга, используя метод await() (известный как барьерное условие) перед вызовом последней задачи.

Нам нужно создать экземпляр задачи Runnable , чтобы инициировать условие барьера:

public class Task implements Runnable {

private CyclicBarrier barrier;

public Task(CyclicBarrier barrier) {
this.barrier = barrier;
}

@Override
public void run() {
try {
LOG.info(Thread.currentThread().getName() +
" is waiting");
barrier.await();
LOG.info(Thread.currentThread().getName() +
" is released");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}

}

Теперь мы можем вызвать некоторые потоки для гонки за условием барьера:

public void start() {

CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {
// ...
LOG.info("All previous tasks are completed");
});

Thread t1 = new Thread(new Task(cyclicBarrier), "T1");
Thread t2 = new Thread(new Task(cyclicBarrier), "T2");
Thread t3 = new Thread(new Task(cyclicBarrier), "T3");

if (!cyclicBarrier.isBroken()) {
t1.start();
t2.start();
t3.start();
}
}

Здесь метод isBroken() проверяет, не прервался ли какой-либо из потоков во время выполнения. Мы всегда должны выполнять эту проверку перед выполнением фактического процесса.

2.7. семафор

Семафор используется для блокировки доступа на уровне потока к некоторой части физического или логического ресурса . Семафор содержит набор разрешений ; всякий раз, когда поток пытается войти в критическую секцию, он должен проверить семафор, доступно ли разрешение или нет.

Если разрешение недоступно (через tryAcquire() ), потоку не разрешается переходить в критическую секцию; однако, если разрешение доступно, доступ предоставляется, и счетчик разрешений уменьшается.

Как только исполняющий поток освобождает критическую секцию, счетчик разрешений снова увеличивается (выполняется методом release() ).

Мы можем указать время ожидания для получения доступа с помощью метода tryAcquire(long timeout, TimeUnit unit) .

Мы также можем проверить количество доступных разрешений или количество потоков, ожидающих получения семафора.

Для реализации семафора можно использовать следующий фрагмент кода:

static Semaphore semaphore = new Semaphore(10);

public void execute() throws InterruptedException {

LOG.info("Available permit : " + semaphore.availablePermits());
LOG.info("Number of threads waiting to acquire: " +
semaphore.getQueueLength());

if (semaphore.tryAcquire()) {
try {
// ...
}
finally {
semaphore.release();
}
}

}

Мы можем реализовать структуру данных, подобную Mutex , используя Semaphore . Подробнее об этом можно узнать здесь.

2.8. ThreadFactory

Как следует из названия, ThreadFactory действует как пул потоков (несуществующих), который создает новый поток по требованию. Это устраняет необходимость в большом количестве шаблонного кода для реализации эффективных механизмов создания потоков.

Мы можем определить ThreadFactory :

public class ForEachThreadFactory implements ThreadFactory {
private int threadId;
private String name;

public ForEachThreadFactory(String name) {
threadId = 1;
this.name = name;
}

@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, name + "-Thread_" + threadId);
LOG.info("created new thread with id : " + threadId +
" and name : " + t.getName());
threadId++;
return t;
}
}

Мы можем использовать этот метод newThread(Runnable r) для создания нового потока во время выполнения:

ForEachThreadFactory factory = new ForEachThreadFactory( 
"ForEachThreadFactory");
for (int i = 0; i < 10; i++) {
Thread t = factory.newThread(new Task());
t.start();
}

2.9. Блокировка очереди

В асинхронном программировании одним из наиболее распространенных шаблонов интеграции является шаблон производитель-потребитель . Пакет java.util.concurrent поставляется со структурой данных, известной как BlockingQueue , которая может быть очень полезна в этих асинхронных сценариях.

Более подробная информация и рабочий пример по этому поводу доступны здесь .

2.10. DelayQueue

DelayQueue — это блокирующая очередь элементов бесконечного размера, из которой элемент может быть извлечен только в том случае, если время его истечения (известное как задержка, определяемая пользователем) завершено. Следовательно, самый верхний элемент ( head ) будет иметь наибольшую задержку и будет опрашиваться последним.

Более подробная информация и рабочий пример по этому поводу доступны здесь .

2.11. Замки

Неудивительно, что Lock — это утилита для блокировки доступа других потоков к определенному сегменту кода, кроме потока, выполняющего его в данный момент.

Основное различие между блокировкой и синхронизированным блоком заключается в том, что синхронизированный блок полностью содержится в методе; однако мы можем иметь операции lock() и unlock() Lock API в отдельных методах.

Более подробная информация и рабочий пример по этому поводу доступны здесь .

2.12. Фазер

Phaser является более гибким решением, чем CyclicBarrier и CountDownLatch. Он используется в качестве повторно используемого барьера, на котором необходимо ожидать динамического количества потоков, прежде чем продолжить выполнение. Мы можем координировать несколько фаз выполнения, повторно используя экземпляр Phaser для каждой фазы программы.

Более подробная информация и рабочий пример по этому поводу доступны здесь .

3. Заключение

В этой обзорной статье высокого уровня мы сосредоточились на различных утилитах, доступных в пакете java.util.concurrent .

Как всегда, полный исходный код доступен на GitHub .