1. Введение
Akka — это библиотека с открытым исходным кодом, которая помогает легко разрабатывать параллельные и распределенные приложения с использованием Java или Scala, используя акторную модель.
В этом уроке мы представим основные функции, такие как определение актеров, как они общаются и как мы можем их убить . В заключительных заметках мы также отметим некоторые рекомендации по работе с Akka.
2. Актерская модель
Актерская модель не нова для компьютерного сообщества. Впервые он был представлен Карлом Эдди Хьюиттом в 1973 году как теоретическая модель для обработки параллельных вычислений.
Его практическая применимость начала проявляться, когда индустрия программного обеспечения начала осознавать подводные камни реализации параллельных и распределенных приложений.
Актер представляет собой независимую вычислительную единицу. Некоторые важные характеристики:
- актор инкапсулирует свое состояние и часть логики приложения
- акторы взаимодействуют только через асинхронные сообщения и никогда через прямые вызовы методов
- у каждого актора есть уникальный адрес и почтовый ящик, в который другие акторы могут доставлять сообщения
- актор будет обрабатывать все сообщения в почтовом ящике в последовательном порядке (реализацией почтового ящика по умолчанию является очередь FIFO)
- система акторов организована в виде древовидной иерархии
- актор может создавать других акторов, может отправлять сообщения любому другому актору и останавливать себя, или любой актор уже создан
2.1. Преимущества
Разработка параллельного приложения сложна, потому что нам нужно иметь дело с синхронизацией, блокировками и общей памятью. Используя акторы Akka, мы можем легко писать асинхронный код без необходимости блокировки и синхронизации.
Одним из преимуществ использования сообщения вместо вызова метода является то, что поток-отправитель не будет блокироваться в ожидании возвращаемого значения при отправке сообщения другому действующему лицу . Принимающий актор ответит с результатом, отправив ответное сообщение отправителю.
Еще одним большим преимуществом использования сообщений является то, что нам не нужно беспокоиться о синхронизации в многопоточной среде. Это связано с тем, что все сообщения обрабатываются последовательно .
Еще одним преимуществом модели акторов Akka является обработка ошибок. Путем организации субъектов в иерархию каждый субъект может уведомить своего родителя о сбое, чтобы он мог действовать соответствующим образом. Родительский актор может принять решение об остановке или перезапуске дочерних акторов.
3. Настройка
Чтобы воспользоваться преимуществами акторов Akka, нам нужно добавить следующую зависимость от Maven Central :
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_2.12</artifactId>
<version>2.5.11</version>
</dependency>
4. Создание актера
Как уже упоминалось, актеры определяются в иерархической системе. Все акторы, которые имеют общую конфигурацию, будут определены в ActorSystem.
Сейчас мы просто определим ActorSystem
с конфигурацией по умолчанию и пользовательским именем:
ActorSystem system = ActorSystem.create("test-system");
Несмотря на то, что мы еще не создали ни одного актора, в системе уже будет 3 основных актора:
- субъект-хранитель корня, имеющий адрес «/», который, как указано в имени, представляет собой корень иерархии системы акторов.
- актер-хранитель пользователя, имеющий адрес «/user». Это будет родительский элемент для всех актеров, которые мы определяем.
- актор-хранитель системы, имеющий адрес «/system». Это будет родителем для всех субъектов, определенных внутри системы Akka.
Любой актор Akka будет расширять абстрактный класс AbstractActor
и реализовывать метод createReceive()
для обработки входящих сообщений от других акторов:
public class MyActor extends AbstractActor {
public Receive createReceive() {
return receiveBuilder().build();
}
}
Это самый простой актер, которого мы можем создать. Он может получать сообщения от других акторов и отбрасывать их, поскольку в ReceiveBuilder не определены соответствующие шаблоны сообщений.
Мы поговорим о сопоставлении шаблонов сообщений позже в этой статье.
Теперь, когда мы создали нашего первого актера, мы должны включить его в ActorSystem
:
ActorRef readingActorRef
= system.actorOf(Props.create(MyActor.class), "my-actor");
4.1. Конфигурация актера
Класс Props
содержит конфигурацию актера. Мы можем настроить такие вещи, как диспетчер, почтовый ящик или конфигурацию развертывания. Этот класс является неизменяемым, поэтому потокобезопасен, поэтому его можно использовать совместно при создании новых акторов.
Настоятельно рекомендуется и считается лучшей практикой определять фабричные методы внутри объекта актора, которые будут обрабатывать создание объекта Props
.
Для примера давайте определим актера, который будет выполнять некоторую обработку текста. Актер получит объект String
, над которым он будет выполнять обработку:
public class ReadingActor extends AbstractActor {
private String text;
public static Props props(String text) {
return Props.create(ReadingActor.class, text);
}
// ...
}
Теперь, чтобы создать экземпляр этого типа актера, мы просто используем фабричный метод props()
для передачи аргумента String конструктору:
ActorRef readingActorRef = system.actorOf(
ReadingActor.props(TEXT), "readingActor");
Теперь, когда мы знаем, как определить актора, давайте посмотрим, как они взаимодействуют внутри системы акторов.
5. Обмен сообщениями с актерами
Для взаимодействия друг с другом акторы могут отправлять и получать сообщения от любого другого актора в системе. Эти сообщения могут быть объектами любого типа с условием, что они неизменяемы .
Лучше всего определять сообщения внутри класса актора. Это помогает писать код, который легко понять и знать, какие сообщения может обрабатывать актор.
5.1. Отправка сообщений
Внутри системы акторов Akka сообщения отправляются с помощью методов:
сказать()
спросить()
вперед()
Когда мы хотим отправить сообщение и не ожидаем ответа, мы можем использовать метод tell()
. Это наиболее эффективный метод с точки зрения производительности:
readingActorRef.tell(new ReadingActor.ReadLines(), ActorRef.noSender());
Первый параметр представляет собой сообщение, которое мы отправляем на адрес актора readActorRef
.
Второй параметр указывает, кто является отправителем. Это полезно, когда актор, получающий сообщение, должен отправить ответ актору, отличному от отправителя (например, родителю отправляющего актора).
Обычно мы можем установить для второго параметра значение null
или ActorRef.noSender()
, потому что мы не ожидаем ответа. Когда нам нужен ответ от актера, мы можем использовать метод ask()
:
CompletableFuture<Object> future = ask(wordCounterActorRef,
new WordCounterActor.CountWords(line), 1000).toCompletableFuture();
При запросе ответа от актора возвращается объект CompletionStage
, поэтому обработка остается неблокирующей.
Очень важным фактом, на который мы должны обратить внимание, является обработка ошибок внутри актора, который будет реагировать. Чтобы вернуть объект Future
, который будет содержать исключение, мы должны отправить сообщение Status.Failure актору
-отправителю.
Это не делается автоматически, когда актор генерирует исключение при обработке сообщения, и вызов ask()
истечет по тайм-ауту, и в журналах не будет видно никаких ссылок на исключение:
@Override
public Receive createReceive() {
return receiveBuilder()
.match(CountWords.class, r -> {
try {
int numberOfWords = countWordsFromLine(r.line);
getSender().tell(numberOfWords, getSelf());
} catch (Exception ex) {
getSender().tell(
new akka.actor.Status.Failure(ex), getSelf());
throw ex;
}
}).build();
}
У нас также есть метод forward()
, похожий на tell()
. Разница в том, что первоначальный отправитель сообщения сохраняется при отправке сообщения, поэтому актор, пересылающий сообщение, действует только как промежуточный актор:
printerActorRef.forward(
new PrinterActor.PrintFinalResult(totalNumberOfWords), getContext());
5.2. Получение сообщений
Каждый актор реализует метод createReceive ()
, который обрабатывает все входящие сообщения. ReceiveBuilder ()
действует как оператор switch, пытаясь сопоставить полученное сообщение с определенным типом сообщений:
public Receive createReceive() {
return receiveBuilder().matchEquals("printit", p -> {
System.out.println("The address of this actor is: " + getSelf());
}).build();
}
При получении сообщение помещается в очередь FIFO, поэтому сообщения обрабатываются последовательно .
6. Убийство актера
Когда мы закончили использовать актор , мы можем остановить его, вызвав метод stop ()
из интерфейса ActorRefFactory
:
system.stop(myActorRef);
Мы можем использовать этот метод для завершения любого дочернего актора или самого актора. Важно отметить, что остановка выполняется асинхронно и что текущая обработка сообщения завершится до того, как актор будет завершен. Входящие сообщения больше не будут приниматься в почтовый ящик актера .
Остановив родительский актор , мы также отправим сигнал уничтожения всем дочерним акторам , которые были им порождены.
Когда нам больше не нужна система акторов, мы можем завершить ее, чтобы освободить все ресурсы и предотвратить утечку памяти:
Future<Terminated> terminateResponse = system.terminate();
Это остановит актеров-хранителей системы, следовательно, всех актеров, определенных в этой системе Akka.
Мы также можем отправить сообщение PoisonPill
любому актеру, которого хотим убить:
myActorRef.tell(PoisonPill.getInstance(), ActorRef.noSender());
Сообщение PoisonPill
будет получено актером, как и любое другое сообщение, и помещено в очередь. Актер будет обрабатывать все сообщения, пока не дойдет до PoisonPill
. Только после этого актор начнет процесс прекращения.
Еще одним специальным сообщением, используемым для убийства актера, является сообщение Kill
. В отличие от PoisonPill,
актор выдаст исключение ActorKilledException
при обработке этого сообщения:
myActorRef.tell(Kill.getInstance(), ActorRef.noSender());
7. Заключение
В этой статье мы представили основы фреймворка Akka. Мы показали, как определять акторов, как они общаются друг с другом и как их завершать.
Мы закончим некоторыми рекомендациями по работе с Akka:
- используйте
tell()
вместоask()
, когда важна производительность - при использовании
ask()
мы всегда должны обрабатывать исключения, отправляя сообщение обошибке
- актеры не должны иметь общего изменяемого состояния
- актор не должен быть объявлен в другом акторе
- актеры не останавливаются автоматически , когда на них больше не ссылаются. Мы должны явно уничтожить актера, когда он нам больше не нужен, чтобы предотвратить утечку памяти.
- сообщения, используемые акторами , всегда должны быть неизменяемыми
Как всегда, исходный код статьи доступен на GitHub .