1. Обзор
В этой статье мы рассмотрим вводные части компонента Selector
Java NIO .
Селектор предоставляет механизм для мониторинга одного или нескольких каналов NIO и распознавания, когда один или несколько становятся доступными для передачи данных.
Таким образом, один поток может использоваться для управления несколькими каналами и, следовательно, несколькими сетевыми подключениями.
2. Зачем использовать селектор?
С помощью селектора мы можем использовать один поток вместо нескольких для управления несколькими каналами. Переключение контекста между потоками дорого обходится операционной системе , и, кроме того, каждый поток занимает память.
Поэтому чем меньше потоков мы используем, тем лучше. Однако важно помнить, что современные операционные системы и центральные процессоры постоянно совершенствуются в многозадачности , поэтому накладные расходы на многопоточность со временем уменьшаются.
Здесь мы будем иметь дело с тем, как мы можем обрабатывать несколько каналов в одном потоке с помощью селектора.
Обратите также внимание, что селекторы не просто помогают вам читать данные; они также могут прослушивать входящие сетевые подключения и записывать данные по медленным каналам.
3. Настройка
Чтобы использовать селектор, нам не нужна никакая специальная настройка. Все классы, которые нам нужны, находятся в основном пакете java.nio
, и нам просто нужно импортировать то, что нам нужно.
После этого мы можем зарегистрировать несколько каналов с помощью объекта-селектора. Когда активность ввода-вывода происходит на любом из каналов, селектор уведомляет нас. Вот как мы можем читать из большого количества источников данных в одном потоке.
Любой канал, который мы регистрируем с помощью селектора, должен быть подклассом SelectableChannel
. Это особый тип каналов, которые можно перевести в неблокирующий режим.
4. Создание селектора
Селектор может быть создан путем вызова статического открытого
метода класса Selector
, который будет использовать системный поставщик селектора по умолчанию для создания нового селектора:
Selector selector = Selector.open();
5. Регистрация выбираемых каналов
Чтобы селектор мог отслеживать какие-либо каналы, мы должны зарегистрировать эти каналы в селекторе. Мы делаем это, вызывая метод регистрации
выбираемого канала.
Но прежде чем канал будет зарегистрирован с помощью селектора, он должен быть в неблокирующем режиме:
channel.configureBlocking(false);
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
Это означает, что мы не можем использовать FileChannel
с селектором, так как их нельзя переключить в неблокирующий режим, как мы это делаем с каналами сокетов.
Первый параметр — это объект Selector
, который мы создали ранее, второй параметр определяет набор интересов , то есть какие события нам интересно прослушивать в отслеживаемом канале через селектор.
Мы можем прослушивать четыре разных события, каждое из которых представлено константой в классе SelectionKey
:
Connect
— когда клиент пытается подключиться к серверу. ПредставленSelectionKey.OP_CONNECT
Принять
— когда сервер принимает соединение от клиента. ПредставленSelectionKey.OP_ACCEPT
Чтение
— когда сервер готов читать из канала. ПредставленSelectionKey.OP_READ
Запись
— когда сервер готов писать в канал. ПредставленSelectionKey.OP_WRITE
Возвращаемый объект SelectionKey
представляет собой регистрацию выбираемого канала в селекторе. Мы рассмотрим это далее в следующем разделе.
6. Объект SelectionKey
Как мы видели в предыдущем разделе, когда мы регистрируем канал с помощью селектора, мы получаем объект SelectionKey
. Этот объект содержит данные, представляющие регистрацию канала.
Он содержит некоторые важные свойства, которые мы должны хорошо понимать, чтобы иметь возможность использовать селектор на канале. Мы рассмотрим эти свойства в следующих подразделах.
6.1. Набор процентов
Набор интересов определяет набор событий, за которыми должен следить селектор на этом канале. Это целочисленное значение; мы можем получить эту информацию следующим образом.
Во-первых, у нас есть процентный набор, возвращаемый методом интересаOps
SelectionKey
. Затем у нас есть константа события в SelectionKey
, которую мы рассмотрели ранее.
Когда мы И эти два значения, мы получаем логическое значение, которое говорит нам, отслеживается ли событие или нет:
int interestSet = selectionKey.interestOps();
boolean isInterestedInAccept = interestSet & SelectionKey.OP_ACCEPT;
boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT;
boolean isInterestedInRead = interestSet & SelectionKey.OP_READ;
boolean isInterestedInWrite = interestSet & SelectionKey.OP_WRITE;
6.2. Готовый набор
Готовый набор определяет набор событий, к которым готов канал. Это также целочисленное значение; мы можем получить эту информацию следующим образом.
У нас есть готовый набор, возвращенный методом
readyOps SelectionKey
. Когда мы И это значение с константами событий, как мы сделали в случае набора процентов, мы получаем логическое значение, представляющее, готов ли канал к определенному значению или нет. ``
Другой альтернативный и более короткий способ сделать это — использовать удобные методы SelectionKey для той же цели:
selectionKey.isAcceptable();
selectionKey.isConnectable();
selectionKey.isReadable();
selectionKey.isWriteable();
6.3. Канал
Доступ к просматриваемому каналу из объекта SelectionKey
очень прост. Мы просто вызываем метод канала
:
Channel channel = key.channel();
6.4. Селектор
Как и в случае с каналом, очень просто получить объект Selector из объекта
SelectionKey
:
Selector selector = key.selector();
6.5. Присоединение объектов
Мы можем прикрепить объект к SelectionKey.
Иногда мы можем захотеть дать каналу пользовательский идентификатор или прикрепить любой объект Java, который мы можем отслеживать.
Присоединение объектов — удобный способ сделать это. Вот как вы прикрепляете и получаете объекты из SelectionKey
:
key.attach(Object);
Object object = key.attachment();
Кроме того, мы можем прикрепить объект во время регистрации канала. Мы добавляем его в качестве третьего параметра в метод регистрации канала, например:
SelectionKey key = channel.register(
selector, SelectionKey.OP_ACCEPT, object);
7. Выбор ключа канала
До сих пор мы рассматривали, как создать селектор, зарегистрировать в нем каналы и проверить свойства объекта SelectionKey
, который представляет регистрацию канала в селекторе.
Это только половина процесса, теперь нам предстоит выполнить непрерывный процесс выбора готового набора, который мы рассмотрели ранее. Мы делаем выбор, используя метод выбора селектора, например :
int channels = selector.select();
Этот метод блокируется до тех пор, пока хотя бы один канал не будет готов к операции. Возвращаемое целое число представляет собой количество ключей, чьи каналы готовы к операции.
Далее мы обычно получаем набор выбранных ключей для обработки:
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Полученный набор состоит из объектов SelectionKey
, каждый ключ представляет собой зарегистрированный канал, который готов к работе.
После этого мы обычно перебираем этот набор и для каждого ключа получаем канал и выполняем над ним любые операции, которые появляются в нашем наборе интересов.
За время существования канала он может быть выбран несколько раз, так как его ключ появляется в наборе готовых для разных событий. Вот почему у нас должен быть непрерывный цикл для захвата и обработки событий канала по мере их возникновения.
8. Полный пример
Чтобы закрепить знания, полученные в предыдущих разделах, мы собираемся построить полный пример клиент-сервер.
Для простоты тестирования нашего кода мы создадим эхо-сервер и эхо-клиент. В такой настройке клиент подключается к серверу и начинает отправлять ему сообщения. Сервер возвращает сообщения, отправленные каждым клиентом.
Когда сервер встречает определенное сообщение, такое как end
, он интерпретирует его как конец связи и закрывает соединение с клиентом.
8.1. Сервер
Вот наш код для EchoServer.java
:
public class EchoServer {
private static final String POISON_PILL = "POISON_PILL";
public static void main(String[] args) throws IOException {
Selector selector = Selector.open();
ServerSocketChannel serverSocket = ServerSocketChannel.open();
serverSocket.bind(new InetSocketAddress("localhost", 5454));
serverSocket.configureBlocking(false);
serverSocket.register(selector, SelectionKey.OP_ACCEPT);
ByteBuffer buffer = ByteBuffer.allocate(256);
while (true) {
selector.select();
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> iter = selectedKeys.iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
if (key.isAcceptable()) {
register(selector, serverSocket);
}
if (key.isReadable()) {
answerWithEcho(buffer, key);
}
iter.remove();
}
}
}
private static void answerWithEcho(ByteBuffer buffer, SelectionKey key)
throws IOException {
SocketChannel client = (SocketChannel) key.channel();
client.read(buffer);
if (new String(buffer.array()).trim().equals(POISON_PILL)) {
client.close();
System.out.println("Not accepting client messages anymore");
}
else {
buffer.flip();
client.write(buffer);
buffer.clear();
}
}
private static void register(Selector selector, ServerSocketChannel serverSocket)
throws IOException {
SocketChannel client = serverSocket.accept();
client.configureBlocking(false);
client.register(selector, SelectionKey.OP_READ);
}
public static Process start() throws IOException, InterruptedException {
String javaHome = System.getProperty("java.home");
String javaBin = javaHome + File.separator + "bin" + File.separator + "java";
String classpath = System.getProperty("java.class.path");
String className = EchoServer.class.getCanonicalName();
ProcessBuilder builder = new ProcessBuilder(javaBin, "-cp", classpath, className);
return builder.start();
}
}
Вот что происходит; мы создаем объект Selector
, вызывая статический метод открытия .
Затем мы также создаем канал, вызывая его статический метод открытия , в частности, экземпляр
ServerSocketChannel
.
Это связано с тем, что ServerSocketChannel доступен
для выбора и подходит для потокового сокета прослушивания .
Затем мы привязываем его к порту по нашему выбору. Помните, мы говорили ранее, что перед регистрацией выбираемого канала в селекторе мы должны сначала установить его в неблокирующий режим. Затем мы делаем это, а затем регистрируем канал в селекторе.
Экземпляр SelectionKey
этого канала на данном этапе нам не нужен , поэтому мы не будем его запоминать.
Java NIO использует модель, ориентированную на буфер, отличную от модели, ориентированной на поток. Таким образом, связь через сокеты обычно осуществляется путем записи в буфер и чтения из него.
Поэтому мы создаем новый ByteBuffer
, в который сервер будет записывать и читать. Мы инициализируем его на 256 байт, это просто произвольное значение, в зависимости от того, сколько данных мы планируем передавать туда и обратно.
Наконец, мы выполняем процесс выбора. Мы выбираем готовые каналы, получаем их ключи выбора, перебираем ключи и выполняем операции, для которых готов каждый канал.
Мы делаем это в бесконечном цикле, так как серверы обычно должны продолжать работать независимо от того, есть активность или нет.
Единственная операция, которую может обрабатывать ServerSocketChannel
, — это операция ACCEPT .
Когда мы принимаем соединение от клиента, мы получаем объект SocketChannel
, для которого мы можем выполнять чтение и запись. Мы устанавливаем его в неблокирующий режим и регистрируем для операции READ в селекторе.
Во время одного из последующих выборов этот новый канал станет готовым к чтению. Мы извлекаем его и читаем его содержимое в буфер. Правда, это как эхо-сервер, мы должны записать этот контент обратно клиенту.
Когда мы хотим записать в буфер, из которого мы читали, мы должны вызвать метод flip ()
.
Наконец, мы устанавливаем буфер в режим записи, вызывая метод flip
и просто пишем в него.
Метод start()
определен таким образом, что эхо-сервер может быть запущен как отдельный процесс во время модульного тестирования.
8.2. Клиент
Вот наш код для EchoClient.java
:
public class EchoClient {
private static SocketChannel client;
private static ByteBuffer buffer;
private static EchoClient instance;
public static EchoClient start() {
if (instance == null)
instance = new EchoClient();
return instance;
}
public static void stop() throws IOException {
client.close();
buffer = null;
}
private EchoClient() {
try {
client = SocketChannel.open(new InetSocketAddress("localhost", 5454));
buffer = ByteBuffer.allocate(256);
} catch (IOException e) {
e.printStackTrace();
}
}
public String sendMessage(String msg) {
buffer = ByteBuffer.wrap(msg.getBytes());
String response = null;
try {
client.write(buffer);
buffer.clear();
client.read(buffer);
response = new String(buffer.array()).trim();
System.out.println("response=" + response);
buffer.clear();
} catch (IOException e) {
e.printStackTrace();
}
return response;
}
}
Клиент проще сервера.
Мы используем одноэлементный шаблон, чтобы создать его экземпляр внутри статического метода start .
Мы вызываем частный конструктор из этого метода.
В приватном конструкторе открываем соединение на том же порту, на который был привязан канал сервера и еще на том же хосте.
Затем мы создаем буфер, в который мы можем писать и из которого мы можем читать.
Наконец, у нас есть метод sendMessage
, который считывает и переносит любую строку, которую мы ему передаем, в байтовый буфер, который передается по каналу на сервер.
Затем мы читаем из клиентского канала, чтобы получить сообщение, отправленное сервером. Мы возвращаем это как эхо нашего сообщения.
8.3. Тестирование
Внутри класса с именем EchoTest.java
мы собираемся создать тестовый пример, который запускает сервер, отправляет сообщения на сервер и проходит только тогда, когда те же сообщения возвращаются с сервера. В качестве последнего шага тестовый пример останавливает сервер перед завершением.
Теперь мы можем запустить тест:
public class EchoTest {
Process server;
EchoClient client;
@Before
public void setup() throws IOException, InterruptedException {
server = EchoServer.start();
client = EchoClient.start();
}
@Test
public void givenServerClient_whenServerEchosMessage_thenCorrect() {
String resp1 = client.sendMessage("hello");
String resp2 = client.sendMessage("world");
assertEquals("hello", resp1);
assertEquals("world", resp2);
}
@After
public void teardown() throws IOException {
server.destroy();
EchoClient.stop();
}
}
9. Селектор.пробуждение()
Как мы видели ранее, вызов selector.select()
блокирует текущий поток до тех пор, пока один из отслеживаемых каналов не станет готовым к работе. Мы можем переопределить это, вызвав selector.wakeup()
из другого потока.
В результате блокирующий поток немедленно возвращается, а не продолжает ждать, независимо от того, готов канал или нет .
Мы можем продемонстрировать это с помощью CountDownLatch
и отслеживания шагов выполнения кода:
@Test
public void whenWakeUpCalledOnSelector_thenBlockedThreadReturns() {
Pipe pipe = Pipe.open();
Selector selector = Selector.open();
SelectableChannel channel = pipe.source();
channel.configureBlocking(false);
channel.register(selector, OP_READ);
List<String> invocationStepsTracker = Collections.synchronizedList(new ArrayList<>());
CountDownLatch latch = new CountDownLatch(1);
new Thread(() -> {
invocationStepsTracker.add(">> Count down");
latch.countDown();
try {
invocationStepsTracker.add(">> Start select");
selector.select();
invocationStepsTracker.add(">> End select");
} catch (IOException e) {
e.printStackTrace();
}
}).start();
invocationStepsTracker.add(">> Start await");
latch.await();
invocationStepsTracker.add(">> End await");
invocationStepsTracker.add(">> Wakeup thread");
selector.wakeup();
//clean up
channel.close();
assertThat(invocationStepsTracker)
.containsExactly(
">> Start await",
">> Count down",
">> Start select",
">> End await",
">> Wakeup thread",
">> End select"
);
}
В этом примере мы используем класс Pipe
Java NIO, чтобы открыть канал для целей тестирования. Мы отслеживаем шаги выполнения кода в потокобезопасном списке. Анализируя эти шаги, мы можем видеть, как selector.wakeup()
освобождает поток, заблокированный selector.select()
.
10. Заключение
В этой статье мы рассмотрели базовое использование компонента Java NIO Selector.
Полный исходный код и все фрагменты кода для этой статьи доступны в моем проекте на GitHub .