Перейти к основному содержимому

Обработка ошибок в gRPC

· 7 мин. чтения

1. Обзор

gRPC — это платформа для выполнения межпроцессных удаленных вызовов процедур (RPC). Он обладает высокой производительностью и может работать в любой среде.

В этом руководстве мы сосредоточимся на обработке ошибок gRPC с помощью Java. gRPC имеет очень низкую задержку и высокую пропускную способность, поэтому он идеально подходит для использования в сложных средах, таких как микросервисные архитектуры. В этих системах очень важно иметь хорошее представление о состоянии, производительности и сбоях различных компонентов сети. Таким образом, хорошая реализация обработки ошибок имеет решающее значение для достижения предыдущих целей.

2. Основы обработки ошибок в gRPC

Ошибки в gRPC являются сущностями первого класса, т . е. каждый вызов в gRPC является либо сообщением полезной нагрузки, либо сообщением об ошибке состояния .

Ошибки закодированы в сообщениях о состоянии и реализованы на всех поддерживаемых языках .

Как правило, мы не должны включать ошибки в полезную нагрузку ответа. С этой целью всегда используйте StreamObserver:: OnError, который внутренне добавляет ошибку состояния в конечные заголовки. Единственным исключением, как мы увидим ниже, является работа с потоками.

Все клиентские или серверные библиотеки gRPC поддерживают официальную модель ошибок gRPC . Java инкапсулирует эту модель ошибки в класс io.grpc.Status . Для этого класса требуется стандартный код состояния ошибки и необязательное строковое сообщение об ошибке для предоставления дополнительной информации . Преимущество этой модели ошибок в том, что она поддерживается независимо от используемой кодировки данных (протокольные буферы, REST и т. д.). Однако он довольно ограничен, поскольку мы не можем включать сведения об ошибке в статус. ``

Если ваше приложение gRPC реализует буферы протоколов для кодирования данных, вы можете использовать более богатую модель ошибок для API Google . Класс com.google.rpc.Status инкапсулирует эту модель ошибки. Этот класс предоставляет значения com.google.rpc.Code , сообщение об ошибке и дополнительные сведения об ошибке , которые добавляются в виде сообщений protobuf . Кроме того, мы можем использовать предопределенный набор сообщений об ошибках protobuf , определенных в error_details.proto , которые охватывают наиболее распространенные случаи. В пакете com.google.rpc есть классы: RetryInfo , DebugInfo , QuotaFailure , ErrorInfo , PrecondicionFailure , BadRequest , RequestInfo , ResourceInfo и Help , которые инкапсулируют все сообщения об ошибках в error_details.proto .

В дополнение к двум моделям ошибок мы можем определить собственные сообщения об ошибках, которые можно добавить в виде пар ключ-значение в метаданные RPC .

Мы собираемся написать очень простое приложение, чтобы показать, как использовать эти модели ошибок со службой ценообразования, где клиент отправляет названия товаров, а сервер предоставляет значения цен.

3. Унарные вызовы RPC

Начнем с рассмотрения следующего интерфейса сервиса, определенного в product_price.proto :

service CommodityPriceProvider {
rpc getBestCommodityPrice(Commodity) returns (CommodityQuote) {}
}

message Commodity {
string access_token = 1;
string commodity_name = 2;
}

message CommodityQuote {
string commodity_name = 1;
string producer_name = 2;
double price = 3;
}

message ErrorResponse {
string commodity_name = 1;
string access_token = 2;
string expected_token = 3;
string expected_value = 4;
}

Вход сервиса – товарное сообщение. В запросе клиент должен предоставить access_token и product_name .

Сервер синхронно отвечает CommodityQuote , в котором указаны имя_товара , имя_производителя и соответствующая цена товара .

Для наглядности мы также определяем пользовательский ErrorResponse . Это пример пользовательского сообщения об ошибке, которое мы отправим клиенту в виде метаданных.

3.1. Ответ с использованием io.grpc.Status

В сервисном вызове сервера мы проверяем запрос на допустимый товар :

public void getBestCommodityPrice(Commodity request, StreamObserver<CommodityQuote> responseObserver) {

if (commodityLookupBasePrice.get(request.getCommodityName()) == null) {

Metadata.Key<ErrorResponse> errorResponseKey = ProtoUtils.keyForProto(ErrorResponse.getDefaultInstance());
ErrorResponse errorResponse = ErrorResponse.newBuilder()
.setCommodityName(request.getCommodityName())
.setAccessToken(request.getAccessToken())
.setExpectedValue("Only Commodity1, Commodity2 are supported")
.build();
Metadata metadata = new Metadata();
metadata.put(errorResponseKey, errorResponse);
responseObserver.onError(io.grpc.Status.INVALID_ARGUMENT.withDescription("The commodity is not supported")
.asRuntimeException(metadata));
}
// ...
}

В этом простом примере мы возвращаем ошибку, если товар не существует в хэш- таблице productLookupBasePrice . ``

Во- первых, мы создаем собственный ErrorResponse и создаем пару ключ-значение, которую мы добавляем к метаданным в metadata.put(errorResponseKey, errorResponse) .

Мы используем io.grpc.Status для указания статуса ошибки. Функция responseObserver::onError принимает Throwable в качестве параметра, поэтому мы используем asRuntimeException(metadata) для преобразования Status в Throwable . asRuntimeException может дополнительно принимать параметр метаданных (в нашем случае пару ключ-значение ErrorResponse ), который добавляется к трейлерам сообщения.

Если клиент сделает недопустимый запрос, он вернет исключение:

@Test
public void whenUsingInvalidCommodityName_thenReturnExceptionIoRpcStatus() throws Exception {

Commodity request = Commodity.newBuilder()
.setAccessToken("123validToken")
.setCommodityName("Commodity5")
.build();

StatusRuntimeException thrown = Assertions.assertThrows(StatusRuntimeException.class, () -> blockingStub.getBestCommodityPrice(request));

assertEquals("INVALID_ARGUMENT", thrown.getStatus().getCode().toString());
assertEquals("INVALID_ARGUMENT: The commodity is not supported", thrown.getMessage());
Metadata metadata = Status.trailersFromThrowable(thrown);
ErrorResponse errorResponse = metadata.get(ProtoUtils.keyForProto(ErrorResponse.getDefaultInstance()));
assertEquals("Commodity5",errorResponse.getCommodityName());
assertEquals("123validToken", errorResponse.getAccessToken());
assertEquals("Only Commodity1, Commodity2 are supported", errorResponse.getExpectedValue());
}

Вызов blockingStub::getBestCommodityPrice вызывает исключение StatusRuntimeExeption , поскольку в запросе содержится недопустимое имя товара.

Мы используем Status::trailerFromThrowable для доступа к метаданным. ProtoUtils::keyForProto дает нам ключ метаданных ErrorResponse .

3.2. Ответ с использованием com.google.rpc.Status

Рассмотрим следующий пример кода сервера:

public void getBestCommodityPrice(Commodity request, StreamObserver<CommodityQuote> responseObserver) {
// ...
if (request.getAccessToken().equals("123validToken") == false) {

com.google.rpc.Status status = com.google.rpc.Status.newBuilder()
.setCode(com.google.rpc.Code.NOT_FOUND.getNumber())
.setMessage("The access token not found")
.addDetails(Any.pack(ErrorInfo.newBuilder()
.setReason("Invalid Token")
.setDomain("com.foreach.grpc.errorhandling")
.putMetadata("insertToken", "123validToken")
.build()))
.build();
responseObserver.onError(StatusProto.toStatusRuntimeException(status));
}
// ...
}

В реализации getBestCommodityPrice возвращает ошибку, если в запросе отсутствует допустимый токен.

Кроме того, мы устанавливаем код состояния, сообщение и детали com.google.rpc.Status .

В этом примере мы используем предопределенный com.google.rpc.ErrorInfo вместо нашего пользовательского ErrorDetails (хотя при необходимости мы могли бы использовать оба). Мы сериализуем ErrorInfo с помощью Any::pack() .

Класс StatusProto::toStatusRuntimeException преобразует com.google.rpc.Status в Throwable .

В принципе, мы могли бы также добавить другие сообщения, определенные в error_details.proto , для дальнейшей настройки ответа.

Реализация клиента проста:

@Test
public void whenUsingInvalidRequestToken_thenReturnExceptionGoogleRPCStatus() throws Exception {

Commodity request = Commodity.newBuilder()
.setAccessToken("invalidToken")
.setCommodityName("Commodity1")
.build();

StatusRuntimeException thrown = Assertions.assertThrows(StatusRuntimeException.class,
() -> blockingStub.getBestCommodityPrice(request));
com.google.rpc.Status status = StatusProto.fromThrowable(thrown);
assertNotNull(status);
assertEquals("NOT_FOUND", Code.forNumber(status.getCode()).toString());
assertEquals("The access token not found", status.getMessage());
for (Any any : status.getDetailsList()) {
if (any.is(ErrorInfo.class)) {
ErrorInfo errorInfo = any.unpack(ErrorInfo.class);
assertEquals("Invalid Token", errorInfo.getReason());
assertEquals("com.foreach.grpc.errorhandling", errorInfo.getDomain());
assertEquals("123validToken", errorInfo.getMetadataMap().get("insertToken"));
}
}
}

StatusProto.fromThrowable — это служебный метод для получения com.google.rpc.Status непосредственно из исключения.

Из status::getDetailsList мы получаем детали com.google.rpc.ErrorInfo .

4. Ошибки с потоками gRPC

Потоки gRPC позволяют серверам и клиентам отправлять несколько сообщений в одном вызове RPC.

С точки зрения распространения ошибок подход, который мы использовали до сих пор, недействителен с потоками gRPC . Причина в том, что onError() должен быть последним методом, вызываемым в RPC , потому что после этого вызова инфраструктура разрывает связь между клиентом и сервером.

Когда мы используем потоки, это нежелательное поведение. Вместо этого мы хотим оставить соединение открытым для ответа на другие сообщения, которые могут прийти через RPC .

Хорошим решением этой проблемы является добавление ошибки в само сообщение , как показано в файле product_price.proto :

service CommodityPriceProvider {

rpc getBestCommodityPrice(Commodity) returns (CommodityQuote) {}

rpc bidirectionalListOfPrices(stream Commodity) returns (stream StreamingCommodityQuote) {}
}

message Commodity {
string access_token = 1;
string commodity_name = 2;
}

message StreamingCommodityQuote{
oneof message{
CommodityQuote comodity_quote = 1;
google.rpc.Status status = 2;
}
}

Функция bidirectionalListOfPrices возвращает StreamingCommodityQuote . В этом сообщении есть ключевое слово oneof , которое указывает, что оно может использовать либо CommodityQuote , либо google.rpc.Status .

В следующем примере, если клиент отправляет недопустимый токен, сервер добавляет ошибку состояния в текст ответа:

public StreamObserver<Commodity> bidirectionalListOfPrices(StreamObserver<StreamingCommodityQuote> responseObserver) {

return new StreamObserver<Commodity>() {
@Override
public void onNext(Commodity request) {

if (request.getAccessToken().equals("123validToken") == false) {

com.google.rpc.Status status = com.google.rpc.Status.newBuilder()
.setCode(Code.NOT_FOUND.getNumber())
.setMessage("The access token not found")
.addDetails(Any.pack(ErrorInfo.newBuilder()
.setReason("Invalid Token")
.setDomain("com.foreach.grpc.errorhandling")
.putMetadata("insertToken", "123validToken")
.build()))
.build();
StreamingCommodityQuote streamingCommodityQuote = StreamingCommodityQuote.newBuilder()
.setStatus(status)
.build();
responseObserver.onNext(streamingCommodityQuote);
}
// ...
}
}
}

Код создает экземпляр com.google.rpc.Status и добавляет его в ответное сообщение StreamingCommodityQuote . Он не вызывает onError(), поэтому фреймворк не прерывает соединение с клиентом.

Посмотрим на реализацию клиента:

public void onNext(StreamingCommodityQuote streamingCommodityQuote) {

switch (streamingCommodityQuote.getMessageCase()) {
case COMODITY_QUOTE:
CommodityQuote commodityQuote = streamingCommodityQuote.getComodityQuote();
logger.info("RESPONSE producer:" + commodityQuote.getCommodityName() + " price:" + commodityQuote.getPrice());
break;
case STATUS:
com.google.rpc.Status status = streamingCommodityQuote.getStatus();
logger.info("Status code:" + Code.forNumber(status.getCode()));
logger.info("Status message:" + status.getMessage());
for (Any any : status.getDetailsList()) {
if (any.is(ErrorInfo.class)) {
ErrorInfo errorInfo;
try {
errorInfo = any.unpack(ErrorInfo.class);
logger.info("Reason:" + errorInfo.getReason());
logger.info("Domain:" + errorInfo.getDomain());
logger.info("Insert Token:" + errorInfo.getMetadataMap().get("insertToken"));
} catch (InvalidProtocolBufferException e) {
logger.error(e.getMessage());
}
}
}
break;
// ...
}
}

Клиент получает возвращенное сообщение в onNext(StreamingCommodityQuote) и использует оператор switch , чтобы отличить CommodityQuote от com.google.rpc.Status .

5. Вывод

В этом руководстве мы показали, как реализовать обработку ошибок в gRPC для унарных и потоковых вызовов RPC .

gRPC — отличная платформа для удаленной связи в распределенных системах. В этих системах важно иметь очень надежную реализацию обработки ошибок, чтобы помочь контролировать систему. Это еще более важно в сложных архитектурах, таких как микросервисы.

Исходный код примеров можно найти на GitHub .