Skip to content
Permalink
1139b72d5e
Switch branches/tags

Name already in use

A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
Go to file
 
 
Cannot retrieve contributors at this time
1108 lines (924 sloc) 32.9 KB
/*
* (C) 2003 Clemson University and The University of Chicago
*
* Changes by Acxiom Corporation to add setgid support
* Copyright © Acxiom Corporation, 2005.
*
* See COPYING in top-level directory.
*/
/** \file
* \ingroup sysint
*
* PVFS2 system interface routines for creating a new directory.
*/
#include <string.h>
#include <assert.h>
#include "client-state-machine.h"
#include "pvfs2-debug.h"
#include "job.h"
#include "gossip.h"
#include "str-utils.h"
#include "pint-cached-config.h"
#include "PINT-reqproto-encode.h"
#include "pint-util.h"
#include "ncache.h"
#include "pvfs2-internal.h"
#include "osd-util/osd-util.h"
extern job_context_id pint_client_sm_context;
enum
{
MKDIR_RETRY = 180,
MKDIR_SKIP_EATTR = 181,
OSD_MKDIR_COLL_MSGPAIR = 2001,
CREATE_COLLECTION = 2002,
SKIP_COLLECTION_CREATE = 2003,
OSD_MKDIR_MSGPAIR = 2004
};
static int mkdir_msg_comp_fn(
void *v_p, struct PVFS_server_resp *resp_p, int index);
static int mkdir_crdirent_comp_fn(
void *v_p, struct PVFS_server_resp *resp_p, int index);
static int mkdir_delete_handle_comp_fn(
void *v_p, struct PVFS_server_resp *resp_p, int index);
static int create_collection_comp_fn(
void *v_p, struct PVFS_server_resp *resp_p, int index);
%%
machine pvfs2_client_mkdir_sm
{
state init
{
run mkdir_init;
default => parent_getattr;
}
state parent_getattr
{
jump pvfs2_client_getattr_sm;
success => parent_getattr_inspect;
default => cleanup;
}
state parent_getattr_inspect
{
run mkdir_parent_getattr_inspect;
success => mkdir_msg_setup_msgpair;
default => cleanup;
}
state mkdir_msg_setup_msgpair
{
run mkdir_msg_setup_msgpair;
OSD_MKDIR_MSGPAIR => mkdir_msg_xfer_osd_msgpair;
OSD_MKDIR_COLL_MSGPAIR => create_collection;
success => mkdir_msg_xfer_msgpair;
default => mkdir_msg_failure;
}
state mkdir_msg_xfer_msgpair
{
jump pvfs2_msgpairarray_sm;
success => create_collection;
default => mkdir_msg_failure;
}
state mkdir_msg_xfer_osd_msgpair
{
jump pvfs2_osd_msgpairarray_sm;
success => create_collection;
default => mkdir_msg_failure;
}
state mkdir_msg_failure
{
run mkdir_msg_failure;
default => cleanup;
}
state create_collection
{
run create_collection_setup_msgpair;
success => create_collection_xfer_msgpair;
SKIP_COLLECTION_CREATE => mkdir_seteattr_setup_msgpair;
default => mkdir_crdirent_failure;
}
state create_collection_xfer_msgpair
{
jump pvfs2_osd_msgpairarray_sm;
success => mkdir_seteattr_setup_msgpair;
default => mkdir_crdirent_failure;
}
state mkdir_seteattr_setup_msgpair
{
run mkdir_seteattr_setup_msgpair;
MKDIR_SKIP_EATTR => mkdir_crdirent_setup_msgpair;
success => mkdir_seteattr_xfer_msgpair;
default => mkdir_seteattr_failure;
}
state mkdir_seteattr_xfer_msgpair
{
jump pvfs2_msgpairarray_sm;
success => mkdir_crdirent_setup_msgpair;
default => mkdir_seteattr_failure;
}
state mkdir_seteattr_failure
{
run mkdir_seteattr_failure;
default => delete_handle_setup_msgpair;
}
state mkdir_crdirent_setup_msgpair
{
run mkdir_crdirent_setup_msgpair;
success => mkdir_crdirent_xfer_msgpair;
OSD_MKDIR_COLL_MSGPAIR => mkdir_crdirent_osd_msgpair;
default => mkdir_crdirent_failure;
}
state mkdir_crdirent_osd_msgpair
{
jump pvfs2_client_osd_dirops_sm;
success => cleanup;
default => mkdir_crdirent_failure;
}
state mkdir_crdirent_xfer_msgpair
{
jump pvfs2_msgpairarray_sm;
success => cleanup;
default => mkdir_crdirent_failure;
}
state mkdir_crdirent_failure
{
run mkdir_crdirent_failure;
default => delete_handle_setup_msgpair;
}
state delete_handle_setup_msgpair
{
run mkdir_delete_handle_setup_msgpair;
success => delete_handle_xfer_msgpair;
default => cleanup;
}
state delete_handle_xfer_msgpair
{
jump pvfs2_msgpairarray_sm;
default => cleanup;
}
state cleanup
{
run mkdir_cleanup;
MKDIR_RETRY => init;
default => terminate;
}
}
%%
/** Initiate creation of a new directory.
*/
PVFS_error PVFS_isys_mkdir(
char *object_name,
PVFS_object_ref parent_ref,
PVFS_sys_attr attr,
const PVFS_credentials *credentials,
PVFS_sysresp_mkdir *resp,
PVFS_sys_op_id *op_id,
PVFS_hint hints,
void *user_ptr)
{
PVFS_error ret = -PVFS_EINVAL;
PINT_smcb *smcb = NULL;
PINT_client_sm *sm_p = NULL;
gossip_debug(GOSSIP_CLIENT_DEBUG, "PVFS_isys_mkdir entered\n");
if ((parent_ref.handle == PVFS_HANDLE_NULL) ||
(parent_ref.fs_id == PVFS_FS_ID_NULL) ||
(object_name == NULL) || (resp == NULL))
{
gossip_err("invalid (NULL) required argument\n");
return ret;
}
if ((attr.mask & PVFS_ATTR_SYS_ALL_SETABLE) !=
PVFS_ATTR_SYS_ALL_SETABLE)
{
gossip_lerr("PVFS_isys_mkdir() failure: invalid attributes "
"specified\n");
return ret;
}
if ((strlen(object_name) + 1) > PVFS_REQ_LIMIT_SEGMENT_BYTES)
{
return -PVFS_ENAMETOOLONG;
}
PINT_smcb_alloc(&smcb, PVFS_SYS_MKDIR,
sizeof(struct PINT_client_sm),
client_op_state_get_machine,
client_state_machine_terminate,
pint_client_sm_context);
if (smcb == NULL)
{
return -PVFS_ENOMEM;
}
sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
PINT_init_msgarray_params(sm_p, parent_ref.fs_id);
PINT_init_sysint_credentials(sm_p->cred_p, credentials);
sm_p->u.mkdir.object_name = object_name;
PVFS_util_copy_sys_attr(&sm_p->u.mkdir.sys_attr, &attr);
sm_p->u.mkdir.mkdir_resp = resp;
sm_p->u.mkdir.stored_error_code = 0;
sm_p->object_ref = parent_ref;
PVFS_hint_copy(hints, &sm_p->hints);
PVFS_hint_add(&sm_p->hints, PVFS_HINT_HANDLE_NAME, sizeof(PVFS_handle), &parent_ref.handle);
gossip_debug(GOSSIP_CLIENT_DEBUG, "Creating directory named %s "
"under parent handle %llu on fs %d\n", object_name,
llu(parent_ref.handle), parent_ref.fs_id);
return PINT_client_state_machine_post(
smcb, op_id, user_ptr);
}
/** Create a new directory.
*/
PVFS_error PVFS_sys_mkdir(
char *object_name,
PVFS_object_ref parent_ref,
PVFS_sys_attr attr,
const PVFS_credentials *credentials,
PVFS_sysresp_mkdir *resp,
PVFS_hint hints)
{
PVFS_error ret = -PVFS_EINVAL, error = 0;
PVFS_sys_op_id op_id;
gossip_debug(GOSSIP_CLIENT_DEBUG, "PVFS_sys_mkdir entered\n");
ret = PVFS_isys_mkdir(object_name, parent_ref, attr,
credentials, resp, &op_id, hints, NULL);
if (ret)
{
PVFS_perror_gossip("PVFS_isys_mkdir call", ret);
error = ret;
}
else
{
ret = PVFS_sys_wait(op_id, "mkdir", &error);
if (ret)
{
PVFS_perror_gossip("PVFS_sys_wait call", ret);
error = ret;
}
}
PINT_sys_release(op_id);
return error;
}
/****************************************************************/
static PINT_sm_action mkdir_init(
struct PINT_smcb *smcb, job_status_s *js_p)
{
struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
job_id_t tmp_id;
gossip_debug(GOSSIP_CLIENT_DEBUG, "mkdir state: init\n");
assert((js_p->error_code == 0) ||
(js_p->error_code == MKDIR_RETRY) || (js_p->error_code == CREATE_COLLECTION));
PINT_SM_GETATTR_STATE_FILL(
sm_p->getattr,
sm_p->object_ref,
(PVFS_ATTR_COMMON_ALL|PVFS_ATTR_DIR_HINT),
PVFS_TYPE_DIRECTORY,
0);
if (js_p->error_code == MKDIR_RETRY)
{
js_p->error_code = 0;
return job_req_sched_post_timer(
sm_p->msgarray_op.params.retry_delay, smcb, 0, js_p, &tmp_id,
pint_client_sm_context);
}
return SM_ACTION_COMPLETE;
}
static int mkdir_msg_comp_fn(void *v_p,
struct PVFS_server_resp *resp_p,
int index)
{
int ret;
uint64_t oid;
PINT_smcb *smcb = v_p;
PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_MSGPAIR_PARENT_SM);
PVFS_object_attr attr;
PINT_sm_msgpair_state *msg_p = &sm_p->msgarray_op.msgpair;
gossip_debug(GOSSIP_CLIENT_DEBUG, "mkdir_msg_comp_fn\n");
if (server_is_osd(msg_p->svr_addr)) {
struct osd_command *command = &sm_p->msgarray_op.msgpair.osd_command;
if (command->status == 0) {
/* Stash the newly created meta handle */
ret = osd_command_attr_resolve(command);
if (ret) {
osd_error_xerrno(ret, "%s: attr_resolve failed", __func__);
return ret;
}
sm_p->u.mkdir.metafile_handle = get_ntohll(command->attr[5].val);
sm_p->u.mkdir.cid = get_ntohll(command->attr[5].val);
}
osd_command_attr_free(command);
return osd_errno_from_status(command->status);
} else {
assert(resp_p->op == PVFS_SERV_MKDIR);
if (resp_p->status != 0)
{
return resp_p->status;
}
/* otherwise, just stash the newly created meta handle */
sm_p->u.mkdir.metafile_handle = resp_p->u.mkdir.handle;
sm_p->u.mkdir.cid = resp_p->u.mkdir.cid;
}
/* also insert entry into attr cache */
PINT_CONVERT_ATTR(&attr, &sm_p->u.mkdir.sys_attr, 0);
PINT_acache_update(sm_p->object_ref, &attr, NULL);
gossip_debug(
GOSSIP_CLIENT_DEBUG, "*** Got newly created dir handle %llu\n",
llu(sm_p->u.mkdir.metafile_handle));
return 0;
}
static int mkdir_crdirent_comp_fn(void *v_p,
struct PVFS_server_resp *resp_p,
int index)
{
gossip_debug(GOSSIP_CLIENT_DEBUG, "mkdir_crdirent_comp_fn\n");
assert(resp_p->op == PVFS_SERV_CRDIRENT);
return resp_p->status;
}
static int mkdir_delete_handle_comp_fn(void *v_p,
struct PVFS_server_resp *resp_p,
int index)
{
gossip_debug(GOSSIP_CLIENT_DEBUG, "mkdir_delete_handle_comp_fn\n");
assert(resp_p->op == PVFS_SERV_REMOVE);
return resp_p->status;
}
static PINT_sm_action mkdir_msg_setup_msgpair(
struct PINT_smcb *smcb, job_status_s *js_p)
{
struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
int ret = -PVFS_EINVAL;
PVFS_handle_extent_array meta_handle_extent_array;
PINT_sm_msgpair_state *msg_p = NULL;
struct server_configuration_s *server_config;
server_config = PINT_get_server_config_struct(
sm_p->object_ref.fs_id);
PINT_put_server_config_struct(server_config);
gossip_debug(GOSSIP_CLIENT_DEBUG,
"mkdir state: mkdir_msg_setup_msgpair\n");
gossip_debug(GOSSIP_CLIENT_DEBUG," mkdir: posting mkdir req\n");
PINT_msgpair_init(&sm_p->msgarray_op);
msg_p = &sm_p->msgarray_op.msgpair;
ret = PINT_cached_config_get_next_meta(
sm_p->object_ref.fs_id,
&msg_p->svr_addr, &meta_handle_extent_array, 1);
if (ret)
{
gossip_err("Failed to map meta server address\n");
js_p->error_code = ret;
return SM_ACTION_COMPLETE;
}
if (server_is_osd(msg_p->svr_addr)) {
if (server_config->coll_object)
{
js_p->error_code = OSD_MKDIR_COLL_MSGPAIR;
return SM_ACTION_COMPLETE;
} else {
int i, numattrs = 6;
struct attribute_list attr[numattrs];
struct osd_command *command = &sm_p->msgarray_op.msgpair.osd_command;
/* Set attr type, page and number */
for (i = 0; i < numattrs - 1; i++) {
attr[i].type = ATTR_SET;
attr[i].page = ANY_PG + PVFS_USEROBJECT_ATTR_PG;
attr[i].number = i;
}
/* uid */
attr[0].val = &sm_p->u.mkdir.sys_attr.owner;
attr[0].len = sizeof(PVFS_uid);
/* gid */
attr[1].val = &sm_p->u.mkdir.sys_attr.group;
attr[1].len = sizeof(PVFS_gid);
/* XXX Default to PVFS_PERM_VALID till we can figure out the umask */
sm_p->u.mkdir.sys_attr.perms = 0777;
attr[2].val = &sm_p->u.mkdir.sys_attr.perms;
attr[2].len = sizeof(PVFS_permissions);
/* mask */
sm_p->u.mkdir.sys_attr.mask = PVFS_ATTR_COMMON_UID |
PVFS_ATTR_COMMON_GID |
PVFS_ATTR_COMMON_PERM |
PVFS_ATTR_COMMON_ATIME |
PVFS_ATTR_COMMON_CTIME |
PVFS_ATTR_COMMON_MTIME |
PVFS_ATTR_COMMON_TYPE;
attr[3].val = &sm_p->u.mkdir.sys_attr.mask;
attr[3].len = sizeof(uint32_t);
/* object type */
sm_p->u.mkdir.sys_attr.objtype = PVFS_TYPE_DIRECTORY;
attr[4].val = &sm_p->u.mkdir.sys_attr.objtype;
attr[4].len = sizeof(PVFS_ds_type);
/* retrieve cid, if we have osd md */
attr[5].type = ATTR_GET;
attr[5].page = CUR_CMD_ATTR_PG;
attr[5].number = CCAP_OID;
attr[5].val = NULL;
attr[5].len = CCAP_OID_LEN;
ret = osd_command_set_create(command, PVFS_OSD_META_PID, 0, 1);
if (ret) {
osd_error_xerrno(ret, "%s: osd_command_set_create failed",
__func__);
js_p->error_code = ret;
return SM_ACTION_COMPLETE;
}
/* Set/Retrieve the dir. attributes */
ret = osd_command_attr_build(command, attr, numattrs);
if (ret) {
osd_error_xerrno(ret, "%s: osd_command_attr_build failed",
__func__);
js_p->error_code = ret;
return SM_ACTION_COMPLETE;
}
js_p->error_code = OSD_MKDIR_MSGPAIR;
}
} else {
PINT_SERVREQ_MKDIR_FILL(
msg_p->req,
*sm_p->cred_p,
sm_p->object_ref.fs_id,
meta_handle_extent_array,
sm_p->u.mkdir.sys_attr,
sm_p->hints);
js_p->error_code = 0;
}
msg_p->fs_id = sm_p->object_ref.fs_id;
msg_p->handle = meta_handle_extent_array.extent_array[0].first;
msg_p->retry_flag = PVFS_MSGPAIR_NO_RETRY;
msg_p->comp_fn = mkdir_msg_comp_fn;
PINT_sm_push_frame(smcb, 0, &sm_p->msgarray_op);
return SM_ACTION_COMPLETE;
}
static PINT_sm_action mkdir_msg_failure(
struct PINT_smcb *smcb, job_status_s *js_p)
{
struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
sm_p->u.mkdir.stored_error_code = js_p->error_code;
gossip_debug(GOSSIP_CLIENT_DEBUG, "mkdir state: mkdir_msg_failure\n");
return SM_ACTION_COMPLETE;
}
static PINT_sm_action create_collection_setup_msgpair(
struct PINT_smcb *smcb, job_status_s *js_p)
{
struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
int ret = -PVFS_EINVAL;
PINT_sm_msgpair_state *msg_p = NULL;
struct server_configuration_s *server_config;
PINT_llist *cur = NULL;
struct host_alias_s *cur_alias;
PVFS_BMI_addr_t addr;
int i, numattrs = 7;
uint64_t attrval;
int is_osd = fsid_is_osd(sm_p->object_ref.fs_id);
int is_osd_md = fsid_is_osd_md(sm_p->object_ref.fs_id);
int server_count = 0;
int randsrv = 0;
int iterator = 0;
gossip_debug(GOSSIP_CLIENT_DEBUG,
"mkdir state: create_collection_setup_msgpair\n");
js_p->error_code = 0;
server_config = PINT_get_server_config_struct(
sm_p->object_ref.fs_id);
PINT_put_server_config_struct(server_config);
if (!is_osd || (is_osd_md && !server_config->coll_object)) {
js_p->error_code = SKIP_COLLECTION_CREATE;
return SM_ACTION_COMPLETE;
}
cur = server_config->host_aliases;
while(cur)
{
cur_alias = PINT_llist_head(cur);
if (!cur_alias)
{
break;
}
cur = PINT_llist_next(cur);
server_count++;
}
randsrv = (rand() % server_count);
cur = server_config->host_aliases;
while (cur)
{
cur_alias = PINT_llist_head(cur);
if (!cur_alias)
{
break;
}
if( (iterator >= randsrv) && (!strncmp(cur_alias->bmi_address, "osd", 3))) {
BMI_addr_lookup(&addr,cur_alias->bmi_address);
break;
}
cur = PINT_llist_next(cur);
iterator++;
}
PINT_msgpair_init(&sm_p->msgarray_op);
msg_p = &sm_p->msgarray_op.msgpair;
msg_p->fs_id = sm_p->object_ref.fs_id;
msg_p->retry_flag = PVFS_MSGPAIR_RETRY;
msg_p->svr_addr = addr;
msg_p->comp_fn = create_collection_comp_fn;
struct attribute_list attr[numattrs];
struct osd_command *command = &sm_p->msgarray_op.msgpair.osd_command;
if (is_osd_md)
{
/* Set attr type, page and number */
for (i = 0; i < numattrs - 1; i++) {
attr[i].type = ATTR_SET;
attr[i].page = ANY_PG + PVFS_USEROBJECT_ATTR_PG;
attr[i].number = i;
}
/* uid */
attr[0].val = &sm_p->u.mkdir.sys_attr.owner;
attr[0].len = sizeof(PVFS_uid);
/* gid */
attr[1].val = &sm_p->u.mkdir.sys_attr.group;
attr[1].len = sizeof(PVFS_gid);
/* XXX Default to PVFS_PERM_VALID till we can figure out the umask */
sm_p->u.mkdir.sys_attr.perms = 0777;
attr[2].val = &sm_p->u.mkdir.sys_attr.perms;
attr[2].len = sizeof(PVFS_permissions);
/* mask */
sm_p->u.mkdir.sys_attr.mask = PVFS_ATTR_COMMON_UID |
PVFS_ATTR_COMMON_GID |
PVFS_ATTR_COMMON_PERM |
PVFS_ATTR_COMMON_ATIME |
PVFS_ATTR_COMMON_CTIME |
PVFS_ATTR_COMMON_MTIME |
PVFS_ATTR_COMMON_TYPE;
attr[3].val = &sm_p->u.mkdir.sys_attr.mask;
attr[3].len = sizeof(uint32_t);
/* object type */
sm_p->u.mkdir.sys_attr.objtype = PVFS_TYPE_DIRECTORY;
attr[4].val = &sm_p->u.mkdir.sys_attr.objtype;
attr[4].len = sizeof(PVFS_ds_type);
/* make this new collection object a member of enclosing collection */
set_htonll(&attrval, sm_p->object_ref.handle);
attr[5].type = ATTR_SET;
attr[5].page = ANY_PG + USER_COLL_PG;
attr[5].val = &attrval;
attr[5].len = 8;
/* retrieve cid, if we have osd md */
attr[6].type = ATTR_GET;
attr[6].page = CUR_CMD_ATTR_PG;
attr[6].number = CCAP_OID;
attr[6].val = NULL;
attr[6].len = CCAP_OID_LEN;
}
ret = osd_command_set_create_collection(command, (server_config->member_attr ? PVFS_OSD_DATA_PID : PVFS_OSD_META_PID), sm_p->u.mkdir.cid);
if (ret) {
osd_error_xerrno(ret, "%s: osd_command_set_create_collection failed",
__func__);
js_p->error_code = ret;
return 1;
}
if (is_osd_md)
{
ret = osd_command_attr_build(command, attr, numattrs);
if (ret) {
osd_error_xerrno(ret, "%s: osd_command_attr_build failed",
__func__);
js_p->error_code = ret;
return SM_ACTION_COMPLETE;
}
}
PINT_sm_push_frame(smcb, 0, &sm_p->msgarray_op);
return SM_ACTION_COMPLETE;
}
static int create_collection_comp_fn(void *v_p,
struct PVFS_server_resp *resp_p,
int index)
{
PINT_smcb *smcb = v_p;
PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_MSGPAIR_PARENT_SM);
PVFS_error status;
int ret;
int is_osd = fsid_is_osd(sm_p->object_ref.fs_id);
int is_osd_md = fsid_is_osd_md(sm_p->object_ref.fs_id);
gossip_debug(GOSSIP_CLIENT_DEBUG, "create_collection_comp_fn\n");
if (is_osd) {
status = osd_errno_from_status(
sm_p->msgarray_op.msgarray[index].osd_command.status);
} else {
assert(resp_p->op == PVFS_SERV_CRDIRENT);
status = resp_p->status;
}
if (status != 0)
{
return status;
}
if (is_osd_md)
{
ret = osd_command_attr_resolve(&sm_p->msgarray_op.msgpair.osd_command);
if (ret) {
osd_error_xerrno(ret, "%s: attr_resolve failed", __func__);
return ret;
}
sm_p->u.mkdir.cid = get_ntohll(sm_p->msgarray_op.msgpair.osd_command.attr[6].val);
sm_p->u.mkdir.metafile_handle = sm_p->u.mkdir.cid;
}
sm_p->object_ref.cid = sm_p->u.mkdir.cid;
return 0;
}
static PINT_sm_action mkdir_crdirent_setup_msgpair(
struct PINT_smcb *smcb, job_status_s *js_p)
{
struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
int ret = -PVFS_EINVAL;
PINT_sm_msgpair_state *msg_p = NULL;
gossip_debug(GOSSIP_CLIENT_DEBUG,
"mkdir state: crdirent_setup_msgpair\n");
gossip_debug(GOSSIP_CLIENT_DEBUG," mkdir: posting crdirent req\n");
gossip_debug(GOSSIP_CLIENT_DEBUG, "hooking dirent %s (%llu) under "
"parent handle %llu\n", sm_p->u.mkdir.object_name,
llu(sm_p->u.mkdir.metafile_handle),
llu(sm_p->object_ref.handle));
PINT_msgpair_init(&sm_p->msgarray_op);
msg_p = &sm_p->msgarray_op.msgpair;
ret = PINT_cached_config_map_to_server(
&msg_p->svr_addr, sm_p->object_ref.handle,
sm_p->object_ref.fs_id);
if (ret)
{
gossip_err("Failed to map meta server address\n");
js_p->error_code = ret;
}
if (server_is_osd(msg_p->svr_addr)) {
/*
* Directory operations for metafile and mdfile. We don't do anything
* here because we'll handle the individual directory operations in a
* different state machine.
*/
js_p->error_code = OSD_MKDIR_COLL_MSGPAIR;
return SM_ACTION_COMPLETE;
} else {
PINT_SERVREQ_CRDIRENT_FILL(
msg_p->req,
*sm_p->cred_p,
sm_p->u.mkdir.object_name,
sm_p->u.mkdir.metafile_handle,
sm_p->object_ref.handle,
sm_p->object_ref.fs_id,
sm_p->hints);
js_p->error_code = 0;
}
msg_p->fs_id = sm_p->object_ref.fs_id;
msg_p->handle = sm_p->object_ref.handle;
msg_p->retry_flag = PVFS_MSGPAIR_NO_RETRY;
msg_p->comp_fn = mkdir_crdirent_comp_fn;
PINT_sm_push_frame(smcb, 0, &sm_p->msgarray_op);
return SM_ACTION_COMPLETE;
}
static PINT_sm_action mkdir_crdirent_failure(
struct PINT_smcb *smcb, job_status_s *js_p)
{
struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
sm_p->u.mkdir.stored_error_code = js_p->error_code;
gossip_debug(GOSSIP_CLIENT_DEBUG, "mkdir state: crdirent_failure\n");
PVFS_perror_gossip("mkdir crdirent failed", js_p->error_code);
return SM_ACTION_COMPLETE;
}
static PINT_sm_action mkdir_delete_handle_setup_msgpair(
struct PINT_smcb *smcb, job_status_s *js_p)
{
struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
int ret = -PVFS_EINVAL;
PVFS_BMI_addr_t metafile_server_addr;
PINT_sm_msgpair_state *msg_p = NULL;
gossip_debug(GOSSIP_CLIENT_DEBUG, "mkdir state: "
"delete_handle_setup_msgpair_array\n");
js_p->error_code = 0;
PINT_msgpair_init(&sm_p->msgarray_op);
msg_p = &sm_p->msgarray_op.msgpair;
ret = PINT_cached_config_map_to_server(
&metafile_server_addr, sm_p->u.mkdir.metafile_handle,
sm_p->object_ref.fs_id);
if (ret)
{
gossip_err("Failed to map meta server address\n");
js_p->error_code = ret;
return SM_ACTION_COMPLETE;
}
PINT_SERVREQ_REMOVE_FILL(
msg_p->req,
*sm_p->cred_p,
sm_p->object_ref.fs_id,
sm_p->u.mkdir.metafile_handle,
sm_p->hints);
msg_p->fs_id = sm_p->object_ref.fs_id;
msg_p->handle = sm_p->u.mkdir.metafile_handle;
msg_p->retry_flag = PVFS_MSGPAIR_NO_RETRY;
msg_p->comp_fn = mkdir_delete_handle_comp_fn;
msg_p->svr_addr = metafile_server_addr;
gossip_debug(GOSSIP_CLIENT_DEBUG, " Preparing to remove "
"directory handle %llu\n", llu(msg_p->handle));
PINT_sm_push_frame(smcb, 0, &sm_p->msgarray_op);
return SM_ACTION_COMPLETE;
}
static PINT_sm_action mkdir_cleanup(
struct PINT_smcb *smcb, job_status_s *js_p)
{
struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
gossip_debug(GOSSIP_CLIENT_DEBUG, "mkdir state: cleanup\n");
if(sm_p->u.mkdir.val_array)
{
if((sm_p->getattr.attr.mask & PVFS_ATTR_DIR_HINT) &&
(sm_p->getattr.attr.u.dir.hint.dfile_count > 0))
{
free(sm_p->u.mkdir.val_array[0].buffer);
}
free(sm_p->u.mkdir.val_array);
}
if(sm_p->u.mkdir.key_array)
{
free(sm_p->u.mkdir.key_array);
}
PVFS_util_release_sys_attr(&sm_p->u.mkdir.sys_attr);
PINT_SM_GETATTR_STATE_CLEAR(sm_p->getattr);
sm_p->error_code = (sm_p->u.mkdir.stored_error_code ?
sm_p->u.mkdir.stored_error_code :
js_p->error_code);
if (sm_p->error_code == 0)
{
PVFS_object_ref directory_ref;
directory_ref.handle = sm_p->u.mkdir.metafile_handle;
directory_ref.fs_id = sm_p->object_ref.fs_id;
sm_p->u.mkdir.mkdir_resp->ref.handle = directory_ref.handle;
sm_p->u.mkdir.mkdir_resp->ref.fs_id = directory_ref.fs_id;
/* insert newly created directory handle into the ncache */
PINT_ncache_update((const char*) sm_p->u.mkdir.object_name,
(const PVFS_object_ref*) &directory_ref,
(const PVFS_object_ref*) &(sm_p->object_ref));
}
else if ((PVFS_ERROR_CLASS(-sm_p->error_code) == PVFS_ERROR_BMI) &&
(sm_p->u.mkdir.retry_count < sm_p->msgarray_op.params.retry_limit))
{
sm_p->u.mkdir.stored_error_code = 0;
sm_p->u.mkdir.retry_count++;
gossip_debug(GOSSIP_CLIENT_DEBUG, "Retrying mkdir operation "
"(attempt number %d)\n", sm_p->u.mkdir.retry_count);
js_p->error_code = MKDIR_RETRY;
return SM_ACTION_COMPLETE;
}
else
{
PINT_acache_invalidate(sm_p->object_ref);
PVFS_perror_gossip("mkdir failed with error", sm_p->error_code);
}
PINT_SET_OP_COMPLETE;
return SM_ACTION_TERMINATE;
}
/** looks at the attributes of the parent directory and decides if it impacts
* the mkdir in any way
*/
static PINT_sm_action mkdir_parent_getattr_inspect(
struct PINT_smcb *smcb, job_status_s *js_p)
{
struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
PVFS_object_attr *attr = NULL;
gossip_debug(GOSSIP_CLIENT_DEBUG, "mkdir state: parent_getattr_inspect\n");
attr = &sm_p->getattr.attr;
assert(attr);
gossip_debug(GOSSIP_CLIENT_DEBUG, "parent owner: %d, group: %d, perms: %d\n",
(int)attr->owner, (int)attr->group, (int)attr->perms);
/* do we have a setgid bit? */
if(attr->perms & PVFS_G_SGID)
{
gossip_debug(GOSSIP_CLIENT_DEBUG, "parent has setgid bit set.\n");
gossip_debug(GOSSIP_CLIENT_DEBUG, " - modifying requested attr for new file.\n");
sm_p->u.mkdir.sys_attr.group = attr->group;
sm_p->u.mkdir.sys_attr.perms |= PVFS_G_SGID;
/* note that permission checking is left to server even in this case */
}
return SM_ACTION_COMPLETE;
}
static PINT_sm_action mkdir_seteattr_setup_msgpair(
struct PINT_smcb *smcb, job_status_s *js_p)
{
struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
int eattr_count = 0;
int cur_index = 0;
PINT_sm_msgpair_state *msg_p = NULL;
int ret = -PVFS_EINVAL;
/* NOTE: any memory allocated here will be free'd in the cleanup function */
gossip_debug(GOSSIP_CLIENT_DEBUG, "mkdir state: seteattr_setup_msgpair\n");
/* don't set any hint attributes if the parent doesn't have them */
if(!(sm_p->getattr.attr.mask & PVFS_ATTR_DIR_HINT))
{
gossip_debug(GOSSIP_CLIENT_DEBUG, "mkdir state: skipping seteattr\n");
js_p->error_code = MKDIR_SKIP_EATTR;
return SM_ACTION_COMPLETE;
}
/* count how many hints we acquired */
if(sm_p->getattr.attr.u.dir.hint.dfile_count > 0)
eattr_count++;
if(sm_p->getattr.attr.u.dir.hint.dist_name != NULL)
eattr_count++;
if(sm_p->getattr.attr.u.dir.hint.dist_params != NULL)
eattr_count++;
if(eattr_count == 0)
{
/* nothing to inherit */
js_p->error_code = MKDIR_SKIP_EATTR;
return SM_ACTION_COMPLETE;
}
sm_p->u.mkdir.key_array = (PVFS_ds_keyval*)calloc(eattr_count,
sizeof(PVFS_ds_keyval));
if(!sm_p->u.mkdir.key_array)
{
js_p->error_code = -PVFS_ENOMEM;
return SM_ACTION_COMPLETE;
}
sm_p->u.mkdir.val_array = (PVFS_ds_keyval*)calloc(eattr_count,
sizeof(PVFS_ds_keyval));
if(!sm_p->u.mkdir.val_array)
{
js_p->error_code = -PVFS_ENOMEM;
return SM_ACTION_COMPLETE;
}
if(sm_p->getattr.attr.u.dir.hint.dfile_count > 0)
{
gossip_debug(GOSSIP_CLIENT_DEBUG, "mkdir: setting num_dfiles\n");
sm_p->u.mkdir.key_array[cur_index].buffer = "user.pvfs2.num_dfiles";
sm_p->u.mkdir.key_array[cur_index].buffer_sz =
strlen("user.pvfs2.num_dfiles") + 1;
sm_p->u.mkdir.val_array[cur_index].buffer = calloc(1, 16);
if(!sm_p->u.mkdir.val_array[cur_index].buffer)
{
js_p->error_code = -PVFS_ENOMEM;
return SM_ACTION_COMPLETE;
}
#ifdef WIN32
_snprintf((char*)sm_p->u.mkdir.val_array[cur_index].buffer,
16, "%d", sm_p->getattr.attr.u.dir.hint.dfile_count);
#else
snprintf((char*)sm_p->u.mkdir.val_array[cur_index].buffer,
16, "%d", sm_p->getattr.attr.u.dir.hint.dfile_count);
#endif
sm_p->u.mkdir.val_array[cur_index].buffer_sz =
strlen((char*)sm_p->u.mkdir.val_array[cur_index].buffer) + 1;
cur_index++;
}
if(sm_p->getattr.attr.u.dir.hint.dist_name != NULL)
{
gossip_debug(GOSSIP_CLIENT_DEBUG, "mkdir: setting dist_name\n");
sm_p->u.mkdir.key_array[cur_index].buffer = "user.pvfs2.dist_name";
sm_p->u.mkdir.key_array[cur_index].buffer_sz =
strlen("user.pvfs2.dist_name") + 1;
sm_p->u.mkdir.val_array[cur_index].buffer =
sm_p->getattr.attr.u.dir.hint.dist_name;
sm_p->u.mkdir.val_array[cur_index].buffer_sz =
sm_p->getattr.attr.u.dir.hint.dist_name_len;
cur_index++;
}
if(sm_p->getattr.attr.u.dir.hint.dist_params != NULL)
{
gossip_debug(GOSSIP_CLIENT_DEBUG, "mkdir: setting dist_params\n");
sm_p->u.mkdir.key_array[cur_index].buffer = "user.pvfs2.dist_params";
sm_p->u.mkdir.key_array[cur_index].buffer_sz =
strlen("user.pvfs2.dist_params") + 1;
sm_p->u.mkdir.val_array[cur_index].buffer =
sm_p->getattr.attr.u.dir.hint.dist_params;
sm_p->u.mkdir.val_array[cur_index].buffer_sz =
sm_p->getattr.attr.u.dir.hint.dist_params_len;
cur_index++;
}
PINT_msgpair_init(&sm_p->msgarray_op);
msg_p = &sm_p->msgarray_op.msgpair;
PINT_SERVREQ_SETEATTR_FILL(
msg_p->req,
(*sm_p->cred_p),
sm_p->object_ref.fs_id,
sm_p->u.mkdir.metafile_handle,
0,
eattr_count,
sm_p->u.mkdir.key_array,
sm_p->u.mkdir.val_array,
sm_p->hints);
msg_p->fs_id = sm_p->object_ref.fs_id;
msg_p->handle = sm_p->u.mkdir.metafile_handle;
msg_p->retry_flag = PVFS_MSGPAIR_RETRY;
/* NOTE: no comp_fn needed. */
ret = PINT_cached_config_map_to_server(
&msg_p->svr_addr,
msg_p->handle,
msg_p->fs_id);
if (ret)
{
gossip_err("Failed to map meta server address\n");
js_p->error_code = ret;
}
else
{
js_p->error_code = 0;
}
PINT_sm_push_frame(smcb, 0, &sm_p->msgarray_op);
return SM_ACTION_COMPLETE;
}
static PINT_sm_action mkdir_seteattr_failure(
struct PINT_smcb *smcb, job_status_s *js_p)
{
struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
sm_p->u.mkdir.stored_error_code = js_p->error_code;
gossip_debug(GOSSIP_CLIENT_DEBUG, "mkdir state: mkdir_seteattr_failure\n");
PVFS_perror_gossip("mkdir seteattr failed", js_p->error_code);
return SM_ACTION_COMPLETE;
}
/*
* Local variables:
* mode: c
* c-indent-level: 4
* c-basic-offset: 4
* End:
*
* vim: ft=c ts=8 sts=4 sw=4 expandtab
*/