redis7.x源码分析:(3) dict字典
dict字典采用经典hash表数据结构实现,由键值对组成,类似于C++中的unordered_map。两者在代码实现层面存在一些差异,比如gnustl的unordered_map分配的桶数组个数是(质数n),而dict分配的桶数组个数是(2^n);另外,dict对hash值相同的key采用了常规的开链法存储,而unordered_map在采用开链法的前提下,又使用了_M_before_begin将不同桶中的链表串联成了一个大链表,从而将遍历算法复杂度优化为O(n);还有就是,dict为应对服务器性能上的特殊要求,设计成了双hash表的形式,这也使得它在rehash,各种操作存在一些特殊性,我在下面的代码分析中会说到。
dict在redis里面的用途十分广泛,几乎所有的模块都会用到,其中的两大核心用途是:
- 16个数据库空间
- hash和zset类型数据的存储
dict相关结构定义:
// 节点 typedef struct dictEntry { // 任意类型键 void *key; // 存储的值 union { void *val; uint64_t u64; int64_t s64; double d; } v; // 同一个桶中链表的下一个元素 struct dictEntry *next; /* Next entry in the same hash bucket. */ void *metadata[]; /* An arbitrary number of bytes (starting at a * pointer-aligned address) of size as returned * by dictType's dictEntryMetadataBytes(). */ } dictEntry; typedef struct dict dict; // 存储不同类型数据的字典,设置不同的处理函数 typedef struct dictType { uint64_t (*hashFunction)(const void *key); void *(*keyDup)(dict *d, const void *key); void *(*valDup)(dict *d, const void *obj); int (*keyCompare)(dict *d, const void *key1, const void *key2); void (*keyDestructor)(dict *d, void *key); void (*valDestructor)(dict *d, void *obj); int (*expandAllowed)(size_t moreMem, double usedRatio); /* Allow a dictEntry to carry extra caller-defined metadata. The * extra memory is initialized to 0 when a dictEntry is allocated. */ size_t (*dictEntryMetadataBytes)(dict *d); } dictType; #define DICTHT_SIZE(exp) ((exp) == -1 ? 0 : (unsigned long)1<<(exp)) #define DICTHT_SIZE_MASK(exp) ((exp) == -1 ? 0 : (DICTHT_SIZE(exp))-1) struct dict { // 字典类型,不同的类型有不同的hash函数,dup函数 dictType *type; // 双哈希表指针数组 dictEntry **ht_table[2]; // 存放的节点数 unsigned long ht_used[2]; // rehash索引(哈希表的下标),大于 -1 表示正在rehash long rehashidx; /* rehashing not in progress if rehashidx == -1 */ /* Keep small vars at end for optimal (minimal) struct padding */ // rehash暂停标志 int16_t pauserehash; /* If >0 rehashing is paused (<0 indicates coding error) */ // 表示2的多少次幂,哈希表的大小=2^ht_size_exp signed char ht_size_exp[2]; /* exponent of size. (size = 1<<exp) */ };
hash表的创建比较简单直接略过,先看下 _dictExpand 的实现,它在hash表扩容缩容和创建时都会用到。当添加dictAdd时,存储的节点数 used / size >= 1,就需要调用它扩容。
int _dictExpand(dict *d, unsigned long size, int* malloc_failed) { if (malloc_failed) *malloc_failed = 0; /* the size is invalid if it is smaller than the number of * elements already inside the hash table */ // 正在rehash或者 used / size > 1直接退出 if (dictIsRehashing(d) || d->ht_used[0] > size) return DICT_ERR; /* the new hash table */ dictEntry **new_ht_table; unsigned long new_ht_used; // 获取第一个 2^N > size 的N的大小 signed char new_ht_size_exp = _dictNextExp(size); /* Detect overflows */ // 2^N 作为hash数组的长度, 另外判断分配的大小是否合法 size_t newsize = 1ul<<new_ht_size_exp; if (newsize < size || newsize * sizeof(dictEntry*) < newsize) return DICT_ERR; /* Rehashing to the same table size is not useful. */ if (new_ht_size_exp == d->ht_size_exp[0]) return DICT_ERR; /* Allocate the new hash table and initialize all pointers to NULL */ if (malloc_failed) { new_ht_table = ztrycalloc(newsize*sizeof(dictEntry*)); *malloc_failed = new_ht_table == NULL; if (*malloc_failed) return DICT_ERR; } else new_ht_table = zcalloc(newsize*sizeof(dictEntry*)); new_ht_used = 0; /* Is this the first initialization? If so it's not really a rehashing * we just set the first hash table so that it can accept keys. */ // 如果是第一次创建hash表,则设置完第一个表后直接退出 if (d->ht_table[0] == NULL) { d->ht_size_exp[0] = new_ht_size_exp; d->ht_used[0] = new_ht_used; d->ht_table[0] = new_ht_table; return DICT_OK; } /* Prepare a second hash table for incremental rehashing */ // 设置第二个表后退出,并且开始rehash d->ht_size_exp[1] = new_ht_size_exp; d->ht_used[1] = new_ht_used; d->ht_table[1] = new_ht_table; d->rehashidx = 0; return DICT_OK; }
// 检查是否需要缩容 int htNeedsResize(dict *dict) { long long size, used; // 哈希表大小 size = dictSlots(dict); // 哈希表已用节点数量 used = dictSize(dict); // 当哈希表的大小大于 > 4 并且用量小于 10%时缩容 return (size && used && size > DICT_HT_INITIAL_SIZE && (used*100/size < REDIS_HT_MINFILL)); }
在执行databasesCron时,如果数据库满足 htNeedsResize 会进行缩容,另外hash和zset类型数据在执行删除操作时,也会判断是否需要缩容。
扩容缩容都会触发开启rehash,最终调用 dictRehash 实现rehash的动作,以下是它的代码:
int dictRehash(dict *d, int n) { // 累计访问多少个空桶后退出rehash int empty_visits = n*10; /* Max number of empty buckets to visit. */ if (!dictIsRehashing(d)) return 0; // 进行n次rehash while(n-- && d->ht_used[0] != 0) { dictEntry *de, *nextde; /* Note that rehashidx can't overflow as we are sure there are more * elements because ht[0].used != 0 */ assert(DICTHT_SIZE(d->ht_size_exp[0]) > (unsigned long)d->rehashidx); while(d->ht_table[0][d->rehashidx] == NULL) { // 遇到空桶, 索引后移 d->rehashidx++; if (--empty_visits == 0) return 1; } // 得到桶中链表第一个节点 de = d->ht_table[0][d->rehashidx]; /* Move all the keys in this bucket from the old to the new hash HT */ // 遍历链表上所有的节点, 添加到hash表2中 while(de) { uint64_t h; nextde = de->next; /* Get the index in the new hash table */ // 计算hash值对应的索引 = hash & (2^exp - 1) h = dictHashKey(d, de->key) & DICTHT_SIZE_MASK(d->ht_size_exp[1]); // 添加到hash表2中 de->next = d->ht_table[1][h]; d->ht_table[1][h] = de; d->ht_used[0]--; d->ht_used[1]++; de = nextde; } // 清空hash表1的桶 d->ht_table[0][d->rehashidx] = NULL; d->rehashidx++; } /* Check if we already rehashed the whole table... */ // 如果rehash全部完成了, 则用表2替换表1, 并且释放原来的表1 if (d->ht_used[0] == 0) { zfree(d->ht_table[0]); /* Copy the new ht onto the old one */ d->ht_table[0] = d->ht_table[1]; d->ht_used[0] = d->ht_used[1]; d->ht_size_exp[0] = d->ht_size_exp[1]; _dictReset(d, 1); d->rehashidx = -1; return 0; } /* More to rehash... */ return 1; } // 执行多少毫秒的rehash操作 int dictRehashMilliseconds(dict *d, int ms) { if (d->pauserehash > 0) return 0; long long start = timeInMilliseconds(); int rehashes = 0; // 还没超时的情况下,一次尝试执行100个桶的rehash while(dictRehash(d,100)) { rehashes += 100; if (timeInMilliseconds()-start > ms) break; } return rehashes; }
再分别看一下 dictAddRaw、dictGenericDelete、dictFind的代码:
/* Low level add or find: * This function adds the entry but instead of setting a value returns the * dictEntry structure to the user, that will make sure to fill the value * field as they wish. * * This function is also directly exposed to the user API to be called * mainly in order to store non-pointers inside the hash value, example: * * entry = dictAddRaw(dict,mykey,NULL); * if (entry != NULL) dictSetSignedIntegerVal(entry,1000); * * Return values: * * If key already exists NULL is returned, and "*existing" is populated * with the existing entry if existing is not NULL. * * If key was added, the hash entry is returned to be manipulated by the caller. */ dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing) { long index; dictEntry *entry; int htidx; // 正在rehash过程中,则执行一次rehash操作(只把老表中一个桶对应链表内的数据节点重新hash到新表的不同桶中) if (dictIsRehashing(d)) _dictRehashStep(d); /* Get the index of the new element, or -1 if * the element already exists. */ // 获取新节点用的数组索引 if ((index = _dictKeyIndex(d, key, dictHashKey(d,key), existing)) == -1) return NULL; /* Allocate the memory and store the new entry. * Insert the element in top, with the assumption that in a database * system it is more likely that recently added entries are accessed * more frequently. */ // 如果在rehash的话,新节点只会加到第二个哈希表中 htidx = dictIsRehashing(d) ? 1 : 0; size_t metasize = dictMetadataSize(d); entry = zmalloc(sizeof(*entry) + metasize); if (metasize > 0) { memset(dictMetadata(entry), 0, metasize); } // 新节点添加到链表头 entry->next = d->ht_table[htidx][index]; d->ht_table[htidx][index] = entry; d->ht_used[htidx]++; /* Set the hash entry fields. */ // 设置新节点的key, 如果设置了 type->KeyDup 函数, 则复制出一个key的副本保存到节点中 dictSetKey(d, entry, key); return entry; } /* Search and remove an element. This is a helper function for * dictDelete() and dictUnlink(), please check the top comment * of those functions. */ static dictEntry *dictGenericDelete(dict *d, const void *key, int nofree) { uint64_t h, idx; dictEntry *he, *prevHe; int table; /* dict is empty */ if (dictSize(d) == 0) return NULL; // 正在rehash过程中,则执行一次rehash操作 if (dictIsRehashing(d)) _dictRehashStep(d); h = dictHashKey(d, key); // 在hash表1和2中查找 for (table = 0; table <= 1; table++) { // 计算索引获取桶中链表节点 idx = h & DICTHT_SIZE_MASK(d->ht_size_exp[table]); he = d->ht_table[table][idx]; prevHe = NULL; while(he) { // 遍历链表找到对应key的节点并删除 if (key==he->key || dictCompareKeys(d, key, he->key)) { /* Unlink the element from the list */ if (prevHe) prevHe->next = he->next; else d->ht_table[table][idx] = he->next; if (!nofree) { // 释放节点中key/value dictFreeUnlinkedEntry(d, he); } d->ht_used[table]--; return he; } prevHe = he; he = he->next; } if (!dictIsRehashing(d)) break; } return NULL; /* not found */ } dictEntry *dictFind(dict *d, const void *key) { dictEntry *he; uint64_t h, idx, table; if (dictSize(d) == 0) return NULL; /* dict is empty */ // 执行一次rehash if (dictIsRehashing(d)) _dictRehashStep(d); h = dictHashKey(d, key); // 在hash表1和2中查找 for (table = 0; table <= 1; table++) { idx = h & DICTHT_SIZE_MASK(d->ht_size_exp[table]); he = d->ht_table[table][idx]; while(he) { if (key==he->key || dictCompareKeys(d, key, he->key)) return he; he = he->next; } if (!dictIsRehashing(d)) return NULL; } return NULL; }
dict的rehash并不是一次完成的,这个是基于性能、响应时间上的考虑。会执行rehash有以下几个函数:
- databasesCron函数,它会定时执行,对16个db中的dict和expires进行rehash,每次调用dictRehashMilliseconds执行1ms时间
- dictAddRaw、dictGenericDelete、dictFind、dictGetRandomKey、dictGetSomeKeys函数,它们每次rehash只处理一个桶
最后看下迭代器的定义:
/* If safe is set to 1 this is a safe iterator, that means, you can call * dictAdd, dictFind, and other functions against the dictionary even while * iterating. Otherwise it is a non safe iterator, and only dictNext() * should be called while iterating. */ typedef struct dictIterator { // 迭代器对应的字典 dict *d; // 正在迭代的hash索引 long index; // table表示迭代的表是两张表中的哪一张;safe表示是否是安全迭代器 int table, safe; // entry表示迭代器当前的节点;nextEntry表示下个节点 dictEntry *entry, *nextEntry; /* unsafe iterator fingerprint for misuse detection. */ // 指纹,判断迭代过程中有没执行被禁止的操作(dictNext以外的函数) unsigned long long fingerprint; } dictIterator;
迭代器分为安全和不安全迭代器:
- 安全迭代器,针对字典可以执行 dictAdd 等有可能修改的函数
- 不安全迭代器,针对字典只能执行 dictNext
迭代的实现:
dictEntry *dictNext(dictIterator *iter) { while (1) { if (iter->entry == NULL) { // 第一个迭代或者桶中的链表遍历完了 if (iter->index == -1 && iter->table == 0) { // 安全模式暂停rehash if (iter->safe) dictPauseRehashing(iter->d); else iter->fingerprint = dictFingerprint(iter->d); } // 依次轮询两张hash表,查找非空节点 iter->index++; if (iter->index >= (long) DICTHT_SIZE(iter->d->ht_size_exp[iter->table])) { if (dictIsRehashing(iter->d) && iter->table == 0) { iter->table++; iter->index = 0; } else { break; } } iter->entry = iter->d->ht_table[iter->table][iter->index]; } else { // 取当前节点的下一个节点 iter->entry = iter->nextEntry; } if (iter->entry) { // 如果不为空,返回当前节点,保存下个节点 /* We need to save the 'next' here, the iterator user * may delete the entry we are returning. */ iter->nextEntry = iter->entry->next; return iter->entry; } } return NULL; }