Перейти к основному содержимому

Асинхронные пакетные операции в Couchbase

· 4 мин. чтения

1. Введение

В этом продолжении нашего руководства по использованию Couchbase в приложении Spring мы исследуем асинхронный характер Couchbase SDK и то, как его можно использовать для выполнения операций сохранения в пакетах, что позволяет нашему приложению достичь оптимального использования ресурсов Couchbase.

1.1. Интерфейс CrudService

Во- первых, мы расширяем наш общий интерфейс CrudService , чтобы включить пакетные операции:

public interface CrudService<T> {
...

List<T> readBulk(Iterable<String> ids);

void createBulk(Iterable<T> items);

void updateBulk(Iterable<T> items);

void deleteBulk(Iterable<String> ids);

boolean exists(String id);
}

1.2. Интерфейс CouchbaseEntity

Мы определяем интерфейс для сущностей, которые мы хотим сохранить:

public interface CouchbaseEntity {

String getId();

void setId(String id);

}

1.3. Класс AbstractCrudService

Затем мы реализуем каждый из этих методов в универсальном абстрактном классе. Этот класс является производным от класса PersonCrudService , который мы использовали в предыдущем руководстве , и начинается следующим образом:

public abstract class AbstractCrudService<T extends CouchbaseEntity> implements CrudService<T> {
private BucketService bucketService;
private Bucket bucket;
private JsonDocumentConverter<T> converter;

public AbstractCrudService(BucketService bucketService, JsonDocumentConverter<T> converter) {
this.bucketService = bucketService;
this.converter = converter;
}

protected void loadBucket() {
bucket = bucketService.getBucket();
}

...
}

2. Асинхронный интерфейс корзины

Couchbase SDK предоставляет интерфейс AsyncBucket для выполнения асинхронных операций. Учитывая экземпляр Bucket , вы можете получить его асинхронную версию с помощью метода async() :

AsyncBucket asyncBucket = bucket.async();

3. Пакетные операции

Для выполнения пакетных операций с использованием интерфейса AsyncBucket мы используем библиотеку RxJava .

3.1. Пакетное чтение

Здесь мы реализуем метод readBulk . Сначала мы используем AsyncBucket и механизм flatMap в RxJava для асинхронного извлечения документов в Observable<JsonDocument> , затем мы используем механизм toBlocking в RxJava для преобразования их в список сущностей:

@Override
public List<T> readBulk(Iterable<String> ids) {
AsyncBucket asyncBucket = bucket.async();
Observable<JsonDocument> asyncOperation = Observable
.from(ids)
.flatMap(new Func1<String, Observable<JsonDocument>>() {
public Observable<JsonDocument> call(String key) {
return asyncBucket.get(key);
}
});

List<T> items = new ArrayList<T>();
try {
asyncOperation.toBlocking()
.forEach(new Action1<JsonDocument>() {
public void call(JsonDocument doc) {
T item = converter.fromDocument(doc);
items.add(item);
}
});
} catch (Exception e) {
logger.error("Error during bulk get", e);
}

return items;
}

3.2. Пакетная вставка

Мы снова используем конструкцию flatMap RxJava для реализации метода createBulk .

Поскольку запросы на массовое изменение создаются быстрее, чем могут быть сгенерированы ответы на них, что иногда приводит к состоянию перегрузки, мы устанавливаем повторную попытку с экспоненциальной задержкой всякий раз, когда встречается BackpressureException :

@Override
public void createBulk(Iterable<T> items) {
AsyncBucket asyncBucket = bucket.async();
Observable
.from(items)
.flatMap(new Func1<T, Observable<JsonDocument>>() {
@SuppressWarnings("unchecked")
@Override
public Observable<JsonDocument> call(final T t) {
if(t.getId() == null) {
t.setId(UUID.randomUUID().toString());
}
JsonDocument doc = converter.toDocument(t);
return asyncBucket.insert(doc)
.retryWhen(RetryBuilder
.anyOf(BackpressureException.class)
.delay(Delay.exponential(TimeUnit.MILLISECONDS, 100))
.max(10)
.build());
}
})
.last()
.toBlocking()
.single();
}

3.3. Пакетное обновление

Мы используем аналогичный механизм в методе updateBulk :

@Override
public void updateBulk(Iterable<T> items) {
AsyncBucket asyncBucket = bucket.async();
Observable
.from(items)
.flatMap(new Func1<T, Observable<JsonDocument>>() {
@SuppressWarnings("unchecked")
@Override
public Observable<JsonDocument> call(final T t) {
JsonDocument doc = converter.toDocument(t);
return asyncBucket.upsert(doc)
.retryWhen(RetryBuilder
.anyOf(BackpressureException.class)
.delay(Delay.exponential(TimeUnit.MILLISECONDS, 100))
.max(10)
.build());
}
})
.last()
.toBlocking()
.single();
}

3.4. Пакетное удаление

И пишем метод deleteBulk следующим образом:

@Override
public void deleteBulk(Iterable<String> ids) {
AsyncBucket asyncBucket = bucket.async();
Observable
.from(ids)
.flatMap(new Func1<String, Observable<JsonDocument>>() {
@SuppressWarnings("unchecked")
@Override
public Observable<JsonDocument> call(String key) {
return asyncBucket.remove(key)
.retryWhen(RetryBuilder
.anyOf(BackpressureException.class)
.delay(Delay.exponential(TimeUnit.MILLISECONDS, 100))
.max(10)
.build());
}
})
.last()
.toBlocking()
.single();
}

4. PersonCrudService

Наконец, мы пишем сервис Spring, PersonCrudService , который расширяет наш AbstractCrudService для объекта Person .

Поскольку все взаимодействие с Couchbase реализовано в абстрактном классе, реализация класса сущностей тривиальна, так как нам нужно только убедиться, что все наши зависимости внедрены и наша корзина загружена:

@Service
public class PersonCrudService extends AbstractCrudService<Person> {

@Autowired
public PersonCrudService(
@Qualifier("TutorialBucketService") BucketService bucketService,
PersonDocumentConverter converter) {
super(bucketService, converter);
}

@PostConstruct
private void init() {
loadBucket();
}
}

5. Вывод

Исходный код, показанный в этом руководстве, доступен в проекте github .

Узнать больше о Couchbase Java SDK можно на официальном сайте документации для разработчиков Couchbase .