1. Обзор
gRPC — это платформа для выполнения межпроцессных удаленных вызовов процедур (RPC). Он следует модели клиент-сервер, обладает высокой производительностью и поддерживает наиболее важные компьютерные языки. Ознакомьтесь с нашей статьей Введение в gRPC , чтобы получить хороший обзор.
В этом руководстве мы сосредоточимся на потоках gRPC. Потоковая передача позволяет мультиплексировать сообщения между серверами и клиентами, создавая очень эффективное и гибкое взаимодействие между процессами .
2. Основы потоковой передачи gRPC
gRPC использует сетевой протокол HTTP/2 для взаимодействия между службами . Одним из ключевых преимуществ HTTP/2 является то, что он поддерживает потоки. Каждый поток может мультиплексировать несколько двунаправленных сообщений, использующих одно соединение.
В gRPC у нас может быть потоковая передача с тремя функциональными типами вызовов:
- Серверная потоковая передача RPC: клиент отправляет один запрос на сервер и возвращает несколько сообщений, которые он читает последовательно.
- Клиентская потоковая передача RPC: клиент отправляет последовательность сообщений на сервер. Клиент ждет, пока сервер обработает сообщения, и читает возвращенный ответ.
- Двунаправленная потоковая передача RPC: клиент и сервер могут отправлять несколько сообщений туда и обратно. Сообщения принимаются в том же порядке, в котором они были отправлены. Однако сервер или клиент могут отвечать на полученные сообщения в том порядке, в котором они выбирают.
Чтобы продемонстрировать, как использовать эти процедурные вызовы, мы напишем пример простого клиент-серверного приложения, которое обменивается информацией о фондовых ценных бумагах.
3. Определение услуги
Мы используем stock_quote.proto
для определения интерфейса службы и структуры сообщений полезной нагрузки:
service StockQuoteProvider {
rpc serverSideStreamingGetListStockQuotes(Stock) returns (stream StockQuote) {}
rpc clientSideStreamingGetStatisticsOfStocks(stream Stock) returns (StockQuote) {}
rpc bidirectionalStreamingGetListsStockQuotes(stream Stock) returns (stream StockQuote) {}
}
message Stock {
string ticker_symbol = 1;
string company_name = 2;
string description = 3;
}
message StockQuote {
double price = 1;
int32 offer_number = 2;
string description = 3;
}
Служба StockQuoteProvider
имеет три типа методов, поддерживающих потоковую передачу сообщений. В следующем разделе мы рассмотрим их реализации.
Из сигнатур методов службы мы видим, что клиент запрашивает сервер, отправляя сообщения Stock
. Сервер отправляет ответ обратно, используя сообщения StockQuote
.
Мы используем protobuf-maven-plugin,
определенный в файле pom.xml
, для генерации кода Java из IDL-файла stock-quote.proto
.
Плагин генерирует заглушки на стороне клиента и код на стороне сервера в каталогах target/generated-sources/protobuf/java
и /grpc-java
.
Мы собираемся использовать сгенерированный код для реализации нашего сервера и клиента .
4. Реализация сервера
Конструктор StockServer использует
сервер
gRPC для прослушивания и отправки входящих запросов:
public class StockServer {
private int port;
private io.grpc.Server server;
public StockServer(int port) throws IOException {
this.port = port;
server = ServerBuilder.forPort(port)
.addService(new StockService())
.build();
}
//...
}
Добавляем
StockService
в io.grpc.Server . StockService
расширяет StockQuoteProviderImplBase
, который плагин protobuf
сгенерировал из нашего прото-файла. Поэтому StockQuoteProviderImplBase
имеет заглушки для трех методов службы потоковой передачи .
``
StockService
необходимо переопределить эти методы-заглушки для фактической реализации нашего сервиса .
Далее мы увидим, как это делается для трех случаев потоковой передачи.
4.1. Потоковая передача на стороне сервера
Клиент отправляет один запрос на котировку и получает несколько ответов, в каждом из которых предлагаются разные цены на товар:
@Override
public void serverSideStreamingGetListStockQuotes(Stock request, StreamObserver<StockQuote> responseObserver) {
for (int i = 1; i <= 5; i++) {
StockQuote stockQuote = StockQuote.newBuilder()
.setPrice(fetchStockPriceBid(request))
.setOfferNumber(i)
.setDescription("Price for stock:" + request.getTickerSymbol())
.build();
responseObserver.onNext(stockQuote);
}
responseObserver.onCompleted();
}
Метод создает StockQuote
, извлекает цены и отмечает номер предложения. Для каждого предложения он отправляет сообщение клиенту, вызывая responseObserver::onNext
. Он использует reponseObserver::onCompleted
, чтобы сообщить, что это сделано с помощью RPC.
4.2. Потоковая передача на стороне клиента
Клиент отправляет несколько акций, а сервер возвращает один StockQuote
:
@Override
public StreamObserver<Stock> clientSideStreamingGetStatisticsOfStocks(StreamObserver<StockQuote> responseObserver) {
return new StreamObserver<Stock>() {
int count;
double price = 0.0;
StringBuffer sb = new StringBuffer();
@Override
public void onNext(Stock stock) {
count++;
price = +fetchStockPriceBid(stock);
sb.append(":")
.append(stock.getTickerSymbol());
}
@Override
public void onCompleted() {
responseObserver.onNext(StockQuote.newBuilder()
.setPrice(price / count)
.setDescription("Statistics-" + sb.toString())
.build());
responseObserver.onCompleted();
}
// handle onError() ...
};
}
Метод получает StreamObserver<StockQuote>
в качестве параметра для ответа клиенту. Он возвращает StreamObserver<Stock>
, где обрабатывает сообщения запроса клиента.
Возвращенный StreamObserver<Stock>
переопределяет onNext()
, чтобы получать уведомления каждый раз, когда клиент отправляет запрос.
Метод StreamObserver<Stock>.onCompleted()
вызывается, когда клиент завершил отправку всех сообщений. Со всеми полученными сообщениями об акциях мы находим среднее значение выбранных цен акций, создаем StockQuote
и
вызываем responseObserver::onNext
, чтобы доставить результат клиенту.
Наконец, мы переопределяем StreamObserver<Stock>.onError()
для обработки аварийных завершений.
4.3. Двунаправленная потоковая передача
Клиент отправляет несколько акций, и сервер возвращает набор цен для каждого запроса :
@Override
public StreamObserver<Stock> bidirectionalStreamingGetListsStockQuotes(StreamObserver<StockQuote> responseObserver) {
return new StreamObserver<Stock>() {
@Override
public void onNext(Stock request) {
for (int i = 1; i <= 5; i++) {
StockQuote stockQuote = StockQuote.newBuilder()
.setPrice(fetchStockPriceBid(request))
.setOfferNumber(i)
.setDescription("Price for stock:" + request.getTickerSymbol())
.build();
responseObserver.onNext(stockQuote);
}
}
@Override
public void onCompleted() {
responseObserver.onCompleted();
}
//handle OnError() ...
};
}
У нас та же сигнатура метода, что и в предыдущем примере. Что изменилось в реализации: мы не ждем, пока клиент отправит все сообщения, прежде чем мы ответим.
В этом случае мы вызываем responseObserver::onNext
сразу после получения каждого входящего сообщения и в том же порядке, в котором оно было получено.
Важно отметить, что при необходимости мы могли бы легко изменить порядок ответов.
5. Реализация клиента
Конструктор StockClient
берет канал gRPC и создает экземпляры классов-заглушек, сгенерированных подключаемым модулем gRPC Maven:
public class StockClient {
private StockQuoteProviderBlockingStub blockingStub;
private StockQuoteProviderStub nonBlockingStub;
public StockClient(Channel channel) {
blockingStub = StockQuoteProviderGrpc.newBlockingStub(channel);
nonBlockingStub = StockQuoteProviderGrpc.newStub(channel);
}
// ...
}
StockQuoteProviderBlockingStub
и StockQuoteProviderStub
поддерживают синхронные и асинхронные запросы методов клиента .
Далее мы рассмотрим реализацию клиента для трех потоковых RPC.
5.1. Клиентский RPC с потоковой передачей на стороне сервера
Клиент делает один вызов серверу, запрашивая цену акции, и возвращает список котировок:
public void serverSideStreamingListOfStockPrices() {
Stock request = Stock.newBuilder()
.setTickerSymbol("AU")
.setCompanyName("Austich")
.setDescription("server streaming example")
.build();
Iterator<StockQuote> stockQuotes;
try {
logInfo("REQUEST - ticker symbol {0}", request.getTickerSymbol());
stockQuotes = blockingStub.serverSideStreamingGetListStockQuotes(request);
for (int i = 1; stockQuotes.hasNext(); i++) {
StockQuote stockQuote = stockQuotes.next();
logInfo("RESPONSE - Price #" + i + ": {0}", stockQuote.getPrice());
}
} catch (StatusRuntimeException e) {
logInfo("RPC failed: {0}", e.getStatus());
}
}
Мы используем blockingStub::serverSideStreamingGetListStock
для выполнения синхронного запроса. Мы получаем список StockQuotes
с помощью Iterator
.
5.2. Клиентский RPC с потоковой передачей на стороне клиента
Клиент отправляет поток Stock
на сервер и возвращает StockQuote
с некоторой статистикой:
public void clientSideStreamingGetStatisticsOfStocks() throws InterruptedException {
StreamObserver<StockQuote> responseObserver = new StreamObserver<StockQuote>() {
@Override
public void onNext(StockQuote summary) {
logInfo("RESPONSE, got stock statistics - Average Price: {0}, description: {1}", summary.getPrice(), summary.getDescription());
}
@Override
public void onCompleted() {
logInfo("Finished clientSideStreamingGetStatisticsOfStocks");
}
// Override OnError ...
};
StreamObserver<Stock> requestObserver = nonBlockingStub.clientSideStreamingGetStatisticsOfStocks(responseObserver);
try {
for (Stock stock : stocks) {
logInfo("REQUEST: {0}, {1}", stock.getTickerSymbol(), stock.getCompanyName());
requestObserver.onNext(stock);
}
} catch (RuntimeException e) {
requestObserver.onError(e);
throw e;
}
requestObserver.onCompleted();
}
Как и в примере с сервером, мы используем StreamObservers
для отправки и получения сообщений.
RequestObserver использует неблокирующую
заглушку для отправки списка Stock
на сервер.
С помощью responseObserver
мы получаем StockQuote
с некоторой статистикой.
5.3. Клиент RPC с двунаправленной потоковой передачей
Клиент отправляет поток акций
и возвращает список цен для каждой акции
.
public void bidirectionalStreamingGetListsStockQuotes() throws InterruptedException{
StreamObserver<StockQuote> responseObserver = new StreamObserver<StockQuote>() {
@Override
public void onNext(StockQuote stockQuote) {
logInfo("RESPONSE price#{0} : {1}, description:{2}", stockQuote.getOfferNumber(), stockQuote.getPrice(), stockQuote.getDescription());
}
@Override
public void onCompleted() {
logInfo("Finished bidirectionalStreamingGetListsStockQuotes");
}
//Override onError() ...
};
StreamObserver<Stock> requestObserver = nonBlockingStub.bidirectionalStreamingGetListsStockQuotes(responseObserver);
try {
for (Stock stock : stocks) {
logInfo("REQUEST: {0}, {1}", stock.getTickerSymbol(), stock.getCompanyName());
requestObserver.onNext(stock);
Thread.sleep(200);
}
} catch (RuntimeException e) {
requestObserver.onError(e);
throw e;
}
requestObserver.onCompleted();
}
Реализация очень похожа на случай потоковой передачи на стороне клиента. Мы отправляем Stock
с помощью requestObserver
— единственная разница в том, что теперь мы получаем несколько ответов с помощью responseObserver
. Ответы отделены от запросов — они могут приходить в любом порядке.
6. Запуск сервера и клиента
После использования Maven для компиляции кода нам просто нужно открыть два командных окна.
Чтобы запустить сервер:
mvn exec:java -Dexec.mainClass=com.foreach.grpc.streaming.StockServer
Чтобы запустить клиент:
mvn exec:java -Dexec.mainClass=com.foreach.grpc.streaming.StockClient
7. Заключение
В этой статье мы увидели, как использовать потоковую передачу в gRPC. Потоковая передача — это мощная функция, которая позволяет клиентам и серверам взаимодействовать, отправляя несколько сообщений по одному соединению. Кроме того, сообщения принимаются в том же порядке, в котором они были отправлены, но любая сторона может читать или писать сообщения в любом порядке по своему усмотрению .
Исходный код примеров можно найти на GitHub .