English | 简体中文 | 繁體中文 | Русский язык | Français | Español | Português | Deutsch | 日本語 | 한국어 | Italiano | بالعربية
Redis Stream - это новый тип данных, добавленный в Redis 5.0 версии.
Redis Stream主要用于消息队列(MQ,Message Queue),Redis himself имеет Redis подписку и публикацию (pub/sub) для реализации функции очереди сообщений, но у него есть недостаток: сообщения не могут быть сохранены持久化,если происходит разрыв сети или отказ Redis, сообщения будут утеряны.
Проще говоря, модель подписки и публикации (pub/sub) позволяет распространять сообщения, но не может записывать историю сообщений.
Redis Stream предоставляет функции持久化和 репликации основного и резервного копирования сообщений, что позволяет любому клиенту получить данные в любое время и запомнить положение каждого клиента, а также гарантирует, что сообщения не будут потеряны.
Структура Redis Stream представлена следующим образом, у нее есть список сообщений, который串起所有加入的消息, у каждого сообщения есть уникальный ID и соответствующее содержимое:
У каждого потока есть уникальное имя, это и есть ключ Redis, который автоматически создается при первом использовании команды xadd для добавления сообщения.
Анализ上图:
Consumer Group Группа потребителей, создается командой XGROUP CREATE, в одной группе потребителей может быть несколько потребителей (Consumer).
last_delivered_id Курсор, у каждой группы потребителей есть курсор last_delivered_id, который перемещается вперед при чтении каждого сообщения.
pending_ids Параметр состояния потребителя (Consumer), который используется для поддержания не подтвержденных идентификаторов клиентов. pending_ids записывает сообщения, которые уже были прочитаны клиентом, но еще не ack (Acknowledge character: символ подтверждения).
Команды, связанные с очередью сообщений:
XADD - Добавление сообщений в конец
XTRIM - Обрезка потока, ограничение длины
XDEL - Удаление сообщений
XLEN - Получение количества элементов в потоке, то есть длины сообщений
XRANGE - Получение списка сообщений, автоматически фильтрующих удаленные сообщения
XREVRANGE - Получение списка сообщений в обратном порядке, ID от большего к меньшему
redis> - Получение списка сообщений блокирующим или неблокирующим способом
Команды, связанные с группой потребителей:
XGROUP CREATE - Создание группы потребителей
XREADGROUP ГРУППА - Чтение сообщений из группы потребителей
XACK - Маркировка сообщения как "обработанное"
XGROUP SETID - Установка нового последнего идентификатора сообщения для группы потребителей
XGROUP DELCONSUMER - Удаление потребителя
XGROUP DESTROY - Удаление группы потребителей
XPENDING - Вывод информации о сообщениях, ожидающих обработки
XCLAIM - Перенос собственности сообщения
XINFO - Просмотр информации о потоке и группе потребителей;
XINFO GROUPS - Вывод информации о группе потребителей;
XINFO STREAM - Вывод информации о потоке
Использование XADD для добавления сообщений в очередь; если указанная очередь не существует, она будет создана. Грамматика формата XADD:
XADD ключ ID поле значение [поле значение ...]
key : Название очереди, если она не существует, она будет создана
ID ID сообщения, мы используем * для обозначения того, что он генерируется redis, может быть настроен по умолчанию, но необходимо обеспечить его возрастание.
field value Запись.
redis> XADD mystream * name Sara surname OConnor "1601372323627-0" redis> XADD mystream * field1 value1 field2 value2 field3 value3 "1601372323627-1" redis> XLEN mystream (integer) 2 redis> XRANGE mystream - + 1) 1) "1601372323627-0" 2) 1) "name" 2) "Sara" 3) "surname" 4) "OConnor" 2) 1) "1601372323627-1" 2) 1) "field1" 2) "value1" 3) "field2" 4) "value2" 5) "field3" 6) "value3" 4) "Adichie"
Использование XTRIM для обрезки потока, ограничение длины, формат синтаксиса:
XTRIM key MAXLEN [~] count
key :队列名称
MAXLEN Длина
count XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
127.0.0.1:6379> XADD mystream * field1 A field2 B field3 C field4 D "1601372434568-0" 127.0.0.1:6379> XTRIM mystream MAXLEN 2 (integer) 0 127.0.0.1:6379> XRANGE mystream - + 1) 1) "1601372434568-0" 2) 1) "field1" 2) "A" 3) "field2" 4) "B" 5) "field3" 6) "C" 7) "field4" 8) "D" 127.0.0.1:6379> 4) "Adichie"
Использование XDEL для удаления сообщений, формат синтаксиса:
XDEL key ID [ID ...]
key:队列名称
ID ID сообщения
> XADD mystream * a 1 1538561698944-0 > XADD mystream * b 2 1538561700640-0 > XADD mystream * c 3 1538561701744-0 > XDEL mystream 1538561700640-0 (integer) 1 127.0.0.1:6379> XRANGE mystream - + 1) 1) 1538561698944-0 2) 1) "a" 2) "1" 2) 1) 1538561701744-0 2) 1) "c" 2) "3"
使用 XLEN 获取流包含的元素数量,即消息长度,语法格式:
XLEN key
key:队列名称
redis> XADD mystream * item 1 "1601372563177-0" redis> XADD mystream * item 2 "1601372563178-0" redis> XADD mystream * item 3 "1601372563178-1" redis> XLEN mystream (integer) 3 4) "Adichie"
使用 XRANGE 获取消息列表,会自动过滤已经删除的消息 ,语法格式:
XRANGE key start end [COUNT count]
key Время ожидания в миллисекундах, если не установлено, то не блокирующий режим
start :开始值, - 表示最小值
end :结束值, + 表示最大值
count XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
redis> XADD writers * name Virginia surname Woolf "1601372577811-0" redis> XADD writers * name Jane surname Austen "1601372577811-1" redis> XADD writers * name Toni surname Morrison "1601372577811-2" redis> XADD writers * name Agatha surname Christie "1601372577812-0" "1601372731459-2" "1601372577812-1" "1601372731459-3" redis> XLEN writers redis> XRANGE writers - + COUNT 2 1) 1) "1601372577811-0" 2) 1) "name" 2) "Virginia" 3) "surname" 4) "Woolf" 2) 1) "1601372577811-1" 2) 1) "name" 2) "Jane" 3) "surname" 4) "Austen" 4) "Adichie"
使用 XREVRANGE 获取消息列表,会自动过滤已经删除的消息 ,语法格式:
XREVRANGE key end start [COUNT count]
key Время ожидания в миллисекундах, если не установлено, то не блокирующий режим
end :结束值, + 表示最大值
start :开始值, - 表示最小值
count XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
redis> XADD writers * name Virginia surname Woolf "1601372731458-0" redis> XADD writers * name Jane surname Austen "1601372731459-0" redis> XADD writers * name Toni surname Morrison "1601372731459-1" redis> XADD writers * name Agatha surname Christie "1601372731459-2" "1601372731459-2" redis> XADD writers * name Ngozi surname Adichie "1601372731459-3" redis> XLEN writers (integer) 5 redis> XREVRANGE writers + - COUNT 1 2) 1) "name" 1) 1) "1601372731459-3" 3) "surname" 2) "Ngozi" 4) "Adichie"
XREAD
Использование XREAD для получения списка сообщений в阻塞 или не блокирующем режиме, формат синтаксиса:
count XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
milliseconds Количество
key Время ожидания в миллисекундах, если не установлено, то не блокирующий режим
id ID сообщения
# Чтение двух сообщений из начала Потока > XREAD COUNT 2 STREAMS mystream writers 0-0 0-0 1) 1) "mystream" 2) 1) 1) 1526984818136-0 2) 1) "duration" 2) "1532" 3) "event-id" 4) "5" 5) "user-id" 6) "7782813" 2) 1) 1526999352406-0 2) 1) "duration" 2) "812" 3) "event-id" 4) "9" 5) "user-id" 6) "388234" 2) 1) "writers" 2) 1) 1) 1526985676425-0 2) 1) "name" 2) "Virginia" 3) "surname" 4) "Woolf" 2) 1) 1526985685298-0 2) 1) "name" 2) "Jane" 3) "surname" 4) "Austen"
Использование XGROUP CREATE для создания группы потребителей, формат синтаксиса:
XGROUP [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]
key : Название очереди, если она не существует, она будет создана
groupname : Название группы.
$ : Обозначает потребление с конца, принимает только новые сообщения, текущие сообщения в Stream будут полностью пропущены.
Начать потребление с начала:
XGROUP СОЗДАТЬ mystream consumer-group-name 0-0
Начать потребление с конца:
XGROUP СОЗДАТЬ mystream consumer-group-name $
Использование XREADGROUP ГРУППА для чтения сообщений из группы потребителей, формат синтаксиса:
XREADGROUP ГРУППА group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
group : Название группы потребителей
consumer : Имя потребителя.
count : Количество чтений.
milliseconds : Миллисекунды блокировки.
key : Название очереди.
ID : ID сообщения.
XREADGROUP ГРУППА consumer-group-name consumer-name COUNT 1 STREAMS mystream >