找回密码
 会员注册
查看: 27|回复: 0

Redis源码解析

[复制链接]

2万

主题

0

回帖

7万

积分

超级版主

积分
72287
发表于 2024-10-6 14:24:55 | 显示全部楼层 |阅读模式
作者:蒋弘毅?| 研发后端一、引言作为后端开发,redis是工作中最绕不开的中间件之一,在工作中通常有以下几个常用用途:缓存,可以抗十万级别的qps计数器,如点赞数,pv等分布式锁限流另外丰富的redis数据类型支持了一些扩展功能,如排行榜,消息队列,布隆过滤器,位图等等。而redis的底层实现是十分简单的,核心源码也仅有几万行。本文就带大家来领略,小小的redis是如何实现这些复杂功能的。*注:本文介绍的源码为redis 5.0.14版本 *二、字符串C语言存储字符串的问题:二进制安全C语言中表示字符串结尾的符号是'\0',如果字符串本身就具有'\0'字符,就会被截断,即非二进制安全。计算字符串的长度性能低C语言中有一个计算字符串长度的函数strlen,但这个函数与Java的不一样,需要遍历整个字符串来计算长度,时间复杂度是O(n),如果需要在循环中计算,性能将十分低下。字符串拼接性能低因为C语言字符串不记录长度,对于一个长度n的字符串来说,底层是n+1的字符数组。char a[n+1]如果需要增长字符串,则需要对底层的字符数组进行重分配的操作;接下来由数据结构入手,看看redis是如何解决这几个问题的。2.1数据结构struct sds{ int len; //buf中已占字符数 int free; //buf中空闲字符数 char buf[];}除了保存字符串的指针buf,还需要记录使用空间和空闲的空间。redis老版本也是这样设计的,这样的设计解决了开头的三个问题:计算字符串长度的时候,时间复杂度是O(1)使用len变量得出字符串的长度,而不是’\0‘,保证了二进制安全对于字符串的拼接操作,进行预分配空间,减少内存重分配的次数小字符串空间浪费的问题:在64位系统中,字符串头部的len和free各占四个字节,对于大字符串而言,这个数字还好,但是如果是小字符串呢,比如buf本身只有一个字节,而头部就占了八个字节,肯定不合适。redis新版本就给了一种方案,根据buf字符串的长度不同,使用不同的结构体存储,同时新增一个单字节变量flags,保存不同的类型。但是对于那种只有一个字节长的字符串,如何优化呢?对于那种小字符串,redis中使用一个字节的标志位flags表示 低三位存储类型(type),高五位存储长度(len),而高五位 2^5-1=31 可以存储最多31个字节的字符串。而大于31个字节的其他几种类型字符串,一个字节存不下,就使用两个变量保存已使用空间和总长度(保留flags字段标识类型,新增len字段标记长度)。sdshdr8,sdshdr16,sdshdr32,sdshdr64 结构都是一样的,区别在于存储的变量大小。struct __attribute__ ((__packed__)) sdshdr5 { unsigned char flags; /* 3 lsb of type, and 5 msb of string length */ char buf[];};struct __attribute__ ((__packed__)) sdshdr8 { uint8_t len; //已使用 uint8_t alloc; // 总长度 unsigned char flags; /* 3 lsb of type, 5 unused bits */ char buf[];};struct __attribute__ ((__packed__)) sdshdr16 { uint16_t len; /* used */ uint16_t alloc; /* excluding the header and null terminator */ unsigned char flags; /* 3 lsb of type, 5 unused bits */ char buf[];};struct __attribute__ ((__packed__)) sdshdr32 { uint32_t len; /* used */ uint32_t alloc; /* excluding the header and null terminator */ unsigned char flags; /* 3 lsb of type, 5 unused bits */ char buf[];};struct __attribute__ ((__packed__)) sdshdr64 { uint64_t len; /* used */ uint64_t alloc; /* excluding the header and null terminator */ unsigned char flags; /* 3 lsb of type, 5 unused bits */ char buf[];};2.2基本操作只介绍扩容操作,其它操作都比较简单,可自行阅读。扩容源码如下:sds sdsMakeRoomFor(sds s, size_t addlen) { struct sdshdr *sh, *newsh; //定义两个 sdshdr 结构体指针 size_t free = sdsavail(s); // 获取 s 目前空闲空间长度 size_t len, newlen; // 前者存储扩展前 sds 字符串长度,后者存储扩展后 sds 字符串长度 if (free >= addlen) return s; // 如果空余空间足够,直接返回 len = sdslen(s); // 获取 s 目前已占用空间的长度 sh = (void*) (s-(sizeof(struct sdshdr))); //结构体指针赋值 newlen = (len+addlen); // 字符串数组 s 最少需要的长度 // 根据新长度,为 s 分配新空间所需的大小 if (newlen free = newlen - len; return newsh->buf;}三、跳跃表跳跃表类似一个多层的链表,首先从最高层开始查找,如果下一个节点的值大于要查找的值或者下一个节点为null,则往下一层查找。通过空间换时间的策略,将时间复杂度控制在O(logn)。3.1一个例子例如查找51这个数:首先从第一层开始查找,找到第二个节点,发现后面为null从第二层查找 查找到第四个节点,发现后面的节点为61,大于当前的数从第三层查找 查找到第六个节点 结束 一共查找四次,比遍历一次少了两次。数据量大的情况下,这个性能会提升的很明显3.2跳跃表结构首先看一下zskiplistNode的数据结构,zskiplistNode表示跳跃表中的一个节点。typedef struct zskiplistNode { sds ele;// 数据 double score; //权重比 struct zskiplistNode *backward; //后退指针,指向当前节点底层 前一个节点 struct zskiplistLevel { struct zskiplistNode *forward; // 指向当前层的前一个节点 unsigned long span; //forward 指向前一个节点的与当前节点的间距 } level[];} zskiplistNode;zskiplist 表示跳跃表:typedef struct zskiplist { struct zskiplistNode *header, *tail; //分别指向头结点和尾结点 unsigned long length; //跳跃表总长度 int level; //跳跃表总高度} zskiplist;其中,头节点是跳跃表的一个特殊节点,它的level数组元素个数为64。头节点在有序集合中不存储任何member和score值,ele值为NULL, score值为0;也不计入跳跃表的总长度。头节点在初始化时,64个元素的forward都指向NULL, span值都为0。3.3基本操作3.3.1 创建跳跃表zskiplist *zslCreate(void) { int j; zskiplist *zsl; zsl = zmalloc(sizeof(*zsl)); zsl->level = 1; zsl->length = 0; // 头结点 zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL); for (j = 0; j header->level[j].forward = NULL; zsl->header->level[j].span = 0; } zsl->header->backward = NULL; zsl->tail = NULL; return zsl;}简单来说就是创建了头结点,创建了64个level数组。3.3.2 随机层高创建和插入节点的之前,当前节点需要在哪几层出现,是通过计算当前节点的level值, 而level值是redis通过伪随机得出的,层数越高,节点出现的概率越小。int zslRandomLevel(void) { int level = 1; while ((random()&0xFFFF) header; for (i = zsl->level-1; i >= 0; i--) { /* store rank that is crossed to reach the insert position */ rank[i] = i == (zsl->level-1) ? 0 : rank[i+1]; while (x->level[i].forward & (x->level[i].forward->score level[i].forward->score == score & sdscmp(x->level[i].forward->ele,ele) level[i].span; x = x->level[i].forward; } update[i] = x; } /* we assume the element is not already inside, since we allow duplicated * scores, reinserting the same element should never happen since the * caller of zslInsert() should test in the hash table if the element is * already inside or not. */ //调整高度 level = zslRandomLevel(); if (level > zsl->level) { for (i = zsl->level; i header; update[i]->level[i].span = zsl->length; } zsl->level = level; } x = zslCreateNode(level,score,ele); //插入节点 for (i = 0; i level[i].forward = update[i]->level[i].forward; update[i]->level[i].forward = x; /* update span covered by update[i] as x is inserted here */ x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]); update[i]->level[i].span = (rank[0] - rank[i]) + 1; } /* increment span for untouched levels */ for (i = level; i level; i++) { update[i]->level[i].span++; } x->backward = (update[0] == zsl->header) ? NULL : update[0]; if (x->level[0].forward) x->level[0].forward->backward = x; else zsl->tail = x; zsl->length++; return x;}3.3.4.删除节点查找节点 (同插入节点)删除节点修改高度void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) { int i; for (i = 0; i level; i++) { if (update[i]->level[i].forward == x) { // update[i].level[i] 的 forward 节点是 x 的情况,需要更新 span 和 forward update[i]->level[i].span += x->level[i].span - 1; update[i]->level[i].forward = x->level[i].forward; } else {// update[i].level[i] 的 forward 节点不是 x 的情况,只需要更新 span update[i]->level[i].span -= 1; } } if (x->level[0].forward) { // 如果 x 不是尾节点,更新 backward 节点 x->level[0].forward->backward = x->backward; } else { // 否则 更新尾节点 zsl->tail = x->backward; } while(zsl->level > 1 & zsl->header->level[zsl->level-1].forward == NULL) zsl->level--; //更新跳跃表 level zsl->length--; // 更新跳跃表长度}3.4.5 跳跃表的应用zset集合插入第一个元素时,会判断下面两种条件:zset-max-ziplist-entries 的值是否等于 0zset-max-ziplist-value 小于要插入元素的字符串长度满足任一条件 Redis 就会采用跳跃表作为底层实现,否则采用压缩列表作为底层实现方式。3.4.6 题外话Q:为什么redis使用跳跃表而不是红黑树呢引用一下原作者的话:There are a few reasons: They are not very memory intensive. It's up to you basically. Changing parameters about the probability of a node to have a given number of levels will make then less memory intensive than btrees.A sorted set is often target of many ZRANGE or ZREVRANGE operations, that is, traversing the skip list as a linked list. With this operation the cache locality of skip lists is at least as good as with other kind of balanced trees.They are simpler to implement, debug, and so forth. For instance thanks to the skip list simplicity I received a patch (already in Redis master) with augmented skip lists implementing ZRANK in O(log(N)). It required little changes to the code.About the Append Only durability & speed, I don't think it is a good idea to optimize Redis at cost of more code and more complexity for a use case that IMHO should be rare for the Redis target (fsync() at every command). Almost no one is using this feature even with ACID SQL databases, as the performance hint is big anyway.About threads: our experience shows that Redis is mostly I/O bound. I'm using threads to serve things from Virtual Memory. The long term solution to exploit all the cores, assuming your link is so fast that you can saturate a single core, is running multiple instances of Redis (no locks, almost fully scalable linearly with number of cores), and using the "Redis Cluster" solution that I plan to develop in the future.简单翻译一下:这并不会浪费太多的空间,并且树的高度可以动态调整的。ZRANGE 和 ZREVRANGE命令,跳表性能比红黑树好红黑树比较复杂...作者懒得实现四、整数集合整数集合(intset)是一个有序的、存储整型数据的结构。conding决定了的element的长度,对应关系如下:4.1基本数据结构typedef struct intset { //编码 uint32_t encoding; //元素个数 uint32_t length; // 柔性数组,根据encoding 决定几个字节表示一个数组 int8_t contents[];} intset;4.2基本操作4.2.1 查询元素uint8_t intsetFind(intset *is, int64_t value) {   uint8_t valenc = _intsetValueEncoding(value); //判断编码方式  //编码方式如果大于当前intset的编码方式,直接返回0。否则调用intsetSearch函数进行查找  return valenc encoding) & intsetSearch(is,value,NULL);static uint8_t intsetSearch(intset *is, int64_t value, uint32_t *pos) { int min = 0, max = intrev32ifbe(is->length)-1, mid = -1; int64_t cur = -1; /*如果intset中没有元素,直接返回0 */if (intrev32ifbe(is->length) == 0) {   if (pos) *pos = 0;  return 0;} else {/* 如果元素大于最大值或者小于最小值,直接返回0 */if (value > _intsetGet(is,max)) {   if (pos) *pos = intrev32ifbe(is->length);    return 0;} else if (value = min) { //二分查找该元素   mid = ((unsigned int)min + (unsigned int)max) >> 1;  cur = _intsetGet(is,mid);if (value > cur) {   min = mid+1;} else if (value ht[1] : &d->ht[0]; /* 是否在进行 rehash 操作中,是则插入至散列表 ht[1] 中,否则插入散列表 ht[0] */ entry = zmalloc(sizeof(*entry)); /* 申请新节点内存 */ entry->next = ht->table[index]; /* 将该节点的 next 指针指向 ht->table[index] 指针指向的位置 */ ht->table[index] = entry; /* 将 ht->table[index] 指针指向该节点 */ ht->used++; dictSetKey(d, entry, key); /* 给新节点存入键信息 */ return entry;}其中查找元素的代码:dictHashKey(d,key), existing)// 根据字典的hash函数得到key的hash值idx = hash & d->ht[table].sizemask; //利用key的hash值与掩码进行与操作(因为与操作的速度比取余快,也就是为什么要存一个掩码)5.2.2 扩容int dictExpand(dict *d, unsigned long size) { if (dictIsRehashing(d) || d->ht[0].used > size) /* 如果此时正在扩容,或者是扩容大小小于 ht[0] 的表大小,则抛错 */ return DICT_ERR; dictht n; /* 新 hash 表 */ unsigned long realsize = _dictNextPower(size); /* 重新计算扩容后的值,必须为 2 的 N 次方幂 */ /* Rehashing to the same table size is not useful. */ if (realsize == d->ht[0].size) return DICT_ERR; /* 重新计算的值如果和原来的 size 相等,则无效 */ /* 分配新 Hash 表,并初始化所有指针为 NULL */ n.size = realsize; n.sizemask = realsize-1; n.table = zcalloc(realsize*sizeof(dictEntry*)); n.used = 0; /* 初始化的情况,而不是进行 rehash 操作,就用 ht[0] 来接收值 */ if (d->ht[0].table == NULL) { d->ht[0] = n; return DICT_OK; } /* 准备第二个 Hash 表,以便执行渐进式哈希操作 */ d->ht[1] = n; /* 扩容后的新内存放入 ht[1] 中 */ d->rehashidx = 0; /* 非默认的 -1,表示需进行 rehash */ return DICT_OK;}redis中的key可能有成千上万,如果一次性扩容,会对性能造成巨大的影响,所以redis使用渐进式扩容,每次执行插入,删除,查找,修改等操作前,都先判断当前字典的rehash操作是否在进行,如果是在进行中,就对当前节点进行rehash操作,只执行一次。除此之外,当服务器空闲时,也会调用incrementallyRehash函数进行批量操作,每次100个节点,大概一毫秒。将rehash操作进行分而治之。渐进式rehash源码:int dictRehash(dict *d, int n) { int empty_visits = n*10; /* 最大访问的空桶的数量,n*10 */ if (!dictIsRehashing(d)) return 0; /* dict 没有正在进行 rehash 时,直接返回 */ while(n-- & d->ht[0].used != 0) { /* n 为最多迁移元素数量 */ dictEntry *de, *nextde; assert(d->ht[0].size > (unsigned long)d->rehashidx); /* 为防止 rehashidx 越界,当 rehashidx 大于 ht[0] 的数组大小时,不继续执行 */ while(d->ht[0].table[d->rehashidx] == NULL) { /* 当 rehashidx 位置的桶为空时,继续向下遍历,直到桶不为空或者达到最大访问空桶的数量 */ d->rehashidx++; if (--empty_visits == 0) return 1; //最大访问空桶数量-1,若减完,则退出 } de = d->ht[0].table[d->rehashidx]; while(de) { // 遍历桶中元素,移动元素至新表 uint64_t h; nextde = de->next; h = dictHashKey(d, de->key) & d->ht[1].sizemask; de->next = d->ht[1].table[h]; // 头插法 d->ht[1].table[h] = de; d->ht[0].used--; d->ht[1].used++; de = nextde; } d->ht[0].table[d->rehashidx] = NULL; // ht[0] 对应桶置为空 d->rehashidx++; } if (d->ht[0].used == 0) { // 检查是否已经 rehash 完成 zfree(d->ht[0].table); d->ht[0] = d->ht[1]; _dictReset(&d->ht[1]); d->rehashidx = -1; return 0; } return 1;}5.2.3 查找元素更新和删除操作大同小异,不在赘述。5.3应用场景总长度超过512字节或者单个元素长度大于64的Hash总长度超过512字节或者单个元素长度大于64的set六、压缩列表redis使用字节数据表示压缩列表,尽最大可能节省空间。其中,coding字段表示content的编码,其长度是动态变化的。如下表:encoding字段第1个字节的前2位,可以判断content字段存储的是整数或者字节数组。当content存储的是字节数组时,后续字节标识字节数组的实际长度;当content存储的是整数时,可根据第3、第4位判断整数的具体类型。而当encoding字段标识当前元素存储的是0~12的立即数时,数据直接存储在encoding字段的最后4位,此时没有content字段。举个例子:6.1数据结构因为解码过程比较繁琐,每次解码都需要性能损耗,为此定义了结构体zlentry,用于表示解码后的压缩列表元素:typedef struct zlentry { //previous_entry_length 长度 unsigned int prevrawlensize; /* Bytes used to encode the previous entry len*/ // previous_entry_length unsigned int prevrawlen; /* Previous entry len. */ //encoding 长度 unsigned int lensize; /* Bytes used to encode this entry type/len. For example strings have a 1, 2 or 5 bytes header. Integers always use a single byte.*/ // 内容的长度 unsigned int len; /* Bytes used to represent the actual entry. For strings this is just the string length while for integers it is 1, 2, 3, 4, 8 or 0 (for 4 bit immediate) depending on the number range. */ //首部长度 unsigned int headersize; /* prevrawlensize + lensize. */ //编码 unsigned char encoding; /* Set to ZIP_STR_* or ZIP_INT_* depending on the entry encoding. However for 4 bits immediate integers this can assume a range of values and must be range-checked. */ // 当前元素的首地址 unsigned char *p; /* Pointer to the very start of the entry, that is, this points to prev-entry-len field. */} zlentry;?6.2解码解码分为两步,解码previousentrylength和解码coding解码previousentrylength#define ZIP_DECODE_PREVLEN(ptr, prevlensize, prevlen) do { ZIP_DECODE_PREVLENSIZE(ptr, prevlensize); // prevlensize=1时,则 ptr 的第一个字节标识上一个节点的长度 if ((prevlensize) == 1) { (prevlen) = (ptr)[0]; } else if ((prevlensize) == 5) { assert(sizeof((prevlen)) == 4); // 如果 prevlensize = 5,取后面 4 个字节作为上一节点的长度 memcpy(&(prevlen), ((char*)(ptr)) + 1, 4); memrev32ifbe(&prevlen); } } while(0);解码coding#define ZIP_DECODE_LENGTH(ptr, encoding, lensize, len) do { // 获取当前的编码类型 ZIP_ENTRY_ENCODING((ptr), (encoding)); // 如果编码类型为字节数组 if ((encoding) numele = 0; rax->numnodes = 1; rax->head = raxNewNode(0,0); if (rax->head == NULL) { rax_free(rax); return NULL; } else { return rax; }}7.3.2.2?查找元素/** * 根据key获取对应的value * @param rax 待查找的rax * @param s 待查找的key * @param len s的长度 * @return */void *raxFind(rax *rax, unsigned char *s, size_t len) { raxNode *h; debugf("### Lookup: %.*s\n", (int)len, s); int splitpos = 0; size_t i = raxLowWalk(rax,s,len,&h,NULL,&splitpos,NULL); if (i != len || (h->iscompr & splitpos != 0) || !h->iskey) return raxNotFound; return raxGetData(h); //返回对应的value}可以看到,主要的代码在raxLowWalk方法中。/** * * @param rax 待查找的rax * @param s 待查找的key * @param len s的长度 * @param stopnode 终止的节点 要么匹配完成,要么没找到。。 * @param plink 父节点指向stopnode的指针的地址 * @param splitpos 压缩节点的匹配位置 * @param ts 记录路径 * @return */static inline size_t raxLowWalk(rax *rax, unsigned char *s, size_t len, raxNode **stopnode, raxNode ***plink, int *splitpos, raxStack *ts) { // 从根节点开始查找 raxNode *h = rax->head; raxNode **parentlink = &rax->head; // 当前匹配字符位置 size_t i = 0; /* Position in the string. */ // 当前匹配节点位置 size_t j = 0; /* Position in the node children (or bytes if compressed).*/ // 当前节点有子节点且s字符串没有遍历完 while(h->size & i data; if (h->iscompr) { //压缩节点判断是否完全匹配 for (j = 0; j size & i size) break; } else { /* Even when h->size is large, linear scan provides good * performances compared to other approaches that are in theory * more sounding, like performing a binary search. */ // 非压缩节点 for (j = 0; j size; j++) { if (v[j] == s[i]) break; } // 未在非压缩节点找到字符串 if (j == h->size) break; // 压缩节点可以匹配 i++; } // 记录路径 if (ts) raxStackPush(ts,h); /* Save stack of parent nodes. */ raxNode **children = raxNodeFirstChildPtr(h); if (h->iscompr) j = 0; /* Compressed node only child is at index 0. */ // 移动到第j个子节点 memcpy(&h,children+j,sizeof(h)); parentlink = children+j; j = 0; /* If the new node is compressed and we do not iterate again (since i == l) set the split position to 0 to signal this node represents the searched key. */ } debugnode("Lookup stop node is",h); if (stopnode) *stopnode = h; if (plink) *plink = parentlink; if (splitpos & h->iscompr) *splitpos = j; return i;}这里的步骤比较简单:初始化变量从根节点查找直到当前节点无子节点或者s字符串遍历完毕 如果是压缩节点,节点中字符需要和s中的字符完全匹配 如果是非压缩节点,需要找到至少一个与S中字符匹配的字符如果匹配成功,就查找子节点。7.3.2.3?添加元素向rax中添加key-value对有两种方式,覆盖和不覆盖原有key对应的方法分别为:int raxInsert(rax *rax, unsigned char *s, size_t len, void *data, void **old) { return raxGenericInsert(rax,s,len,data,old,1);}int raxTryInsert(rax *rax, unsigned char *s, size_t len, void *data, void **old) { return raxGenericInsert(rax,s,len,data,old,0);}两者都是调用同一个方法raxGenericInsert。该接口方法主要分为以下几步:查找key是否存在i = raxLowWalk(rax,s,len,&h,&parentlink,&j,NULL);key存在的情况下,直接更新节点数据? // 如果之前节点没有数据,就分配一个空间 if (!h->iskey || (h->isnull & overwrite)) { h = raxReallocForData(h,data); if (h) memcpy(parentlink,&h,sizeof(h)); } if (h == NULL) { errno = ENOMEM; return 0; } /* Update the existing key if there is already one. */ // 更新数据 if (h->iskey) { if (old) *old = raxGetData(h); if (overwrite) raxSetData(h,data); errno = 0; return 0; /* Element already exists. */ }key不存在的情况下,最后停留在某个压缩节点上;key不存在时,就分为多种情况,这里借用redis源码注解的例子。原来有个rax树,长这样:有以下几种插入情况:插入 ANNIENTARE插入ANNIBALI插入 AGO和第一种类似,只是右边的节点变成了非压缩节点。插入 CIAO插入 ANNI上面列举了五种情况,可以分为两类:第一种是新插入的key是当前节点的一部分,这时我们只需要拆分压缩节点,并设置新的key即可第二种是新插入的key与压缩节点的某个位置不匹配,这时我们需要在拆分后的相应位置的非压缩节点中,插入新key的不匹配字符,之后将新key的剩余部分,插入到这个非压缩节点的子节点中源码中细节较多,不再细讲 感兴趣的同学可以查看。int raxGenericInsert(rax *rax, unsigned char *s, size_t len, void *data, void**old, int overwrite)7.3.2.4 添加元素删除节点删除接口用于删除rax中某个key,依旧拿redis源码中注释的例子 原来有这样一个rax,其有两个key,foo和foobar也有以下几种情况:删除foo假设原来的rax是这样的,其有两个key,foobar,footer7.4stream结构stream流就像是一个消息链表,依赖于Rax结构和listpack结构,本节主要介绍消息流的增删查操作。typedef struct stream { // 指向rax树 rax *rax; // 元素个数 uint64_t length; // 指向最后一个消息 streamID last_id; // 消费组 rax *cgroups; } stream;结构如下图所示:7.4.1 基本操作7.4.1.1 添加消息redis提供streamAppendItem 函数,向stream中添加一个新的消息。/** * * @param s 待插入的数据流 * @param argv 消息内容 * @param numfields 消息数量 * @param added_id 消息id * @param use_id 调用方定义的id * @return */int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_id, streamID *use_id)代码比较多就不贴了,画一张流程图描述细节。7.4.1.2 新增消费组消费组也是保存在rax树中,以消费组的名称为key,消费组的streamCG结构为value。treamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id) { //当前消息流没有消费组,就新建一个 if (s->cgroups == NULL) s->cgroups = raxNew(); // 查找是否有重名消费组,有就直接返回 if (raxFind(s->cgroups,(unsigned char*)name,namelen) != raxNotFound) return NULL; // 新建消费组 streamCG *cg = zmalloc(sizeof(*cg)); cg->pel = raxNew(); cg->consumers = raxNew(); cg->last_id = *id; // 将消费组插入到消费组树中 raxInsert(s->cgroups,(unsigned char*)name,namelen,cg,NULL); return cg;}7.4.1.3 新建消费者redis没有提供为消费组中新增消费者的方法。在查询消费者的时候,如果不存在,就会新增。7.4.1.4 删除消息删除消息通过将listpack中消息的标志位设为已删除,并不是真正的删除。如果整个listpack的消息都被删除了,才会从rax中释放该节点。void streamIteratorRemoveEntry(streamIterator *si, streamID *current) { // 当前消息所在的listpack unsigned char *lp = si->lp; int64_t aux; // 标记删除位 int flags = lpGetInteger(si->lp_flags); flags |= STREAM_ITEM_FLAG_DELETED; lp = lpReplaceInteger(lp,&si->lp_flags,flags); //修改有效的消息数量 unsigned char *p = lpFirst(lp); aux = lpGetInteger(p); // 如果只有待删除的消息,就直接释放listpack if (aux == 1) { /* If this is the last element in the listpack, we can remove the whole * node. */ lpFree(lp); raxRemove(si->stream->rax,si->ri.key,si->ri.key_len,NULL); } else { // 更新统计信息 lp = lpReplaceInteger(lp,&p,aux-1); // 查找删除节点p p = lpNext(lp,p); aux = lpGetInteger(p); lp = lpReplaceInteger(lp,&p,aux+1); // 更新listpack指针,可能因为扩容缩容而变化 if (si->lp != lp) raxInsert(si->stream->rax,si->ri.key,si->ri.key_len,lp,NULL); }7.4.1.5.查找消费组主要是利用rax查询接口:streamCG *streamLookupCG(stream *s, sds groupname) { if (s->cgroups == NULL) return NULL; streamCG *cg = raxFind(s->cgroups,(unsigned char*)groupname, sdslen(groupname)); return (cg == raxNotFound) ? NULL : cg;}7.4.1.6 查找消费者streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int create) { // 在消费者的rax中查找指定的消费者 streamConsumer *consumer = raxFind(cg->consumers,(unsigned char*)name, sdslen(name)); if (consumer == raxNotFound) { if (!create) return NULL; // 如果没有找到,新建一个消费者,插入到消费者rax树中 consumer = zmalloc(sizeof(*consumer)); consumer->name = sdsdup(name); consumer->pel = raxNew(); raxInsert(cg->consumers,(unsigned char*)name,sdslen(name), consumer,NULL); } consumer->seen_time = mstime(); return consumer;}八、Redis线程模型演进8.1Redis3.0redis内部使用文件事件处理器file event handler,这个文件处理器是单线程的,所以我们经常说的redis是单线程模型。客户端 Socket01 向 Redis 的 Server Socket 请求建立连接,此时 Server Socket 会产生一个 AEREADABLE 事件,IO 多路复用程序监听到 server socket 产生的事件后,将该事件压入队列中。文件事件分派器从队列中获取该事件,交给连接应答处理器。连接应答处理器会创建一个能与客户端通信的 Socket01,并将该 Socket01 的 AEREADABLE 事件与命令请求处理器关联。假设此时客户端发送了一个 set key value 请求,此时 Redis 中的 Socket01 会产生 AEREADABLE 事件,IO 多路复用程序将事件压入队列,此时事件分派器从队列中获取到该事件,由于前面 Socket01 的 AEREADABLE 事件已经与命令请求处理器关联,因此事件分派器将事件交给命令请求处理器来处理。命令请求处理器读取 Scket01 的 set key value 并在自己内存中完成 set key value 的设置。操作完成后,它会将 Scket01 的 AE_WRITABLE 事件与令回复处理器关联。如果此时客户端准备好接收返回结果了,那么 Redis 中的 Socket01 会产生一个 AEWRITABLE 事件,同样压入队列中,事件分派器找到相关联的命令回复处理器,由命令回复处理器对 socket01 输入本次操作的一个结果,比如 ok,之后解除 Socket01 的 AEWRITABLE 事件与命令回复处理器的关联。8.2Redis4.0redis4.0开始引入了多线程,除了主线程,redis有后台线程进行一些边缘的缓慢的操作,比如释放无用连接,rehash迁移等操作。8.3Redis6.0在redis6.0中,真正引入了多线程。九、总结本文介绍了redis基本数据类型以及redis5.0以后的新特性。工作中我们也常常使用redis进行各种逻辑的处理,而了解其源码可以避免踩很多坑。另外,redis底层的设计也有很多值得学习的地方,比如更高效的使用内存和提升运算的时间复杂度,了解这些可以帮助我们在性能优化中有更多的思路。点击“阅读原文”了解更多有赞技术招聘信息
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 会员注册

本版积分规则

QQ|手机版|心飞设计-版权所有:微度网络信息技术服务中心 ( 鲁ICP备17032091号-12 )|网站地图

GMT+8, 2025-1-11 10:17 , Processed in 1.343263 second(s), 25 queries .

Powered by Discuz! X3.5

© 2001-2025 Discuz! Team.

快速回复 返回顶部 返回列表