blog
blog copied to clipboard
Redis 中的 quicklist 实现
Redis 中使用 quicklist 作为列表的底层实现之一,quicklist 是 Redis 3.2 版本引入的,在 Redis 3.2 之前,列表的底层实现有 ziplist 和 adlist (A generic doubly linked list) 两种。
// 3.2/src/adlist.h - A generic doubly linked list implementation
typedef struct list {
listNode *head;
listNode *tail;
void *(*dup)(void *ptr);
void (*free)(void *ptr);
int (*match)(void *ptr, void *key);
unsigned long len;
} list;
typedef struct listNode {
struct listNode *prev;
struct listNode *next;
void *value;
} listNode;
adlist 是一个通用的双向链表实现,list 结构为链表提供表头指针 head、表尾指针 tail,以及链表节点长度 len。链表的每个节点都有指向前一个节点、下一个节点的指针。
Redis 3.2 之后的版本 quicklist 作为列表的唯一底层实现,源码注释是这样描述 quciklist 的,A doubly linked list of ziplists。顾名思义,quicklist 是在双向链表的基础上,链表中的每个节点是一个 ziplist 结构。
// 5.0/src/quicklist.h
/* quicklist is a 40 byte struct (on 64-bit systems) describing a quicklist.
* 'count' is the number of total entries.
* 'len' is the number of quicklist nodes.
* 'compress' is: -1 if compression disabled, otherwise it's the number
* of quicklistNodes to leave uncompressed at ends of quicklist.
* 'fill' is the user-requested (or default) fill factor. */
typedef struct quicklist {
quicklistNode *head;
quicklistNode *tail;
unsigned long count; /* total count of all entries in all ziplists */
unsigned long len; /* number of quicklistNodes */
...
} quicklist;
quicklist 结构和 adlist 有点类似,head 和 tail 分别是指向链表的头尾节点的指针,len 也是表示链表节点长度,不一样的是,count 用来表示链表的元素个数,在 adlist 中,len 数等同于 count 数,而 quicklist 中,每个 quicklistNode 节点都有对应的一个 ziplist 结构。
// 5.0/src/quicklist.h
/* quicklistNode is a 32 byte struct describing a ziplist for a quicklist.
* We use bit fields keep the quicklistNode at 32 bytes.
* count: 16 bits, max 65536 (max zl bytes is 65k, so max count actually < 32k).
* encoding: 2 bits, RAW=1, LZF=2.
* container: 2 bits, NONE=1, ZIPLIST=2.
* recompress: 1 bit, bool, true if node is temporarry decompressed for usage.
* attempted_compress: 1 bit, boolean, used for verifying during testing.
* extra: 10 bits, free for future use; pads out the remainder of 32 bits */
typedef struct quicklistNode {
struct quicklistNode *prev;
struct quicklistNode *next;
unsigned char *zl;
unsigned int sz; /* ziplist size in bytes */
unsigned int count : 16; /* count of items in ziplist */
unsigned int encoding : 2; /* RAW==1 or LZF==2 */
unsigned int container : 2; /* NONE==1 or ZIPLIST==2 */
unsigned int recompress : 1; /* was this node previous compressed? */
unsigned int attempted_compress : 1; /* node can't compress; too small */
unsigned int extra : 10; /* more bits to steal for future usage */
} quicklistNode;
quicklistNode 除了前后节点指针和指向节点数据的 zl 指针,还有下面几个元数据:
- sz (32 byte): ziplist 字节大小。
- count (16 bit) : ziplist 中的元素个数。
- encoding (2 bit):container 的编码方式,1 表示原生的,2 表示使用 LZF 压缩。
- container (2 bit) : 表示 zl 指向的容器类型,1 代表 none,2 代表 ziplist 结构。
- recompress (1 bit) : 是否之前被压缩过。
- attempted_compress (1 bit) : 测试用。
- extra (10 bit) : 保留位。
从 quicklistNode 结构上看,虽然 container 支持不同类型,但目前 zl 基本上是一个 ziplist 结构。

// 5.0/src/quicklist.h
typedef struct quicklist {
...
int fill : 16; /* fill factor for individual nodes */
unsigned int compress : 16; /* depth of end nodes not to compress;0=off */
} quicklist;
quicklist 结构里面有两个 16 bit 的位域,fill 和 compress。
- fill 表示每个节点 ziplist 的填充因子,用来指明 ziplist 允许的大小或节点个数。
- compress 表示链表末尾不压缩的节点个数。
当 fill 是整数的时候表示 ziplist 最多的节点个数,负数表示 ziplist 最大字节大小。
- -1 表示 ziplist 最大为 4 KB
- -2 表示 ziplist 最大为 8 KB
- -3 表示 ziplist 最大为 16 KB
- -4 表示 ziplist 最大为 32 KB
- -5 表示 ziplist 最大为 64 KB
// src/quicklist.c
/* Optimization levels for size-based filling */
static const size_t optimization_level[] = {4096, 8192, 16384, 32768, 65536};
REDIS_STATIC int
_quicklistNodeSizeMeetsOptimizationRequirement(const size_t sz,
const int fill) {
if (fill >= 0)
return 0;
size_t offset = (-fill) - 1;
if (offset < (sizeof(optimization_level) / sizeof(*optimization_level))) {
if (sz <= optimization_level[offset]) {
return 1;
} else {
return 0;
}
} else {
return 0;
}
}
quicklistLZF 结构用来表示一个使用 LZF 压缩的 zl 数据。
/* quicklistLZF is a 4+N byte struct holding 'sz' followed by 'compressed'.
* 'sz' is byte length of 'compressed' field.
* 'compressed' is LZF data with total (compressed) length 'sz'
* NOTE: uncompressed length is stored in quicklistNode->sz.
* When quicklistNode->zl is compressed, node->zl points to a quicklistLZF */
typedef struct quicklistLZF {
unsigned int sz; /* LZF size in bytes*/
char compressed[];
} quicklistLZF;
创建
// 5.0/src/quicklist.c
/* Create a new quicklist.
* Free with quicklistRelease(). */
quicklist *quicklistCreate(void) {
struct quicklist *quicklist;
quicklist = zmalloc(sizeof(*quicklist));
quicklist->head = quicklist->tail = NULL;
quicklist->len = 0;
quicklist->count = 0;
quicklist->compress = 0;
quicklist->fill = -2;
return quicklist;
}
添加
// 5.0/src/quicklist.c
/* Wrapper to allow argument-based switching between HEAD/TAIL pop */
void quicklistPush(quicklist *quicklist, void *value, const size_t sz,
int where) {
if (where == QUICKLIST_HEAD) {
quicklistPushHead(quicklist, value, sz);
} else if (where == QUICKLIST_TAIL) {
quicklistPushTail(quicklist, value, sz);
}
}
quicklist 支持向头部或尾部 push 新的数据。
// 5.0/src/quicklist.c
/* Add new entry to head node of quicklist.
*
* Returns 0 if used existing head.
* Returns 1 if new head created. */
int quicklistPushHead(quicklist *quicklist, void *value, size_t sz) {
quicklistNode *orig_head = quicklist->head;
if (likely(
_quicklistNodeAllowInsert(quicklist->head, quicklist->fill, sz))) {
quicklist->head->zl =
ziplistPush(quicklist->head->zl, value, sz, ZIPLIST_HEAD);
quicklistNodeUpdateSz(quicklist->head);
} else {
quicklistNode *node = quicklistCreateNode();
node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_HEAD);
quicklistNodeUpdateSz(node);
_quicklistInsertNodeBefore(quicklist, quicklist->head, node);
}
quicklist->count++;
quicklist->head->count++;
return (orig_head != quicklist->head);
}
_quicklistNodeAllowInsert 判断是否允许往 quicklistNode 中的 ziplist 继续插入数据。
- 允许插入,就调用 ziplistPush 往 head 节点的 ziplist 头部插入。
- 否则就新增一个 quicklistNode 节点,插入到 quicklist 头部。
REDIS_STATIC int _quicklistNodeAllowInsert(const quicklistNode *node,
const int fill, const size_t sz) {
if (unlikely(!node))
return 0;
int ziplist_overhead;
/* size of previous offset */
if (sz < 254)
ziplist_overhead = 1;
else
ziplist_overhead = 5;
/* size of forward offset */
if (sz < 64)
ziplist_overhead += 1;
else if (likely(sz < 16384))
ziplist_overhead += 2;
else
ziplist_overhead += 5;
/* new_sz overestimates if 'sz' encodes to an integer type */
unsigned int new_sz = node->sz + sz + ziplist_overhead;
if (likely(_quicklistNodeSizeMeetsOptimizationRequirement(new_sz, fill)))
return 1;
else if (!sizeMeetsSafetyLimit(new_sz))
return 0;
else if ((int)node->count < fill)
return 1;
else
return 0;
}
/* Maximum size in bytes of any multi-element ziplist.
* Larger values will live in their own isolated ziplists. */
#define SIZE_SAFETY_LIMIT 8192
#define sizeMeetsSafetyLimit(sz) ((sz) <= SIZE_SAFETY_LIMIT)
_quicklistNodeAllowInsert 先计算插入数据所需要的头部大小 ziplist_overhead,分别由后一个节点的 prevlen 和当前节点的 encoding 大小组成。
新的 ziplist 字节大小 node->sz + sz + ziplist_overhead,这边其实忽略了 ziplist 连锁更新带来的影响。
下面是几种是否被允许插入新增数据的判断。
- 对于填充因子小于 0,判断是否超过单个 ziplist 大小限制;
- 校验单个 ziplist 是否超过 8KB,主要防止 fill 为正数时单个 ziplist 内存过大;
- 校验 ziplist 节点个数是否超过限制。
_quicklistInsertNodeBefore 用来在 quicklist 中的某一个节点前插入一个新的节点。
/* Wrappers for node inserting around existing node. */
REDIS_STATIC void _quicklistInsertNodeBefore(quicklist *quicklist,
quicklistNode *old_node,
quicklistNode *new_node) {
__quicklistInsertNode(quicklist, old_node, new_node, 0);
}
/* Insert 'new_node' after 'old_node' if 'after' is 1.
* Insert 'new_node' before 'old_node' if 'after' is 0.
* Note: 'new_node' is *always* uncompressed, so if we assign it to
* head or tail, we do not need to uncompress it. */
REDIS_STATIC void __quicklistInsertNode(quicklist *quicklist,
quicklistNode *old_node,
quicklistNode *new_node, int after) {
if (after) {
new_node->prev = old_node;
if (old_node) {
new_node->next = old_node->next;
if (old_node->next)
old_node->next->prev = new_node;
old_node->next = new_node;
}
if (quicklist->tail == old_node)
quicklist->tail = new_node;
} else {
new_node->next = old_node;
if (old_node) {
new_node->prev = old_node->prev;
if (old_node->prev)
old_node->prev->next = new_node;
old_node->prev = new_node;
}
if (quicklist->head == old_node)
quicklist->head = new_node;
}
/* If this insert creates the only element so far, initialize head/tail. */
if (quicklist->len == 0) {
quicklist->head = quicklist->tail = new_node;
}
if (old_node)
quicklistCompress(quicklist, old_node);
quicklist->len++;
}