Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
added PHT insert, delete
  • Loading branch information
subenksaha committed Oct 6, 2025
commit 82ef1383a0ccc39ccc418b2053e669649486167c
7 changes: 7 additions & 0 deletions src/api/include/pdc_client_connect.h
Original file line number Diff line number Diff line change
Expand Up @@ -1140,4 +1140,11 @@ void report_avg_server_profiling_rst();
perr_t PDC_Client_transfer_pthread_create();
perr_t PDC_Client_transfer_pthread_terminate();
perr_t PDC_Client_transfer_pthread_cnt_add(int n);

perr_t PDC_Client_create_bucket(char *prefix);
uint32_t PDC_prefix_binary_search(char *prefix, char **target_prefix);
perr_t PDC_metadata_key_add(pdcid_t obj_id, pdc_kvtag_t *kvtag, int is_cont);
perr_t PDC_metadata_key_delete(pdcid_t obj_id, pdc_kvtag_t *kvtag, int is_cont);
perr_t PDC_delete_metadata_key(pdcid_t obj_id, pdc_kvtag_t *kvtag, int is_cont);
hg_return_t metadata_create_bucket_client_rpc_cb(const struct hg_cb_info *callback_info);
#endif /* PDC_CLIENT_CONNECT_H */
286 changes: 251 additions & 35 deletions src/api/pdc_client_connect.c
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
#include "errno.h"

/* #define TANG_DEBUG 1 */
#define RING_ARC_SIZE 64
#define ROOT_BUCKET "#"

int is_client_debug_g = 0;
pdc_server_selection_t pdc_server_selection_g = PDC_SERVER_DEFAULT;
Expand Down Expand Up @@ -149,6 +149,7 @@ static hg_id_t metadata_update_register_id_g;
static hg_id_t metadata_add_tag_register_id_g;
static hg_id_t metadata_add_kvtag_register_id_g;
static hg_id_t metadata_check_prefix_register_id_g;
static hg_id_t metadata_key_add_register_id_g;
static hg_id_t metadata_del_kvtag_register_id_g;
static hg_id_t metadata_get_kvtag_register_id_g;
static hg_id_t region_lock_register_id_g;
Expand Down Expand Up @@ -1347,6 +1348,8 @@ PDC_Client_mercury_init(hg_class_t **hg_class, hg_context_t **hg_context, int po
metadata_add_tag_register_id_g = PDC_metadata_add_tag_register(*hg_class);
metadata_add_kvtag_register_id_g = PDC_metadata_add_kvtag_register(*hg_class);
metadata_check_prefix_register_id_g = PDC_metadata_check_prefix_register(*hg_class);
metadata_create_bucket_register_id_g = PDC_metadata_create_bucket_register(*hg_class);
metadata_key_add_register_id_g = PDC_metadata_key_add_register(*hg_class);
metadata_del_kvtag_register_id_g = PDC_metadata_del_kvtag_register(*hg_class);
metadata_get_kvtag_register_id_g = PDC_metadata_get_kvtag_register(*hg_class);
region_lock_register_id_g = PDC_region_lock_register(*hg_class);
Expand Down Expand Up @@ -1550,8 +1553,9 @@ PDC_Client_init()
}

if (pdc_client_mpi_rank_g == 0) {
LOG_INFO("Using [%s] as tmp dir, %d clients per server\n", pdc_client_tmp_dir_g,
LOG_INFO("Using [%s] as tmp dir112, %d clients per server\n", pdc_client_tmp_dir_g,
pdc_nclient_per_server_g);
PDC_Client_create_bucket(ROOT_BUCKET);
}

if (mercury_has_init_g) {
Expand Down Expand Up @@ -2046,6 +2050,68 @@ metadata_add_tag_rpc_cb(const struct hg_cb_info *callback_info)
FUNC_LEAVE(ret_value);
}

static hg_return_t
metadata_check_prefix_rpc_cb(const struct hg_cb_info *callback_info)
{
FUNC_ENTER(NULL);

hg_return_t ret_value;
metadata_check_prefix_out_t output;
metadata_check_prefix_out_t *args = (metadata_check_prefix_out_t *)callback_info->arg;
hg_handle_t handle = callback_info->info.forward.handle;

printf("metadata_check_prefix_rpc_cb: callback_info->arg = %p\n", callback_info->arg);
/* Get output from server*/
ret_value = HG_Get_output(handle, &output);
printf("metadata_check_prefix_rpc_cb: after HG_Get_output\n");
if (ret_value != HG_SUCCESS) {
ret_value = -1;
PGOTO_ERROR(HG_OTHER_ERROR, "Error with HG_Get_output");
}
args->found = output.found;
args->server_id = output.server_id;
args->ret = output.ret;
args->leaf = output.leaf;
printf("metadata_check_prefix_rpc_cb: output.found = %d\n", args->found);
printf("metadata_check_prefix_rpc_cb: output.leaf = %d\n", args->leaf);
printf("metadata_check_prefix_rpc_cb: output.ret = %d\n", args->ret);
printf("metadata_check_prefix_rpc_cb: output.server_id = %d\n", args->server_id);

done:
hg_atomic_decr32(&atomic_work_todo_g);
HG_Free_output(handle, &output);

FUNC_LEAVE(ret_value);
}

static hg_return_t
metadata_key_add_rpc_cb(const struct hg_cb_info *callback_info)
{
FUNC_ENTER(NULL);

hg_return_t ret_value;
metadata_key_add_out_t output;
metadata_key_add_out_t *args = (metadata_key_add_out_t *)callback_info->arg;
hg_handle_t handle = callback_info->info.forward.handle;

printf("metadata_key_add_rpc_cb: callback_info->arg = %p\n", callback_info->arg);
/* Get output from server*/
ret_value = HG_Get_output(handle, &output);
printf("metadata_key_add_rpc_cb: after HG_Get_output\n");
if (ret_value != HG_SUCCESS) {
ret_value = -1;
PGOTO_ERROR(HG_OTHER_ERROR, "Error with HG_Get_output");
}
args->ret = output.ret;
printf("metadata_check_prefix_rpc_cb: output.ret = %d\n", args->ret);

done:
hg_atomic_decr32(&atomic_work_todo_g);
HG_Free_output(handle, &output);

FUNC_LEAVE(ret_value);
}

perr_t
PDC_Client_add_tag(pdcid_t obj_id, const char *tag)
{
Expand Down Expand Up @@ -6363,6 +6429,7 @@ PDC_add_kvtag(pdcid_t obj_id, pdc_kvtag_t *kvtag, int is_cont)
}

server_id = PDC_get_server_by_obj_id(meta_id, pdc_server_num_g);
printf("Object ID %" PRIu64 " mapped to server %" PRIu32 "\n", obj_id, server_id);

// Debug statistics for counting number of messages sent to each server.
debug_server_id_count[server_id]++;
Expand Down Expand Up @@ -6409,65 +6476,149 @@ PDC_add_kvtag(pdcid_t obj_id, pdc_kvtag_t *kvtag, int is_cont)
*/

static perr_t
PDC_check_prefix(uint32_t server, char *prefix){
PDC_check_prefix(uint32_t server, char *prefix, metadata_check_prefix_out_t *out) {
FUNC_ENTER(NULL);
perr_t ret_value = SUCCEED;
hg_return_t hg_ret = 0;
hg_handle_t metadata_check_prefix_handle;
struct _pdc_client_lookup_args lookup_args;
metadata_check_prefix_in_t in;

}
if (PDC_Client_try_lookup_server(server, 0) != SUCCEED)
PGOTO_ERROR(FAIL, "Error with PDC_Client_try_lookup_server");

uint32_t
PDC_get_server_using_pht(uint32_t key_hash) {
int ring_size = pdc_server_num_g * RING_ARC_SIZE;
uint64_t mapped_hash = key_hash % ring_size;
HG_Create(send_context_g, pdc_server_info_g[server].addr, metadata_check_prefix_register_id_g,
&metadata_check_prefix_handle);

for (int i = 0; i < pdc_server_num_g; i++) {
int node_pos = (i + 1) * ceil(ring_size / (float)pdc_server_num_g);
// Fill input structure
in.prefix = prefix;
printf("Checking prefix client %s on server %d\n", prefix, server);
hg_ret = HG_Forward(metadata_check_prefix_handle, metadata_check_prefix_rpc_cb, out, &in);
if (hg_ret != HG_SUCCESS)
PGOTO_ERROR(FAIL, "Could not start HG_Forward");

if (mapped_hash <= node_pos) {
return i;
}
}
// Wait for response from server
hg_atomic_set32(&atomic_work_todo_g, 1);
PDC_Client_check_response(&send_context_g);

// Wrap around
return 0;
if (out->ret != 1)
LOG_ERROR("Check prefix NOT successful, ret_value = %d\n", out->ret);

done:
HG_Destroy(metadata_check_prefix_handle);

FUNC_LEAVE(ret_value);
}

uint32_t PDC_prefix_binary_search(char *prefix, char **target_prefix, bool is_leaf){
int target_server = -1;
uint32_t PDC_prefix_binary_search(char *prefix, char **target_prefix){
metadata_check_prefix_out_t resp;
uint64_t temp_server_id = -1;
int max = strlen(prefix) - 1;
int mid = 1;
char *slice;


printf("Searching prefix %s\n", prefix);
while (mid <= max){
slice = malloc((mid + 1) * sizeof(char));
strncpy(slice, prefix, mid);
slice[mid] = '\0';
char *prefix_bin = string_to_binary(slice);
uint32_t hash_value = prefix_hash(prefix_bin);
target_server = PDC_get_server_using_pht(hash_value);
//void *resp = PDC_check_if_prefix_exists(target_server, slice);

/** TODO: Check if the prefix bucket is redirected, make another call here */

*target_prefix = malloc((mid + 1) * sizeof(char));
strncpy(*target_prefix, slice, mid);
(*target_prefix)[mid] = '\0';
uint64_t hash_value = prefix_hash(prefix);
temp_server_id = PDC_get_server_using_pht(hash_value);
printf("Server %d is responsible for prefix %s till length %d\n", temp_server_id, slice, mid);
perr_t ret = PDC_check_prefix(temp_server_id, slice, &resp);

printf("Checking prefix %s on server %d: found=%d, leaf=%d\n", slice, temp_server_id, resp.found, resp.leaf);
//TODO: check if redirect to another server

/*** If the prefix exists and a leaf node, return the target server */

if (resp.found == 1 && resp.leaf == 1){
*target_prefix = malloc((mid + 1) * sizeof(char));
strncpy(*target_prefix, slice, mid);
(*target_prefix)[mid] = '\0';
free(slice);
return temp_server_id;
}
else if (resp.found == 1 && resp.leaf == 0){ /** If the prefix exists but not a leaf node, search longer prefix */
mid++;
}
else{ /** If the prefix does not exist, search shorter prefix */
max = mid - 1;
mid = mid / 2;
}

free(slice);
return target_server;
return temp_server_id;
}
return -1; // No valid prefix found
}

perr_t PDC_metadata_key_add(pdcid_t obj_id, pdc_kvtag_t *kvtag, int is_cont) {
FUNC_ENTER(NULL);
char *key_name;
char *target_prefix;
perr_t ret_value = SUCCEED;
hg_return_t hg_ret = 0;
hg_handle_t metadata_key_add_handle;
metadata_key_add_in_t in;
struct _pdc_client_lookup_args lookup_args;

printf("PDC_metadata_key_add: Adding key %s to metadata\n", kvtag->name);
key_name = kvtag->name;
char * key_bin = string_to_binary(key_name);
char *prefix = malloc(strlen(key_bin) + 2);
strncpy(prefix, ROOT_BUCKET, 1);
strncpy(prefix+1, key_bin, strlen(key_bin));
prefix[strlen(key_bin)+1] = '\0';
free(key_bin);

uint32_t server_id = PDC_prefix_binary_search(prefix, &target_prefix);
if (PDC_Client_try_lookup_server(server_id, 0) != SUCCEED)
PGOTO_ERROR(FAIL, "Error with PDC_Client_try_lookup_server");

HG_Create(send_context_g, pdc_server_info_g[server_id].addr, metadata_key_add_register_id_g, &metadata_key_add_handle);

in.key = strdup(kvtag->name);
in.value = kvtag->value;
in.prefix = strdup(target_prefix);
in.size = kvtag->size;

printf("PDC_metadata_key_add: Inserting key %s with prefix %s and value %s to server %d\n", in.key, in.prefix, in.value, server_id);

hg_ret = HG_Forward(metadata_key_add_handle, metadata_key_add_rpc_cb, &lookup_args, &in);
printf("HG_Forward returned %d\n", hg_ret);
if (hg_ret != HG_SUCCESS) PGOTO_ERROR(FAIL, "Could not start HG_Forward");

// Wait for response from server
hg_atomic_set32(&atomic_work_todo_g, 1);
PDC_Client_check_response(&send_context_g);

free(target_prefix);
free(prefix);
done:
HG_Destroy(metadata_key_add_handle);

FUNC_LEAVE(ret_value);
}

static perr_t
PDC_add_metadata_key(pdcid_t obj_id, pdc_kvtag_t *kvtag, int is_cont) {
char *prefix;
perr_t PDC_metadata_key_delete(pdcid_t obj_id, pdc_kvtag_t *kvtag, int is_cont) {
FUNC_ENTER(NULL);
char *key_name;
char *target_prefix;
perr_t ret_value = SUCCEED;

printf("PDC_metadata_key_delete: Adding key %s to metadata\n", kvtag->name);
key_name = kvtag->name;
prefix = string_to_binary(key_name);
char * key_bin = string_to_binary(key_name);
char *prefix = malloc(strlen(key_bin) + 2);
strncpy(prefix, ROOT_BUCKET, 1);
strncpy(prefix+1, key_bin, strlen(key_bin));
prefix[strlen(key_bin)+1] = '\0';
free(key_bin);
uint32_t server_id = PDC_prefix_binary_search(prefix, &target_prefix);
printf("PDC_metadata_key_delete: Inserting key %s with prefix %s to server %d\n", key_name, target_prefix, server_id);
free(target_prefix);
free(prefix);
FUNC_LEAVE(ret_value);
}

static hg_return_t
Expand Down Expand Up @@ -8520,6 +8671,71 @@ PDC_Client_search_obj_ref_through_dart_mpi(dart_hash_algo_t hash_algo, char *que
*out = dart_out;
FUNC_LEAVE(ret_value);
}

perr_t
PDC_Client_create_bucket(char *prefix){
FUNC_ENTER(NULL);

perr_t ret_value = SUCCEED;
hg_return_t hg_ret = 0;
hg_handle_t metadata_create_bucket_handle;
metadata_create_bucket_in_t in;
struct _pdc_client_lookup_args lookup_args;

uint64_t hash_value = prefix_hash(prefix);
uint32_t server_id = PDC_get_server_using_pht(hash_value);

if (PDC_Client_try_lookup_server(server_id, 0) != SUCCEED)
PGOTO_ERROR(FAIL, "Error with PDC_Client_try_lookup_server");

HG_Create(send_context_g, pdc_server_info_g[server_id].addr, metadata_create_bucket_register_id_g,
&metadata_create_bucket_handle);

// Fill input structure
in.prefix = prefix;

hg_ret = HG_Forward(metadata_create_bucket_handle, metadata_create_bucket_client_rpc_cb, &lookup_args, &in);
if (hg_ret != HG_SUCCESS)
PGOTO_ERROR(FAIL, "Could not start HG_Forward");
// Wait for response from server
hg_atomic_set32(&atomic_work_todo_g, 1);
PDC_Client_check_response(&send_context_g); // BLOCKING CALL

if (lookup_args.ret != 1)
LOG_ERROR("Add create_bucket NOT successful, ret_value = %d\n", lookup_args.ret);
done:
HG_Destroy(metadata_create_bucket_handle);

FUNC_LEAVE(ret_value);
}

hg_return_t
metadata_create_bucket_client_rpc_cb(const struct hg_cb_info *callback_info)
{

FUNC_ENTER(NULL);
hg_return_t ret_value;
hg_handle_t handle = callback_info->info.forward.handle;
struct _pdc_client_lookup_args *client_lookup_args = (struct _pdc_client_lookup_args *)callback_info->arg;
/* Get output from server*/

metadata_create_bucket_out_t output;
ret_value = HG_Get_output(handle, &output);

if (ret_value != HG_SUCCESS) {
client_lookup_args->ret = -1;
PGOTO_ERROR(HG_OTHER_ERROR, "Error with HG_Get_output");
}
printf("metadata_create_bucket_rpc_cb: output.ret = %d\n", output.ret);
client_lookup_args->ret = output.ret;
done:

hg_atomic_decr32(&atomic_work_todo_g);
HG_Free_output(handle, &output);
FUNC_LEAVE(ret_value);
}


#endif

/******************** Collective Object Selection Query Ends *******************************/
Loading