1. Обзор
gRPC — это платформа для выполнения межпроцессных удаленных вызовов процедур (RPC). Он обладает высокой производительностью и может работать в любой среде.
В этом руководстве мы сосредоточимся на обработке ошибок gRPC с помощью Java. gRPC имеет очень низкую задержку и высокую пропускную способность, поэтому он идеально подходит для использования в сложных средах, таких как микросервисные архитектуры. В этих системах очень важно иметь хорошее представление о состоянии, производительности и сбоях различных компонентов сети. Таким образом, хорошая реализация обработки ошибок имеет решающее значение для достижения предыдущих целей.
2. Основы обработки ошибок в gRPC
Ошибки в gRPC являются сущностями первого класса, т . е. каждый вызов в gRPC является либо сообщением полезной нагрузки, либо сообщением об ошибке состояния .
Ошибки закодированы в сообщениях о состоянии и реализованы на всех поддерживаемых языках .
Как правило, мы не должны включать ошибки в полезную нагрузку ответа. С этой целью всегда используйте StreamObserver::
OnError,
который внутренне добавляет ошибку состояния в конечные заголовки. Единственным исключением, как мы увидим ниже, является работа с потоками.
Все клиентские или серверные библиотеки gRPC поддерживают официальную модель ошибок gRPC . Java инкапсулирует эту модель ошибки в класс io.grpc.Status
. Для этого класса требуется стандартный код состояния ошибки и необязательное строковое сообщение об ошибке для предоставления дополнительной информации . Преимущество этой модели ошибок в том, что она поддерживается независимо от используемой кодировки данных (протокольные буферы, REST и т. д.). Однако он довольно ограничен, поскольку мы не можем включать сведения об ошибке в статус. ``
Если ваше приложение gRPC реализует буферы протоколов для кодирования данных, вы можете использовать более богатую модель ошибок для API Google . Класс com.google.rpc.Status
инкапсулирует эту модель ошибки. Этот класс предоставляет значения com.google.rpc.Code
, сообщение об ошибке и дополнительные сведения об ошибке , которые добавляются в виде сообщений protobuf
. Кроме того, мы можем использовать предопределенный набор сообщений об ошибках protobuf , определенных в
error_details.proto
, которые охватывают наиболее распространенные случаи. В пакете com.google.rpc
есть классы: RetryInfo
, DebugInfo
, QuotaFailure
, ErrorInfo
, PrecondicionFailure
, BadRequest
, RequestInfo
, ResourceInfo
и Help
, которые инкапсулируют все сообщения об ошибках в error_details.proto
.
В дополнение к двум моделям ошибок мы можем определить собственные сообщения об ошибках, которые можно добавить в виде пар ключ-значение в метаданные RPC .
Мы собираемся написать очень простое приложение, чтобы показать, как использовать эти модели ошибок со службой ценообразования, где клиент отправляет названия товаров, а сервер предоставляет значения цен.
3. Унарные вызовы RPC
Начнем с рассмотрения следующего интерфейса сервиса, определенного в product_price.proto
:
service CommodityPriceProvider {
rpc getBestCommodityPrice(Commodity) returns (CommodityQuote) {}
}
message Commodity {
string access_token = 1;
string commodity_name = 2;
}
message CommodityQuote {
string commodity_name = 1;
string producer_name = 2;
double price = 3;
}
message ErrorResponse {
string commodity_name = 1;
string access_token = 2;
string expected_token = 3;
string expected_value = 4;
}
Вход сервиса – товарное
сообщение. В запросе клиент должен предоставить access_token
и product_name
.
Сервер синхронно отвечает CommodityQuote
, в котором указаны имя_товара
, имя_производителя
и соответствующая цена
товара
.
Для наглядности мы также определяем пользовательский ErrorResponse
. Это пример пользовательского сообщения об ошибке, которое мы отправим клиенту в виде метаданных.
3.1. Ответ с использованием io.grpc.Status
В сервисном вызове сервера мы проверяем запрос на допустимый товар
:
public void getBestCommodityPrice(Commodity request, StreamObserver<CommodityQuote> responseObserver) {
if (commodityLookupBasePrice.get(request.getCommodityName()) == null) {
Metadata.Key<ErrorResponse> errorResponseKey = ProtoUtils.keyForProto(ErrorResponse.getDefaultInstance());
ErrorResponse errorResponse = ErrorResponse.newBuilder()
.setCommodityName(request.getCommodityName())
.setAccessToken(request.getAccessToken())
.setExpectedValue("Only Commodity1, Commodity2 are supported")
.build();
Metadata metadata = new Metadata();
metadata.put(errorResponseKey, errorResponse);
responseObserver.onError(io.grpc.Status.INVALID_ARGUMENT.withDescription("The commodity is not supported")
.asRuntimeException(metadata));
}
// ...
}
В этом простом примере мы возвращаем ошибку, если товар
не существует в хэш- таблице
productLookupBasePrice
. ``
Во- первых, мы создаем собственный ErrorResponse
и создаем пару ключ-значение, которую мы добавляем к метаданным в metadata.put(errorResponseKey, errorResponse)
.
Мы используем io.grpc.Status
для указания статуса ошибки. Функция responseObserver::onError
принимает Throwable
в качестве параметра, поэтому мы используем asRuntimeException(metadata)
для преобразования Status
в Throwable
. asRuntimeException
может дополнительно принимать параметр метаданных (в нашем случае пару ключ-значение ErrorResponse
), который добавляется к трейлерам сообщения.
Если клиент сделает недопустимый запрос, он вернет исключение:
@Test
public void whenUsingInvalidCommodityName_thenReturnExceptionIoRpcStatus() throws Exception {
Commodity request = Commodity.newBuilder()
.setAccessToken("123validToken")
.setCommodityName("Commodity5")
.build();
StatusRuntimeException thrown = Assertions.assertThrows(StatusRuntimeException.class, () -> blockingStub.getBestCommodityPrice(request));
assertEquals("INVALID_ARGUMENT", thrown.getStatus().getCode().toString());
assertEquals("INVALID_ARGUMENT: The commodity is not supported", thrown.getMessage());
Metadata metadata = Status.trailersFromThrowable(thrown);
ErrorResponse errorResponse = metadata.get(ProtoUtils.keyForProto(ErrorResponse.getDefaultInstance()));
assertEquals("Commodity5",errorResponse.getCommodityName());
assertEquals("123validToken", errorResponse.getAccessToken());
assertEquals("Only Commodity1, Commodity2 are supported", errorResponse.getExpectedValue());
}
Вызов blockingStub::getBestCommodityPrice
вызывает исключение StatusRuntimeExeption
, поскольку в запросе содержится недопустимое имя товара.
Мы используем Status::trailerFromThrowable
для доступа к метаданным. ProtoUtils::keyForProto
дает нам ключ метаданных ErrorResponse
.
3.2. Ответ с использованием com.google.rpc.Status
Рассмотрим следующий пример кода сервера:
public void getBestCommodityPrice(Commodity request, StreamObserver<CommodityQuote> responseObserver) {
// ...
if (request.getAccessToken().equals("123validToken") == false) {
com.google.rpc.Status status = com.google.rpc.Status.newBuilder()
.setCode(com.google.rpc.Code.NOT_FOUND.getNumber())
.setMessage("The access token not found")
.addDetails(Any.pack(ErrorInfo.newBuilder()
.setReason("Invalid Token")
.setDomain("com.foreach.grpc.errorhandling")
.putMetadata("insertToken", "123validToken")
.build()))
.build();
responseObserver.onError(StatusProto.toStatusRuntimeException(status));
}
// ...
}
В реализации getBestCommodityPrice
возвращает ошибку, если в запросе отсутствует допустимый токен.
Кроме того, мы устанавливаем код состояния, сообщение и детали com.google.rpc.Status
.
В этом примере мы используем предопределенный com.google.rpc.ErrorInfo
вместо нашего пользовательского ErrorDetails
(хотя при необходимости мы могли бы использовать оба). Мы сериализуем ErrorInfo
с помощью Any::pack()
.
Класс StatusProto::toStatusRuntimeException
преобразует com.google.rpc.Status
в Throwable
.
В принципе, мы могли бы также добавить другие сообщения, определенные в error_details.proto
, для дальнейшей настройки ответа.
Реализация клиента проста:
@Test
public void whenUsingInvalidRequestToken_thenReturnExceptionGoogleRPCStatus() throws Exception {
Commodity request = Commodity.newBuilder()
.setAccessToken("invalidToken")
.setCommodityName("Commodity1")
.build();
StatusRuntimeException thrown = Assertions.assertThrows(StatusRuntimeException.class,
() -> blockingStub.getBestCommodityPrice(request));
com.google.rpc.Status status = StatusProto.fromThrowable(thrown);
assertNotNull(status);
assertEquals("NOT_FOUND", Code.forNumber(status.getCode()).toString());
assertEquals("The access token not found", status.getMessage());
for (Any any : status.getDetailsList()) {
if (any.is(ErrorInfo.class)) {
ErrorInfo errorInfo = any.unpack(ErrorInfo.class);
assertEquals("Invalid Token", errorInfo.getReason());
assertEquals("com.foreach.grpc.errorhandling", errorInfo.getDomain());
assertEquals("123validToken", errorInfo.getMetadataMap().get("insertToken"));
}
}
}
StatusProto.fromThrowable
— это служебный метод для получения com.google.rpc.Status
непосредственно из исключения.
Из status::getDetailsList
мы получаем детали com.google.rpc.ErrorInfo .
4. Ошибки с потоками gRPC
Потоки gRPC позволяют серверам и клиентам отправлять несколько сообщений в одном вызове RPC.
С точки зрения распространения ошибок подход, который мы использовали до сих пор, недействителен с потоками gRPC . Причина в том, что onError()
должен быть последним методом, вызываемым в RPC , потому что после этого вызова инфраструктура разрывает связь между клиентом и сервером.
Когда мы используем потоки, это нежелательное поведение. Вместо этого мы хотим оставить соединение открытым для ответа на другие сообщения, которые могут прийти через RPC .
Хорошим решением этой проблемы является добавление ошибки в само сообщение , как показано в файле product_price.proto
:
service CommodityPriceProvider {
rpc getBestCommodityPrice(Commodity) returns (CommodityQuote) {}
rpc bidirectionalListOfPrices(stream Commodity) returns (stream StreamingCommodityQuote) {}
}
message Commodity {
string access_token = 1;
string commodity_name = 2;
}
message StreamingCommodityQuote{
oneof message{
CommodityQuote comodity_quote = 1;
google.rpc.Status status = 2;
}
}
Функция bidirectionalListOfPrices
возвращает StreamingCommodityQuote
. В этом сообщении есть ключевое слово oneof
, которое указывает, что оно может использовать либо CommodityQuote
, либо google.rpc.Status
.
В следующем примере, если клиент отправляет недопустимый токен, сервер добавляет ошибку состояния в текст ответа:
public StreamObserver<Commodity> bidirectionalListOfPrices(StreamObserver<StreamingCommodityQuote> responseObserver) {
return new StreamObserver<Commodity>() {
@Override
public void onNext(Commodity request) {
if (request.getAccessToken().equals("123validToken") == false) {
com.google.rpc.Status status = com.google.rpc.Status.newBuilder()
.setCode(Code.NOT_FOUND.getNumber())
.setMessage("The access token not found")
.addDetails(Any.pack(ErrorInfo.newBuilder()
.setReason("Invalid Token")
.setDomain("com.foreach.grpc.errorhandling")
.putMetadata("insertToken", "123validToken")
.build()))
.build();
StreamingCommodityQuote streamingCommodityQuote = StreamingCommodityQuote.newBuilder()
.setStatus(status)
.build();
responseObserver.onNext(streamingCommodityQuote);
}
// ...
}
}
}
Код создает экземпляр com.google.rpc.Status
и добавляет его в ответное сообщение StreamingCommodityQuote
. Он не вызывает onError(),
поэтому фреймворк не прерывает соединение с клиентом.
Посмотрим на реализацию клиента:
public void onNext(StreamingCommodityQuote streamingCommodityQuote) {
switch (streamingCommodityQuote.getMessageCase()) {
case COMODITY_QUOTE:
CommodityQuote commodityQuote = streamingCommodityQuote.getComodityQuote();
logger.info("RESPONSE producer:" + commodityQuote.getCommodityName() + " price:" + commodityQuote.getPrice());
break;
case STATUS:
com.google.rpc.Status status = streamingCommodityQuote.getStatus();
logger.info("Status code:" + Code.forNumber(status.getCode()));
logger.info("Status message:" + status.getMessage());
for (Any any : status.getDetailsList()) {
if (any.is(ErrorInfo.class)) {
ErrorInfo errorInfo;
try {
errorInfo = any.unpack(ErrorInfo.class);
logger.info("Reason:" + errorInfo.getReason());
logger.info("Domain:" + errorInfo.getDomain());
logger.info("Insert Token:" + errorInfo.getMetadataMap().get("insertToken"));
} catch (InvalidProtocolBufferException e) {
logger.error(e.getMessage());
}
}
}
break;
// ...
}
}
Клиент получает возвращенное сообщение в onNext(StreamingCommodityQuote)
и использует оператор switch
, чтобы отличить CommodityQuote
от com.google.rpc.Status
.
5. Вывод
В этом руководстве мы показали, как реализовать обработку ошибок в gRPC для унарных и потоковых вызовов RPC .
gRPC — отличная платформа для удаленной связи в распределенных системах. В этих системах важно иметь очень надежную реализацию обработки ошибок, чтобы помочь контролировать систему. Это еще более важно в сложных архитектурах, таких как микросервисы.
Исходный код примеров можно найти на GitHub .