Skip to content

Commit 0c05436

Browse files
committed
Lazyfree: a first implementation of non blocking DEL.
1 parent 712ea72 commit 0c05436

File tree

8 files changed

+341
-53
lines changed

8 files changed

+341
-53
lines changed

src/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ endif
117117

118118
REDIS_SERVER_NAME=redis-server
119119
REDIS_SENTINEL_NAME=redis-sentinel
120-
REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o geo.o
120+
REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o geo.o lazyfree.o
121121
REDIS_GEOHASH_OBJ=../deps/geohash-int/geohash.o ../deps/geohash-int/geohash_helper.o
122122
REDIS_CLI_NAME=redis-cli
123123
REDIS_CLI_OBJ=anet.o adlist.o redis-cli.o zmalloc.o release.o anet.o ae.o crc64.o

src/db.c

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,6 @@
3333
#include <signal.h>
3434
#include <ctype.h>
3535

36-
void slotToKeyAdd(robj *key);
37-
void slotToKeyDel(robj *key);
38-
void slotToKeyFlush(void);
39-
4036
/*-----------------------------------------------------------------------------
4137
* C-level DB API
4238
*----------------------------------------------------------------------------*/
@@ -184,7 +180,7 @@ robj *dbRandomKey(redisDb *db) {
184180
}
185181

186182
/* Delete a key, value, and associated expiration entry if any, from the DB */
187-
int dbDelete(redisDb *db, robj *key) {
183+
int dbSyncDelete(redisDb *db, robj *key) {
188184
/* Deleting an entry from the expires dict will not free the sds of
189185
* the key, because it is shared with the main dictionary. */
190186
if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
@@ -196,6 +192,14 @@ int dbDelete(redisDb *db, robj *key) {
196192
}
197193
}
198194

195+
/* This is a wrapper whose behavior depends on the Redis lazy free
196+
* configuration. Deletes the key synchronously or asynchronously. */
197+
int dbDelete(redisDb *db, robj *key) {
198+
int async = 1; /* TODO: Fixme making this a proper option. */
199+
if (async) return dbAsyncDelete(db,key);
200+
else return dbSyncDelete(db,key);
201+
}
202+
199203
/* Prepare the string object stored at 'key' to be modified destructively
200204
* to implement commands like SETBIT or APPEND.
201205
*
@@ -302,20 +306,31 @@ void flushallCommand(client *c) {
302306
server.dirty++;
303307
}
304308

305-
void delCommand(client *c) {
306-
int deleted = 0, j;
309+
/* This command implements DEL and LAZYDEL. */
310+
void delGenericCommand(client *c, int lazy) {
311+
int numdel = 0, j;
307312

308313
for (j = 1; j < c->argc; j++) {
309314
expireIfNeeded(c->db,c->argv[j]);
310-
if (dbDelete(c->db,c->argv[j])) {
315+
int deleted = lazy ? dbAsyncDelete(c->db,c->argv[j]) :
316+
dbSyncDelete(c->db,c->argv[j]);
317+
if (deleted) {
311318
signalModifiedKey(c->db,c->argv[j]);
312319
notifyKeyspaceEvent(NOTIFY_GENERIC,
313320
"del",c->argv[j],c->db->id);
314321
server.dirty++;
315-
deleted++;
322+
numdel++;
316323
}
317324
}
318-
addReplyLongLong(c,deleted);
325+
addReplyLongLong(c,numdel);
326+
}
327+
328+
void delCommand(client *c) {
329+
delGenericCommand(c,0);
330+
}
331+
332+
void unlinkCommand(client *c) {
333+
delGenericCommand(c,1);
319334
}
320335

321336
/* EXISTS key1 key2 ... key_N.

src/dict.c

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -855,7 +855,7 @@ unsigned long dictScan(dict *d,
855855
void *privdata)
856856
{
857857
dictht *t0, *t1;
858-
const dictEntry *de;
858+
const dictEntry *de, *next;
859859
unsigned long m0, m1;
860860

861861
if (dictSize(d) == 0) return 0;
@@ -867,8 +867,9 @@ unsigned long dictScan(dict *d,
867867
/* Emit entries at cursor */
868868
de = t0->table[v & m0];
869869
while (de) {
870+
next = de->next;
870871
fn(privdata, de);
871-
de = de->next;
872+
de = next;
872873
}
873874

874875
} else {
@@ -887,8 +888,9 @@ unsigned long dictScan(dict *d,
887888
/* Emit entries at cursor */
888889
de = t0->table[v & m0];
889890
while (de) {
891+
next = de->next;
890892
fn(privdata, de);
891-
de = de->next;
893+
de = next;
892894
}
893895

894896
/* Iterate over indices in larger table that are the expansion
@@ -897,8 +899,9 @@ unsigned long dictScan(dict *d,
897899
/* Emit entries at cursor */
898900
de = t1->table[v & m1];
899901
while (de) {
902+
next = de->next;
900903
fn(privdata, de);
901-
de = de->next;
904+
de = next;
902905
}
903906

904907
/* Increment bits not covered by the smaller mask */

src/dict.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ typedef struct dict {
7878
void *privdata;
7979
dictht ht[2];
8080
long rehashidx; /* rehashing not in progress if rehashidx == -1 */
81-
int iterators; /* number of iterators currently running */
81+
unsigned long iterators; /* number of iterators currently running */
8282
} dict;
8383

8484
/* If safe is set to 1 this is a safe iterator, that means, you can call

src/lazyfree.c

Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
#include "server.h"
2+
3+
/* Initialization of the lazy free engine. Must be called only once at server
4+
* startup. */
5+
void initLazyfreeEngine(void) {
6+
server.lazyfree_dbs = listCreate();
7+
server.lazyfree_obj = listCreate();
8+
server.lazyfree_elements = 0;
9+
}
10+
11+
/* Return the amount of work needed in order to free an object.
12+
* The return value is not always the actual number of allocations the
13+
* object is compoesd of, but a number proportional to it.
14+
*
15+
* For strings the function always returns 1.
16+
*
17+
* For aggregated objects represented by hash tables or other data structures
18+
* the function just returns the number of elements the object is composed of.
19+
*
20+
* Objects composed of single allocations are always reported as having a
21+
* single item even if they are actaully logical composed of multiple
22+
* elements.
23+
*
24+
* For lists the funciton returns the number of elements in the quicklist
25+
* representing the list. */
26+
size_t lazyfreeGetFreeEffort(robj *obj) {
27+
if (obj->type == OBJ_LIST) {
28+
quicklist *ql = obj->ptr;
29+
return ql->len;
30+
} else if (obj->type == OBJ_SET && obj->encoding == OBJ_ENCODING_HT) {
31+
dict *ht = obj->ptr;
32+
return dictSize(ht);
33+
} else if (obj->type == OBJ_ZSET && obj->encoding == OBJ_ENCODING_SKIPLIST){
34+
zset *zs = obj->ptr;
35+
return zs->zsl->length;
36+
} else if (obj->type == OBJ_HASH && obj->encoding == OBJ_ENCODING_HT) {
37+
dict *ht = obj->ptr;
38+
return dictSize(ht);
39+
} else {
40+
return 1; /* Everything else is a single allocation. */
41+
}
42+
}
43+
44+
/* This callback is used together with dictScan() in order to free a dict.c
45+
* hash table incrementally. */
46+
void lazyfreeScanCallback(void *privdata, const dictEntry *de) {
47+
dict *ht = privdata;
48+
long saved_iterators = ht->iterators;
49+
ht->iterators = 1; /* Make sure no rehashing happens. */
50+
dictDelete(ht,dictGetKey(de));
51+
ht->iterators = saved_iterators;
52+
}
53+
54+
/* Free some object from the lazy free list. */
55+
#define LAZYFREE_ITER_PER_STEP 100
56+
size_t lazyfreeFastStep(void) {
57+
size_t maxiter = LAZYFREE_ITER_PER_STEP;
58+
size_t workdone = 0;
59+
robj *current = NULL;
60+
61+
while(maxiter--) {
62+
if (current == NULL) {
63+
listNode *ln = listFirst(server.lazyfree_obj);
64+
if (ln == NULL) break; /* Nothing more to free. */
65+
current = ln->value;
66+
}
67+
if ((current->type == OBJ_SET ||
68+
current->type == OBJ_HASH) &&
69+
current->encoding == OBJ_ENCODING_HT)
70+
{
71+
dict *ht = current->ptr;
72+
size_t origsize = dictSize(ht);
73+
ht->iterators = dictScan(ht,ht->iterators,lazyfreeScanCallback,ht);
74+
workdone++; /* We are not sure how many elements we freed, even if
75+
zero, the free list is non empty so we don't return
76+
0 to the caller. */
77+
server.lazyfree_elements -= (origsize - dictSize(ht));
78+
if (dictSize(ht) == 0) {
79+
decrRefCount(current);
80+
listNode *ln = listFirst(server.lazyfree_obj);
81+
listDelNode(server.lazyfree_obj,ln);
82+
current = NULL;
83+
}
84+
} else {
85+
/* Not handled type or encoding. Do a blocking free. */
86+
size_t effort = lazyfreeGetFreeEffort(current);
87+
server.lazyfree_elements -= effort;
88+
workdone += effort;
89+
decrRefCount(current);
90+
listNode *ln = listFirst(server.lazyfree_obj);
91+
listDelNode(server.lazyfree_obj,ln);
92+
current = NULL;
93+
}
94+
}
95+
return workdone;
96+
}
97+
98+
/* Handles slow or fast collection steps. */
99+
size_t lazyfreeStep(int type) {
100+
if (type == LAZYFREE_STEP_FAST) return lazyfreeFastStep();
101+
102+
size_t totalwork = 0;
103+
mstime_t end = mstime()+2;
104+
do {
105+
size_t workdone = lazyfreeFastStep();
106+
if (workdone == 0) break;
107+
totalwork += workdone;
108+
} while(mstime() < end);
109+
return totalwork;
110+
}
111+
112+
/* Delete a key, value, and associated expiration entry if any, from the DB.
113+
* If there are enough allocations to free the value object may be put into
114+
* a lazy free list instead of being freed synchronously. The lazy free list
115+
* will be reclaimed incrementally in a non blocking way. */
116+
#define LAZYFREE_THRESHOLD 64
117+
int dbAsyncDelete(redisDb *db, robj *key) {
118+
/* Deleting an entry from the expires dict will not free the sds of
119+
* the key, because it is shared with the main dictionary. */
120+
if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
121+
122+
/* If the value is composed of a few allocations, to free in a lazy way
123+
* is actually just slower... So under a certain limit we just free
124+
* the object synchronously. */
125+
dictEntry *de = dictFind(db->dict,key->ptr);
126+
if (de) {
127+
robj *val = dictGetVal(de);
128+
size_t free_effort = lazyfreeGetFreeEffort(val);
129+
130+
/* If releasing the object is too much work, let's put it into the
131+
* lazy free list. */
132+
if (free_effort > LAZYFREE_THRESHOLD) {
133+
listAddNodeTail(server.lazyfree_obj,val);
134+
server.lazyfree_elements += free_effort;
135+
dictSetVal(db->dict,de,NULL);
136+
}
137+
}
138+
139+
/* Release the key-val pair, or just the key if we set the val
140+
* field to NULL in order to lazy free it later. */
141+
if (dictDelete(db->dict,key->ptr) == DICT_OK) {
142+
if (server.cluster_enabled) slotToKeyDel(key);
143+
return 1;
144+
} else {
145+
return 0;
146+
}
147+
}
148+
149+
/* This is the timer handler we use to incrementally perform collection
150+
* into the lazy free lists. We can't use serverCron since we need a
151+
* very high timer frequency when there are many objects to collect, while
152+
* we lower the frequency to just 1HZ when there is nothing to do.
153+
*
154+
* Since a slow lazy free step will take 1.5 milliseconds and we modulate
155+
* the timer frequency from 1 to 333 HZ in an adaptive way, the CPU
156+
* used is between 0% (nothing in the lazy free list) to 50%.
157+
*
158+
* The frequency is obtained as follows: if the lazy free list is empty
159+
* it is set to 1HZ. If the lazy free has elements the call period starts
160+
* at 20 (50HZ) and is decremented (up to 3 ms = 333HZ) each time the server
161+
* used memory raises between calls of this function. */
162+
int lazyfreeCron(struct aeEventLoop *eventLoop, long long id, void *clientData)
163+
{
164+
UNUSED(eventLoop);
165+
UNUSED(id);
166+
UNUSED(clientData);
167+
168+
static size_t prev_mem;
169+
static int timer_period = 1000; /* Defauls to 1HZ */
170+
static double mem_trend = 0;
171+
size_t mem = zmalloc_used_memory();
172+
173+
/* Compute the memory trend, biased towards thinking memory is raising
174+
* for a few calls every time previous and current memory raise. */
175+
if (prev_mem < mem) mem_trend = 1;
176+
mem_trend *= 0.9; /* Make it slowly forget. */
177+
int mem_is_raising = mem_trend > .1;
178+
179+
/* Free a few items. */
180+
size_t workdone = lazyfreeStep(LAZYFREE_STEP_SLOW);
181+
182+
/* Adjust this timer call frequency according to the current state. */
183+
if (workdone) {
184+
if (timer_period == 1000) timer_period = 20;
185+
if (mem_is_raising && timer_period > 3)
186+
timer_period--; /* Raise call frequency. */
187+
else if (!mem_is_raising && timer_period < 20)
188+
timer_period++; /* Lower call frequency. */
189+
} else {
190+
timer_period = 1000; /* 1 HZ */
191+
}
192+
prev_mem = mem;
193+
#if 0
194+
printf("%llu (%d hz) %s (%f)\n",
195+
(unsigned long long)server.lazyfree_elements,
196+
1000/timer_period,
197+
mem_is_raising ? "RAISING" : "lowering",
198+
mem_trend);
199+
#endif
200+
return timer_period;
201+
}

src/object.c

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,23 @@ robj *createObject(int type, void *ptr) {
4848
return o;
4949
}
5050

51+
/* Set a special refcount in the object to make it "shared":
52+
* incrRefCount and decrRefCount() will test for this special refcount
53+
* and will not touch the object. This way it is free to access shared
54+
* objects such as small integers from different threads without any
55+
* mutex.
56+
*
57+
* A common patter to create shared objects:
58+
*
59+
* robj *myobject = makeObjectShared(createObject(...));
60+
*
61+
*/
62+
robj *makeObjectShared(robj *o) {
63+
serverAssert(o->refcount == 1);
64+
o->refcount = OBJ_SHARED_REFCOUNT;
65+
return o;
66+
}
67+
5168
/* Create a string object with encoding OBJ_ENCODING_RAW, that is a plain
5269
* string object where o->ptr points to a proper sds string. */
5370
robj *createRawStringObject(const char *ptr, size_t len) {
@@ -295,11 +312,10 @@ void freeHashObject(robj *o) {
295312
}
296313

297314
void incrRefCount(robj *o) {
298-
o->refcount++;
315+
if (o->refcount != OBJ_SHARED_REFCOUNT) o->refcount++;
299316
}
300317

301318
void decrRefCount(robj *o) {
302-
if (o->refcount <= 0) serverPanic("decrRefCount against refcount <= 0");
303319
if (o->refcount == 1) {
304320
switch(o->type) {
305321
case OBJ_STRING: freeStringObject(o); break;
@@ -311,7 +327,8 @@ void decrRefCount(robj *o) {
311327
}
312328
zfree(o);
313329
} else {
314-
o->refcount--;
330+
if (o->refcount <= 0) serverPanic("decrRefCount against refcount <= 0");
331+
if (o->refcount != OBJ_SHARED_REFCOUNT) o->refcount--;
315332
}
316333
}
317334

0 commit comments

Comments
 (0)