Skip to content
Permalink
1139b72d5e
Switch branches/tags

Name already in use

A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
Go to file
 
 
Cannot retrieve contributors at this time
497 lines (414 sloc) 14.2 KB
/*
* (C) 2003 Clemson University and The University of Chicago
*
* See COPYING in top-level directory.
*/
/** \file
* \ingroup sysint
*
* PVFS2 system interface routines for reading entries from a directory.
*/
#include <string.h>
#include <assert.h>
#include "client-state-machine.h"
#include "pvfs2-debug.h"
#include "job.h"
#include "gossip.h"
#include "str-utils.h"
#include "pint-cached-config.h"
#include "PINT-reqproto-encode.h"
#include "ncache.h"
#include "pint-util.h"
#include "pvfs2-internal.h"
#include "osd-util/osd-util.h"
extern job_context_id pint_client_sm_context;
enum {
OSD_MSGPAIR = 2001
};
static int readdir_msg_comp_fn(
void *v_p, struct PVFS_server_resp *resp_p, int index);
%%
nested machine pvfs2_client_readdir_sm
{
state init
{
run readdir_init;
default => readdir_getattr;
}
state readdir_getattr
{
jump pvfs2_client_getattr_sm;
success => readdir_msg_setup_msgpair;
default => cleanup;
}
state readdir_msg_setup_msgpair
{
run readdir_msg_setup_msgpair;
OSD_MSGPAIR => readdir_msg_xfer_osd_msgpair;
success => readdir_msg_xfer_msgpair;
default => readdir_msg_failure;
}
state readdir_msg_xfer_osd_msgpair
{
jump pvfs2_osd_msgpairarray_sm;
success => cleanup;
default => readdir_msg_failure;
}
state readdir_msg_xfer_msgpair
{
jump pvfs2_msgpairarray_sm;
success => cleanup;
default => readdir_msg_failure;
}
state readdir_msg_failure
{
run readdir_msg_failure;
default => cleanup;
}
state cleanup
{
run readdir_cleanup;
default => return;
}
}
machine pvfs2_client_sysint_readdir_sm
{
state dowork
{
jump pvfs2_client_readdir_sm;
default => do_cleanup;
}
state do_cleanup
{
run do_cleanup;
default => terminate;
}
}
%%
/** Initiate reading of entries from a directory.
*
* \param token opaque value used to track position in directory
* when more than one read is required.
* \param pvfs_dirent_incount maximum number of entries to read, if
* available, starting from token.
*/
PVFS_error PVFS_isys_readdir(
PVFS_object_ref ref,
PVFS_ds_position token,
int32_t pvfs_dirent_incount,
const PVFS_credentials *credentials,
PVFS_sysresp_readdir *resp,
PVFS_sys_op_id *op_id,
PVFS_hint hints,
void *user_ptr)
{
PVFS_error ret = -PVFS_EINVAL;
PINT_smcb *smcb = NULL;
PINT_client_sm *sm_p = NULL;
gossip_debug(GOSSIP_CLIENT_DEBUG, "PVFS_isys_readdir entered\n");
if ((ref.handle == PVFS_HANDLE_NULL) ||
(ref.fs_id == PVFS_FS_ID_NULL) ||
(resp == NULL))
{
gossip_err("invalid (NULL) required argument\n");
return ret;
}
if (pvfs_dirent_incount > PVFS_REQ_LIMIT_DIRENT_COUNT)
{
gossip_lerr("PVFS_isys_readdir unable to handle request "
"for %d entries.\n", pvfs_dirent_incount);
return ret;
}
PINT_smcb_alloc(&smcb, PVFS_SYS_READDIR,
sizeof(struct PINT_client_sm),
client_op_state_get_machine,
client_state_machine_terminate,
pint_client_sm_context);
if (smcb == NULL)
{
return -PVFS_ENOMEM;
}
sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
PINT_init_msgarray_params(sm_p, ref.fs_id);
PINT_init_sysint_credentials(sm_p->cred_p, credentials);
sm_p->u.readdir.readdir_resp = resp;
sm_p->object_ref = ref;
PVFS_hint_copy(hints, &sm_p->hints);
PVFS_hint_add(&sm_p->hints, PVFS_HINT_HANDLE_NAME, sizeof(PVFS_handle), &ref.handle);
/* point the sm dirent array and outcount to the readdir response field */
sm_p->readdir.dirent_array = &resp->dirent_array;
sm_p->readdir.dirent_outcount = &resp->pvfs_dirent_outcount;
sm_p->readdir.token = &resp->token;
sm_p->readdir.directory_version = &resp->directory_version;
sm_p->readdir.pos_token = sm_p->u.readdir.pos_token = token;
sm_p->readdir.dirent_limit = sm_p->u.readdir.dirent_limit = pvfs_dirent_incount;
gossip_debug(GOSSIP_READDIR_DEBUG, "Doing readdir on handle "
"%llu on fs %d\n", llu(ref.handle), ref.fs_id);
return PINT_client_state_machine_post(
smcb, op_id, user_ptr);
}
/** Read entries from a directory.
*
* \param token opaque value used to track position in directory
* when more than one read is required.
* \param pvfs_dirent_incount maximum number of entries to read, if
* available, starting from token.
*/
PVFS_error PVFS_sys_readdir(
PVFS_object_ref ref,
PVFS_ds_position token,
int32_t pvfs_dirent_incount,
const PVFS_credentials *credentials,
PVFS_sysresp_readdir *resp,
PVFS_hint hints)
{
PVFS_error ret = -PVFS_EINVAL, error = 0;
PVFS_sys_op_id op_id;
gossip_debug(GOSSIP_CLIENT_DEBUG, "PVFS_sys_readdir entered\n");
ret = PVFS_isys_readdir(ref, token, pvfs_dirent_incount,
credentials, resp, &op_id, hints, NULL);
if (ret)
{
PVFS_perror_gossip("PVFS_isys_readdir call", ret);
error = ret;
}
else
{
ret = PVFS_sys_wait(op_id, "readdir", &error);
if (ret)
{
PVFS_perror_gossip("PVFS_sys_wait call", ret);
error = ret;
}
}
PINT_sys_release(op_id);
return error;
}
/****************************************************************/
static PINT_sm_action readdir_init(
struct PINT_smcb *smcb, job_status_s *js_p)
{
struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
gossip_debug(GOSSIP_CLIENT_DEBUG, "readdir state: init\n");
PINT_SM_GETATTR_STATE_FILL(
sm_p->getattr,
sm_p->object_ref,
PVFS_ATTR_DIR_ALL,
PVFS_TYPE_DIRECTORY,
0);
assert(js_p->error_code == 0);
return SM_ACTION_COMPLETE;
}
static PINT_sm_action readdir_msg_setup_msgpair(
struct PINT_smcb *smcb, job_status_s *js_p)
{
struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
int ret = -PVFS_EINVAL;
PINT_sm_msgpair_state *msg_p = NULL;
int is_osd_md = fsid_is_osd_md(sm_p->object_ref.fs_id);
gossip_debug(GOSSIP_CLIENT_DEBUG, "readdir state: "
"readdir_msg_setup_msgpair\n");
if (js_p->error_code)
{
return SM_ACTION_COMPLETE;
}
js_p->error_code = 0;
if(!sm_p->getattr.attr.cid) {
sm_p->object_ref.cid = (is_osd_md ? sm_p->object_ref.handle : COLLECTION_OID_LB); /* root directory */
} else {
sm_p->object_ref.cid = sm_p->getattr.attr.cid;
}
gossip_debug(GOSSIP_READDIR_DEBUG," readdir: posting readdir req\n");
gossip_debug(
GOSSIP_READDIR_DEBUG, "%llu|%d | token is %llu | limit is %d\n",
llu(sm_p->object_ref.handle),
sm_p->object_ref.fs_id,
llu(sm_p->readdir.pos_token),
sm_p->readdir.dirent_limit);
PINT_msgpair_init(&sm_p->msgarray_op);
msg_p = &sm_p->msgarray_op.msgpair;
msg_p->fs_id = sm_p->object_ref.fs_id;
msg_p->handle = sm_p->object_ref.handle;
ret = PINT_cached_config_map_to_server(
&msg_p->svr_addr, sm_p->object_ref.handle,
sm_p->object_ref.fs_id);
if (ret)
{
gossip_err("Failed to map meta server address\n");
js_p->error_code = ret;
}
if (server_is_osd(msg_p->svr_addr)) {
uint64_t oid;
struct osd_command *command = &sm_p->msgarray_op.msgpair.osd_command;
js_p->error_code = OSD_MSGPAIR;
/* Retrieve all the directory entries */
oid = sm_p->object_ref.handle;
/* Retrieve the attributes of the collection or the regular dir object */
/* Retrieved attribute numbers are the hashed name of each directory entry */
/*ret = osd_command_set_list_collection(command, PVFS_OSD_META_PID, oid, 0, 1024, 0, 0);*/
ret = osd_command_set_get_attributes(command, PVFS_OSD_META_PID, oid);
if (ret) {
osd_error_xerrno(ret, "%s: osd_command_set_get_attributes failed",
__func__);
js_p->error_code = ret;
return 1;
}
ret = osd_command_attr_all_build(command, ANY_PG + PVFS_USEROBJECT_DIR_PG);
if (ret) {
osd_error_xerrno(ret, "%s: osd_command_attr_all_build failed",
__func__);
js_p->error_code = ret;
return 1;
}
} else {
js_p->error_code = 0;
PINT_SERVREQ_READDIR_FILL(
msg_p->req,
*sm_p->cred_p,
sm_p->object_ref.fs_id,
sm_p->object_ref.handle,
sm_p->u.readdir.pos_token,
sm_p->u.readdir.dirent_limit,
sm_p->hints);
}
msg_p->retry_flag = PVFS_MSGPAIR_RETRY;
msg_p->comp_fn = readdir_msg_comp_fn;
PINT_sm_push_frame(smcb, 0, &sm_p->msgarray_op);
return SM_ACTION_COMPLETE;
}
static int readdir_msg_comp_fn(void *v_p,
struct PVFS_server_resp *resp_p,
int index)
{
int ret = 0;
PINT_smcb *smcb = v_p;
PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_MSGPAIR_PARENT_SM);
PINT_sm_msgpair_state *msg_p = &sm_p->msgarray_op.msgpair;
gossip_debug(GOSSIP_CLIENT_DEBUG, "readdir_msg_comp_fn\n");
if (server_is_osd(msg_p->svr_addr)) {
int i = 0, dirent_array_len = 0;
struct osd_command *command = &sm_p->msgarray_op.msgpair.osd_command;
ret = osd_errno_from_status(command->status);
if (ret != 0) {
return ret;
}
ret = osd_command_attr_all_resolve(command);
if (ret) {
osd_error_xerrno(ret, "%s: osd_command_attr_all_resolve failed",
__func__);
return ret;
}
//ret = osd_command_list_collection_resolve(command);
/*
* XXX readdir.token and readdir.directory_version are used when we
* use multiple passes to retrieve the dirents. For now, we retrieve
* all the dirents in a single pass. So we don't really need token and
* directory_version.
*/
*(sm_p->readdir.token) = 0;
*(sm_p->readdir.directory_version) = 0;
*(sm_p->readdir.dirent_outcount) = command->numattr;
dirent_array_len = command->numattr * sizeof(PVFS_dirent);
/* The dirent_array must be freed by caller */
*(sm_p->readdir.dirent_array) = malloc(dirent_array_len);
assert(*(sm_p->readdir.dirent_array));
/* populate dirent_array */
for (i = 0; i < command->numattr; i++) {
PVFS_dirent *dirent = &(*sm_p->readdir.dirent_array)[i];
if (command->attr[i].outlen < 9) {
gossip_err("%s: short dirent %d/%d len %d\n", __func__,
i, command->numattr, command->attr[i].outlen);
*(sm_p->readdir.dirent_outcount) = 0;
return -EINVAL;
}
dirent->handle = get_ntohll(command->attr[i].val);
memcpy(dirent->d_name, (uint8_t *)command->attr[i].val + 8,
command->attr[i].outlen - 8);
dirent->d_name[command->attr[i].outlen - 8] = '\0';
}
/* free the returned attributes */
osd_command_attr_all_free(command);
} else {
assert(resp_p->op == PVFS_SERV_READDIR);
if (resp_p->status != 0)
{
return resp_p->status;
}
/* convert servresp_readdir response to a sysresp_readdir obj */
*(sm_p->readdir.token) = resp_p->u.readdir.token;
*(sm_p->readdir.directory_version) =
resp_p->u.readdir.directory_version;
*(sm_p->readdir.dirent_outcount) =
resp_p->u.readdir.dirent_count;
if (*(sm_p->readdir.dirent_outcount) > 0)
{
int dirent_array_len =
(sizeof(PVFS_dirent) * *(sm_p->readdir.dirent_outcount));
/* this dirent_array MUST be freed by caller */
*(sm_p->readdir.dirent_array) =
(PVFS_dirent *) malloc(dirent_array_len);
assert(*(sm_p->readdir.dirent_array));
memcpy(*(sm_p->readdir.dirent_array),
resp_p->u.readdir.dirent_array, dirent_array_len);
}
}
gossip_debug(GOSSIP_READDIR_DEBUG, "*** Got %d directory entries "
"[version %lld]\n",
*(sm_p->readdir.dirent_outcount),
lld(*(sm_p->readdir.directory_version)));
return 0;
}
static PINT_sm_action readdir_msg_failure(
struct PINT_smcb *smcb, job_status_s *js_p)
{
gossip_debug(GOSSIP_CLIENT_DEBUG,
"readdir state: readdir_msg_failure\n");
return SM_ACTION_COMPLETE;
}
static PINT_sm_action readdir_cleanup(
struct PINT_smcb *smcb, job_status_s *js_p)
{
struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
int i = 0;
PVFS_object_ref tmp_ref;
gossip_debug(GOSSIP_CLIENT_DEBUG, "readdir state: cleanup\n");
if(js_p->error_code == 0)
{
/* insert all handles into the ncache while we have them */
tmp_ref.fs_id = sm_p->object_ref.fs_id;
for(i = 0; i < *(sm_p->readdir.dirent_outcount); i++)
{
tmp_ref.handle = (*(sm_p->readdir.dirent_array))[i].handle;
PINT_ncache_update(
(const char *) (*(sm_p->readdir.dirent_array))[i].d_name,
(const PVFS_object_ref *) &(tmp_ref),
(const PVFS_object_ref *) &(sm_p->object_ref));
}
}
PINT_SM_GETATTR_STATE_CLEAR(sm_p->getattr);
if(js_p->error_code != 0)
{
PINT_acache_invalidate(sm_p->object_ref);
}
return SM_ACTION_COMPLETE;
}
static PINT_sm_action do_cleanup(
struct PINT_smcb *smcb, job_status_s *js_p)
{
struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
gossip_debug(GOSSIP_CLIENT_DEBUG, "readdir state: do_cleanup\n");
sm_p->error_code = js_p->error_code;
gossip_debug(GOSSIP_READDIR_DEBUG, " final return code is %d\n",
sm_p->error_code);
PINT_SET_OP_COMPLETE;
return SM_ACTION_TERMINATE;
}
/*
* Local variables:
* mode: c
* c-indent-level: 4
* c-basic-offset: 4
* End:
*
* vim: ft=c ts=8 sts=4 sw=4 expandtab
*/