Перейти к основному содержимому

Введение в селектор Java NIO

· 11 мин. чтения

1. Обзор

В этой статье мы рассмотрим вводные части компонента Selector Java NIO .

Селектор предоставляет механизм для мониторинга одного или нескольких каналов NIO и распознавания, когда один или несколько становятся доступными для передачи данных.

Таким образом, один поток может использоваться для управления несколькими каналами и, следовательно, несколькими сетевыми подключениями.

2. Зачем использовать селектор?

С помощью селектора мы можем использовать один поток вместо нескольких для управления несколькими каналами. Переключение контекста между потоками дорого обходится операционной системе , и, кроме того, каждый поток занимает память.

Поэтому чем меньше потоков мы используем, тем лучше. Однако важно помнить, что современные операционные системы и центральные процессоры постоянно совершенствуются в многозадачности , поэтому накладные расходы на многопоточность со временем уменьшаются.

Здесь мы будем иметь дело с тем, как мы можем обрабатывать несколько каналов в одном потоке с помощью селектора.

Обратите также внимание, что селекторы не просто помогают вам читать данные; они также могут прослушивать входящие сетевые подключения и записывать данные по медленным каналам.

3. Настройка

Чтобы использовать селектор, нам не нужна никакая специальная настройка. Все классы, которые нам нужны, находятся в основном пакете java.nio , и нам просто нужно импортировать то, что нам нужно.

После этого мы можем зарегистрировать несколько каналов с помощью объекта-селектора. Когда активность ввода-вывода происходит на любом из каналов, селектор уведомляет нас. Вот как мы можем читать из большого количества источников данных в одном потоке.

Любой канал, который мы регистрируем с помощью селектора, должен быть подклассом SelectableChannel . Это особый тип каналов, которые можно перевести в неблокирующий режим.

4. Создание селектора

Селектор может быть создан путем вызова статического открытого метода класса Selector , который будет использовать системный поставщик селектора по умолчанию для создания нового селектора:

Selector selector = Selector.open();

5. Регистрация выбираемых каналов

Чтобы селектор мог отслеживать какие-либо каналы, мы должны зарегистрировать эти каналы в селекторе. Мы делаем это, вызывая метод регистрации выбираемого канала.

Но прежде чем канал будет зарегистрирован с помощью селектора, он должен быть в неблокирующем режиме:

channel.configureBlocking(false);
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);

Это означает, что мы не можем использовать FileChannel с селектором, так как их нельзя переключить в неблокирующий режим, как мы это делаем с каналами сокетов.

Первый параметр — это объект Selector , который мы создали ранее, второй параметр определяет набор интересов , то есть какие события нам интересно прослушивать в отслеживаемом канале через селектор.

Мы можем прослушивать четыре разных события, каждое из которых представлено константой в классе SelectionKey :

  • Connect когда клиент пытается подключиться к серверу. Представлен SelectionKey.OP_CONNECT
  • Принять когда сервер принимает соединение от клиента. Представлен SelectionKey.OP_ACCEPT
  • Чтение когда сервер готов читать из канала. Представлен SelectionKey.OP_READ
  • Запись когда сервер готов писать в канал. Представлен SelectionKey.OP_WRITE

Возвращаемый объект SelectionKey представляет собой регистрацию выбираемого канала в селекторе. Мы рассмотрим это далее в следующем разделе.

6. Объект SelectionKey

Как мы видели в предыдущем разделе, когда мы регистрируем канал с помощью селектора, мы получаем объект SelectionKey . Этот объект содержит данные, представляющие регистрацию канала.

Он содержит некоторые важные свойства, которые мы должны хорошо понимать, чтобы иметь возможность использовать селектор на канале. Мы рассмотрим эти свойства в следующих подразделах.

6.1. Набор процентов

Набор интересов определяет набор событий, за которыми должен следить селектор на этом канале. Это целочисленное значение; мы можем получить эту информацию следующим образом.

Во-первых, у нас есть процентный набор, возвращаемый методом интересаOps SelectionKey . Затем у нас есть константа события в SelectionKey , которую мы рассмотрели ранее.

Когда мы И эти два значения, мы получаем логическое значение, которое говорит нам, отслеживается ли событие или нет:

int interestSet = selectionKey.interestOps();

boolean isInterestedInAccept = interestSet & SelectionKey.OP_ACCEPT;
boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT;
boolean isInterestedInRead = interestSet & SelectionKey.OP_READ;
boolean isInterestedInWrite = interestSet & SelectionKey.OP_WRITE;

6.2. Готовый набор

Готовый набор определяет набор событий, к которым готов канал. Это также целочисленное значение; мы можем получить эту информацию следующим образом.

У нас есть готовый набор, возвращенный методом readyOps SelectionKey . Когда мы И это значение с константами событий, как мы сделали в случае набора процентов, мы получаем логическое значение, представляющее, готов ли канал к определенному значению или нет. ``

Другой альтернативный и более короткий способ сделать это — использовать удобные методы SelectionKey для той же цели:

selectionKey.isAcceptable();
selectionKey.isConnectable();
selectionKey.isReadable();
selectionKey.isWriteable();

6.3. Канал

Доступ к просматриваемому каналу из объекта SelectionKey очень прост. Мы просто вызываем метод канала :

Channel channel = key.channel();

6.4. Селектор

Как и в случае с каналом, очень просто получить объект Selector из объекта SelectionKey :

Selector selector = key.selector();

6.5. Присоединение объектов

Мы можем прикрепить объект к SelectionKey. Иногда мы можем захотеть дать каналу пользовательский идентификатор или прикрепить любой объект Java, который мы можем отслеживать.

Присоединение объектов — удобный способ сделать это. Вот как вы прикрепляете и получаете объекты из SelectionKey :

key.attach(Object);

Object object = key.attachment();

Кроме того, мы можем прикрепить объект во время регистрации канала. Мы добавляем его в качестве третьего параметра в метод регистрации канала, например:

SelectionKey key = channel.register(
selector, SelectionKey.OP_ACCEPT, object);

7. Выбор ключа канала

До сих пор мы рассматривали, как создать селектор, зарегистрировать в нем каналы и проверить свойства объекта SelectionKey , который представляет регистрацию канала в селекторе.

Это только половина процесса, теперь нам предстоит выполнить непрерывный процесс выбора готового набора, который мы рассмотрели ранее. Мы делаем выбор, используя метод выбора селектора, например :

int channels = selector.select();

Этот метод блокируется до тех пор, пока хотя бы один канал не будет готов к операции. Возвращаемое целое число представляет собой количество ключей, чьи каналы готовы к операции.

Далее мы обычно получаем набор выбранных ключей для обработки:

Set<SelectionKey> selectedKeys = selector.selectedKeys();

Полученный набор состоит из объектов SelectionKey , каждый ключ представляет собой зарегистрированный канал, который готов к работе.

После этого мы обычно перебираем этот набор и для каждого ключа получаем канал и выполняем над ним любые операции, которые появляются в нашем наборе интересов.

За время существования канала он может быть выбран несколько раз, так как его ключ появляется в наборе готовых для разных событий. Вот почему у нас должен быть непрерывный цикл для захвата и обработки событий канала по мере их возникновения.

8. Полный пример

Чтобы закрепить знания, полученные в предыдущих разделах, мы собираемся построить полный пример клиент-сервер.

Для простоты тестирования нашего кода мы создадим эхо-сервер и эхо-клиент. В такой настройке клиент подключается к серверу и начинает отправлять ему сообщения. Сервер возвращает сообщения, отправленные каждым клиентом.

Когда сервер встречает определенное сообщение, такое как end , он интерпретирует его как конец связи и закрывает соединение с клиентом.

8.1. Сервер

Вот наш код для EchoServer.java :

public class EchoServer {

private static final String POISON_PILL = "POISON_PILL";

public static void main(String[] args) throws IOException {
Selector selector = Selector.open();
ServerSocketChannel serverSocket = ServerSocketChannel.open();
serverSocket.bind(new InetSocketAddress("localhost", 5454));
serverSocket.configureBlocking(false);
serverSocket.register(selector, SelectionKey.OP_ACCEPT);
ByteBuffer buffer = ByteBuffer.allocate(256);

while (true) {
selector.select();
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> iter = selectedKeys.iterator();
while (iter.hasNext()) {

SelectionKey key = iter.next();

if (key.isAcceptable()) {
register(selector, serverSocket);
}

if (key.isReadable()) {
answerWithEcho(buffer, key);
}
iter.remove();
}
}
}

private static void answerWithEcho(ByteBuffer buffer, SelectionKey key)
throws IOException {

SocketChannel client = (SocketChannel) key.channel();
client.read(buffer);
if (new String(buffer.array()).trim().equals(POISON_PILL)) {
client.close();
System.out.println("Not accepting client messages anymore");
}
else {
buffer.flip();
client.write(buffer);
buffer.clear();
}
}

private static void register(Selector selector, ServerSocketChannel serverSocket)
throws IOException {

SocketChannel client = serverSocket.accept();
client.configureBlocking(false);
client.register(selector, SelectionKey.OP_READ);
}

public static Process start() throws IOException, InterruptedException {
String javaHome = System.getProperty("java.home");
String javaBin = javaHome + File.separator + "bin" + File.separator + "java";
String classpath = System.getProperty("java.class.path");
String className = EchoServer.class.getCanonicalName();

ProcessBuilder builder = new ProcessBuilder(javaBin, "-cp", classpath, className);

return builder.start();
}
}

Вот что происходит; мы создаем объект Selector , вызывая статический метод открытия . Затем мы также создаем канал, вызывая его статический метод открытия , в частности, экземпляр ServerSocketChannel .

Это связано с тем, что ServerSocketChannel доступен для выбора и подходит для потокового сокета прослушивания .

Затем мы привязываем его к порту по нашему выбору. Помните, мы говорили ранее, что перед регистрацией выбираемого канала в селекторе мы должны сначала установить его в неблокирующий режим. Затем мы делаем это, а затем регистрируем канал в селекторе.

Экземпляр SelectionKey этого канала на данном этапе нам не нужен , поэтому мы не будем его запоминать.

Java NIO использует модель, ориентированную на буфер, отличную от модели, ориентированной на поток. Таким образом, связь через сокеты обычно осуществляется путем записи в буфер и чтения из него.

Поэтому мы создаем новый ByteBuffer , в который сервер будет записывать и читать. Мы инициализируем его на 256 байт, это просто произвольное значение, в зависимости от того, сколько данных мы планируем передавать туда и обратно.

Наконец, мы выполняем процесс выбора. Мы выбираем готовые каналы, получаем их ключи выбора, перебираем ключи и выполняем операции, для которых готов каждый канал.

Мы делаем это в бесконечном цикле, так как серверы обычно должны продолжать работать независимо от того, есть активность или нет.

Единственная операция, которую может обрабатывать ServerSocketChannel , — это операция ACCEPT . Когда мы принимаем соединение от клиента, мы получаем объект SocketChannel , для которого мы можем выполнять чтение и запись. Мы устанавливаем его в неблокирующий режим и регистрируем для операции READ в селекторе.

Во время одного из последующих выборов этот новый канал станет готовым к чтению. Мы извлекаем его и читаем его содержимое в буфер. Правда, это как эхо-сервер, мы должны записать этот контент обратно клиенту.

Когда мы хотим записать в буфер, из которого мы читали, мы должны вызвать метод flip () .

Наконец, мы устанавливаем буфер в режим записи, вызывая метод flip и просто пишем в него.

Метод start() определен таким образом, что эхо-сервер может быть запущен как отдельный процесс во время модульного тестирования.

8.2. Клиент

Вот наш код для EchoClient.java :

public class EchoClient {
private static SocketChannel client;
private static ByteBuffer buffer;
private static EchoClient instance;

public static EchoClient start() {
if (instance == null)
instance = new EchoClient();

return instance;
}

public static void stop() throws IOException {
client.close();
buffer = null;
}

private EchoClient() {
try {
client = SocketChannel.open(new InetSocketAddress("localhost", 5454));
buffer = ByteBuffer.allocate(256);
} catch (IOException e) {
e.printStackTrace();
}
}

public String sendMessage(String msg) {
buffer = ByteBuffer.wrap(msg.getBytes());
String response = null;
try {
client.write(buffer);
buffer.clear();
client.read(buffer);
response = new String(buffer.array()).trim();
System.out.println("response=" + response);
buffer.clear();
} catch (IOException e) {
e.printStackTrace();
}
return response;

}
}

Клиент проще сервера.

Мы используем одноэлементный шаблон, чтобы создать его экземпляр внутри статического метода start . Мы вызываем частный конструктор из этого метода.

В приватном конструкторе открываем соединение на том же порту, на который был привязан канал сервера и еще на том же хосте.

Затем мы создаем буфер, в который мы можем писать и из которого мы можем читать.

Наконец, у нас есть метод sendMessage , который считывает и переносит любую строку, которую мы ему передаем, в байтовый буфер, который передается по каналу на сервер.

Затем мы читаем из клиентского канала, чтобы получить сообщение, отправленное сервером. Мы возвращаем это как эхо нашего сообщения.

8.3. Тестирование

Внутри класса с именем EchoTest.java мы собираемся создать тестовый пример, который запускает сервер, отправляет сообщения на сервер и проходит только тогда, когда те же сообщения возвращаются с сервера. В качестве последнего шага тестовый пример останавливает сервер перед завершением.

Теперь мы можем запустить тест:

public class EchoTest {

Process server;
EchoClient client;

@Before
public void setup() throws IOException, InterruptedException {
server = EchoServer.start();
client = EchoClient.start();
}

@Test
public void givenServerClient_whenServerEchosMessage_thenCorrect() {
String resp1 = client.sendMessage("hello");
String resp2 = client.sendMessage("world");
assertEquals("hello", resp1);
assertEquals("world", resp2);
}

@After
public void teardown() throws IOException {
server.destroy();
EchoClient.stop();
}
}

9. Селектор.пробуждение()

Как мы видели ранее, вызов selector.select() блокирует текущий поток до тех пор, пока один из отслеживаемых каналов не станет готовым к работе. Мы можем переопределить это, вызвав selector.wakeup() из другого потока.

В результате блокирующий поток немедленно возвращается, а не продолжает ждать, независимо от того, готов канал или нет .

Мы можем продемонстрировать это с помощью CountDownLatch и отслеживания шагов выполнения кода:

@Test
public void whenWakeUpCalledOnSelector_thenBlockedThreadReturns() {
Pipe pipe = Pipe.open();
Selector selector = Selector.open();
SelectableChannel channel = pipe.source();
channel.configureBlocking(false);
channel.register(selector, OP_READ);

List<String> invocationStepsTracker = Collections.synchronizedList(new ArrayList<>());

CountDownLatch latch = new CountDownLatch(1);

new Thread(() -> {
invocationStepsTracker.add(">> Count down");
latch.countDown();
try {
invocationStepsTracker.add(">> Start select");
selector.select();
invocationStepsTracker.add(">> End select");
} catch (IOException e) {
e.printStackTrace();
}
}).start();

invocationStepsTracker.add(">> Start await");
latch.await();
invocationStepsTracker.add(">> End await");

invocationStepsTracker.add(">> Wakeup thread");
selector.wakeup();
//clean up
channel.close();

assertThat(invocationStepsTracker)
.containsExactly(
">> Start await",
">> Count down",
">> Start select",
">> End await",
">> Wakeup thread",
">> End select"
);
}

В этом примере мы используем класс Pipe Java NIO, чтобы открыть канал для целей тестирования. Мы отслеживаем шаги выполнения кода в потокобезопасном списке. Анализируя эти шаги, мы можем видеть, как selector.wakeup() освобождает поток, заблокированный selector.select() .

10. Заключение

В этой статье мы рассмотрели базовое использование компонента Java NIO Selector.

Полный исходный код и все фрагменты кода для этой статьи доступны в моем проекте на GitHub .