Skip to content

Commit c69c6c8

Browse files
committed
Lazyfree: ability to free whole DBs in background.
1 parent b08c36c commit c69c6c8

File tree

7 files changed

+106
-14
lines changed

7 files changed

+106
-14
lines changed

src/bio.c

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,8 @@ struct bio_job {
8585

8686
void *bioProcessBackgroundJobs(void *arg);
8787
void lazyfreeFreeObjectFromBioThread(robj *o);
88+
void lazyfreeFreeDatabaseFromBioThread(dict *ht1, dict *ht2);
89+
void lazyfreeFreeSlotsMapFromBioThread(zskiplist *sl);
8890

8991
/* Make sure we have enough stack to perform all the things we do in the
9092
* main thread. */
@@ -187,7 +189,16 @@ void *bioProcessBackgroundJobs(void *arg) {
187189
} else if (type == BIO_AOF_FSYNC) {
188190
aof_fsync((long)job->arg1);
189191
} else if (type == BIO_LAZY_FREE) {
190-
lazyfreeFreeObjectFromBioThread(job->arg1);
192+
/* What we free changes depending on what arguments are set:
193+
* arg1 -> free the object at pointer.
194+
* arg2 & arg3 -> free two dictionaries (a Redis DB).
195+
* only arg3 -> free the skiplist. */
196+
if (job->arg1)
197+
lazyfreeFreeObjectFromBioThread(job->arg1);
198+
else if (job->arg2 && job->arg3)
199+
lazyfreeFreeDatabaseFromBioThread(job->arg2,job->arg3);
200+
else if (job->arg3)
201+
lazyfreeFreeSlotsMapFromBioThread(job->arg3);
191202
} else {
192203
serverPanic("Wrong job type in bioProcessBackgroundJobs().");
193204
}

src/cluster.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -495,7 +495,7 @@ void clusterReset(int hard) {
495495
if (nodeIsSlave(myself)) {
496496
clusterSetNodeAsMaster(myself);
497497
replicationUnsetMaster();
498-
emptyDb(NULL);
498+
emptyDb(-1,EMPTYDB_NO_FLAGS,NULL);
499499
}
500500

501501
/* Close slots, reset manual failover state. */

src/db.c

Lines changed: 37 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929

3030
#include "server.h"
3131
#include "cluster.h"
32+
#include "atomicvar.h"
3233

3334
#include <signal.h>
3435
#include <ctype.h>
@@ -238,16 +239,46 @@ robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o) {
238239
return o;
239240
}
240241

241-
long long emptyDb(void(callback)(void*)) {
242-
int j;
242+
/* Remove all keys from all the databases in a Redis server.
243+
* If callback is given the function is called from time to time to
244+
* signal that work is in progress.
245+
*
246+
* The dbnum can be -1 if all teh DBs should be flushed, or the specified
247+
* DB number if we want to flush only a single Redis database number.
248+
*
249+
* Flags are be EMPTYDB_NO_FLAGS if no special flags are specified or
250+
* EMPTYDB_ASYCN if we want the memory to be freed in a different thread
251+
* and the function to return ASAP.
252+
*
253+
* On success the fuction returns the number of keys removed from the
254+
* database(s). Otherwise -1 is returned in the specific case the
255+
* DB number is out of range, and errno is set to EINVAL. */
256+
long long emptyDb(int dbnum, int flags, void(callback)(void*)) {
257+
int j, async = (flags & EMPTYDB_ASYNC);
243258
long long removed = 0;
244259

260+
if (dbnum < -1 || dbnum >= server.dbnum) {
261+
errno = EINVAL;
262+
return -1;
263+
}
264+
245265
for (j = 0; j < server.dbnum; j++) {
266+
if (dbnum != 1 && dbnum != j) continue;
246267
removed += dictSize(server.db[j].dict);
247-
dictEmpty(server.db[j].dict,callback);
248-
dictEmpty(server.db[j].expires,callback);
268+
if (async) {
269+
emptyDbAsync(&server.db[j]);
270+
} else {
271+
dictEmpty(server.db[j].dict,callback);
272+
dictEmpty(server.db[j].expires,callback);
273+
}
274+
}
275+
if (server.cluster_enabled) {
276+
if (async) {
277+
slotToKeyFlushAsync();
278+
} else {
279+
slotToKeyFlush();
280+
}
249281
}
250-
if (server.cluster_enabled) slotToKeyFlush();
251282
return removed;
252283
}
253284

@@ -290,7 +321,7 @@ void flushdbCommand(client *c) {
290321

291322
void flushallCommand(client *c) {
292323
signalFlushedDb(-1);
293-
server.dirty += emptyDb(NULL);
324+
server.dirty += emptyDb(-1,EMPTYDB_NO_FLAGS,NULL);
294325
addReply(c,shared.ok);
295326
if (server.rdb_child_pid != -1) {
296327
kill(server.rdb_child_pid,SIGUSR1);

src/debug.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -271,15 +271,15 @@ void debugCommand(client *c) {
271271
addReply(c,shared.err);
272272
return;
273273
}
274-
emptyDb(NULL);
274+
emptyDb(-1,EMPTYDB_NO_FLAGS,NULL);
275275
if (rdbLoad(server.rdb_filename) != C_OK) {
276276
addReplyError(c,"Error trying to load the RDB dump");
277277
return;
278278
}
279279
serverLog(LL_WARNING,"DB reloaded by DEBUG RELOAD");
280280
addReply(c,shared.ok);
281281
} else if (!strcasecmp(c->argv[1]->ptr,"loadaof")) {
282-
emptyDb(NULL);
282+
emptyDb(-1,EMPTYDB_NO_FLAGS,NULL);
283283
if (loadAppendOnlyFile(server.aof_filename) != C_OK) {
284284
addReply(c,shared.err);
285285
return;

src/lazyfree.c

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#include "server.h"
22
#include "bio.h"
33
#include "atomicvar.h"
4+
#include "cluster.h"
45

56
static size_t lazyfree_objects = 0;
67
pthread_mutex_t lazyfree_objects_mutex = PTHREAD_MUTEX_INITIALIZER;
@@ -75,9 +76,51 @@ int dbAsyncDelete(redisDb *db, robj *key) {
7576
}
7677
}
7778

78-
/* Implementation of function to release a single object called from the
79-
* lazyfree thread from bio.c. */
79+
/* Empty a Redis DB asynchronously. What the function does actually is to
80+
* create a new empty set of hash tables and scheduling the old ones for
81+
* lazy freeing. */
82+
void emptyDbAsync(redisDb *db) {
83+
dict *oldht1 = db->dict, *oldht2 = db->expires;
84+
db->dict = dictCreate(&dbDictType,NULL);
85+
db->expires = dictCreate(&keyptrDictType,NULL);
86+
atomicIncr(lazyfree_objects,dictSize(oldht1),
87+
&lazyfree_objects_mutex);
88+
bioCreateBackgroundJob(BIO_LAZY_FREE,NULL,oldht1,oldht2);
89+
}
90+
91+
/* Empty the slots-keys map of Redis CLuster by creating a new empty one
92+
* and scheduiling the old for lazy freeing. */
93+
void slotToKeyFlushAsync(void) {
94+
zskiplist *oldsl = server.cluster->slots_to_keys;
95+
server.cluster->slots_to_keys = zslCreate();
96+
atomicIncr(lazyfree_objects,oldsl->length,
97+
&lazyfree_objects_mutex);
98+
bioCreateBackgroundJob(BIO_LAZY_FREE,NULL,NULL,oldsl);
99+
}
100+
101+
/* Release objects from the lazyfree thread. It's just decrRefCount()
102+
* updating the count of objects to release. */
80103
void lazyfreeFreeObjectFromBioThread(robj *o) {
81104
decrRefCount(o);
82105
atomicDecr(lazyfree_objects,1,&lazyfree_objects_mutex);
83106
}
107+
108+
/* Release a database from the lazyfree thread. The 'db' pointer is the
109+
* database which was substitutied with a fresh one in the main thread
110+
* when the database was logically deleted. 'sl' is a skiplist used by
111+
* Redis Cluster in order to take the hash slots -> keys mapping. This
112+
* may be NULL if Redis Cluster is disabled. */
113+
void lazyfreeFreeDatabaseFromBioThread(dict *ht1, dict *ht2) {
114+
size_t numkeys = dictSize(ht1);
115+
dictRelease(ht1);
116+
dictRelease(ht2);
117+
atomicDecr(lazyfree_objects,numkeys,&lazyfree_objects_mutex);
118+
}
119+
120+
/* Release the skiplist mapping Redis Cluster keys to slots in the
121+
* lazyfree thread. */
122+
void lazyfreeFreeSlotsMapFromBioThread(zskiplist *sl) {
123+
size_t len = sl->length;
124+
zslFree(sl);
125+
atomicDecr(lazyfree_objects,len,&lazyfree_objects_mutex);
126+
}

src/replication.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1111,7 +1111,7 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
11111111
}
11121112
serverLog(LL_NOTICE, "MASTER <-> SLAVE sync: Flushing old data");
11131113
signalFlushedDb(-1);
1114-
emptyDb(replicationEmptyDbCallback);
1114+
emptyDb(-1,EMPTYDB_NO_FLAGS,replicationEmptyDbCallback);
11151115
/* Before loading the DB into memory we need to delete the readable
11161116
* handler, otherwise it will get called recursively since
11171117
* rdbLoad() will call the event loop to process events from time to

src/server.h

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1050,6 +1050,7 @@ extern dictType shaScriptObjectDictType;
10501050
extern double R_Zero, R_PosInf, R_NegInf, R_Nan;
10511051
extern dictType hashDictType;
10521052
extern dictType replScriptCacheDictType;
1053+
extern dictType keyptrDictType;
10531054

10541055
/*-----------------------------------------------------------------------------
10551056
* Functions prototypes
@@ -1384,7 +1385,11 @@ robj *dbRandomKey(redisDb *db);
13841385
int dbSyncDelete(redisDb *db, robj *key);
13851386
int dbDelete(redisDb *db, robj *key);
13861387
robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o);
1387-
long long emptyDb(void(callback)(void*));
1388+
1389+
#define EMPTYDB_NO_FLAGS 0 /* No flags. */
1390+
#define EMPTYDB_ASYNC (1<<0) /* Reclaim memory in another thread. */
1391+
long long emptyDb(int dbnum, int flags, void(callback)(void*));
1392+
13881393
int selectDb(client *c, int id);
13891394
void signalModifiedKey(redisDb *db, robj *key);
13901395
void signalFlushedDb(int dbid);
@@ -1407,6 +1412,8 @@ void slotToKeyFlush(void);
14071412
#define LAZYFREE_STEP_OOM 2 /* Free a few elements at any cost if there
14081413
is something to free: we are out of memory */
14091414
int dbAsyncDelete(redisDb *db, robj *key);
1415+
void emptyDbAsync(redisDb *db);
1416+
void slotToKeyFlushAsync(void);
14101417

14111418
/* API to get key arguments from commands */
14121419
int *getKeysFromCommand(struct redisCommand *cmd, robj **argv, int argc, int *numkeys);

0 commit comments

Comments
 (0)