Skip to content

Commit 6af7aa0

Browse files
committed
Pause all threads while swapping hash table.
We used to hold a global lock around all modifications to the hash table. Then it was switched to wrapping hash table accesses in a global lock during hash table expansion, set by notifying each worker thread to change lock styles. There was a bug here which causes trylocks to clobber, due to the specific item locks not being held during the global lock: https://code.google.com/p/memcached/issues/detail?id=370 The patch previous to this one uses item locks during hash table expansion. Since the item lock table is always smaller than the hash table, an item lock will always cover both its new and old buckets. However, we still need to pause all threads during the pointer swap and setup. This patch pauses all background threads and worker threads, swaps the hash table, then unpauses them. This trades the (possibly significant) slowdown during the hash table copy, with a short total hang at the beginning of each expansion. As previously; those worried about consistent performance can presize the hash table with `-o hashpower=n`
1 parent d2676b4 commit 6af7aa0

File tree

5 files changed

+73
-71
lines changed

5 files changed

+73
-71
lines changed

assoc.c

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
#include <pthread.h>
2727

2828
static pthread_cond_t maintenance_cond = PTHREAD_COND_INITIALIZER;
29-
29+
static pthread_mutex_t maintenance_lock = PTHREAD_MUTEX_INITIALIZER;
3030

3131
typedef unsigned long int ub4; /* unsigned 4-byte quantities */
3232
typedef unsigned char ub1; /* unsigned 1-byte quantities */
@@ -147,11 +147,6 @@ static void assoc_start_expand(void) {
147147
if (started_expanding)
148148
return;
149149

150-
/*With this condition, we can expanding holding only one item lock,
151-
* and it should always be false*/
152-
if (item_lock_hashpower >= hashpower)
153-
return;
154-
155150
started_expanding = true;
156151
pthread_cond_signal(&maintenance_cond);
157152
}
@@ -209,11 +204,11 @@ int hash_bulk_move = DEFAULT_HASH_BULK_MOVE;
209204

210205
static void *assoc_maintenance_thread(void *arg) {
211206

207+
mutex_lock(&maintenance_lock);
212208
while (do_run_maintenance_thread) {
213209
int ii = 0;
214210

215-
/* As there is only one thread process expanding, and we hold the item
216-
* lock, it seems not necessary to hold the cache_lock . */
211+
/* There is only one expansion thread, so no need to global lock. */
217212
for (ii = 0; ii < hash_bulk_move && expanding; ++ii) {
218213
item *it, *next;
219214
int bucket;
@@ -223,7 +218,6 @@ static void *assoc_maintenance_thread(void *arg) {
223218
* is the lowest N bits of the hv, and the bucket of item_locks is
224219
* also the lowest M bits of hv, and N is greater than M.
225220
* So we can process expanding with only one item_lock. cool! */
226-
/*Get item lock for the slot in old hashtable*/
227221
if ((item_lock = item_trylock(expand_bucket))) {
228222
for (it = old_hashtable[expand_bucket]; NULL != it; it = next) {
229223
next = it->h_next;
@@ -247,9 +241,7 @@ static void *assoc_maintenance_thread(void *arg) {
247241
}
248242

249243
} else {
250-
/*wait for 100ms. since only one expanding thread, it's not
251-
* necessary to sleep a random value*/
252-
usleep(100*1000);
244+
usleep(10*1000);
253245
}
254246

255247
if (item_lock) {
@@ -260,11 +252,18 @@ static void *assoc_maintenance_thread(void *arg) {
260252

261253
if (!expanding) {
262254
/* We are done expanding.. just wait for next invocation */
263-
mutex_lock(&cache_lock);
264255
started_expanding = false;
265-
pthread_cond_wait(&maintenance_cond, &cache_lock);
256+
pthread_cond_wait(&maintenance_cond, &maintenance_lock);
257+
/* assoc_expand() swaps out the hash table entirely, so we need
258+
* all threads to not hold any references related to the hash
259+
* table while this happens.
260+
* This is instead of a more complex, possibly slower algorithm to
261+
* allow dynamic hash table expansion without causing significant
262+
* wait times.
263+
*/
264+
pause_threads(PAUSE_ALL_THREADS);
266265
assoc_expand();
267-
mutex_unlock(&cache_lock);
266+
pause_threads(RESUME_ALL_THREADS);
268267
}
269268
}
270269
return NULL;
@@ -281,6 +280,7 @@ int start_assoc_maintenance_thread() {
281280
hash_bulk_move = DEFAULT_HASH_BULK_MOVE;
282281
}
283282
}
283+
pthread_mutex_init(&maintenance_lock, NULL);
284284
if ((ret = pthread_create(&maintenance_tid, NULL,
285285
assoc_maintenance_thread, NULL)) != 0) {
286286
fprintf(stderr, "Can't create thread: %s\n", strerror(ret));
@@ -290,13 +290,12 @@ int start_assoc_maintenance_thread() {
290290
}
291291

292292
void stop_assoc_maintenance_thread() {
293-
mutex_lock(&cache_lock);
293+
mutex_lock(&maintenance_lock);
294294
do_run_maintenance_thread = 0;
295295
pthread_cond_signal(&maintenance_cond);
296-
mutex_unlock(&cache_lock);
296+
mutex_unlock(&maintenance_lock);
297297

298298
/* Wait for the maintenance thread to stop */
299299
pthread_join(maintenance_tid, NULL);
300300
}
301301

302-

items.c

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -958,6 +958,15 @@ enum crawler_result_type lru_crawler_crawl(char *slabs) {
958958
return CRAWLER_OK;
959959
}
960960

961+
/* If we hold this lock, crawler can't wake up or move */
962+
void lru_crawler_pause(void) {
963+
pthread_mutex_lock(&lru_crawler_lock);
964+
}
965+
966+
void lru_crawler_resume(void) {
967+
pthread_mutex_unlock(&lru_crawler_lock);
968+
}
969+
961970
int init_lru_crawler(void) {
962971
if (lru_crawler_initialized == 0) {
963972
if (pthread_cond_init(&lru_crawler_cond, NULL) != 0) {

items.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,3 +36,5 @@ int start_item_crawler_thread(void);
3636
int stop_item_crawler_thread(void);
3737
int init_lru_crawler(void);
3838
enum crawler_result_type lru_crawler_crawl(char *slabs);
39+
void lru_crawler_pause(void);
40+
void lru_crawler_resume(void);

memcached.h

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -188,9 +188,11 @@ enum network_transport {
188188
udp_transport
189189
};
190190

191-
enum item_lock_types {
192-
ITEM_LOCK_GRANULAR = 0,
193-
ITEM_LOCK_GLOBAL
191+
enum pause_thread_types {
192+
PAUSE_WORKER_THREADS = 0,
193+
PAUSE_ALL_THREADS,
194+
RESUME_ALL_THREADS,
195+
RESUME_WORKER_THREADS
194196
};
195197

196198
#define IS_UDP(x) (x == udp_transport)
@@ -389,7 +391,6 @@ typedef struct {
389391
struct thread_stats stats; /* Stats generated by this thread */
390392
struct conn_queue *new_conn_queue; /* queue of new connections to handle */
391393
cache_t *suffix_cache; /* suffix cache */
392-
uint8_t item_lock_type; /* use fine-grained or global item lock */
393394
} LIBEVENT_THREAD;
394395

395396
typedef struct {
@@ -574,13 +575,11 @@ void item_stats_sizes(ADD_STAT add_stats, void *c);
574575
void item_unlink(item *it);
575576
void item_update(item *it);
576577

577-
void item_lock_global(void);
578-
void item_unlock_global(void);
579578
void item_lock(uint32_t hv);
580579
void *item_trylock(uint32_t hv);
581580
void item_trylock_unlock(void *arg);
582581
void item_unlock(uint32_t hv);
583-
void switch_item_lock_type(enum item_lock_types type);
582+
void pause_threads(enum pause_thread_types type);
584583
unsigned short refcount_incr(unsigned short *refcount);
585584
unsigned short refcount_decr(unsigned short *refcount);
586585
void STATS_LOCK(void);

thread.c

Lines changed: 39 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,9 @@ pthread_mutex_t atomics_mutex = PTHREAD_MUTEX_INITIALIZER;
4949
/* Lock for global stats */
5050
static pthread_mutex_t stats_lock;
5151

52+
/* Lock to cause worker threads to hang up after being woken */
53+
static pthread_mutex_t worker_hang_lock;
54+
5255
/* Free list of CQ_ITEM structs */
5356
static CQ_ITEM *cqi_freelist;
5457
static pthread_mutex_t cqi_freelist_lock;
@@ -59,10 +62,6 @@ static uint32_t item_lock_count;
5962
unsigned int item_lock_hashpower;
6063
#define hashsize(n) ((unsigned long int)1<<(n))
6164
#define hashmask(n) (hashsize(n)-1)
62-
/* this lock is temporarily engaged during a hash table expansion */
63-
static pthread_mutex_t item_global_lock;
64-
/* thread-specific variable for deeply finding the item lock type */
65-
static pthread_key_t item_lock_type_key;
6665

6766
static LIBEVENT_DISPATCHER_THREAD dispatcher_thread;
6867

@@ -112,22 +111,8 @@ unsigned short refcount_decr(unsigned short *refcount) {
112111
#endif
113112
}
114113

115-
/* Convenience functions for calling *only* when in ITEM_LOCK_GLOBAL mode */
116-
void item_lock_global(void) {
117-
mutex_lock(&item_global_lock);
118-
}
119-
120-
void item_unlock_global(void) {
121-
mutex_unlock(&item_global_lock);
122-
}
123-
124114
void item_lock(uint32_t hv) {
125-
uint8_t *lock_type = pthread_getspecific(item_lock_type_key);
126-
if (likely(*lock_type == ITEM_LOCK_GRANULAR)) {
127-
mutex_lock(&item_locks[hv & hashmask(item_lock_hashpower)]);
128-
} else {
129-
mutex_lock(&item_global_lock);
130-
}
115+
mutex_lock(&item_locks[hv & hashmask(item_lock_hashpower)]);
131116
}
132117

133118
/* Special case. When ITEM_LOCK_GLOBAL mode is enabled, this should become a
@@ -150,12 +135,7 @@ void item_trylock_unlock(void *lock) {
150135
}
151136

152137
void item_unlock(uint32_t hv) {
153-
uint8_t *lock_type = pthread_getspecific(item_lock_type_key);
154-
if (likely(*lock_type == ITEM_LOCK_GRANULAR)) {
155-
mutex_unlock(&item_locks[hv & hashmask(item_lock_hashpower)]);
156-
} else {
157-
mutex_unlock(&item_global_lock);
158-
}
138+
mutex_unlock(&item_locks[hv & hashmask(item_lock_hashpower)]);
159139
}
160140

161141
static void wait_for_thread_registration(int nthreads) {
@@ -169,25 +149,44 @@ static void register_thread_initialized(void) {
169149
init_count++;
170150
pthread_cond_signal(&init_cond);
171151
pthread_mutex_unlock(&init_lock);
152+
/* Force worker threads to pile up if someone wants us to */
153+
pthread_mutex_lock(&worker_hang_lock);
154+
pthread_mutex_unlock(&worker_hang_lock);
172155
}
173156

174-
void switch_item_lock_type(enum item_lock_types type) {
157+
/* Must not be called with any deeper locks held:
158+
* item locks, cache_lock, stats_lock, etc
159+
*/
160+
void pause_threads(enum pause_thread_types type) {
175161
char buf[1];
176162
int i;
177163

164+
buf[0] = 0;
178165
switch (type) {
179-
case ITEM_LOCK_GRANULAR:
180-
buf[0] = 'l';
166+
case PAUSE_ALL_THREADS:
167+
slabs_rebalancer_pause();
168+
lru_crawler_pause();
169+
case PAUSE_WORKER_THREADS:
170+
buf[0] = 'p';
171+
pthread_mutex_lock(&worker_hang_lock);
181172
break;
182-
case ITEM_LOCK_GLOBAL:
183-
buf[0] = 'g';
173+
case RESUME_ALL_THREADS:
174+
slabs_rebalancer_resume();
175+
lru_crawler_resume();
176+
case RESUME_WORKER_THREADS:
177+
pthread_mutex_unlock(&worker_hang_lock);
184178
break;
185179
default:
186180
fprintf(stderr, "Unknown lock type: %d\n", type);
187181
assert(1 == 0);
188182
break;
189183
}
190184

185+
/* Only send a message if we have one. */
186+
if (buf[0] == 0) {
187+
return;
188+
}
189+
191190
pthread_mutex_lock(&init_lock);
192191
init_count = 0;
193192
for (i = 0; i < settings.num_threads; i++) {
@@ -374,13 +373,6 @@ static void *worker_libevent(void *arg) {
374373
* all threads have finished initializing.
375374
*/
376375

377-
/* set an indexable thread-specific memory item for the lock type.
378-
* this could be unnecessary if we pass the conn *c struct through
379-
* all item_lock calls...
380-
*/
381-
me->item_lock_type = ITEM_LOCK_GRANULAR;
382-
pthread_setspecific(item_lock_type_key, &me->item_lock_type);
383-
384376
register_thread_initialized();
385377

386378
event_base_loop(me->base, 0);
@@ -425,13 +417,8 @@ static void thread_libevent_process(int fd, short which, void *arg) {
425417
cqi_free(item);
426418
}
427419
break;
428-
/* we were told to flip the lock type and report in */
429-
case 'l':
430-
me->item_lock_type = ITEM_LOCK_GRANULAR;
431-
register_thread_initialized();
432-
break;
433-
case 'g':
434-
me->item_lock_type = ITEM_LOCK_GLOBAL;
420+
/* we were told to pause and report in */
421+
case 'p':
435422
register_thread_initialized();
436423
break;
437424
}
@@ -784,6 +771,7 @@ void memcached_thread_init(int nthreads, struct event_base *main_base) {
784771

785772
pthread_mutex_init(&cache_lock, NULL);
786773
pthread_mutex_init(&stats_lock, NULL);
774+
pthread_mutex_init(&worker_hang_lock, NULL);
787775

788776
pthread_mutex_init(&init_lock, NULL);
789777
pthread_cond_init(&init_cond, NULL);
@@ -803,6 +791,13 @@ void memcached_thread_init(int nthreads, struct event_base *main_base) {
803791
power = 13;
804792
}
805793

794+
if (power >= hashpower) {
795+
fprintf(stderr, "Hash table power size (%d) cannot be equal to or less than item lock table (%d)\n", hashpower, power);
796+
fprintf(stderr, "Item lock table grows with `-t N` (worker threadcount)\n");
797+
fprintf(stderr, "Hash table grows with `-o hashpower=N` \n");
798+
exit(1);
799+
}
800+
806801
item_lock_count = hashsize(power);
807802
item_lock_hashpower = power;
808803

@@ -814,8 +809,6 @@ void memcached_thread_init(int nthreads, struct event_base *main_base) {
814809
for (i = 0; i < item_lock_count; i++) {
815810
pthread_mutex_init(&item_locks[i], NULL);
816811
}
817-
pthread_key_create(&item_lock_type_key, NULL);
818-
pthread_mutex_init(&item_global_lock, NULL);
819812

820813
threads = calloc(nthreads, sizeof(LIBEVENT_THREAD));
821814
if (! threads) {

0 commit comments

Comments
 (0)