Перейти к основному содержимому

Фьючерсы Гуавы и ListenableFuture

· 10 мин. чтения

1. Введение

Guava предоставляет нам ListenableFuture с расширенным API по сравнению с Java Future по умолчанию. Давайте посмотрим, как мы можем использовать это в наших интересах.

2. Future , ListenableFuture и Futures

Давайте кратко рассмотрим, что представляют собой эти разные классы и как они связаны друг с другом.

2.1. Будущее

Начиная с Java 5, мы можем использовать java.util.concurrent.Future для представления асинхронных задач.

Будущее позволяет нам получить доступ к результату задачи, которая уже завершена или может быть завершена в будущем, а также поддерживает их отмену .

2.2. ListenableFuture

Одной из недостающих функций при использовании java.util.concurrent.Future является возможность добавления прослушивателей для запуска по завершении, что является общей функцией, предоставляемой большинством популярных асинхронных фреймворков.

Guava решает эту проблему, позволяя нам прикреплять слушателей к его com.google.common.util.concurrent.ListenableFuture .

2.3. Фьючерсы

Guava предоставляет нам удобный класс com.google.common.util.concurrent.Futures , чтобы упростить работу с их ListenableFuture.

Этот класс предоставляет различные способы взаимодействия с ListenableFuture, среди которых поддержка добавления обратных вызовов успеха/неудачи и позволяет нам координировать несколько фьючерсов с агрегацией или преобразованиями.

3. Простое использование

Давайте теперь посмотрим, как мы можем использовать ListenableFuture самыми простыми способами; создание и добавление обратных вызовов.

3.1. Создание ListenableFuture

Самый простой способ получить ListenableFuture — отправить задачу в ListeningExecutorService (так же, как мы использовали бы обычный ExecutorService для получения обычного Future ):

ExecutorService execService = Executors.newSingleThreadExecutor();
ListeningExecutorService lExecService = MoreExecutors.listeningDecorator(execService);

ListenableFuture<Integer> asyncTask = lExecService.submit(() -> {
TimeUnit.MILLISECONDS.sleep(500); // long running task
return 5;
});

Обратите внимание, как мы используем класс MoreExecutors для оформления нашего ExecutorService как ListeningExecutorService. Мы можем обратиться к Реализации пула потоков в Guava , чтобы узнать больше о MoreExecutors .

Если у нас уже есть API, возвращающее Future , и нам нужно преобразовать его в ListenableFuture , это легко сделать `, инициализировав его конкретную реализацию ListenableFutureTask:`

// old api
public FutureTask<String> fetchConfigTask(String configKey) {
return new FutureTask<>(() -> {
TimeUnit.MILLISECONDS.sleep(500);
return String.format("%s.%d", configKey, new Random().nextInt(Integer.MAX_VALUE));
});
}

// new api
public ListenableFutureTask<String> fetchConfigListenableTask(String configKey) {
return ListenableFutureTask.create(() -> {
TimeUnit.MILLISECONDS.sleep(500);
return String.format("%s.%d", configKey, new Random().nextInt(Integer.MAX_VALUE));
});
}

Мы должны знать, что эти задачи не будут выполняться, пока мы не отправим их исполнителю. Непосредственное взаимодействие с ListenableFutureTask не является распространенным явлением и выполняется только в редких случаях (например, реализация нашего собственного ExecutorService ). Обратитесь к AbstractListeningExecutorService от Guava для практического использования.

Мы также можем использовать com.google.common.util.concurrent.SettableFuture , если наша асинхронная задача не может использовать ListeningExecutorService или предоставленные служебные методы Futures , и нам нужно установить будущее значение вручную. Для более сложного использования мы также можем рассмотреть com.google.common.util.concurrent.AbstractFuture.

3.2. Добавление слушателей/обратных вызовов

Один из способов добавить слушателя в ListenableFuture — зарегистрировать обратный вызов с помощью Futures.addCallback(), предоставляя нам доступ к результату или исключению в случае успеха или неудачи:

Executor listeningExecutor = Executors.newSingleThreadExecutor();

ListenableFuture<Integer> asyncTask = new ListenableFutureService().succeedingTask()
Futures.addCallback(asyncTask, new FutureCallback<Integer>() {
@Override
public void onSuccess(Integer result) {
// do on success
}

@Override
public void onFailure(Throwable t) {
// do on failure
}
}, listeningExecutor);

Мы также можем добавить слушателя, добавив его непосредственно в ListenableFuture. Обратите внимание, что этот прослушиватель будет запущен, когда будущее завершится успешно или в исключительном порядке. Также обратите внимание, что у нас нет доступа к результату асинхронной задачи:

Executor listeningExecutor = Executors.newSingleThreadExecutor();

int nextTask = 1;
Set<Integer> runningTasks = ConcurrentHashMap.newKeySet();
runningTasks.add(nextTask);

ListenableFuture<Integer> asyncTask = new ListenableFutureService().succeedingTask()
asyncTask.addListener(() -> runningTasks.remove(nextTask), listeningExecutor);

4. Комплексное использование

Давайте теперь посмотрим, как мы можем использовать эти фьючерсы в более сложных сценариях.

4.1. Веер

Иногда нам может понадобиться вызвать несколько асинхронных задач и собрать их результаты, что обычно называется операцией разветвления.

Гуава предоставляет нам два способа сделать это. Однако мы должны быть осторожны при выборе правильного метода в зависимости от наших требований. Предположим, нам нужно согласовать следующие асинхронные задачи:

ListenableFuture<String> task1 = service.fetchConfig("config.0");
ListenableFuture<String> task2 = service.fetchConfig("config.1");
ListenableFuture<String> task3 = service.fetchConfig("config.2");

Одним из способов объединения нескольких фьючерсов является использование метода Futures.allAsList() . Это позволяет нам собирать результаты всех фьючерсов, если все они успешны, в порядке предоставленных фьючерсов. Если одно из этих фьючерсов терпит неудачу, то весь результат — неудачное будущее:

ListenableFuture<List<String>> configsTask = Futures.allAsList(task1, task2, task3);
Futures.addCallback(configsTask, new FutureCallback<List<String>>() {
@Override
public void onSuccess(@Nullable List<String> configResults) {
// do on all futures success
}

@Override
public void onFailure(Throwable t) {
// handle on at least one failure
}
}, someExecutor);

Если нам нужно собрать результаты всех асинхронных задач, независимо от того, завершились они неудачно или нет, мы можем использовать Futures.successfulAsList() . Это вернет список, результаты которого будут иметь тот же порядок, что и задачи, переданные в аргумент, а невыполненным задачам будут присвоены нулевые значения в соответствующих позициях в списке:

ListenableFuture<List<String>> configsTask = Futures.successfulAsList(task1, task2, task3);
Futures.addCallback(configsTask, new FutureCallback<List<String>>() {
@Override
public void onSuccess(@Nullable List<String> configResults) {
// handle results. If task2 failed, then configResults.get(1) == null
}

@Override
public void onFailure(Throwable t) {
// handle failure
}
}, listeningExecutor);

Мы должны быть осторожны в приведенном выше использовании: если будущая задача обычно возвращает null в случае успеха, она будет неотличима от невыполненной задачи (которая также устанавливает результат как null ).

4.2. Fan-In с объединителями

Если у нас есть требование координировать несколько фьючерсов, которые возвращают разные результаты, приведенного выше решения может быть недостаточно. В этом случае мы можем использовать варианты объединения операций разветвления для координации этого сочетания фьючерсов.

Подобно простым операциям разветвления, Guava предоставляет нам два варианта; один выполняется успешно, когда все задачи завершаются успешно, а другой завершается успешно, даже если некоторые задачи завершаются с ошибкой с использованием методов Futures.whenAllSucceed() и Futures.whenAllComplete() соответственно.

Давайте посмотрим, как мы можем использовать Futures.whenAllSucceed() для объединения различных типов результатов из нескольких фьючерсов:

ListenableFuture<Integer> cartIdTask = service.getCartId();
ListenableFuture<String> customerNameTask = service.getCustomerName();
ListenableFuture<List<String>> cartItemsTask = service.getCartItems();

ListenableFuture<CartInfo> cartInfoTask = Futures.whenAllSucceed(cartIdTask, customerNameTask, cartItemsTask)
.call(() -> {
int cartId = Futures.getDone(cartIdTask);
String customerName = Futures.getDone(customerNameTask);
List<String> cartItems = Futures.getDone(cartItemsTask);
return new CartInfo(cartId, customerName, cartItems);
}, someExecutor);

Futures.addCallback(cartInfoTask, new FutureCallback<CartInfo>() {
@Override
public void onSuccess(@Nullable CartInfo result) {
//handle on all success and combination success
}

@Override
public void onFailure(Throwable t) {
//handle on either task fail or combination failed
}
}, listeningExecService);

Если нам нужно допустить сбой некоторых задач, мы можем использовать Futures.whenAllComplete() . Хотя семантика в основном аналогична приведенной выше, мы должны знать, что неудавшиеся фьючерсы вызовут исключение ExecutionException , когда для них вызывается Futures.getDone() .

4.3. Преобразования

Иногда нам нужно преобразовать результат в будущее, когда-то успешный. Guava предоставляет нам два способа сделать это с помощью Futures.transform() и Futures.lazyTransform() .

Давайте посмотрим, как мы можем использовать Futures.transform() для преобразования результата будущего. Это можно использовать, если вычисление преобразования не является тяжелым:

ListenableFuture<List<String>> cartItemsTask = service.getCartItems();

Function<List<String>, Integer> itemCountFunc = cartItems -> {
assertNotNull(cartItems);
return cartItems.size();
};

ListenableFuture<Integer> itemCountTask = Futures.transform(cartItemsTask, itemCountFunc, listenExecService);

Мы также можем использовать Futures.lazyTransform() для применения функции преобразования к java.util.concurrent.Future. Мы должны иметь в виду, что эта опция возвращает не ListenableFuture , а обычный java.util.concurrent.Future , и что функция преобразования применяется каждый раз, когда get() вызывается для результирующего будущего.

4.4. Цепочка фьючерсов

Мы можем столкнуться с ситуациями, когда наши фьючерсы должны вызывать другие фьючерсы. В таких случаях Guava предоставляет нам варианты async() для безопасного связывания этих фьючерсов для выполнения одного за другим.

Давайте посмотрим, как мы можем использовать Futures.submitAsync() для вызова future из отправленного Callable :

AsyncCallable<String> asyncConfigTask = () -> {
ListenableFuture<String> configTask = service.fetchConfig("config.a");
TimeUnit.MILLISECONDS.sleep(500); //some long running task
return configTask;
};

ListenableFuture<String> configTask = Futures.submitAsync(asyncConfigTask, executor);

Если нам нужна настоящая цепочка, когда результат одного будущего используется для вычисления другого будущего, мы можем использовать Futures.transformAsync() :

ListenableFuture<String> usernameTask = service.generateUsername("john");
AsyncFunction<String, String> passwordFunc = username -> {
ListenableFuture<String> generatePasswordTask = service.generatePassword(username);
TimeUnit.MILLISECONDS.sleep(500); // some long running task
return generatePasswordTask;
};

ListenableFuture<String> passwordTask = Futures.transformAsync(usernameTask, passwordFunc, executor);

Guava также предоставляет нам Futures.scheduleAsync() и Futures.catchAsync() для отправки запланированной задачи и предоставления резервных задач при устранении ошибок соответственно. Хотя они предназначены для разных сценариев, мы не будем их обсуждать, поскольку они аналогичны другим вызовам async() .

5. Что можно и чего нельзя делать при использовании

Давайте теперь рассмотрим некоторые распространенные ловушки, с которыми мы можем столкнуться при работе с фьючерсами, и способы их избежать.

5.1. Работающие и слушающие исполнители

Важно понимать разницу между работающим исполнителем и прослушивающим исполнителем при использовании фьючерсов Guava. Например, допустим, у нас есть асинхронная задача для получения конфигураций:

public ListenableFuture<String> fetchConfig(String configKey) {
return lExecService.submit(() -> {
TimeUnit.MILLISECONDS.sleep(500);
return String.format("%s.%d", configKey, new Random().nextInt(Integer.MAX_VALUE));
});
}

Допустим также, что мы хотим прикрепить слушателя к вышеуказанному будущему:

ListenableFuture<String> configsTask = service.fetchConfig("config.0");
Futures.addCallback(configsTask, someListener, listeningExecutor);

Обратите внимание, что здесь lExecService — это исполнитель, который запускает нашу асинхронную задачу, а listenExecutor — это исполнитель, для которого вызывается наш слушатель.

Как показано выше, мы всегда должны рассматривать возможность разделения этих двух исполнителей, чтобы избежать сценариев, в которых наши слушатели и рабочие конкурируют за одни и те же ресурсы пула потоков. Совместное использование одного и того же исполнителя может привести к тому, что наши ресурсоемкие задачи будут тормозить выполнение прослушивателя. Или плохо написанный тяжеловесный слушатель в конечном итоге блокирует наши важные тяжелые задачи.

5.2. Будьте осторожны с directExecutor()

Хотя мы можем использовать MoreExecutors.directExecutor() и MoreExecutors.newDirectExecutorService() в модульном тестировании, чтобы упростить обработку асинхронных исполнений, мы должны быть осторожны, используя их в рабочем коде.

Когда мы получаем исполнителей из вышеуказанных методов, любые задачи, которые мы передаем им, будь то тяжеловесы или слушатели, будут выполняться в текущем потоке. Это может быть опасно, если текущий контекст выполнения требует высокой пропускной способности.

Например, использование directExecutor и отправка ему тяжелой задачи в потоке пользовательского интерфейса автоматически заблокирует наш поток пользовательского интерфейса.

Мы также можем столкнуться со сценарием, в котором наш слушатель `замедляет работу всех других наших слушателей (даже тех, которые не связаны с directExecutor ). Это связано с тем, что Guava выполняет все прослушиватели в цикле while в соответствующих исполнителях, но directExecutor заставит прослушиватель работать в том же потоке, что и цикл while` .

5.3. Вложенные фьючерсы — это плохо

При работе со связанными фьючерсами мы должны быть осторожны, чтобы не вызывать один из другого фьючерса таким образом, чтобы он создавал вложенные фьючерсы:

public ListenableFuture<String> generatePassword(String username) {
return lExecService.submit(() -> {
TimeUnit.MILLISECONDS.sleep(500);
return username + "123";
});
}

String firstName = "john";
ListenableFuture<ListenableFuture<String>> badTask = lExecService.submit(() -> {
final String username = firstName.replaceAll("[^a-zA-Z]+", "")
.concat("@service.com");
return generatePassword(username);
});

Если мы когда-нибудь увидим код, в котором ListenableFuture<ListenableFuture<V>>, то мы должны знать, что это плохо написанное будущее , потому что есть шанс, что отмена и завершение внешнего будущего могут состязаться, и отмена может не распространяться на внутреннее будущее.

Если мы видим приведенный выше сценарий, мы всегда должны использовать варианты Futures.async() для безопасного развертывания этих связанных фьючерсов в связанном виде.

5.4. Будьте осторожны с JdkFutureAdapters.listenInPoolThread()

Guava рекомендует, чтобы лучший способ использовать его ListenableFuture — преобразовать весь наш код, использующий Future , в ListenableFuture.

Если это преобразование невозможно в некоторых сценариях, Guava предоставляет нам адаптеры для этого с помощью переопределений JdkFutureAdapters.listenInPoolThread() . Хотя это может показаться полезным, Guava предупреждает нас, что это тяжеловесные адаптеры, и их следует по возможности избегать.

6. Заключение

В этой статье мы увидели, как мы можем использовать ListenableFuture от Guava, чтобы обогатить наше использование фьючерсов, и как использовать Futures API, чтобы упростить работу с этими фьючерсами.

Мы также увидели некоторые распространенные ошибки, которые мы можем допустить при работе с этими фьючерсами и предоставленными исполнителями.

Как всегда, полный исходный код с нашими примерами доступен на GitHub .