Перейти к основному содержимому

Потоковая передача с помощью gRPC в Java

· 7 мин. чтения

1. Обзор

gRPC — это платформа для выполнения межпроцессных удаленных вызовов процедур (RPC). Он следует модели клиент-сервер, обладает высокой производительностью и поддерживает наиболее важные компьютерные языки. Ознакомьтесь с нашей статьей Введение в gRPC , чтобы получить хороший обзор.

В этом руководстве мы сосредоточимся на потоках gRPC. Потоковая передача позволяет мультиплексировать сообщения между серверами и клиентами, создавая очень эффективное и гибкое взаимодействие между процессами .

2. Основы потоковой передачи gRPC

gRPC использует сетевой протокол HTTP/2 для взаимодействия между службами . Одним из ключевых преимуществ HTTP/2 является то, что он поддерживает потоки. Каждый поток может мультиплексировать несколько двунаправленных сообщений, использующих одно соединение.

В gRPC у нас может быть потоковая передача с тремя функциональными типами вызовов:

  1. Серверная потоковая передача RPC: клиент отправляет один запрос на сервер и возвращает несколько сообщений, которые он читает последовательно.
  2. Клиентская потоковая передача RPC: клиент отправляет последовательность сообщений на сервер. Клиент ждет, пока сервер обработает сообщения, и читает возвращенный ответ.
  3. Двунаправленная потоковая передача RPC: клиент и сервер могут отправлять несколько сообщений туда и обратно. Сообщения принимаются в том же порядке, в котором они были отправлены. Однако сервер или клиент могут отвечать на полученные сообщения в том порядке, в котором они выбирают.

Чтобы продемонстрировать, как использовать эти процедурные вызовы, мы напишем пример простого клиент-серверного приложения, которое обменивается информацией о фондовых ценных бумагах.

3. Определение услуги

Мы используем stock_quote.proto для определения интерфейса службы и структуры сообщений полезной нагрузки:

service StockQuoteProvider {

rpc serverSideStreamingGetListStockQuotes(Stock) returns (stream StockQuote) {}

rpc clientSideStreamingGetStatisticsOfStocks(stream Stock) returns (StockQuote) {}

rpc bidirectionalStreamingGetListsStockQuotes(stream Stock) returns (stream StockQuote) {}
}
message Stock {
string ticker_symbol = 1;
string company_name = 2;
string description = 3;
}
message StockQuote {
double price = 1;
int32 offer_number = 2;
string description = 3;
}

Служба StockQuoteProvider имеет три типа методов, поддерживающих потоковую передачу сообщений. В следующем разделе мы рассмотрим их реализации.

Из сигнатур методов службы мы видим, что клиент запрашивает сервер, отправляя сообщения Stock . Сервер отправляет ответ обратно, используя сообщения StockQuote .

Мы используем protobuf-maven-plugin, определенный в файле pom.xml , для генерации кода Java из IDL-файла stock-quote.proto .

Плагин генерирует заглушки на стороне клиента и код на стороне сервера в каталогах target/generated-sources/protobuf/java и /grpc-java .

Мы собираемся использовать сгенерированный код для реализации нашего сервера и клиента .

4. Реализация сервера

Конструктор StockServer использует сервер gRPC для прослушивания и отправки входящих запросов:

public class StockServer {
private int port;
private io.grpc.Server server;

public StockServer(int port) throws IOException {
this.port = port;
server = ServerBuilder.forPort(port)
.addService(new StockService())
.build();
}
//...
}

Добавляем StockService в io.grpc.Server . StockService расширяет StockQuoteProviderImplBase , который плагин protobuf сгенерировал из нашего прото-файла. Поэтому StockQuoteProviderImplBase имеет заглушки для трех методов службы потоковой передачи . ``

StockService необходимо переопределить эти методы-заглушки для фактической реализации нашего сервиса .

Далее мы увидим, как это делается для трех случаев потоковой передачи.

4.1. Потоковая передача на стороне сервера

Клиент отправляет один запрос на котировку и получает несколько ответов, в каждом из которых предлагаются разные цены на товар:

@Override
public void serverSideStreamingGetListStockQuotes(Stock request, StreamObserver<StockQuote> responseObserver) {
for (int i = 1; i <= 5; i++) {
StockQuote stockQuote = StockQuote.newBuilder()
.setPrice(fetchStockPriceBid(request))
.setOfferNumber(i)
.setDescription("Price for stock:" + request.getTickerSymbol())
.build();
responseObserver.onNext(stockQuote);
}
responseObserver.onCompleted();
}

Метод создает StockQuote , извлекает цены и отмечает номер предложения. Для каждого предложения он отправляет сообщение клиенту, вызывая responseObserver::onNext . Он использует reponseObserver::onCompleted , чтобы сообщить, что это сделано с помощью RPC.

4.2. Потоковая передача на стороне клиента

Клиент отправляет несколько акций, а сервер возвращает один StockQuote :

@Override
public StreamObserver<Stock> clientSideStreamingGetStatisticsOfStocks(StreamObserver<StockQuote> responseObserver) {
return new StreamObserver<Stock>() {
int count;
double price = 0.0;
StringBuffer sb = new StringBuffer();

@Override
public void onNext(Stock stock) {
count++;
price = +fetchStockPriceBid(stock);
sb.append(":")
.append(stock.getTickerSymbol());
}

@Override
public void onCompleted() {
responseObserver.onNext(StockQuote.newBuilder()
.setPrice(price / count)
.setDescription("Statistics-" + sb.toString())
.build());
responseObserver.onCompleted();
}

// handle onError() ...
};
}

Метод получает StreamObserver<StockQuote> в качестве параметра для ответа клиенту. Он возвращает StreamObserver<Stock> , где обрабатывает сообщения запроса клиента.

Возвращенный StreamObserver<Stock> переопределяет onNext() , чтобы получать уведомления каждый раз, когда клиент отправляет запрос.

Метод StreamObserver<Stock>.onCompleted() вызывается, когда клиент завершил отправку всех сообщений. Со всеми полученными сообщениями об акциях мы находим среднее значение выбранных цен акций, создаем StockQuote и вызываем responseObserver::onNext , чтобы доставить результат клиенту.

Наконец, мы переопределяем StreamObserver<Stock>.onError() для обработки аварийных завершений.

4.3. Двунаправленная потоковая передача

Клиент отправляет несколько акций, и сервер возвращает набор цен для каждого запроса :

@Override
public StreamObserver<Stock> bidirectionalStreamingGetListsStockQuotes(StreamObserver<StockQuote> responseObserver) {
return new StreamObserver<Stock>() {
@Override
public void onNext(Stock request) {
for (int i = 1; i <= 5; i++) {
StockQuote stockQuote = StockQuote.newBuilder()
.setPrice(fetchStockPriceBid(request))
.setOfferNumber(i)
.setDescription("Price for stock:" + request.getTickerSymbol())
.build();
responseObserver.onNext(stockQuote);
}
}

@Override
public void onCompleted() {
responseObserver.onCompleted();
}

//handle OnError() ...
};
}

У нас та же сигнатура метода, что и в предыдущем примере. Что изменилось в реализации: мы не ждем, пока клиент отправит все сообщения, прежде чем мы ответим.

В этом случае мы вызываем responseObserver::onNext сразу после получения каждого входящего сообщения и в том же порядке, в котором оно было получено.

Важно отметить, что при необходимости мы могли бы легко изменить порядок ответов.

5. Реализация клиента

Конструктор StockClient берет канал gRPC и создает экземпляры классов-заглушек, сгенерированных подключаемым модулем gRPC Maven:

public class StockClient {
private StockQuoteProviderBlockingStub blockingStub;
private StockQuoteProviderStub nonBlockingStub;

public StockClient(Channel channel) {
blockingStub = StockQuoteProviderGrpc.newBlockingStub(channel);
nonBlockingStub = StockQuoteProviderGrpc.newStub(channel);
}
// ...
}

StockQuoteProviderBlockingStub и StockQuoteProviderStub поддерживают синхронные и асинхронные запросы методов клиента .

Далее мы рассмотрим реализацию клиента для трех потоковых RPC.

5.1. Клиентский RPC с потоковой передачей на стороне сервера

Клиент делает один вызов серверу, запрашивая цену акции, и возвращает список котировок:

public void serverSideStreamingListOfStockPrices() {
Stock request = Stock.newBuilder()
.setTickerSymbol("AU")
.setCompanyName("Austich")
.setDescription("server streaming example")
.build();
Iterator<StockQuote> stockQuotes;
try {
logInfo("REQUEST - ticker symbol {0}", request.getTickerSymbol());
stockQuotes = blockingStub.serverSideStreamingGetListStockQuotes(request);
for (int i = 1; stockQuotes.hasNext(); i++) {
StockQuote stockQuote = stockQuotes.next();
logInfo("RESPONSE - Price #" + i + ": {0}", stockQuote.getPrice());
}
} catch (StatusRuntimeException e) {
logInfo("RPC failed: {0}", e.getStatus());
}
}

Мы используем blockingStub::serverSideStreamingGetListStock для выполнения синхронного запроса. Мы получаем список StockQuotes с помощью Iterator .

5.2. Клиентский RPC с потоковой передачей на стороне клиента

Клиент отправляет поток Stock на сервер и возвращает StockQuote с некоторой статистикой:

public void clientSideStreamingGetStatisticsOfStocks() throws InterruptedException {
StreamObserver<StockQuote> responseObserver = new StreamObserver<StockQuote>() {
@Override
public void onNext(StockQuote summary) {
logInfo("RESPONSE, got stock statistics - Average Price: {0}, description: {1}", summary.getPrice(), summary.getDescription());
}

@Override
public void onCompleted() {
logInfo("Finished clientSideStreamingGetStatisticsOfStocks");
}

// Override OnError ...
};

StreamObserver<Stock> requestObserver = nonBlockingStub.clientSideStreamingGetStatisticsOfStocks(responseObserver);
try {
for (Stock stock : stocks) {
logInfo("REQUEST: {0}, {1}", stock.getTickerSymbol(), stock.getCompanyName());
requestObserver.onNext(stock);
}
} catch (RuntimeException e) {
requestObserver.onError(e);
throw e;
}
requestObserver.onCompleted();
}

Как и в примере с сервером, мы используем StreamObservers для отправки и получения сообщений.

RequestObserver использует неблокирующую заглушку для отправки списка Stock на сервер.

С помощью responseObserver мы получаем StockQuote с некоторой статистикой.

5.3. Клиент RPC с двунаправленной потоковой передачей

Клиент отправляет поток акций и возвращает список цен для каждой акции .

public void bidirectionalStreamingGetListsStockQuotes() throws InterruptedException{
StreamObserver<StockQuote> responseObserver = new StreamObserver<StockQuote>() {
@Override
public void onNext(StockQuote stockQuote) {
logInfo("RESPONSE price#{0} : {1}, description:{2}", stockQuote.getOfferNumber(), stockQuote.getPrice(), stockQuote.getDescription());
}

@Override
public void onCompleted() {
logInfo("Finished bidirectionalStreamingGetListsStockQuotes");
}

//Override onError() ...
};

StreamObserver<Stock> requestObserver = nonBlockingStub.bidirectionalStreamingGetListsStockQuotes(responseObserver);
try {
for (Stock stock : stocks) {
logInfo("REQUEST: {0}, {1}", stock.getTickerSymbol(), stock.getCompanyName());
requestObserver.onNext(stock);
Thread.sleep(200);
}
} catch (RuntimeException e) {
requestObserver.onError(e);
throw e;
}
requestObserver.onCompleted();
}

Реализация очень похожа на случай потоковой передачи на стороне клиента. Мы отправляем Stock с помощью requestObserver — единственная разница в том, что теперь мы получаем несколько ответов с помощью responseObserver . Ответы отделены от запросов — они могут приходить в любом порядке.

6. Запуск сервера и клиента

После использования Maven для компиляции кода нам просто нужно открыть два командных окна.

Чтобы запустить сервер:

mvn exec:java -Dexec.mainClass=com.foreach.grpc.streaming.StockServer

Чтобы запустить клиент:

mvn exec:java -Dexec.mainClass=com.foreach.grpc.streaming.StockClient

7. Заключение

В этой статье мы увидели, как использовать потоковую передачу в gRPC. Потоковая передача — это мощная функция, которая позволяет клиентам и серверам взаимодействовать, отправляя несколько сообщений по одному соединению. Кроме того, сообщения принимаются в том же порядке, в котором они были отправлены, но любая сторона может читать или писать сообщения в любом порядке по своему усмотрению .

Исходный код примеров можно найти на GitHub .