English | 简体中文 | 繁體中文 | Русский язык | Français | Español | Português | Deutsch | 日本語 | 한국어 | Italiano | بالعربية

Stream Redis

Redis Stream - это новый тип данных, добавленный в Redis 5.0 версии.

Redis Stream主要用于消息队列(MQ,Message Queue),Redis himself имеет Redis подписку и публикацию (pub/sub) для реализации функции очереди сообщений, но у него есть недостаток: сообщения не могут быть сохранены持久化,если происходит разрыв сети или отказ Redis, сообщения будут утеряны.

Проще говоря, модель подписки и публикации (pub/sub) позволяет распространять сообщения, но не может записывать историю сообщений.

Redis Stream предоставляет функции持久化和 репликации основного и резервного копирования сообщений, что позволяет любому клиенту получить данные в любое время и запомнить положение каждого клиента, а также гарантирует, что сообщения не будут потеряны.

Структура Redis Stream представлена следующим образом, у нее есть список сообщений, который串起所有加入的消息, у каждого сообщения есть уникальный ID и соответствующее содержимое:

У каждого потока есть уникальное имя, это и есть ключ Redis, который автоматически создается при первом использовании команды xadd для добавления сообщения.

Анализ上图:

  • Consumer Group Группа потребителей, создается командой XGROUP CREATE, в одной группе потребителей может быть несколько потребителей (Consumer).

  • last_delivered_id Курсор, у каждой группы потребителей есть курсор last_delivered_id, который перемещается вперед при чтении каждого сообщения.

  • pending_ids Параметр состояния потребителя (Consumer), который используется для поддержания не подтвержденных идентификаторов клиентов. pending_ids записывает сообщения, которые уже были прочитаны клиентом, но еще не ack (Acknowledge character: символ подтверждения).

Команды, связанные с очередью сообщений:

  • XADD - Добавление сообщений в конец

  • XTRIM - Обрезка потока, ограничение длины

  • XDEL - Удаление сообщений

  • XLEN - Получение количества элементов в потоке, то есть длины сообщений

  • XRANGE - Получение списка сообщений, автоматически фильтрующих удаленные сообщения

  • XREVRANGE - Получение списка сообщений в обратном порядке, ID от большего к меньшему

  • redis> - Получение списка сообщений блокирующим или неблокирующим способом

Команды, связанные с группой потребителей:

  • XGROUP CREATE - Создание группы потребителей

  • XREADGROUP ГРУППА - Чтение сообщений из группы потребителей

  • XACK - Маркировка сообщения как "обработанное"

  • XGROUP SETID - Установка нового последнего идентификатора сообщения для группы потребителей

  • XGROUP DELCONSUMER - Удаление потребителя

  • XGROUP DESTROY - Удаление группы потребителей

  • XPENDING - Вывод информации о сообщениях, ожидающих обработки

  • XCLAIM - Перенос собственности сообщения

  • XINFO - Просмотр информации о потоке и группе потребителей;

  • XINFO GROUPS - Вывод информации о группе потребителей;

  • XINFO STREAM - Вывод информации о потоке

XADD

Использование XADD для добавления сообщений в очередь; если указанная очередь не существует, она будет создана. Грамматика формата XADD:

XADD ключ ID поле значение [поле значение ...]
  • key : Название очереди, если она не существует, она будет создана

  • ID ID сообщения, мы используем * для обозначения того, что он генерируется redis, может быть настроен по умолчанию, но необходимо обеспечить его возрастание.

  • field value Запись.

redis> XADD mystream * name Sara surname OConnor
"1601372323627-0"
redis> XADD mystream * field1 value1 field2 value2 field3 value3
"1601372323627-1"
redis> XLEN mystream
(integer) 2
redis> XRANGE mystream - +
1) 1) "1601372323627-0"
   2) 1) "name"
      2) "Sara"
      3) "surname"
      4) "OConnor"
2) 1) "1601372323627-1"
   2) 1) "field1"
      2) "value1"
      3) "field2"
      4) "value2"
      5) "field3"
      6) "value3"
4) "Adichie"

XTRIM

Использование XTRIM для обрезки потока, ограничение длины, формат синтаксиса:

XTRIM key MAXLEN [~] count
  • key :队列名称

  • MAXLEN Длина

  • count XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]

127.0.0.1:6379> XADD mystream * field1 A field2 B field3 C field4 D
"1601372434568-0"
127.0.0.1:6379> XTRIM mystream MAXLEN 2
(integer) 0
127.0.0.1:6379> XRANGE mystream - +
1) 1) "1601372434568-0"
   2) 1) "field1"
      2) "A"
      3) "field2"
      4) "B"
      5) "field3"
      6) "C"
      7) "field4"
      8) "D"
127.0.0.1:6379> 
4) "Adichie"

XDEL

Использование XDEL для удаления сообщений, формат синтаксиса:

XDEL key ID [ID ...]
  • key:队列名称

  • ID ID сообщения

> XADD mystream * a 1
1538561698944-0
> XADD mystream * b 2
1538561700640-0
> XADD mystream * c 3
1538561701744-0
> XDEL mystream 1538561700640-0
(integer) 1
127.0.0.1:6379> XRANGE mystream - +
1) 1) 1538561698944-0
   2) 1) "a"
      2) "1"
2) 1) 1538561701744-0
   2) 1) "c"
      2) "3"

XLEN

使用 XLEN 获取流包含的元素数量,即消息长度,语法格式:

XLEN key
  • key:队列名称

redis> XADD mystream * item 1
"1601372563177-0"
redis> XADD mystream * item 2
"1601372563178-0"
redis> XADD mystream * item 3
"1601372563178-1"
redis> XLEN mystream
(integer) 3
4) "Adichie"

XRANGE

使用 XRANGE 获取消息列表,会自动过滤已经删除的消息 ,语法格式:

XRANGE key start end [COUNT count]
  • key Время ожидания в миллисекундах, если не установлено, то не блокирующий режим

  • start :开始值, - 表示最小值

  • end :结束值, + 表示最大值

  • count XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]

redis> XADD writers * name Virginia surname Woolf
"1601372577811-0"
redis> XADD writers * name Jane surname Austen
"1601372577811-1"
redis> XADD writers * name Toni surname Morrison
"1601372577811-2"
redis> XADD writers * name Agatha surname Christie
"1601372577812-0"
"1601372731459-2"
"1601372577812-1"
"1601372731459-3"
redis> XLEN writers
redis> XRANGE writers - + COUNT 2
1) 1) "1601372577811-0"
   2) 1) "name"
      2) "Virginia"
      3) "surname"
      4) "Woolf"
2) 1) "1601372577811-1"
   2) 1) "name"
      2) "Jane"
      3) "surname"
      4) "Austen"
4) "Adichie"

XREVRANGE

使用 XREVRANGE 获取消息列表,会自动过滤已经删除的消息 ,语法格式:

XREVRANGE key end start [COUNT count]
  • key Время ожидания в миллисекундах, если не установлено, то не блокирующий режим

  • end :结束值, + 表示最大值

  • start :开始值, - 表示最小值

  • count XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]

redis> XADD writers * name Virginia surname Woolf
"1601372731458-0"
redis> XADD writers * name Jane surname Austen
"1601372731459-0"
redis> XADD writers * name Toni surname Morrison
"1601372731459-1"
redis> XADD writers * name Agatha surname Christie
"1601372731459-2"
"1601372731459-2"
redis> XADD writers * name Ngozi surname Adichie
"1601372731459-3"
redis> XLEN writers
(integer) 5
redis> XREVRANGE writers + - COUNT 1
   2) 1) "name"
      1) 1) "1601372731459-3"
      3) "surname"
      2) "Ngozi"
4) "Adichie"

redis>

XREAD

Использование XREAD для получения списка сообщений в阻塞 или не блокирующем режиме, формат синтаксиса:
  • count XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]

  • milliseconds Количество

  • key Время ожидания в миллисекундах, если не установлено, то не блокирующий режим

  • id ID сообщения

# Чтение двух сообщений из начала Потока
> XREAD COUNT 2 STREAMS mystream writers 0-0 0-0
1) 1) "mystream"
   2) 1) 1) 1526984818136-0
         2) 1) "duration"
            2) "1532"
            3) "event-id"
            4) "5"
            5) "user-id"
            6) "7782813"
      2) 1) 1526999352406-0
         2) 1) "duration"
            2) "812"
            3) "event-id"
            4) "9"
            5) "user-id"
            6) "388234"
2) 1) "writers"
   2) 1) 1) 1526985676425-0
         2) 1) "name"
            2) "Virginia"
            3) "surname"
            4) "Woolf"
      2) 1) 1526985685298-0
         2) 1) "name"
            2) "Jane"
            3) "surname"
            4) "Austen"

XGROUP CREATE

Использование XGROUP CREATE для создания группы потребителей, формат синтаксиса:

XGROUP [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]
  • key : Название очереди, если она не существует, она будет создана

  • groupname : Название группы.

  • $ : Обозначает потребление с конца, принимает только новые сообщения, текущие сообщения в Stream будут полностью пропущены.

Начать потребление с начала:

XGROUP СОЗДАТЬ mystream consumer-group-name 0-0

Начать потребление с конца:

XGROUP СОЗДАТЬ mystream consumer-group-name $

XREADGROUP ГРУППА

Использование XREADGROUP ГРУППА для чтения сообщений из группы потребителей, формат синтаксиса:

XREADGROUP ГРУППА group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
  • group : Название группы потребителей

  • consumer : Имя потребителя.

  • count : Количество чтений.

  • milliseconds : Миллисекунды блокировки.

  • key : Название очереди.

  • ID : ID сообщения.

XREADGROUP ГРУППА consumer-group-name consumer-name COUNT 1 STREAMS mystream >