版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领
文档简介
1、 深入剖析 Redis 5 Streams数据结构Streams - 消息队列的新选择Redis 5.0 全新的数据类型:streams,官方把它定义为:以更抽象的方式建模日志的数据结构。Redis的streams主要是一个append only的数据结构,至少在概念上它是一种在内存中表示的抽象数据类型,只不过它们实现了更强大的操作,以克服日志文件本身的限制。如果你了解MQ,那么可以把streams当做MQ。如果你还了解kafka,那么甚至可以把streams当做kafka。另外,这个功能有点类似于redis以前的Pub/Sub,但是也有基本的不同:streams支持多个客户端(消费者)等待数
2、据(Linux环境开多个窗口执行XREAD即可模拟),并且每个客户端得到的是完全相同的数据。Pub/Sub是发送忘记的方式,并且不存储任何数据;而streams模式下,所有消息被无限期追加在streams中,除非用于显示执行删除(XDEL)。streams的Consumer Groups也是Pub/Sub无法实现的控制方式。streams数据结构streams数据结构本身非常简单,但是streams依然是Redis到目前为止最复杂的类型,其原因是实现的一些额外的功能:一系列的阻塞操作允许消费者等待生产者加入到streams的新数据。另外还有一个称为Consumer Groups的概念,这个概念
3、最先由kafka提出,Redis有一个类似实现,和kafka的Consumer Groups的目的是一样的:允许一组客户端协调消费相同的信息流!redis源码中定义streams结构的源码如下,由源码可知,stream的核心数据结构是radix tree:typedefstructstreamrax*rax;/*Theradixtreeholdingthestream.*/uint64_tlength;/*Numberofelementsinsidethisstream.*/streamIDlast_id;/*Zeroifthereareyetnoitems.*/rax*cgroups;/*C
4、onsumergroupsdictionary:name-streamCG*/stream;源码参考:/antirez/redis/blob/5.0.0/src/stream.h;至于redis对radix tree的实现,参考源码:/antirez/redis/blob/5.0.0/src/rax.c 和 /antirez/redis/blob/5.0.0/src/rax.h 。网上也有很多radix tree的文章,本篇文章就不做过多的介绍了。下面给出一张从官方源码中的部分截图:radix treeradix treestreams基础为了理解streams的目的,以及如何使用它,我们先忽
5、略掉所有高级特性,只把注意力放在数据结构本身,以及那些操作和访问streams的命令。这基本上也是大多数其他Redis数据类型共有的部分,例如Lists,Sets,Sorted Sets等。然而需要注意的是,Lists也有一个更复杂的阻塞式的API,例如BLPOP,BRPOP等。streams这方便的API也没什么不同,只是更复杂,更强大(更牛逼,哈)!streams命令废话不多说,先上手玩玩这个全新的数据类型。streams这个数据类型对应有如下13个操作命令,所有命令都以X开头:XADD用法:XADD key ID field string field string 正如其名,这个命令就是
6、用来添加的,给streams追加(append,前面提到过:streams主要是一个append only的数据结构)一个新的entry(和Java里的Map类似,Redis里的streams中的数据也称为entry)。key:的含义就是同一类型streams的名称;ID: streams中entry的唯一标识符,如果执行XADD命令时,传入星号(*),那么,ID会自动生成,且自动生成的ID会在执行XADD后返回,默认生成的ID格式为millisecondsTime+sequenceNumber,即当前毫秒级别的时间戳加上一个自增序号值,例如1540013735401-0。并且执行XADD时,
7、不接受少于或等于上一次执行XADD的ID,否则会报错:ERR The ID specified in XADD is equal or smaller than the target stream top item;field&string:接下来就是若干组field string。可以把它理解为表示属性的json中的key-value。例如,某一streams的key命名为userInfo,且某个用户信息为username:afei, password:123456,那么执行XADD命令如下::6379xadduserInfo*usernameafeipassword123456154001
8、4082060-0由于命令中ID字段的值是星号,所以自定生成ID,1540014082060-0就是自动生成的ID。 XADD命令也支持显示指定ID,例如:XADD streamname 0-2 foo bar。时钟回拨需要注意的是,ID的时间戳部分是部署Redis服务器的本地时间,如果发生时钟回拨会怎么样?如果发生始终回拨,生成的ID的时间戳部分就是回拨后的时间,然后加上这个时间的递增序列号。例如当前时间戳1540014082060,然后这时候发生了时钟回拨,且回拨5ms,那么时间戳就是1540014082055。假设以前已经生成了1540014082055-0,1540014082055-
9、1,那么这次由于时钟回拨,生成的ID就是1540014082055-2。所以允许自动生成的ID在发生时钟回拨时少于上次的ID,但是不允许显示指定一个少于上次的ID。XDEL用法:XDEL key ID ID 和XADD相反,这是命令用来从streams中删除若干个entry,并且会返回实际删除数,这个删除数可能和参数ID个数不等,因为某些ID表示的消息可能不存在。执行命令如下,第二个参数ID是不存在的,所以XDEL的返回结果是1::6379XDELuserInfo1540014379642-01540014379642-1(integer)1XLEN用法:XLEN key很好理解,这个命令就是
10、用来返回streams中有多少个entry。执行如下::6379XLENuserInfo(integer)2streams三种查询模式redis提供了三种查询streams数据的模式:范围查询:因为streams的每个entry,其默认生成的ID是基于时间且递增的;监听模式:类比linux中的tailf命令,实时接收新增加到streams中的entry(也有点像一个消息系统,事实上笔者认为它就是借鉴了kafka);消费者组:即Consumer Groups,特殊的监听模式。从一个消费者的角度来看streams,一个streams能被分区到多个处理消息的消费者,对于任意一条消息,同一个消费者组中
11、只有一个消费者可以处理(和kafka的消费者组完全一样)。这样还能够横向扩容消费者,从而提升处理消息的能力,而不需要只让把让一个消费者处理所有消息。接下里分别介绍这三种模式。XRANGE用法:XRANGE key start end COUNT count这个命令属于第1种模式,即基于范围查询。这个命令用来返回streams某个顺序范围下的元素,start参数是更小的ID,end参数是更大的ID。有两个特殊的ID用符号-和+表示,符号-表示最小的ID,符号+表示最大的ID::6379XRANGEuserInfo1540014096298-01540014477236-01)1)15400140
12、96298-02)1)username2)root3)password4)6666662)1)1540014477236-02)1)username2)test3)password4)111111:6379:6379XRANGEuserInfo-+1)1)1540014082060-02)1)username2)afei3)password4)1234562)1)1540014096298-02)1)username2)root3)password4)6666663)1)1540014477236-02)1)username2)test3)password4)1111114)1)1540014
13、493402-02)1)username2)u13)password4)111111XRANGE还能实现遍历某个范围区间的功能,例如我想遍历2018-10-20号新增的用户信息。首先得到2018-10-20 00:00:00对应的时间戳为1539964800000,再得到2018-10-20 23:59:59对应的时间戳为1540051199000,然后执行如下命令::6379XRANGEuserInfo1539964800000-01540051199000-0COUNT51)1)1540014082060-02)1)username2)afei3)password4)123456.5)1)
14、1540014496505-02)1)username2)u23)password4)111111:6379#需要注意的是,接下来再遍历的start参数是上一次遍历结果最大的ID加1,即1540014496505-0加1就是1540014496505-1。:6379XRANGEuserInfo1540014496505-11540051199000-0COUNT51)1)1540014499863-02)1)username2)u33)password4)111111XREVRANGE用法:XREVRANGE key end start COUNT count这个命令也属于第1种模式,且和XR
15、ANGE相反,返回一个逆序范围。end参数是更大的ID,start参数是更小的ID。执行示例如下:XREVRANGEuserInfo1540014477236-01540014096298-0XREAD用法:XREAD COUNT countBLOCK milliseconds STREAMS key key ID ID 很明显,这个命令就是用来实现第2个模式,即监听模式。其作用是返回streams中从来没有读取的,且比参数ID更大的元素。这个命令的使用方式如下::6379XREADCOUNT10BLOCK60000STREAMSuserInfo1540041139268-01)1)userI
16、nfo2)1)1)1540041264182-02)1)u22)p2(9.26s)#1540041264182-0这条消息时通过XADD添加的然后被XREAD监听到的消息。:6379XREADCOUNT2STREAMSuserInfo01)1)userInfo2)1)1)1540014082060-02)1)username2)afei3)password4)1234562)1)1540014096298-02)1)username2)root3)password4)666666#这条命令实现类似XRANGE的功能。:6379XREADBLOCK0STREAMSuserInfo$1)1)use
17、rInfo2)1)1)1540042613437-02)1)u72)p7#说明BLOCK为0表示一致等待知道有新的数据,否则永远不会超时。并且ID的值我们用特殊字符$表示,这个特殊字符表示我们只获取最新添加的消息。此外,XREAD还支持同时监听多个streams,用法如下所示::6379XREADBLOCK0STREAMSuserInfo_01userInfo_02userInfo_03userInfo_04$1)1)userInfo_032)1)1)1540043348287-02)1)u12)p1(3.49s)#监听userInfo_01userInfo_04这4个streams的新的消息
18、。XREAD除了COUNT和BLOCK,没有其他选项了。所有XREAD是一个非常基本的命令。更多高级特性可以往下看接下来要介绍的XREADGROUP。XREADGROUP用法:XREADGROUP GROUP group consumer COUNT countBLOCK milliseconds STREAMS key key ID ID 很明显,这就是第三种模式:消费者组模式。如果你了解kafka的消费者组,那么你就也了解了streams的消费者组。如果不了解也没关系,笔者简单解释一下,假设有三个消费者C1,C2,C3。在streams中总计有7条消息:1, 2, 3, 4, 5, 6,
19、7,那么消费关系如下所示:1-C12-C23-C34-C15-C26-C37-C1消费者组具备如下几个特点:同一个消息不会被投递到一个消费者组下的多个消费者,只可能是一个消费者。同一个消费者组下,每个消费者都是唯一的,通过大小写敏感的名字区分。消费者组中的消费者请求的消息,一定是新的,从来没有投递过的消息。消费一个消息后,需要用命令(XACK)确认,意思是说:这条消息已经给成功处理。正因为如此,当访问streams的历史消息时,每个消费者只能看到投递给它自己的消息。消费者组抽象的想象成如下这个样子:+|consumer_group_name:afeigroup|consumer_group_s
20、tream:somekey|last_delivered_id:1292309234234-92|consumers:|consumer-1withpendingmessages|1292309234234-4|1292309234232-8|consumer-42withpendingmessages|.(andsoforth)|+XACK用法:XACK key group ID ID 这是消费者组相关的另一个重要的命令。标记一个处理中的消息为已被正确处理,如此一来,这条消息就会被从消费者组的pending消息集合中删除,类似MQ中的ack。XGROUP用法:XGROUP CREATE ke
21、y groupname id-or-$SETID key id-or-$ DESTROY key groupnameDELCONSUMER key groupname consumername这也是消费者组的一个重要命令,这个命令用来管理消费者组,例如创建,删除等。XREADGROUP,XACK,XGROUP三种命令构成了消费者组相关的操作命令,下面是消费者组一些操作示例:#创建一个消费者组:6379XGROUPCREATEuserInfoGRP-AFEI$OK#需要注意的是,目前XGROUPCREATE的streams必须是一个存在的streams,否则会报错::6379XGROUPCREA
22、TEuserinfoGRP-AFEI$(error)ERRTheXGROUPsubcommandrequiresthekeytoexist.NotethatforCREATEyoumaywanttousetheMKSTREAMoptiontocreateanemptystreamautomatically.#名为zhangsan的消费者,需要注意的是streams名称userInfo后面的特殊符号表示这个消费者只接收从来没有被投递给其他消费者的消息,即新的消息。当然我们也可以指定具体的ID,例如指定0表示访问所有投递给该消费者的历史消息,指定1540081890919-1表示投递给该消费者且大
23、于这个ID的历史消息::6379XREADGROUPGROUPmygroupzhangsanCOUNT1BLOCK0STREAMSuserInfo#名为lisi的消费者::6379XREADGROUPGROUPmygrouplisiCOUNT1BLOCK0STREAMSuserInfo#接下来分别添加两条信息,一条就会被zhangsan消费,另一条被lisi消费::6379XADDuserInfo*usernameu102102passwordp1021021540081873370-0:6379XADDuserInfo*usernameu102103passwordp102103154008
24、1890919-0#现在消费者lisi有一条消息::6379XREADGROUPGROUPmygrouplisiCOUNT5BLOCK0STREAMSuserInfo01)1)userInfo2)1)1)1540081890919-02)1)username2)u1021033)password4)p102103#然后通过命令ack这条消息::6379XACKuserInfomygroup1540081890919-0(integer)1#再看消费者lisi的pending队列,已经为空::6379XREADGROUPGROUPmygrouplisiCOUNT5BLOCK0STREAMSuse
25、rInfo01)1)userInfo2)(emptylistorset)XPENDING用法:XPENDING key group start end countconsumer返回streams中消费者组的pending消息,即消费者接收到但是还没有ack的消息,用法参考:#查看消费者组下总计最多10条pending消息:6379XPENDINGuserInfomygroup-+101)1)1540083260408-02)zhangsan3)(integer)1835514)(integer)12)1)1540083266293-02)lisi3)(integer)1776664)(int
26、eger)1#查看消费者组下zhangsan这个消费者总计最多10条pending消息:6379XPENDINGuserInfomygroup-+10zhangsan1)1)1540083260408-02)zhangsan3)(integer)1870064)(integer)1XCLAIM用法:XCLAIM key group consumer min-idle-time ID ID IDLE ms TIME ms-unix-timeRETRYCOUNT count FORCEJUSTID作用是改变消费者组中消息的所有权,用法参考::6379XREADGROUPGROUPmygroupzh
27、angsanCOUNT5BLOCK0STREAMSuserInfo01)1)userInfo2)1)1)1540083260408-02)1)username2)u1021063)password4)p102106#zhangsan本来有1条消息,现在将另一条本来属于lisi的消息的所有权转给它::6379XCLAIMuserInfomygroupzhangsan3601540083266293-01)1)1540083266293-02)1)username2)u1021073)password4)p102107#现在zhangsan有两条消息了:6379XREADGROUPGROUPmyg
28、roupzhangsanCOUNT5BLOCK0STREAMSuserInfo01)1)userInfo2)1)1)1540083260408-02)1)username2)u1021063)password4)p1021062)1)1540083266293-02)1)username2)u1021073)password4)p102107XINFO用法:XINFO CONSUMERS key groupnameGROUPS key STREAM keyHELP其作用是得到streams和消费者组的一些信息,使用参考::6379XINFOCONSUMERSuserInfomygroup1)1)name2)lisi3)pending4)(integer)05)idle6)(integer)2010862)1)name2)zhangsan3)pending4)(integer)25)idle6)(integer)701954:6379XINFOSTREAMuserInfo1)length2)(integer)223)radix-tree-keys4)(integer)15)r
温馨提示
- 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
- 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
- 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
- 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
- 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
- 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
- 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
最新文档
- 2024年度年福建省高校教师资格证之高等教育心理学自我检测试卷A卷附答案
- 2024年度山西省高校教师资格证之高等教育法规通关提分题库及完整答案
- 2024年合成胶粘剂项目投资申请报告代可行性研究报告
- 2024年私人损害赔偿自愿协议
- 高校食品专业实验室安全管理探究
- 新形势下企业经济管理创新思路探究
- 2024年商业楼宇化粪池建造协议范例
- 2024年加工区租赁协议
- 2024年度工程地质勘察协议范本
- 2024届安徽省安大附中高三下学期第一次诊断测试数学试题
- 书屋业务管理及管理知识培训
- 儿科肺炎喘嗽护理查房
- GB/T 16739.1-2023汽车维修业经营业务条件第1部分:汽车整车维修企业
- 储罐施工方案33
- 消毒供应中心技能考核操作评分标准
- 尼古拉伊万诺维奇布哈林
- 混凝土强度自动评定表格
- 大学生心理稿范文800字(优选9篇)-1
- 【教学设计】大猫What's for breakfast
- 2023年重庆市大渡口区春晖路街道阳光社区工作人员考试模拟试题及答案
- 全国各大媒体的报料热线电话号码
评论
0/150
提交评论