Перейти к основному содержимому

Обмен сообщениями PubSub с Spring Data Redis

· 4 мин. чтения

1. Обзор

Во второй статье из серии, посвященной Spring Data Redis, мы рассмотрим очереди сообщений pub/sub.

В Redis издатели не запрограммированы на отправку своих сообщений конкретным подписчикам. Скорее, опубликованные сообщения классифицируются по каналам без знания того, какие (если есть) подписчики могут быть.

Точно так же подписчики проявляют интерес к одной или нескольким темам и получают только те сообщения, которые представляют интерес, не зная, какие (если есть) издатели существуют.

Такое разделение издателей и подписчиков может обеспечить большую масштабируемость и более динамичную топологию сети.

2. Конфигурация Redis

Давайте начнем добавлять конфигурацию, необходимую для очередей сообщений.

Во- первых, мы определим bean-компонент MessageListenerAdapter , который содержит пользовательскую реализацию интерфейса MessageListener с именем RedisMessageSubscriber . Этот компонент действует как подписчик в модели обмена сообщениями pub-sub:

@Bean
MessageListenerAdapter messageListener() {
return new MessageListenerAdapter(new RedisMessageSubscriber());
}

RedisMessageListenerContainer — это класс, предоставляемый Spring Data Redis, который обеспечивает асинхронное поведение для прослушивателей сообщений Redis. Это вызывается внутри и, согласно документации Spring Data Redis , «обрабатывает низкоуровневые детали прослушивания, преобразования и отправки сообщений».

@Bean
RedisMessageListenerContainer redisContainer() {
RedisMessageListenerContainer container
= new RedisMessageListenerContainer();
container.setConnectionFactory(jedisConnectionFactory());
container.addMessageListener(messageListener(), topic());
return container;
}

Мы также создадим bean-компонент, используя специально созданный интерфейс MessagePublisher и реализацию RedisMessagePublisher . Таким образом, у нас может быть общий API публикации сообщений, а реализация Redis принимает redisTemplate и тему в качестве аргументов конструктора:

@Bean
MessagePublisher redisPublisher() {
return new RedisMessagePublisher(redisTemplate(), topic());
}

Наконец, настроим тему, в которую издатель будет отправлять сообщения, а подписчик их будет получать:

@Bean
ChannelTopic topic() {
return new ChannelTopic("messageQueue");
}

3. Публикация сообщений

3.1. Определение интерфейса MessagePublisher

Spring Data Redis не предоставляет интерфейс MessagePublisher для распространения сообщений. Мы можем определить собственный интерфейс, который будет использовать redisTemplate в реализации:

public interface MessagePublisher {
void publish(String message);
}

3.2. Реализация RedisMessagePublisher

Наш следующий шаг — предоставить реализацию интерфейса MessagePublisher , добавив сведения о публикации сообщений и используя функции в redisTemplate.

Шаблон содержит очень богатый набор функций для широкого круга операций, из которых convertAndSend способен отправить сообщение в очередь через топик:

public class RedisMessagePublisher implements MessagePublisher {

@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private ChannelTopic topic;

public RedisMessagePublisher() {
}

public RedisMessagePublisher(
RedisTemplate<String, Object> redisTemplate, ChannelTopic topic) {
this.redisTemplate = redisTemplate;
this.topic = topic;
}

public void publish(String message) {
redisTemplate.convertAndSend(topic.getTopic(), message);
}
}

Как видите, реализация издателя проста. Он использует метод convertAndSend () шаблона redisTemplate для форматирования и публикации данного сообщения в настроенной теме.

Тема реализует семантику публикации и подписки: когда сообщение публикуется, оно отправляется всем подписчикам, которые зарегистрированы для прослушивания этой темы.

4. Подписка на сообщения

RedisMessageSubscriber реализует интерфейс MessageListener , предоставляемый Spring Data Redis :

@Service
public class RedisMessageSubscriber implements MessageListener {

public static List<String> messageList = new ArrayList<String>();

public void onMessage(Message message, byte[] pattern) {
messageList.add(message.toString());
System.out.println("Message received: " + message.toString());
}
}

Обратите внимание, что есть второй параметр с именем pattern , который мы не использовали в этом примере. В документации Spring Data Redis указано, что этот параметр представляет «шаблон, соответствующий каналу (если он указан)», но может быть нулевым .

5. Отправка и получение сообщений

Теперь будем все это собирать. Давайте создадим сообщение, а затем опубликуем его с помощью RedisMessagePublisher :

String message = "Message " + UUID.randomUUID();
redisMessagePublisher.publish(message);

Когда мы вызываем publish(message) , контент отправляется в Redis, где он направляется в тему очереди сообщений, определенную в нашем издателе. Потом раздается подписчикам этой темы.

Возможно, вы уже заметили, что RedisMessageSubscriber — это прослушиватель, который регистрируется в очереди для получения сообщений.

По прибытии сообщения срабатывает определенный метод подписчика onMessage() .

В нашем примере мы можем убедиться, что мы получили сообщения, которые были опубликованы, проверив messageList в нашем RedisMessageSubscriber :

RedisMessageSubscriber.messageList.get(0).contains(message)

6. Заключение

В этой статье мы рассмотрели реализацию очереди сообщений публикации/подписки с использованием Spring Data Redis.

Реализацию приведенного выше примера можно найти в проекте GitHub .