Перейти к основному содержимому

AWS S3 с Java — реактивная поддержка

· 14 мин. чтения

1. Введение

AWS предлагает множество сервисов через многочисленные API-интерфейсы, к которым мы можем получить доступ из Java, используя их официальный SDK. Однако до недавнего времени этот SDK не предлагал поддержку реактивных операций и имел лишь ограниченную поддержку асинхронного доступа.

С выпуском AWS SDK для Java 2.0 мы теперь можем использовать эти API в полностью неблокирующем режиме ввода-вывода благодаря принятию стандарта Reactive Streams.

В этом руководстве мы рассмотрим эти новые функции, реализуя простой REST API хранилища BLOB-объектов в Spring Boot, который использует известную службу S3 в качестве серверной части хранилища.

2. Обзор операций AWS S3

Прежде чем погрузиться в реализацию, давайте сделаем краткий обзор того, чего мы хотим здесь достичь. Типичная служба хранилища BLOB-объектов предоставляет операции CRUD, которые использует интерфейсное приложение, чтобы позволить конечному пользователю загружать, перечислять, скачивать и удалять несколько типов содержимого, например аудио, изображения и документы.

Распространенная проблема, с которой приходится сталкиваться традиционным реализациям, заключается в том, как эффективно обрабатывать большие файлы или медленные соединения . В ранних версиях (до сервлета 3.0) все, что могла предложить спецификация JavaEE, — это блокирующий API, поэтому нам требовался поток для каждого параллельного клиента хранилища BLOB-объектов. У этой модели есть недостаток, который требует больше серверных ресурсов (следовательно, больших машин) и делает их более уязвимыми для атак типа DoS:

./1b36b94a5a384593afa2abeab8bdf6b2.png

./1f7d5c379bef4cd5f4bcd8dec50a074e.png

Используя реактивный стек, мы можем сделать наш сервис гораздо менее ресурсоемким при том же количестве клиентов . В реализации реактора используется небольшое количество потоков, которые отправляются в ответ на события завершения ввода-вывода, такие как доступность новых данных для чтения или завершение предыдущей записи .

Это означает, что один и тот же поток продолжает обрабатывать эти события, которые могут исходить от любого из активных клиентских подключений, до тех пор, пока не останется доступной работы. Такой подход значительно сокращает количество переключений контекста — довольно дорогостоящую операцию — и позволяет очень эффективно использовать доступные ресурсы:

./1b36b94a5a384593afa2abeab8bdf6b2.png

3. Настройка проекта

Наш демонстрационный проект представляет собой стандартное приложение Spring Boot WebFlux , которое включает обычные вспомогательные зависимости, такие как Lombok и JUnit.

В дополнение к этим библиотекам нам нужно добавить AWS SDK для зависимостей Java V2:

<dependencyManagement>
<dependencies>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>bom</artifactId>
<version>2.10.1</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3</artifactId>
<scope>compile</scope>
</dependency>

<dependency>
<artifactId>netty-nio-client</artifactId>
<groupId>software.amazon.awssdk</groupId>
<scope>compile</scope>
</dependency>
</dependencies>

AWS SDK предоставляет спецификацию, определяющую необходимые версии для всех зависимостей, поэтому нам не нужно указывать их в разделе зависимостей нашего файла POM.

Мы добавили клиентскую библиотеку S3, которая принесет с собой другие основные зависимости от SDK. Нам также понадобится клиентская библиотека Netty, поскольку мы будем использовать асинхронные API для взаимодействия с AWS.

Официальная документация AWS содержит более подробную информацию о доступных транспортах.

4. Создание клиента AWS S3

Точкой входа для операций S3 является класс S3AsyncClient , который мы будем использовать для запуска новых вызовов API.

Поскольку нам нужен только один экземпляр этого класса, давайте создадим класс @Configuration с методом @Bean , который его создаст, чтобы мы могли внедрить его туда, где он нам нужен:

@Configuration
@EnableConfigurationProperties(S3ClientConfigurarionProperties.class)
public class S3ClientConfiguration {
@Bean
public S3AsyncClient s3client(S3ClientConfigurarionProperties s3props,
AwsCredentialsProvider credentialsProvider) {
SdkAsyncHttpClient httpClient = NettyNioAsyncHttpClient.builder()
.writeTimeout(Duration.ZERO)
.maxConcurrency(64)
.build();
S3Configuration serviceConfiguration = S3Configuration.builder()
.checksumValidationEnabled(false)
.chunkedEncodingEnabled(true)
.build();
S3AsyncClientBuilder b = S3AsyncClient.builder().httpClient(httpClient)
.region(s3props.getRegion())
.credentialsProvider(credentialsProvider)
.serviceConfiguration(serviceConfiguration);

if (s3props.getEndpoint() != null) {
b = b.endpointOverride(s3props.getEndpoint());
}
return b.build();
}
}

Для этой демонстрации мы используем минимальный класс @ConfigurationProperties (доступный в нашем репозитории), который содержит следующие части информации, необходимые для доступа к службам S3:

  • регион: допустимый идентификатор региона AWS, например us-east-1 .
  • accessKeyId/secretAccessKey : наш ключ и идентификатор API AWS.
  • конечная точка: необязательный URI, который мы можем использовать для переопределения конечной точки службы S3 по умолчанию. Основной вариант использования — использовать демонстрационный код с альтернативными решениями для хранения данных, которые предлагают S3-совместимый API (примерами являются minio и localstack).
  • Bucket : имя корзины, в которой мы будем хранить загруженные файлы.

Есть несколько моментов, которые стоит упомянуть о коде инициализации клиента. Во-первых, мы отключаем тайм-ауты записи и увеличиваем максимальный параллелизм, чтобы загрузка могла выполняться даже в условиях низкой пропускной способности.

Во-вторых, мы отключаем проверку контрольной суммы и включаем кодирование по частям. Мы делаем это, потому что хотим начать загрузку данных в корзину, как только данные поступят в нашу службу в потоковом режиме.

Наконец, мы не рассматриваем само создание корзины, так как предполагаем, что она уже создана и настроена администратором.

Что касается учетных данных, мы предоставляем настроенный AwsCredentialsProvider , который может восстанавливать учетные данные из свойств Spring. Это открывает возможность вводить эти значения через абстракцию Spring Environment и все ее поддерживаемые реализации PropertySource , такие как Vault или Config Server:

@Bean
public AwsCredentialsProvider awsCredentialsProvider(S3ClientConfigurarionProperties s3props) {
if (StringUtils.isBlank(s3props.getAccessKeyId())) {
return DefaultCredentialsProvider.create();
} else {
return () -> {
return AwsBasicCredentials.create(
s3props.getAccessKeyId(),
s3props.getSecretAccessKey());
};
}
}

5. Загрузить обзор службы

Теперь мы реализуем службу загрузки, которая будет доступна по пути /inbox .

POST для этого пути к ресурсу сохранит файл в нашей корзине S3 под случайно сгенерированным ключом. Мы сохраним исходное имя файла в качестве ключа метаданных, чтобы мы могли использовать его для создания соответствующих заголовков загрузки HTTP для браузеров.

Нам нужно обработать два разных сценария: простую загрузку и загрузку из нескольких частей. Давайте продолжим и создадим @RestController и добавим методы для обработки этих сценариев:

@RestController
@RequestMapping("/inbox")
@Slf4j
public class UploadResource {
private final S3AsyncClient s3client;
private final S3ClientConfigurarionProperties s3config;

public UploadResource(S3AsyncClient s3client, S3ClientConfigurarionProperties s3config) {
this.s3client = s3client;
this.s3config = s3config;
}

@PostMapping
public Mono<ResponseEntity<UploadResult>> uploadHandler(
@RequestHeader HttpHeaders headers,
@RequestBody Flux<ByteBuffer> body) {
// ... see section 6
}

@RequestMapping(
consumes = MediaType.MULTIPART_FORM_DATA_VALUE,
method = {RequestMethod.POST, RequestMethod.PUT})
public Mono<ResponseEntity<UploadResult>> multipartUploadHandler(
@RequestHeader HttpHeaders headers,
@RequestBody Flux<Part> parts ) {
// ... see section 7
}
}

Сигнатуры обработчиков отражают основное различие между обоими случаями: в простом случае тело содержит само содержимое файла, тогда как в многокомпонентном случае оно может состоять из нескольких «частей», каждая из которых соответствует файлу или данным формы.

Для удобства мы будем поддерживать многокомпонентную загрузку с использованием методов POST или PUT. Причина этого в том, что некоторые инструменты ( в частности, cURL ) используют последний вариант по умолчанию при загрузке файлов с параметром -F .

В обоих случаях мы вернем UploadResult , содержащий результат операции и сгенерированные файловые ключи, которые клиент должен использовать для восстановления исходных файлов — подробнее об этом позже!

6. Загрузка одного файла

В этом случае клиенты отправляют контент в простой операции POST с телом запроса, содержащим необработанные данные. Чтобы получить этот контент в реактивном веб-приложении, все, что нам нужно сделать, — это объявить метод @PostMapping , который принимает аргумент Flux<ByteBuffer> .

Потоковая передача этого потока в новый файл S3 в этом случае проста.

Все, что нам нужно, это создать PutObjectRequest со сгенерированным ключом, длиной файла, типом содержимого MIME и передать его методу putObject() в нашем клиенте S3:

@PostMapping
public Mono<ResponseEntity<UploadResult>> uploadHandler(@RequestHeader HttpHeaders headers,
@RequestBody Flux<ByteBuffer> body) {
// ... some validation code omitted
String fileKey = UUID.randomUUID().toString();
MediaType mediaType = headers.getContentType();

if (mediaType == null) {
mediaType = MediaType.APPLICATION_OCTET_STREAM;
}
CompletableFuture future = s3client
.putObject(PutObjectRequest.builder()
.bucket(s3config.getBucket())
.contentLength(length)
.key(fileKey.toString())
.contentType(mediaType.toString())
.metadata(metadata)
.build(),
AsyncRequestBody.fromPublisher(body));

return Mono.fromFuture(future)
.map((response) -> {
checkResult(response);
return ResponseEntity
.status(HttpStatus.CREATED)
.body(new UploadResult(HttpStatus.CREATED, new String[] {fileKey}));
});
}

Ключевым моментом здесь является то, как мы передаем входящий поток в метод putObject() .

Этот метод ожидает объект AsyncRequestBody , предоставляющий содержимое по запросу. По сути, это обычный издатель с некоторыми дополнительными удобными методами. В нашем случае мы воспользуемся методом fromPublisher () для преобразования нашего Flux в требуемый тип.

Кроме того, мы предполагаем, что клиент отправит HTTP-заголовок Content-Length с правильным значением. Без этой информации вызов не удастся, так как это обязательное поле.

Асинхронные методы в SDK версии 2 всегда возвращают объект CompletableFuture . Мы берем его и адаптируем к Mono , используя фабричный метод fromFuture() . Это сопоставляется с конечным объектом UploadResult .

7. Загрузка нескольких файлов

Обработка загрузки multipart/form-data может показаться простой, особенно при использовании библиотек, которые обрабатывают все детали за нас. Итак, можем ли мы просто использовать предыдущий метод для каждого загруженного файла? Ну да, но за это приходится платить: буферизация.

Чтобы использовать предыдущий метод, нам нужна длина части, но передача файла по частям не всегда включает эту информацию. Один из подходов — сохранить часть во временном файле, а затем отправить его в AWS, но это замедлит общее время загрузки. Это также означает дополнительное хранилище для наших серверов.

В качестве альтернативы здесь мы будем использовать многокомпонентную загрузку AWS . Эта функция позволяет разделить загрузку одного файла на несколько фрагментов, которые мы можем отправлять параллельно и не по порядку.

Шаги следующие, нам нужно отправить:

  • запрос createMultipartUpload — AWS отвечает с помощью uploadId , который мы будем использовать в следующих вызовах.
  • куски файла, содержащие uploadId , порядковый номер и содержимое — AWS отвечает идентификатором ETag для каждой части
  • запрос completeUpload , содержащий uploadId и все полученные ETag

Обратите внимание: мы повторим эти шаги для каждого полученного FilePart !

7.1. Конвейер верхнего уровня

Обработчик multipartUploadHandler в нашем классе @Controller отвечает за обработку, что неудивительно, многокомпонентных загрузок файлов. В этом контексте каждая часть может иметь данные любого типа, идентифицируемые своим MIME-типом. Фреймворк Reactive Web доставляет эти части нашему обработчику в виде потока объектов, реализующих интерфейс Part , который мы будем обрабатывать по очереди:

return parts
.ofType(FilePart.class)
.flatMap((part)-> saveFile(headers, part))
.collect(Collectors.toList())
.map((keys)-> new UploadResult(HttpStatus.CREATED, keys)));

Этот конвейер начинается с фильтрации частей, соответствующих фактическому загруженному файлу, который всегда будет объектом, реализующим интерфейс FilePart . Затем каждая часть передается методу saveFile , который обрабатывает фактическую загрузку одного файла и возвращает сгенерированный ключ файла.

Собираем все ключи в List и, наконец, строим окончательный UploadResult . Мы всегда создаем новый ресурс, поэтому мы вернем более описательный статус CREATED (202) вместо обычного OK.

7.2. Обработка загрузки одного файла

Мы уже описали шаги, необходимые для загрузки файла с использованием многокомпонентного метода AWS. Однако есть одна загвоздка: служба S3 требует, чтобы каждая часть, кроме последней, имела заданный минимальный размер — в настоящее время 5 Мбайт.

Это означает, что мы не можем просто взять полученные фрагменты и сразу отправить их. Вместо этого нам нужно буферизовать их локально, пока мы не достигнем минимального размера или конца данных. Поскольку нам также нужно место для отслеживания количества отправленных частей и полученных в результате результатов CompletedPart , мы создадим простой внутренний класс UploadState для хранения этого состояния:

class UploadState {
String bucket;
String filekey;
String uploadId;
int partCounter;
Map<Integer, CompletedPart> completedParts = new HashMap<>();
int buffered = 0;
// ... getters/setters omitted
UploadState(String bucket, String filekey) {
this.bucket = bucket;
this.filekey = filekey;
}
}

Учитывая необходимые шаги и буферизацию, в итоге реализация может показаться немного пугающей на первый взгляд:

Mono<String> saveFile(HttpHeaders headers,String bucket, FilePart part) {
String filekey = UUID.randomUUID().toString();
Map<String, String> metadata = new HashMap<String, String>();
String filename = part.filename();
if ( filename == null ) {
filename = filekey;
}
metadata.put("filename", filename);
MediaType mt = part.headers().getContentType();
if ( mt == null ) {
mt = MediaType.APPLICATION_OCTET_STREAM;
}
UploadState uploadState = new UploadState(bucket,filekey);
CompletableFuture<CreateMultipartUploadResponse> uploadRequest = s3client
.createMultipartUpload(CreateMultipartUploadRequest.builder()
.contentType(mt.toString())
.key(filekey)
.metadata(metadata)
.bucket(bucket)
.build());

return Mono
.fromFuture(uploadRequest)
.flatMapMany((response) -> {
checkResult(response);
uploadState.uploadId = response.uploadId();
return part.content();
})
.bufferUntil((buffer) -> {
uploadState.buffered += buffer.readableByteCount();
if ( uploadState.buffered >= s3config.getMultipartMinPartSize() ) {
uploadState.buffered = 0;
return true;
} else {
return false;
}
})
.map((buffers) -> concatBuffers(buffers))
.flatMap((buffer) -> uploadPart(uploadState,buffer))
.reduce(uploadState,(state,completedPart) -> {
state.completedParts.put(completedPart.partNumber(), completedPart);
return state;
})
.flatMap((state) -> completeUpload(state))
.map((response) -> {
checkResult(response);
return uploadState.filekey;
});
}

Мы начинаем со сбора некоторых метаданных файла и их использования для создания объекта запроса, необходимого для API-вызова createMultipartUpload() . Этот вызов возвращает CompletableFuture , который является отправной точкой для нашего конвейера потоковой передачи.

Давайте рассмотрим, что делает каждый шаг этого пайплайна:

  • После получения начального результата, который содержит сгенерированный S3 uploadId , мы сохраняем его в объекте состояния загрузки и начинаем потоковую передачу тела файла. Обратите внимание на использование здесь flatMapMany , который превращает Mono в Flux .
  • Мы используем bufferUntil() для накопления необходимого количества байтов. Конвейер в этот момент меняется с потока объектов DataBuffer на поток объектов List < DataBuffer> . ``
  • Преобразуйте каждый List<DataBuffer> в ByteBuffer
  • Отправьте ByteBuffer на S3 (см. следующий раздел) и верните полученное значение CompletedPart ниже по течению .
  • Сократите полученные значения CompletedPart в uploadState.
  • Сигналы S3 о том, что мы завершили загрузку (подробнее об этом позже)
  • Вернуть сгенерированный файловый ключ

7.3. Загрузка частей файла

Еще раз поясним, что под «файловой частью» здесь понимается часть одного файла (например, первые 5 МБ из 100 МБ файла), а не часть сообщения, которая оказывается файлом, как в стрим на высшем уровне!

Конвейер загрузки файлов вызывает метод uploadPart() с двумя аргументами: состояние загрузки и ByteBuffer . Оттуда мы создаем экземпляр UploadPartRequest и используем метод uploadPart() , доступный в нашем S3AsyncClient , для отправки данных:

private Mono<CompletedPart> uploadPart(UploadState uploadState, ByteBuffer buffer) {
final int partNumber = ++uploadState.partCounter;
CompletableFuture<UploadPartResponse> request = s3client.uploadPart(UploadPartRequest.builder()
.bucket(uploadState.bucket)
.key(uploadState.filekey)
.partNumber(partNumber)
.uploadId(uploadState.uploadId)
.contentLength((long) buffer.capacity())
.build(),
AsyncRequestBody.fromPublisher(Mono.just(buffer)));

return Mono
.fromFuture(request)
.map((uploadPartResult) -> {
checkResult(uploadPartResult);
return CompletedPart.builder()
.eTag(uploadPartResult.eTag())
.partNumber(partNumber)
.build();
});
}

Здесь мы используем возвращаемое значение из запроса uploadPart() для создания экземпляра CompletedPart . Это тип AWS SDK, который нам понадобится позже при создании окончательного запроса, закрывающего загрузку.

7.4. Завершение загрузки

И последнее, но не менее важное: нам нужно завершить загрузку файла, состоящего из нескольких частей, отправив запрос completeMultipartUpload() на S3. Это довольно просто, учитывая, что конвейер загрузки передает всю необходимую нам информацию в качестве аргументов:

private Mono<CompleteMultipartUploadResponse> completeUpload(UploadState state) {        
CompletedMultipartUpload multipartUpload = CompletedMultipartUpload.builder()
.parts(state.completedParts.values())
.build();
return Mono.fromFuture(s3client.completeMultipartUpload(CompleteMultipartUploadRequest.builder()
.bucket(state.bucket)
.uploadId(state.uploadId)
.multipartUpload(multipartUpload)
.key(state.filekey)
.build()));
}

8. Загрузка файлов с AWS

По сравнению с загрузкой из нескольких частей, загрузка объектов из корзины S3 — гораздо более простая задача . В этом случае нам не нужно беспокоиться о кусках или чем-то подобном. SDK API предоставляет метод getObject() , который принимает два аргумента:

  • Объект GetObjectRequest , содержащий запрошенный сегмент и ключ файла .
  • AsyncResponseTransformer , который позволяет нам сопоставить входящий потоковый ответ с чем-то другим .

SDK предоставляет пару реализаций последнего, которые позволяют адаптировать поток к Flux, но, опять же, за свою цену: они буферизуют данные внутри буфера массива . Поскольку эта буферизация приводит к снижению времени отклика клиента нашей демонстрационной службы, мы реализуем собственный адаптер, что, как мы увидим, не имеет большого значения.

8.1. Контроллер загрузки

Наш контроллер загрузки — это стандартный Spring Reactive @RestController с единственным методом @GetMapping , который обрабатывает запросы на загрузку. Мы ожидаем ключ файла через аргумент @PathVariable и вернем асинхронный ResponseEntity с содержимым файла:

@GetMapping(path="/{filekey}")
Mono<ResponseEntity<Flux<ByteBuffer>>> downloadFile(@PathVariable("filekey") String filekey) {
GetObjectRequest request = GetObjectRequest.builder()
.bucket(s3config.getBucket())
.key(filekey)
.build();

return Mono.fromFuture(s3client.getObject(request,new FluxResponseProvider()))
.map(response -> {
checkResult(response.sdkResponse);
String filename = getMetadataItem(response.sdkResponse,"filename",filekey);
return ResponseEntity.ok()
.header(HttpHeaders.CONTENT_TYPE, response.sdkResponse.contentType())
.header(HttpHeaders.CONTENT_LENGTH, Long.toString(response.sdkResponse.contentLength()))
.header(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=\"" + filename + "\"")
.body(response.flux);
});
}

Здесь getMetadataItem() — это просто вспомогательный метод, который ищет заданный ключ метаданных в ответе без учета регистра.

Это важная деталь: S3 возвращает информацию о метаданных, используя специальные заголовки HTTP, но эти заголовки нечувствительны к регистру (см. RFC 7230, раздел 3.2 ) . Это означает, что реализации могут изменить регистр для данного элемента по желанию — и это действительно происходит при использовании MinIO .

8.2. Реализация FluxResponseProvider

Наш FluxReponseProvider должен реализовать интерфейс AsyncResponseTransformer , который имеет всего четыре метода:

  • prepare() , где мы можем выполнить любую необходимую настройку
  • onResponse(), вызывается, когда S3 возвращает статус ответа и метаданные.
  • onStream() вызывается , когда у ответа есть тело, всегда после onResponse()
  • exceptionOccurred () вызывается в случае какой-то ошибки

Задача этого провайдера — обрабатывать эти события и создавать экземпляр FluxResponse , содержащий как предоставленный экземпляр GetObjectResponse , так и тело ответа в виде потока:

class FluxResponseProvider implements AsyncResponseTransformer<GetObjectResponse,FluxResponse> {    
private FluxResponse response;
@Override
public CompletableFuture<FluxResponse> prepare() {
response = new FluxResponse();
return response.cf;
}

@Override
public void onResponse(GetObjectResponse sdkResponse) {
this.response.sdkResponse = sdkResponse;
}

@Override
public void onStream(SdkPublisher<ByteBuffer> publisher) {
response.flux = Flux.from(publisher);
response.cf.complete(response);
}

@Override
public void exceptionOccurred(Throwable error) {
response.cf.completeExceptionally(error);
}
}

Наконец, давайте быстро взглянем на класс FluxResponse :

class FluxResponse {
final CompletableFuture<FluxResponse> cf = new CompletableFuture<>();
GetObjectResponse sdkResponse;
Flux<ByteBuffer> flux;
}

9. Заключение

В этом руководстве мы рассмотрели основы использования реактивных расширений, доступных в библиотеке AWS SDK V2. Здесь мы сосредоточились на сервисе AWS S3, но мы можем распространить те же методы на другие реактивные сервисы, такие как DynamoDB.

Как обычно, весь код доступен на GitHub .