1. Цель
В предыдущей статье о загрузке S3 мы рассмотрели, как мы можем использовать общие API-интерфейсы Blob из jclouds для загрузки контента в S3. В этой статье мы будем использовать специфический асинхронный API S3 от jclouds для загрузки контента и использовать функцию многокомпонентной загрузки, предоставляемую S3 .
2. Подготовка
2.1. Настройте пользовательский API
Первая часть процесса загрузки — создание API jclouds — это настраиваемый API для Amazon S3:
public AWSS3AsyncClient s3AsyncClient() {
String identity = ...
String credentials = ...
BlobStoreContext context = ContextBuilder.newBuilder("aws-s3").
credentials(identity, credentials).buildView(BlobStoreContext.class);
RestContext<AWSS3Client, AWSS3AsyncClient> providerContext = context.unwrap();
return providerContext.getAsyncApi();
}
2.2. Определение количества частей контента
Amazon S3 имеет ограничение в 5 МБ для каждой загружаемой части. Таким образом, первое, что нам нужно сделать, это определить правильное количество частей, на которые мы можем разделить наш контент, чтобы у нас не было частей меньше этого предела в 5 МБ:
public static int getMaximumNumberOfParts(byte[] byteArray) {
int numberOfParts= byteArray.length / fiveMB; // 5*1024*1024
if (numberOfParts== 0) {
return 1;
}
return numberOfParts;
}
2.3. Разбивка контента на части
Мы собирались разбить массив байтов на заданное количество частей:
public static List<byte[]> breakByteArrayIntoParts(byte[] byteArray, int maxNumberOfParts) {
List<byte[]> parts = Lists.<byte[]> newArrayListWithCapacity(maxNumberOfParts);
int fullSize = byteArray.length;
long dimensionOfPart = fullSize / maxNumberOfParts;
for (int i = 0; i < maxNumberOfParts; i++) {
int previousSplitPoint = (int) (dimensionOfPart * i);
int splitPoint = (int) (dimensionOfPart * (i + 1));
if (i == (maxNumberOfParts - 1)) {
splitPoint = fullSize;
}
byte[] partBytes = Arrays.copyOfRange(byteArray, previousSplitPoint, splitPoint);
parts.add(partBytes);
}
return parts;
}
Мы собираемся протестировать логику разбиения массива байтов на части — мы собираемся сгенерировать несколько байтов, разделить массив байтов, снова собрать его вместе с помощью Guava и убедиться , что мы получаем исходный код:
@Test
public void given16MByteArray_whenFileBytesAreSplitInto3_thenTheSplitIsCorrect() {
byte[] byteArray = randomByteData(16);
int maximumNumberOfParts = S3Util.getMaximumNumberOfParts(byteArray);
List<byte[]> fileParts = S3Util.breakByteArrayIntoParts(byteArray, maximumNumberOfParts);
assertThat(fileParts.get(0).length + fileParts.get(1).length + fileParts.get(2).length,
equalTo(byteArray.length));
byte[] unmultiplexed = Bytes.concat(fileParts.get(0), fileParts.get(1), fileParts.get(2));
assertThat(byteArray, equalTo(unmultiplexed));
}
Для генерации данных мы просто используем поддержку от Random
:
byte[] randomByteData(int mb) {
byte[] randomBytes = new byte[mb * 1024 * 1024];
new Random().nextBytes(randomBytes);
return randomBytes;
}
2.4. Создание полезных нагрузок
Теперь, когда мы определили правильное количество частей для нашего контента и нам удалось разбить контент на части, нам нужно сгенерировать объекты Payload для jclouds API:
public static List<Payload> createPayloadsOutOfParts(Iterable<byte[]> fileParts) {
List<Payload> payloads = Lists.newArrayList();
for (byte[] filePart : fileParts) {
byte[] partMd5Bytes = Hashing.md5().hashBytes(filePart).asBytes();
Payload partPayload = Payloads.newByteArrayPayload(filePart);
partPayload.getContentMetadata().setContentLength((long) filePart.length);
partPayload.getContentMetadata().setContentMD5(partMd5Bytes);
payloads.add(partPayload);
}
return payloads;
}
3. Загрузить
Процесс загрузки представляет собой гибкий многоэтапный процесс, а это означает:
- загрузку можно начать до получения всех данных — данные можно загружать по мере их поступления
- данные загружаются порциями — если одна из этих операций не удалась, их можно просто получить
- чанки можно загружать параллельно — это может значительно увеличить скорость загрузки, особенно в случае больших файлов
3.1. Запуск операции загрузки
Первым шагом в операции загрузки является запуск процесса . Этот запрос к S3 должен содержать стандартные заголовки HTTP — в частности, необходимо вычислить заголовок Content
— MD5 .
Мы собирались использовать поддержку хэш-функции Guava здесь:
Hashing.md5().hashBytes(byteArray).asBytes();
Это хэш md5 всего массива байтов, а не его частей.
Чтобы инициировать загрузку и для всех дальнейших взаимодействий с S3, мы будем использовать AWSS3AsyncClient — асинхронный API, который мы создали ранее:
ObjectMetadata metadata = ObjectMetadataBuilder.create().key(key).contentMD5(md5Bytes).build();
String uploadId = s3AsyncApi.initiateMultipartUpload(container, metadata).get();
Ключ
— это дескриптор, назначенный объекту — это должен быть уникальный идентификатор, указанный клиентом.
Также обратите внимание, что, несмотря на то, что мы используем асинхронную версию API, мы блокируем результат этой операции — это потому, что нам понадобится результат инициализации, чтобы двигаться вперед.
Результатом операции является идентификатор загрузки , возвращаемый S3. Он будет идентифицировать загрузку на протяжении всего ее жизненного цикла и будет присутствовать во всех последующих операциях загрузки.
3.2. Загрузка частей
Следующим шагом является загрузка частей . Наша цель здесь — отправлять эти запросы параллельно , так как операция загрузки частей представляет собой основную часть процесса загрузки:
List<ListenableFuture<String>> ongoingOperations = Lists.newArrayList();
for (int partNumber = 0; partNumber < filePartsAsByteArrays.size(); partNumber++) {
ListenableFuture<String> future = s3AsyncApi.uploadPart(
container, key, partNumber + 1, uploadId, payloads.get(partNumber));
ongoingOperations.add(future);
}
Номера деталей должны быть непрерывными, но порядок отправки запросов не имеет значения.
После того, как все запросы на загрузку частей отправлены, нам нужно дождаться их ответов , чтобы мы могли собрать индивидуальное значение ETag для каждой части:
Function<ListenableFuture<String>, String> getEtagFromOp =
new Function<ListenableFuture<String>, String>() {
public String apply(ListenableFuture<String> ongoingOperation) {
try {
return ongoingOperation.get();
} catch (InterruptedException | ExecutionException e) {
throw new IllegalStateException(e);
}
}
};
List<String> etagsOfParts = Lists.transform(ongoingOperations, getEtagFromOp);
Если по какой-либо причине одна из операций загрузки части завершается неудачно, операцию можно повторять до тех пор, пока она не завершится успешно. Приведенная выше логика не содержит механизма повторных попыток, но его создание должно быть достаточно простым.
3.3. Завершение операции загрузки
Последним этапом процесса загрузки является завершение многокомпонентной операции . S3 API требует загрузки ответов из предыдущих частей в виде карты
, которую теперь мы можем легко создать из списка ETag, полученного выше:
Map<Integer, String> parts = Maps.newHashMap();
for (int i = 0; i < etagsOfParts.size(); i++) {
parts.put(i + 1, etagsOfParts.get(i));
}
И, наконец, отправьте полный запрос:
s3AsyncApi.completeMultipartUpload(container, key, uploadId, parts).get();
Это вернет окончательный ETag готового объекта и завершит весь процесс загрузки.
4. Вывод
В этой статье мы создали многокомпонентную, полностью параллельную операцию загрузки в S3, используя пользовательский API jclouds S3. Эта операция готова к использованию как есть, но ее можно улучшить несколькими способами.
Во- первых, вокруг операций загрузки следует добавить логику повторных попыток , чтобы лучше справляться со сбоями.
Далее, для действительно больших файлов, даже несмотря на то, что механизм отправляет все составные запросы на загрузку параллельно, механизм регулирования должен по-прежнему ограничивать количество отправляемых параллельных запросов. Это нужно как для того, чтобы пропускная способность не стала узким местом, так и для того, чтобы убедиться, что сам Amazon не помечает процесс загрузки как превышение разрешенного лимита запросов в секунду — Guava RateLimiter потенциально может очень хорошо подходить для этого.