@@ -150,7 +150,7 @@ RedisArray *ra_load_array(const char *name TSRMLS_DC) {
150150
151151	zval  * z_params_hosts , * * z_hosts ;
152152	zval  * z_params_prev , * * z_prev ;
153- 	zval  * z_params_funs , * * z_data_pp , * z_fun  =  NULL ;
153+ 	zval  * z_params_funs , * * z_data_pp , * z_fun  =  NULL ,  * z_dist   =   NULL ;
154154	zval  * z_params_index ;
155155	zval  * z_params_autorehash ;
156156	RedisArray  * ra  =  NULL ;
@@ -188,6 +188,16 @@ RedisArray *ra_load_array(const char *name TSRMLS_DC) {
188188		zval_copy_ctor (z_fun );
189189	}
190190
191+ 	/* find distributor */ 
192+ 	MAKE_STD_ZVAL (z_params_funs );
193+ 	array_init (z_params_funs );
194+ 	sapi_module .treat_data (PARSE_STRING , estrdup (INI_STR ("redis.arrays.distributor" )), z_params_funs  TSRMLS_CC );
195+ 	if  (zend_hash_find (Z_ARRVAL_P (z_params_funs ), name , strlen (name ) +  1 , (void  * * ) & z_data_pp ) !=  FAILURE ) {
196+ 		MAKE_STD_ZVAL (z_dist );
197+ 		* z_dist  =  * * z_data_pp ;
198+ 		zval_copy_ctor (z_dist );
199+ 	}
200+ 
191201	/* find index option */ 
192202	MAKE_STD_ZVAL (z_params_index );
193203	array_init (z_params_index );
@@ -209,7 +219,7 @@ RedisArray *ra_load_array(const char *name TSRMLS_DC) {
209219	}
210220
211221	/* create RedisArray object */ 
212- 	ra  =  ra_make_array (hHosts , z_fun , hPrev , b_index  TSRMLS_CC );
222+ 	ra  =  ra_make_array (hHosts , z_fun , z_dist ,  hPrev , b_index  TSRMLS_CC );
213223	ra -> auto_rehash  =  b_autorehash ;
214224
215225	/* cleanup */ 
@@ -228,7 +238,7 @@ RedisArray *ra_load_array(const char *name TSRMLS_DC) {
228238}
229239
230240RedisArray  * 
231- ra_make_array (HashTable  * hosts , zval  * z_fun , HashTable  * hosts_prev , zend_bool  b_index  TSRMLS_DC ) {
241+ ra_make_array (HashTable  * hosts , zval  * z_fun , zval   * z_dist ,  HashTable  * hosts_prev , zend_bool  b_index  TSRMLS_DC ) {
232242
233243	int  count  =  zend_hash_num_elements (hosts );
234244
@@ -238,6 +248,7 @@ ra_make_array(HashTable *hosts, zval *z_fun, HashTable *hosts_prev, zend_bool b_
238248	ra -> redis  =  emalloc (count  *  sizeof (zval * ));
239249	ra -> count  =  count ;
240250	ra -> z_fun  =  NULL ;
251+ 	ra -> z_dist  =  NULL ;
241252	ra -> z_multi_exec  =  NULL ;
242253	ra -> index  =  b_index ;
243254	ra -> auto_rehash  =  0 ;
@@ -248,7 +259,7 @@ ra_make_array(HashTable *hosts, zval *z_fun, HashTable *hosts_prev, zend_bool b_
248259	if (NULL  ==  ra_load_hosts (ra , hosts  TSRMLS_CC )) {
249260		return  NULL ;
250261	}
251- 	ra -> prev  =  hosts_prev  ? ra_make_array (hosts_prev , z_fun , NULL , b_index  TSRMLS_CC ) : NULL ;
262+ 	ra -> prev  =  hosts_prev  ? ra_make_array (hosts_prev , z_fun , z_dist ,  NULL , b_index  TSRMLS_CC ) : NULL ;
252263
253264	/* copy function if provided */ 
254265	if (z_fun ) {
@@ -257,6 +268,13 @@ ra_make_array(HashTable *hosts, zval *z_fun, HashTable *hosts_prev, zend_bool b_
257268		zval_copy_ctor (ra -> z_fun );
258269	}
259270
271+ 	/* copy distributor if provided */ 
272+ 	if (z_dist ) {
273+ 		MAKE_STD_ZVAL (ra -> z_dist );
274+ 		* ra -> z_dist  =  * z_dist ;
275+ 		zval_copy_ctor (ra -> z_dist );
276+ 	}
277+ 
260278	return  ra ;
261279}
262280
@@ -322,6 +340,37 @@ ra_extract_key(RedisArray *ra, const char *key, int key_len, int *out_len TSRMLS
322340	return  out ;
323341}
324342
343+ /* call userland key distributor function */ 
344+ zend_bool 
345+ ra_call_distributor (RedisArray  * ra , const  char  * key , int  key_len , int  * pos  TSRMLS_DC ) {
346+ 
347+ 	char  * error  =  NULL ;
348+ 	zval  z_ret ;
349+ 	zval  * z_argv0 ;
350+ 
351+ 	/* check that we can call the extractor function */ 
352+ 	if (!zend_is_callable_ex (ra -> z_dist , NULL , 0 , NULL , NULL , NULL , & error  TSRMLS_CC )) {
353+ 		php_error_docref (NULL  TSRMLS_CC , E_ERROR , "Could not call distributor function" );
354+ 		return  0 ;
355+ 	}
356+ 	//convert_to_string(ra->z_fun); 
357+ 
358+ 	/* call extraction function */ 
359+ 	MAKE_STD_ZVAL (z_argv0 );
360+ 	ZVAL_STRINGL (z_argv0 , key , key_len , 0 );
361+ 	call_user_function (EG (function_table ), NULL , ra -> z_dist , & z_ret , 1 , & z_argv0  TSRMLS_CC );
362+ 	efree (z_argv0 );
363+ 
364+ 	if (Z_TYPE (z_ret ) !=  IS_LONG ) {
365+ 		zval_dtor (& z_ret );
366+ 		return  0 ;
367+ 	}
368+ 
369+ 	* pos  =  Z_LVAL (z_ret );
370+ 	zval_dtor (& z_ret );
371+ 	return  1 ;
372+ }
373+ 
325374zval  * 
326375ra_find_node (RedisArray  * ra , const  char  * key , int  key_len , int  * out_pos  TSRMLS_DC ) {
327376
@@ -334,12 +383,19 @@ ra_find_node(RedisArray *ra, const char *key, int key_len, int *out_pos TSRMLS_D
334383	if (!out )
335384		return  NULL ;
336385
337- 	/* hash */ 
338- 	hash  =  crc32 (out , out_len );
339- 	efree (out );
340- 
341- 	/* get position on ring */ 
342- 	pos  =  (int )((((uint64_t )hash ) *  ra -> count ) / 0xffffffff );
386+ 	if (ra -> z_dist ) {
387+ 		if  (!ra_call_distributor (ra , key , key_len , & pos  TSRMLS_CC )) {
388+ 			return  NULL ;
389+ 		}
390+ 	}
391+ 	else  {
392+ 		/* hash */ 
393+ 		hash  =  rcrc32 (out , out_len );
394+ 		efree (out );
395+ 	
396+ 		/* get position on ring */ 
397+ 		pos  =  (int )((((uint64_t )hash ) *  ra -> count ) / 0xffffffff );
398+ 	}
343399	if (out_pos ) * out_pos  =  pos ;
344400
345401	return  ra -> redis [pos ];
@@ -376,13 +432,17 @@ ra_find_key(RedisArray *ra, zval *z_args, const char *cmd, int *key_len) {
376432}
377433
378434void 
379- ra_index_multi (zval  * z_redis  TSRMLS_DC ) {
435+ ra_index_multi (zval  * z_redis ,  long  multi_value   TSRMLS_DC ) {
380436
381437	zval  z_fun_multi , z_ret ;
438+ 	zval  * z_args [1 ];
382439
383440	/* run MULTI */ 
384441	ZVAL_STRING (& z_fun_multi , "MULTI" , 0 );
385- 	call_user_function (& redis_ce -> function_table , & z_redis , & z_fun_multi , & z_ret , 0 , NULL  TSRMLS_CC );
442+ 	MAKE_STD_ZVAL (z_args [0 ]);
443+ 	ZVAL_LONG (z_args [0 ], multi_value );
444+ 	call_user_function (& redis_ce -> function_table , & z_redis , & z_fun_multi , & z_ret , 1 , z_args  TSRMLS_CC );
445+ 	efree (z_args [0 ]);
386446	//zval_dtor(&z_ret); 
387447}
388448
@@ -667,7 +727,7 @@ ra_del_key(const char *key, int key_len, zval *z_from TSRMLS_DC) {
667727	zval  z_fun_del , z_ret , * z_args ;
668728
669729	/* in a transaction */ 
670- 	ra_index_multi (z_from  TSRMLS_CC );
730+ 	ra_index_multi (z_from ,  MULTI  TSRMLS_CC );
671731
672732	/* run DEL on source */ 
673733	MAKE_STD_ZVAL (z_args );
@@ -692,7 +752,8 @@ ra_move_zset(const char *key, int key_len, zval *z_from, zval *z_to TSRMLS_DC) {
692752	char  * val ;
693753	int  val_len , i ;
694754	long  idx ;
695- 
755+ 	int  type ;
756+ 	
696757	/* run ZRANGE key 0 -1 WITHSCORES on source */ 
697758	ZVAL_STRINGL (& z_fun_zrange , "ZRANGE" , 6 , 0 );
698759	for (i  =  0 ; i  <  4 ; ++ i ) {
@@ -729,17 +790,24 @@ ra_move_zset(const char *key, int key_len, zval *z_from, zval *z_to TSRMLS_DC) {
729790			continue ;
730791		}
731792
732- 		zend_hash_get_current_key_ex (h_zset_vals , & val , & val_len , & idx , 0 , NULL );
733- 
734793		/* add score */ 
735794		convert_to_double (* z_score_pp );
736795		MAKE_STD_ZVAL (z_zadd_args [i ]);
737796		ZVAL_DOUBLE (z_zadd_args [i ], Z_DVAL_PP (z_score_pp ));
738797
739798		/* add value */ 
740799		MAKE_STD_ZVAL (z_zadd_args [i + 1 ]);
741- 		ZVAL_STRINGL (z_zadd_args [i + 1 ], val , val_len - 1 , 0 ); /* we have to remove 1 because it is an array key. */ 
742- 
800+ 		switch  (zend_hash_get_current_key_ex (h_zset_vals , & val , & val_len , & idx , 0 , NULL )) {
801+ 			case  HASH_KEY_IS_STRING :
802+ 				ZVAL_STRINGL (z_zadd_args [i + 1 ], val , val_len - 1 , 0 ); /* we have to remove 1 because it is an array key. */ 
803+ 				break ;
804+ 			case  HASH_KEY_IS_LONG :
805+ 				ZVAL_LONG (z_zadd_args [i + 1 ], idx );
806+ 				break ;
807+ 			default :
808+ 				return  -1 ; // Todo: log error 
809+ 				break ;
810+ 		}
743811		i  +=  2 ;
744812	}
745813
@@ -912,7 +980,7 @@ ra_move_key(const char *key, int key_len, zval *z_from, zval *z_to TSRMLS_DC) {
912980	zend_bool  success  =  0 ;
913981
914982	/* open transaction on target server */ 
915- 	ra_index_multi (z_to  TSRMLS_CC );
983+ 	ra_index_multi (z_to ,  MULTI  TSRMLS_CC );
916984
917985	switch (type ) {
918986		case  REDIS_STRING :
0 commit comments