Перейти к основному содержимому

Руководство по каналу асинхронных сокетов NIO2

· 6 мин. чтения

1. Обзор

В этой статье мы продемонстрируем, как создать простой сервер и его клиент, используя API канала Java 7 NIO.2.

Мы рассмотрим классы AsynchronousServerSocketChannel и AsynchronousSocketChannel , которые являются ключевыми классами, используемыми при реализации сервера и клиента соответственно.

Если вы новичок в API канала NIO.2, у нас есть вводная статья на этом сайте. Вы можете прочитать его, перейдя по этой ссылке .

Все классы, необходимые для использования API каналов NIO.2, объединены в пакет java.nio.channels :

import java.nio.channels.*;

2. Сервер будущего

Экземпляр AsynchronousServerSocketChannel создается путем вызова статического открытого API для его класса:

AsynchronousServerSocketChannel server
= AsynchronousServerSocketChannel.open();

Вновь созданный асинхронный канал сокета сервера открыт, но еще не привязан, поэтому мы должны привязать его к локальному адресу и дополнительно выбрать порт:

server.bind(new InetSocketAddress("127.0.0.1", 4555));

С таким же успехом мы могли бы передать null, чтобы он использовал локальный адрес и привязывался к произвольному порту:

server.bind(null);

После привязки API-интерфейс accept используется для инициации приема подключений к сокету канала:

Future<AsynchronousSocketChannel> acceptFuture = server.accept();

Как и в случае с асинхронными операциями канала, приведенный выше вызов сразу же возвращается, и выполнение продолжается.

Затем мы можем использовать API get для запроса ответа от объекта Future :

AsynchronousSocketChannel worker = future.get();

Этот вызов будет заблокирован, если необходимо дождаться запроса на подключение от клиента. При желании мы можем указать тайм-аут, если мы не хотим ждать вечно:

AsynchronousSocketChannel worker = acceptFuture.get(10, TimeUnit.SECONDS);

После того, как вышеуказанный вызов вернется и операция будет успешной, мы можем создать цикл, в котором мы прослушиваем входящие сообщения и возвращаем их обратно клиенту.

Давайте создадим метод с именем runServer, в котором мы будем ждать и обрабатывать любые входящие сообщения:

public void runServer() {
clientChannel = acceptResult.get();
if ((clientChannel != null) && (clientChannel.isOpen())) {
while (true) {
ByteBuffer buffer = ByteBuffer.allocate(32);
Future<Integer> readResult = clientChannel.read(buffer);

// perform other computations

readResult.get();

buffer.flip();
Future<Integer> writeResult = clientChannel.write(buffer);

// perform other computations

writeResult.get();
buffer.clear();
}
clientChannel.close();
serverChannel.close();
}
}

Все, что мы делаем внутри цикла, — это создаем буфер для чтения и записи в зависимости от операции.

Затем каждый раз, когда мы выполняем чтение или запись, мы можем продолжать выполнение любого другого кода, и когда мы будем готовы обработать результат, мы вызываем API get() для объекта Future .

Чтобы запустить сервер, мы вызываем его конструктор, а затем метод runServer внутри main :

public static void main(String[] args) {
AsyncEchoServer server = new AsyncEchoServer();
server.runServer();
}

3. Сервер с CompletionHandler

В этом разделе мы увидим, как реализовать тот же сервер, используя подход CompletionHandler , а не подход Future .

Внутри конструктора мы создаем AsynchronousServerSocketChannel и привязываем его к локальному адресу так же, как делали это раньше:

serverChannel = AsynchronousServerSocketChannel.open();
InetSocketAddress hostAddress = new InetSocketAddress("localhost", 4999);
serverChannel.bind(hostAddress);

Далее, все еще внутри конструктора, мы создаем цикл while, в котором мы принимаем любое входящее соединение от клиента. Этот цикл while используется исключительно для предотвращения выхода сервера до установления соединения с клиентом .

Чтобы цикл не работал бесконечно , мы вызываем System.in.read() в его конце, чтобы заблокировать выполнение до тех пор, пока входящее соединение не будет прочитано из стандартного потока ввода:

while (true) {
serverChannel.accept(
null, new CompletionHandler<AsynchronousSocketChannel,Object>() {

@Override
public void completed(
AsynchronousSocketChannel result, Object attachment) {
if (serverChannel.isOpen()){
serverChannel.accept(null, this);
}

clientChannel = result;
if ((clientChannel != null) && (clientChannel.isOpen())) {
ReadWriteHandler handler = new ReadWriteHandler();
ByteBuffer buffer = ByteBuffer.allocate(32);

Map<String, Object> readInfo = new HashMap<>();
readInfo.put("action", "read");
readInfo.put("buffer", buffer);

clientChannel.read(buffer, readInfo, handler);
}
}
@Override
public void failed(Throwable exc, Object attachment) {
// process error
}
});
System.in.read();
}

Когда соединение установлено, вызывается завершенный метод обратного вызова в CompletionHandler операции принятия.

Его возвращаемый тип — экземпляр AsynchronousSocketChannel . Если канал сокета сервера все еще открыт, мы снова вызываем API приема , чтобы подготовиться к другому входящему соединению, повторно используя тот же обработчик.

Затем мы назначаем возвращенный канал сокета глобальному экземпляру. Затем мы проверяем, что он не равен нулю и что он открыт, прежде чем выполнять над ним операции.

Точка, в которой мы можем начать операции чтения и записи, находится внутри завершенного API обратного вызова обработчика операции принятия . Этот шаг заменяет предыдущий подход, когда мы опрашивали канал с помощью get API.

Обратите внимание, что сервер больше не будет выходить после установления соединения, если мы не закроем его явным образом.

Обратите также внимание, что мы создали отдельный внутренний класс для обработки операций чтения и записи; Обработчик чтения и записи . Мы увидим, как объект вложения пригодится на этом этапе.

Во-первых, давайте посмотрим на класс ReadWriteHandler :

class ReadWriteHandler implements 
CompletionHandler<Integer, Map<String, Object>> {

@Override
public void completed(
Integer result, Map<String, Object> attachment) {
Map<String, Object> actionInfo = attachment;
String action = (String) actionInfo.get("action");

if ("read".equals(action)) {
ByteBuffer buffer = (ByteBuffer) actionInfo.get("buffer");
buffer.flip();
actionInfo.put("action", "write");

clientChannel.write(buffer, actionInfo, this);
buffer.clear();

} else if ("write".equals(action)) {
ByteBuffer buffer = ByteBuffer.allocate(32);

actionInfo.put("action", "read");
actionInfo.put("buffer", buffer);

clientChannel.read(buffer, actionInfo, this);
}
}

@Override
public void failed(Throwable exc, Map<String, Object> attachment) {
//
}
}

Общий тип нашего вложения в классе ReadWriteHandler — это карта. Нам специально нужно передать через него два важных параметра — тип операции (действие) и буфер.

Далее мы увидим, как используются эти параметры.

Первая операция, которую мы выполняем, — это чтение , так как это эхо-сервер, который реагирует только на клиентские сообщения. Внутри завершенного метода обратного вызова ReadWriteHandler мы извлекаем прикрепленные данные и решаем, что делать соответственно. ``

Если это операция чтения , которая завершилась, мы извлекаем буфер, изменяем параметр действия вложения и сразу же выполняем операцию записи , чтобы отобразить сообщение клиенту.

Если это операция записи , которая только что завершилась, мы снова вызываем API чтения , чтобы подготовить сервер к приему другого входящего сообщения.

4. Клиент

После настройки сервера мы можем настроить клиент, вызвав открытый API в классе AsyncronousSocketChannel . Этот вызов создает новый экземпляр канала клиентского сокета, который мы затем используем для подключения к серверу:

AsynchronousSocketChannel client = AsynchronousSocketChannel.open();
InetSocketAddress hostAddress = new InetSocketAddress("localhost", 4999)
Future<Void> future = client.connect(hostAddress);

Операция подключения ничего не возвращает в случае успеха. Однако мы по-прежнему можем использовать объект Future для отслеживания состояния асинхронной операции.

Давайте вызовем get API для ожидания соединения:

future.get()

После этого шага мы можем начать отправлять сообщения на сервер и получать для них эхо. Метод sendMessage выглядит следующим образом:

public String sendMessage(String message) {
byte[] byteMsg = new String(message).getBytes();
ByteBuffer buffer = ByteBuffer.wrap(byteMsg);
Future<Integer> writeResult = client.write(buffer);

// do some computation

writeResult.get();
buffer.flip();
Future<Integer> readResult = client.read(buffer);

// do some computation

readResult.get();
String echo = new String(buffer.array()).trim();
buffer.clear();
return echo;
}

5. Тест

Чтобы подтвердить, что наши серверные и клиентские приложения работают в соответствии с ожиданиями, мы можем использовать тест:

@Test
public void givenServerClient_whenServerEchosMessage_thenCorrect() {
String resp1 = client.sendMessage("hello");
String resp2 = client.sendMessage("world");

assertEquals("hello", resp1);
assertEquals("world", resp2);
}

6. Заключение

В этой статье мы рассмотрели API асинхронных каналов сокетов Java NIO.2. Мы смогли пройти через процесс создания сервера и клиента с помощью этих новых API.

Вы можете получить доступ к полному исходному коду этой статьи в проекте Github .