Перейти к основному содержимому

Введение в Нетти

· 8 мин. чтения

1. Введение

В этой статье мы рассмотрим Netty — асинхронную среду сетевых приложений, управляемую событиями.

Основная цель Netty — построение высокопроизводительных протокольных серверов на основе NIO (или, возможно, NIO.2) с разделением и слабой связью компонентов сети и бизнес-логики. Он может реализовывать широко известный протокол, такой как HTTP, или ваш собственный протокол.

2. Основные концепции

Netty — это неблокирующий фреймворк. Это приводит к высокой пропускной способности по сравнению с блокирующим вводом-выводом. Понимание неблокирующего ввода-вывода имеет решающее значение для понимания основных компонентов Netty и их взаимосвязей.

2.1. Канал

Канал — это основа Java NIO. Он представляет собой открытое соединение, способное выполнять операции ввода-вывода, такие как чтение и запись.

2.2. Будущее

Каждая операция ввода-вывода на канале в Netty является неблокирующей.

Это означает, что каждая операция возвращается сразу после вызова. В стандартной библиотеке Java есть интерфейс Future , но он не удобен для целей Netty — мы можем только спросить Future о завершении операции или заблокировать текущий поток до выполнения операции.

Именно поэтому у Netty есть собственный интерфейс ChannelFuture . Мы можем передать обратный вызов ChannelFuture , который будет вызван после завершения операции.

2.3. События и обработчики

Netty использует парадигму приложений, управляемых событиями, поэтому конвейер обработки данных представляет собой цепочку событий, проходящих через обработчики. События и обработчики могут быть связаны с входящим и исходящим потоком данных. Входящие события могут быть следующими:

  • Активация и деактивация канала
  • Чтение событий операции
  • Исключительные события
  • Пользовательские события

Исходящие события проще и, как правило, связаны с открытием/закрытием соединения и записью/удалением данных.

Приложения Netty состоят из нескольких сетевых событий и событий логики приложений и их обработчиков. Базовыми интерфейсами для обработчиков событий канала являются ChannelHandler и его преемники ChannelOutboundHandler и ChannelInboundHandler .

Netty предоставляет огромную иерархию реализаций ChannelHandler. Стоит отметить адаптеры, которые являются просто пустыми реализациями, например ChannelInboundHandlerAdapter и ChannelOutboundHandlerAdapter . Мы могли бы расширить эти адаптеры, когда нам нужно обработать только подмножество всех событий.

Кроме того, существует множество реализаций конкретных протоколов, таких как HTTP, например , HttpRequestDecoder, HttpResponseEncoder, HttpObjectAggregator. Было бы неплохо ознакомиться с ними в Javadoc Netty.

2.4. Кодеры и декодеры

Поскольку мы работаем с сетевым протоколом, нам необходимо выполнить сериализацию и десериализацию данных. Для этой цели Netty вводит специальные расширения ChannelInboundHandler для декодеров , способных декодировать входящие данные. Базовым классом большинства декодеров является ByteToMessageDecoder.

Для кодирования исходящих данных у Netty есть расширения ChannelOutboundHandler , называемые кодировщиками. MessageToByteEncoder является основой для большинства реализаций кодировщика . Мы можем преобразовать сообщение из последовательности байтов в объект Java и наоборот с помощью кодировщиков и декодеров.

3. Пример серверного приложения

Давайте создадим проект, представляющий простой протокольный сервер, который получает запрос, выполняет расчет и отправляет ответ.

3.1. Зависимости

Прежде всего, нам нужно предоставить зависимость Netty в нашем pom.xml :

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.10.Final</version>
</dependency>

Мы можем найти последнюю версию на Maven Central .

3.2. Модель данных

Класс данных запроса будет иметь следующую структуру:

public class RequestData {
private int intValue;
private String stringValue;

// standard getters and setters
}

Предположим, что сервер получает запрос и возвращает значение intValue , умноженное на 2. Ответ будет иметь единственное значение int:

public class ResponseData {
private int intValue;

// standard getters and setters
}

3.3. Запросить декодер

Теперь нам нужно создать кодеры и декодеры для наших протокольных сообщений.

Стоит отметить, что Netty работает с буфером приема сокета , который представлен не в виде очереди, а просто в виде кучи байтов. Это означает, что наш входящий обработчик может быть вызван, когда полное сообщение не получено сервером.

Мы должны убедиться, что мы получили полное сообщение перед обработкой , и есть много способов сделать это.

Прежде всего, мы можем создать временный ByteBuf и добавлять к нему все входящие байты, пока не получим необходимое количество байтов:

public class SimpleProcessingHandler 
extends ChannelInboundHandlerAdapter {
private ByteBuf tmp;

@Override
public void handlerAdded(ChannelHandlerContext ctx) {
System.out.println("Handler added");
tmp = ctx.alloc().buffer(4);
}

@Override
public void handlerRemoved(ChannelHandlerContext ctx) {
System.out.println("Handler removed");
tmp.release();
tmp = null;
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf m = (ByteBuf) msg;
tmp.writeBytes(m);
m.release();
if (tmp.readableBytes() >= 4) {
// request processing
RequestData requestData = new RequestData();
requestData.setIntValue(tmp.readInt());
ResponseData responseData = new ResponseData();
responseData.setIntValue(requestData.getIntValue() * 2);
ChannelFuture future = ctx.writeAndFlush(responseData);
future.addListener(ChannelFutureListener.CLOSE);
}
}
}

Показанный выше пример выглядит немного странно, но помогает нам понять, как работает Netty. Каждый метод нашего обработчика вызывается, когда происходит соответствующее ему событие. Итак, мы инициализируем буфер при добавлении обработчика, заполняем его данными при получении новых байтов и начинаем обрабатывать, когда получаем достаточно данных.

Мы намеренно не использовали stringValue — такое декодирование было бы излишне сложным. Вот почему Netty предоставляет полезные классы декодера, которые являются реализациями ChannelInboundHandler : ByteToMessageDecoder и ReplayingDecoder .

Как мы отмечали выше, мы можем создать конвейер обработки каналов с помощью Netty. Таким образом, мы можем поставить наш декодер в качестве первого обработчика, а обработчик логики обработки может следовать за ним.

Далее показан декодер для RequestData:

public class RequestDecoder extends ReplayingDecoder<RequestData> {

private final Charset charset = Charset.forName("UTF-8");

@Override
protected void decode(ChannelHandlerContext ctx,
ByteBuf in, List<Object> out) throws Exception {

RequestData data = new RequestData();
data.setIntValue(in.readInt());
int strLen = in.readInt();
data.setStringValue(
in.readCharSequence(strLen, charset).toString());
out.add(data);
}
}

Идея этого декодера довольно проста. Он использует реализацию ByteBuf , которая генерирует исключение, когда в буфере недостаточно данных для операции чтения.

При обнаружении исключения буфер перематывается на начало, и декодер ожидает новую порцию данных. Декодирование останавливается, когда исходящий список не пуст после выполнения декодирования .

3.4. Кодировщик ответа

Помимо декодирования RequestData нам нужно закодировать сообщение. Эта операция проще, потому что у нас есть полные данные сообщения, когда происходит операция записи.

Мы можем записывать данные в Channel в нашем основном обработчике или мы можем отделить логику и создать обработчик, расширяющий MessageToByteEncoder , который будет перехватывать операцию записи ResponseData :

public class ResponseDataEncoder 
extends MessageToByteEncoder<ResponseData> {

@Override
protected void encode(ChannelHandlerContext ctx,
ResponseData msg, ByteBuf out) throws Exception {
out.writeInt(msg.getIntValue());
}
}

3.5. Обработка запроса

Так как декодирование и кодирование мы проводили в отдельных обработчиках, нам нужно изменить наш ProcessingHandler :

public class ProcessingHandler extends ChannelInboundHandlerAdapter {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {

RequestData requestData = (RequestData) msg;
ResponseData responseData = new ResponseData();
responseData.setIntValue(requestData.getIntValue() * 2);
ChannelFuture future = ctx.writeAndFlush(responseData);
future.addListener(ChannelFutureListener.CLOSE);
System.out.println(requestData);
}
}

3.6. Начальная загрузка сервера

Теперь соберем все вместе и запустим наш сервер:

public class NettyServer {

private int port;

// constructor

public static void main(String[] args) throws Exception {

int port = args.length > 0
? Integer.parseInt(args[0]);
: 8080;

new NettyServer(port).run();
}

public void run() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch)
throws Exception {
ch.pipeline().addLast(new RequestDecoder(),
new ResponseDataEncoder(),
new ProcessingHandler());
}
}).option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);

ChannelFuture f = b.bind(port).sync();
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}

Подробную информацию о классах, используемых в приведенном выше примере начальной загрузки сервера, можно найти в их Javadoc. Самая интересная часть это строка:

ch.pipeline().addLast(
new RequestDecoder(),
new ResponseDataEncoder(),
new ProcessingHandler());

Здесь мы определяем входящие и исходящие обработчики, которые будут обрабатывать запросы и выводить в правильном порядке.

4. Клиентское приложение

Клиент должен выполнять обратное кодирование и декодирование, поэтому нам нужны RequestDataEncoder и ResponseDataDecoder :

public class RequestDataEncoder 
extends MessageToByteEncoder<RequestData> {

private final Charset charset = Charset.forName("UTF-8");

@Override
protected void encode(ChannelHandlerContext ctx,
RequestData msg, ByteBuf out) throws Exception {

out.writeInt(msg.getIntValue());
out.writeInt(msg.getStringValue().length());
out.writeCharSequence(msg.getStringValue(), charset);
}
}
public class ResponseDataDecoder 
extends ReplayingDecoder<ResponseData> {

@Override
protected void decode(ChannelHandlerContext ctx,
ByteBuf in, List<Object> out) throws Exception {

ResponseData data = new ResponseData();
data.setIntValue(in.readInt());
out.add(data);
}
}

Также нам нужно определить ClientHandler , который будет отправлять запрос и получать ответ от сервера:

public class ClientHandler extends ChannelInboundHandlerAdapter {

@Override
public void channelActive(ChannelHandlerContext ctx)
throws Exception {

RequestData msg = new RequestData();
msg.setIntValue(123);
msg.setStringValue(
"all work and no play makes jack a dull boy");
ChannelFuture future = ctx.writeAndFlush(msg);
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
System.out.println((ResponseData)msg);
ctx.close();
}
}

Теперь давайте загрузим клиент:

public class NettyClient {
public static void main(String[] args) throws Exception {

String host = "localhost";
int port = 8080;
EventLoopGroup workerGroup = new NioEventLoopGroup();

try {
Bootstrap b = new Bootstrap();
b.group(workerGroup);
b.channel(NioSocketChannel.class);
b.option(ChannelOption.SO_KEEPALIVE, true);
b.handler(new ChannelInitializer<SocketChannel>() {

@Override
public void initChannel(SocketChannel ch)
throws Exception {
ch.pipeline().addLast(new RequestDataEncoder(),
new ResponseDataDecoder(), new ClientHandler());
}
});

ChannelFuture f = b.connect(host, port).sync();

f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
}
}
}

Как мы видим, есть много общих деталей с начальной загрузкой сервера.

Теперь мы можем запустить основной метод клиента и посмотреть на вывод консоли. Как и ожидалось, мы получили ResponseData с intValue равным 246.

5. Вывод

В этой статье у нас было краткое введение в Netty. Мы показали его основные компоненты, такие как Channel и ChannelHandler . Также мы сделали простой сервер неблокирующего протокола и клиент для него.

Как всегда, все образцы кода доступны на GitHub .