Skip to content
Permalink
1139b72d5e
Switch branches/tags

Name already in use

A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
Go to file
 
 
Cannot retrieve contributors at this time
1293 lines (1082 sloc) 41 KB
/*
* State machine to handle OSD directory operations.
*
* Copyright (C) 2007 OSC PVFS-OSD Team <pvfs-osd@osc.edu>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, version 2 of the License.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <errno.h>
#include <string.h>
#include <assert.h>
#include "client-state-machine.h"
#include "pvfs2-debug.h"
#include "job.h"
#include "gossip.h"
#include "str-utils.h"
#include "pint-cached-config.h"
#include "PINT-reqproto-encode.h"
#include "pint-util.h"
#include "ncache.h"
#include "pvfs2-internal.h"
#include "osd-util/osd-util.h"
extern job_context_id pint_client_sm_context;
enum {
OSD_DIROPS_RETRY = 191,
OSD_PVFS_REMOVE = 192,
OSD_PVFS_LOCK_FAILED = 193,
OSD_PVFS_ATOMIC_FAILED = 194
};
/* OSD directory completion function prototypes */
static int osd_dirops_attr1_comp_fn(
void *v_p, struct PVFS_server_resp *resp_p, int index);
static int osd_dirops_attr1_remove_comp_fn(
void *v_p, struct PVFS_server_resp *resp_p, int index);
static int osd_dirops_attr4_lock_comp_fn(
void *v_p, struct PVFS_server_resp *resp_p, int index);
static int osd_dirops_attr4_remove_comp_fn(
void *v_p, struct PVFS_server_resp *resp_p, int index);
static int osd_dirops_attr4_lookup_comp_fn(
void *v_p, struct PVFS_server_resp *resp_p, int index);
static int osd_dirops_attr4_insert_remove_comp_fn(
void *v_p, struct PVFS_server_resp *resp_p, int index);
static int osd_dirops_attr4_unlock_comp_fn(
void *v_p, struct PVFS_server_resp *resp_p, int index);
%%
nested machine pvfs2_client_osd_dirops_sm
{
state osd_dirops_init
{
run osd_dirops_init;
OSD_DIR_ATTR1 => osd_dirops_attr1;
OSD_DIR_ATTR4 => osd_dirops_attr4;
default => return;
}
state osd_dirops_attr1
{
jump pvfs2_client_osd_dirops_attr1_sm;
default => return;
}
state osd_dirops_attr4
{
jump pvfs2_client_osd_dirops_attr4_sm;
default => return;
}
}
nested machine pvfs2_client_osd_dirops_attr1_sm
{
state osd_dirops_attr1_init
{
run osd_dirops_attr1_init;
OSD_PVFS_REMOVE => osd_dirops_attr1_remove_setup_msgpair;
default => osd_dirops_attr1_setup_msgpair;
}
state osd_dirops_attr1_remove_setup_msgpair
{
run osd_dirops_attr1_remove_setup_msgpair;
success => osd_dirops_attr1_remove_xfer_msgpair;
default => osd_dirops_attr1_failure;
}
state osd_dirops_attr1_remove_xfer_msgpair
{
jump pvfs2_osd_msgpairarray_sm;
success => osd_dirops_attr1_setup_msgpair;
default => osd_dirops_attr1_failure;
}
state osd_dirops_attr1_setup_msgpair
{
run osd_dirops_attr1_setup_msgpair;
success => osd_dirops_attr1_xfer_msgpair;
default => osd_dirops_attr1_failure;
}
state osd_dirops_attr1_xfer_msgpair
{
jump pvfs2_osd_msgpairarray_sm;
success => osd_dirops_attr1_cleanup;
OSD_PVFS_ATOMIC_FAILED => osd_dirops_attr1_init;
default => osd_dirops_attr1_failure;
}
state osd_dirops_attr1_failure
{
run osd_dirops_attr1_failure;
default => osd_dirops_attr1_cleanup;
}
state osd_dirops_attr1_cleanup
{
run osd_dirops_attr1_cleanup;
OSD_DIROPS_RETRY => osd_dirops_attr1_init;
default => return;
}
}
nested machine pvfs2_client_osd_dirops_attr4_sm
{
state osd_dirops_attr4_init
{
run osd_dirops_attr4_init;
default => osd_dirops_attr4_setup_lock_msgpair;
}
state osd_dirops_attr4_setup_lock_msgpair
{
run osd_dirops_attr4_setup_lock_msgpair;
success => osd_dirops_attr4_xfer_lock_msgpair;
default => osd_dirops_attr4_failure;
}
state osd_dirops_attr4_xfer_lock_msgpair
{
jump pvfs2_osd_msgpairarray_sm;
OSD_PVFS_REMOVE => osd_dirops_attr4_remove_setup_msgpair;
OSD_PVFS_LOCK_FAILED => osd_dirops_attr4_init;
success => osd_dirops_attr4_setup_lookup_msgpair;
default => osd_dirops_attr4_failure;
}
state osd_dirops_attr4_remove_setup_msgpair
{
run osd_dirops_attr4_remove_setup_msgpair;
success => osd_dirops_attr4_remove_xfer_msgpair;
default => osd_dirops_attr4_failure;
}
state osd_dirops_attr4_remove_xfer_msgpair
{
jump pvfs2_osd_msgpairarray_sm;
success => osd_dirops_attr4_setup_lookup_msgpair;
default => osd_dirops_attr4_failure;
}
state osd_dirops_attr4_setup_lookup_msgpair
{
run osd_dirops_attr4_setup_lookup_msgpair;
success => osd_dirops_attr4_xfer_lookup_msgpair;
default => osd_dirops_attr4_setup_unlock_msgpair;
}
state osd_dirops_attr4_xfer_lookup_msgpair
{
jump pvfs2_osd_msgpairarray_sm;
success => osd_dirops_attr4_setup_insert_remove_msgpair;
default => osd_dirops_attr4_setup_unlock_msgpair;
}
state osd_dirops_attr4_setup_insert_remove_msgpair
{
run osd_dirops_attr4_setup_insert_remove_msgpair;
success => osd_dirops_attr4_xfer_insert_remove_msgpair;
default => osd_dirops_attr4_setup_unlock_msgpair;
}
state osd_dirops_attr4_xfer_insert_remove_msgpair
{
jump pvfs2_osd_msgpairarray_sm;
success => osd_dirops_attr4_setup_unlock_msgpair;
default => osd_dirops_attr4_setup_unlock_msgpair;
}
state osd_dirops_attr4_setup_unlock_msgpair
{
run osd_dirops_attr4_setup_unlock_msgpair;
success => osd_dirops_attr4_xfer_unlock_msgpair;
default => osd_dirops_attr4_failure;
}
state osd_dirops_attr4_xfer_unlock_msgpair
{
jump pvfs2_osd_msgpairarray_sm;
success => osd_dirops_attr4_cleanup;
default => osd_dirops_attr4_failure;
}
state osd_dirops_attr4_failure
{
run osd_dirops_attr4_failure;
default => osd_dirops_attr4_cleanup;
}
state osd_dirops_attr4_cleanup
{
run osd_dirops_attr4_cleanup;
OSD_DIROPS_RETRY => osd_dirops_attr4_init;
default => return;
}
}
%%
static PINT_sm_action osd_dirops_init(
struct PINT_smcb *smcb, job_status_s *js_p)
{
struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
gossip_debug(GOSSIP_CLIENT_DEBUG, "osd_dirops state: init\n");
if (smcb->op == PVFS_SYS_REMOVE) {
js_p->error_code = fsid_dir_switch(sm_p->parent_ref.fs_id);
} else {
js_p->error_code = fsid_dir_switch(sm_p->object_ref.fs_id);
}
return SM_ACTION_COMPLETE;
}
static PINT_sm_action osd_dirops_attr1_init(
struct PINT_smcb *smcb, job_status_s *js_p)
{
osd_debug("osd_dirops_attr1_init\n");
job_id_t tmp_id;
struct PINT_client_sm *sm_p;
sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
gossip_debug(GOSSIP_CLIENT_DEBUG, "osd_dirops_attr1 state: init\n");
js_p->error_code = (smcb->op == PVFS_SYS_REMOVE) ? OSD_PVFS_REMOVE : 0;
if (js_p->error_code == OSD_DIROPS_RETRY) {
return job_req_sched_post_timer(
sm_p->msgarray_op.params.retry_delay, smcb, 0, js_p, &tmp_id,
pint_client_sm_context);
}
return SM_ACTION_COMPLETE;
}
static PINT_sm_action osd_dirops_attr1_remove_setup_msgpair(
struct PINT_smcb *smcb, job_status_s *js_p)
{
int ret = 0;
uint64_t oid;
char *object_name = NULL;
PINT_sm_msgpair_state *msg_p = NULL;
struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
struct attribute_list attr;
struct osd_command *command = &sm_p->msgarray_op.msgpair.osd_command;
gossip_debug(GOSSIP_CLIENT_DEBUG,
"osd_dirops_attr1: osd_dirops_attr1_remove_setup_msgpair\n");
js_p->error_code = 0;
PINT_msgpair_init(&sm_p->msgarray_op);
msg_p = &sm_p->msgarray_op.msgpair;
oid = sm_p->parent_ref.handle;
ret = osd_command_set_get_attributes(command, PVFS_OSD_META_PID, oid);
if (ret) {
osd_error_xerrno(ret, "%s: osd_command_set_get_attributes failed",
__func__);
js_p->error_code = ret;
return 1;
}
object_name = sm_p->u.remove.object_name;
attr.type = ATTR_GET;
attr.page = ANY_PG + PVFS_USEROBJECT_DIR_PG;
attr.number = jenkins_one_at_a_time_hash((uint8_t *)object_name,
strlen(object_name) + 1);
attr.len = 1024;
ret = osd_command_attr_build(command, &attr, 1);
if (ret) {
osd_error_xerrno(ret, "%s: osd_command_attr_build failed", __func__);
js_p->error_code = ret;
return 1;
}
msg_p->fs_id = sm_p->parent_ref.fs_id;
msg_p->handle = sm_p->parent_ref.handle;
msg_p->retry_flag = PVFS_MSGPAIR_NO_RETRY;
msg_p->comp_fn = osd_dirops_attr1_remove_comp_fn;
ret = PINT_cached_config_map_to_server(&msg_p->svr_addr, msg_p->handle,
msg_p->fs_id);
if (ret) {
gossip_err("Failed to map meta server address\n");
js_p->error_code = ret;
}
return SM_ACTION_COMPLETE;
}
static int osd_dirops_attr1_remove_comp_fn(void *v_p,
struct PVFS_server_resp *resp_p,
int index)
{
int ret;
PVFS_error status = 0;
PINT_smcb *smcb = v_p;
PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
struct osd_command *command = &sm_p->msgarray_op.msgpair.osd_command;
gossip_debug(GOSSIP_CLIENT_DEBUG, "osd_dirops_attr1_remove_comp_fn\n");
status = osd_errno_from_status(sm_p->msgarray_op.msgpair.osd_command.status);
if (status != 0) {
gossip_debug(GOSSIP_CLIENT_DEBUG,
"osd_command_set_get_attributes failed %d\n", index);
return status;
}
ret = osd_command_attr_resolve(command);
if (ret) {
osd_error_xerrno(ret, "%s: osd_command_attr_resolve failed", __func__);
return ret;
}
if (command->attr->outlen == 0) {
status = -ENOENT;
} else {
sm_p->object_ref.handle = get_ntohll(command->attr->val);
sm_p->object_ref.fs_id = sm_p->parent_ref.fs_id;
}
osd_command_attr_free(&sm_p->msgarray_op.msgpair.osd_command);
return status;
}
static PINT_sm_action osd_dirops_attr1_setup_msgpair(
struct PINT_smcb *smcb, job_status_s *js_p)
{
osd_debug("attr1_setup_msgpair\n");
int ret = 0;
uint64_t oid;
uint8_t *dirent = NULL;
char *object_name = NULL;
PINT_sm_msgpair_state *msg_p = NULL;
struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
struct attribute_list attr[3];
struct osd_command *command = &sm_p->msgarray_op.msgpair.osd_command;
gossip_debug(GOSSIP_CLIENT_DEBUG,
"osd_dirops_attr1: osd_dirops_attr1_setup_msgpair\n");
js_p->error_code = 0;
PINT_msgpair_init(&sm_p->msgarray_op);
msg_p = &sm_p->msgarray_op.msgpair;
if (smcb->op == PVFS_SYS_CREATE) {
/* insert dirent */
attr[1].len = 8 + strlen(sm_p->u.create.object_name) + 1;
dirent = malloc(attr[1].len);
if (!dirent) {
js_p->error_code = -PVFS_ENOMEM;
return 1;
}
set_htonll(dirent, sm_p->u.create.metafile_handle);
strcpy((char *)&dirent[8], sm_p->u.create.object_name);
/* For a create operation, object_ref refers to the dir. object */
oid = sm_p->object_ref.handle;
msg_p->fs_id = sm_p->object_ref.fs_id;
msg_p->handle = sm_p->object_ref.handle;
object_name = sm_p->u.create.object_name;
/* cmp */
attr[0].type = ATTR_SET;
attr[0].page = ANY_PG + PVFS_USEROBJECT_DIR_PG;
attr[0].number = jenkins_one_at_a_time_hash((uint8_t *)object_name,
strlen(object_name) + 1);
attr[0].val = NULL;
attr[0].len = 0;
/* swp */
attr[1].type = ATTR_SET;
attr[1].page = ANY_PG + PVFS_USEROBJECT_DIR_PG;
attr[1].number = attr[0].number;
attr[1].val = dirent;
/* result */
/* XXX: this should not be NULL. In the case that the attr already
* existed (like there already was a file /pvfs/x), then the CAS
* will return that original value, not empty. Sufficient to
* grab the first 8 bytes of the result, though, to make sure
* that the handles are the same, like the lock 0/1 case.
*/
attr[2].type = ATTR_RESULT;
attr[2].page = ANY_PG + PVFS_USEROBJECT_DIR_PG;
attr[2].number = attr[0].number;
attr[2].val = NULL;
attr[2].len = 0;
} else if (smcb->op == PVFS_SYS_MKDIR) {
/* insert dirent */
attr[1].len = 8 + strlen(sm_p->u.mkdir.object_name) + 1;
dirent = malloc(attr[1].len);
if (!dirent) {
js_p->error_code = -PVFS_ENOMEM;
return 1;
}
set_htonll(dirent, sm_p->u.mkdir.metafile_handle);
strcpy((char *)&dirent[8], sm_p->u.mkdir.object_name);
/* For a mkdir operation, object_ref refers to the dir. object */
oid = sm_p->object_ref.handle;
msg_p->fs_id = sm_p->object_ref.fs_id;
msg_p->handle = sm_p->object_ref.handle;
object_name = sm_p->u.mkdir.object_name;
osd_debug("msg_p->handle: %d\n", msg_p->handle);
osd_debug("object_name: %s\n", object_name);
/* cmp */
attr[0].type = ATTR_SET;
attr[0].page = ANY_PG + PVFS_USEROBJECT_DIR_PG;
attr[0].number = jenkins_one_at_a_time_hash((uint8_t *)object_name,
strlen(object_name) + 1);
attr[0].val = NULL;
attr[0].len = 0;
/* swp */
attr[1].type = ATTR_SET;
attr[1].page = ANY_PG + PVFS_USEROBJECT_DIR_PG;
attr[1].number = attr[0].number;
attr[1].val = dirent;
/* result */
attr[2].type = ATTR_RESULT;
attr[2].page = ANY_PG + PVFS_USEROBJECT_DIR_PG;
attr[2].number = attr[0].number;
attr[2].val = NULL;
attr[2].len = 0;
} else if (smcb->op == PVFS_SYS_REMOVE) {
/* For a remove operation, parent_ref refers to the dir. object */
oid = sm_p->parent_ref.handle;
msg_p->fs_id = sm_p->parent_ref.fs_id;
msg_p->handle = sm_p->parent_ref.handle;
object_name = sm_p->u.remove.object_name;
/* cmp */
attr[0].type = ATTR_SET;
attr[0].page = ANY_PG + PVFS_USEROBJECT_DIR_PG;
attr[0].number = jenkins_one_at_a_time_hash((uint8_t *)object_name,
strlen(object_name) + 1);
attr[0].len = 8 + strlen(sm_p->u.remove.object_name) + 1;
dirent = malloc(attr[0].len);
if (!dirent) {
js_p->error_code = -PVFS_ENOMEM;
return 1;
}
set_htonll(dirent, sm_p->object_ref.handle);
strcpy((char *)&dirent[8], sm_p->u.remove.object_name);
attr[0].val = dirent;
/* swp */
attr[1].type = ATTR_SET;
attr[1].page = ANY_PG + PVFS_USEROBJECT_DIR_PG;
attr[1].number = attr[0].number;
attr[1].val = NULL;
attr[1].len = 0;
/* result */
attr[2].type = ATTR_RESULT;
attr[2].page = ANY_PG + PVFS_USEROBJECT_DIR_PG;
attr[2].number = attr[0].number;
attr[2].val = dirent;
attr[2].len = attr[0].len;
} else {
return 1;
}
ret = osd_command_set_gen_cas(command, PVFS_OSD_META_PID, oid);
if (ret) {
osd_error_xerrno(ret, "%s: osd_command_set_gen_cas failed", __func__);
js_p->error_code = ret;
return 1;
}
ret = osd_command_attr_build(command, attr, 3);
if (ret) {
osd_error_xerrno(ret, "%s: osd_command_attr_build failed", __func__);
js_p->error_code = ret;
return 1;
}
msg_p->retry_flag = PVFS_MSGPAIR_NO_RETRY;
msg_p->comp_fn = osd_dirops_attr1_comp_fn;
ret = PINT_cached_config_map_to_server(&msg_p->svr_addr, msg_p->handle,
msg_p->fs_id);
if (ret) {
gossip_err("Failed to map meta server address\n");
js_p->error_code = ret;
}
PINT_sm_push_frame(smcb, 0, &sm_p->msgarray_op);
return SM_ACTION_COMPLETE;
}
static int osd_dirops_attr1_comp_fn(void *v_p,
struct PVFS_server_resp *resp_p,
int index)
{
osd_debug("attr1_comp_fn\n");
int ret = 0;
PINT_smcb *smcb = v_p;
PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
struct osd_command *command = &sm_p->msgarray_op.msgpair.osd_command;
gossip_debug(GOSSIP_CLIENT_DEBUG, "osd_dirops_attr1_comp_fn\n");
ret = osd_errno_from_status(sm_p->msgarray_op.msgpair.osd_command.status);
if (ret != 0) {
gossip_debug(GOSSIP_CLIENT_DEBUG,
"osd_command_set_gen_cas failed %d\n", index);
goto out;
}
ret = osd_command_attr_resolve(command);
if (ret) {
osd_error_xerrno(ret, "%s: osd_command_attr_resolve failed", __func__);
goto out;
}
if (smcb->op == PVFS_SYS_CREATE || smcb->op == PVFS_SYS_MKDIR) {
/* insert dirent operation */
if (command->attr[2].val == NULL) {
osd_debug("successful\n");
/* successful insert operation */
ret = 0;
goto out;
} else {
/*
* insert operation failed. either the name is already being
* used or we have a hash collision.
*/
gossip_debug(GOSSIP_CLIENT_DEBUG, "Atomic CAS: insert failed\n");
ret = 1;
goto out;
}
} else {
/* remove dirent operation */
assert(command->attr[0].val && command->attr[2].val);
if (get_ntohll(command->attr[0].val) ==
get_ntohll(command->attr[2].val)) {
/* successful remove operation */
ret = 0;
goto out;
} else {
/* remove operation failed */
gossip_debug(GOSSIP_CLIENT_DEBUG, "Atomic CAS: remove failed\n");
ret = 1;
goto out;
}
}
out:
/* free data alloced for the create, if this was an insert */
if (sm_p->msgarray_op.msgpair.osd_command.attr->len) {
free(sm_p->msgarray_op.msgpair.osd_command.attr->val);
}
osd_command_attr_free(&sm_p->msgarray_op.msgpair.osd_command);
return ret;
}
static PINT_sm_action osd_dirops_attr1_cleanup(
struct PINT_smcb *smcb, job_status_s *js_p)
{
struct PINT_client_sm *sm_p;
sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
gossip_debug(GOSSIP_CLIENT_DEBUG, "osd_dirops_attr1 state: cleanup\n");
/* cleanup code goes here */
return SM_ACTION_COMPLETE;
}
static PINT_sm_action osd_dirops_attr1_failure(
struct PINT_smcb *smcb, job_status_s *js_p)
{
struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
sm_p->u.mkdir.stored_error_code = js_p->error_code;
gossip_debug(GOSSIP_CLIENT_DEBUG, "osd_dirops_attr1 state: failure\n");
/* failure code goes here */
return SM_ACTION_COMPLETE;
}
static PINT_sm_action osd_dirops_attr4_init(
struct PINT_smcb *smcb, job_status_s *js_p)
{
job_id_t tmp_id;
struct PINT_client_sm *sm_p;
sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
gossip_debug(GOSSIP_CLIENT_DEBUG, "osd_dirops_attr4 state: init\n");
js_p->error_code = 0;
if (js_p->error_code == OSD_DIROPS_RETRY) {
return job_req_sched_post_timer(
sm_p->msgarray_op.params.retry_delay, smcb, 0, js_p, &tmp_id,
pint_client_sm_context);
}
return SM_ACTION_COMPLETE;
}
static PINT_sm_action osd_dirops_attr4_setup_lock_msgpair(
struct PINT_smcb *smcb, job_status_s *js_p)
{
int ret = 0;
uint64_t oid;
char *object_name = NULL;
uint64_t *outbuf, *inbuf;
PINT_sm_msgpair_state *msg_p = NULL;
struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
struct osd_command *command = &sm_p->msgarray_op.msgpair.osd_command;
gossip_debug(GOSSIP_CLIENT_DEBUG,
"osd_dirops_attr4: osd_dirops_attr4_setup_lock_msgpair\n");
js_p->error_code = 0;
PINT_msgpair_init(&sm_p->msgarray_op);
msg_p = &sm_p->msgarray_op.msgpair;
/*
* Lock directory. A directory is represented by a single object. The
* directory entries are attributes on that object. object_ref or
* parent_ref here is the parent directory (i.e. OSD object) that will own
* the newly created entry (or that does own the one to be removed). We
* lock entries in it.
*/
if (smcb->op == PVFS_SYS_CREATE) {
/* For a create operation, object_ref refers to the dir. object */
oid = sm_p->object_ref.handle;
msg_p->fs_id = sm_p->object_ref.fs_id;
msg_p->handle = sm_p->object_ref.handle;
object_name = sm_p->u.create.object_name;
} else if (smcb->op == PVFS_SYS_MKDIR) {
/* For a mkdir operation, object_ref refers to the dir. object */
oid = sm_p->object_ref.handle;
msg_p->fs_id = sm_p->object_ref.fs_id;
msg_p->handle = sm_p->object_ref.handle;
object_name = sm_p->u.mkdir.object_name;
} else if (smcb->op == PVFS_SYS_REMOVE) {
/* For a remove operation, parent_ref refers to the dir. object */
oid = sm_p->parent_ref.handle;
msg_p->fs_id = sm_p->parent_ref.fs_id;
msg_p->handle = sm_p->parent_ref.handle;
object_name = sm_p->u.remove.object_name;
} else {
return 1;
}
ret = osd_command_set_cas(command, PVFS_OSD_META_PID, oid, 8, 0);
if (ret) {
osd_error_xerrno(ret, "%s: osd_command_set_cas failed", __func__);
js_p->error_code = ret;
return 1;
}
command->outlen = 2 * sizeof(uint64_t);
outbuf = malloc(command->outlen);
if (!outbuf) {
js_p->error_code = -PVFS_ENOMEM;
return 1;
}
memset(outbuf, 0, command->outlen);
command->outdata = outbuf;
command->inlen_alloc = sizeof(uint64_t);
inbuf = malloc(command->inlen_alloc);
if (!inbuf) {
js_p->error_code = -PVFS_ENOMEM;
return 1;
}
memset(inbuf, 0, command->inlen_alloc);
command->indata = inbuf;
set_htonll((uint8_t *)&outbuf[0], 0); /* cmp */
set_htonll((uint8_t *)&outbuf[1], 1); /* swp */
msg_p->retry_flag = PVFS_MSGPAIR_NO_RETRY;
msg_p->comp_fn = osd_dirops_attr4_lock_comp_fn;
ret = PINT_cached_config_map_to_server(&msg_p->svr_addr, msg_p->handle,
msg_p->fs_id);
if (ret) {
gossip_err("Failed to map meta server address\n");
js_p->error_code = ret;
}
PINT_sm_push_frame(smcb, 0, &sm_p->msgarray_op);
return SM_ACTION_COMPLETE;
}
static int osd_dirops_attr4_lock_comp_fn(void *v_p,
struct PVFS_server_resp *resp_p,
int index)
{
int ret = 0;
PVFS_error status = 0;
PINT_smcb *smcb = v_p;
uint8_t *inbuf, *outbuf;
PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
gossip_debug(GOSSIP_CLIENT_DEBUG, "osd_dirops_attr4_lock_comp_fn\n");
status = osd_errno_from_status(sm_p->msgarray_op.msgpair.osd_command.status);
if (status != 0) {
gossip_debug(GOSSIP_CLIENT_DEBUG,
"osd_command_set_cas failed %d\n", index);
return status;
}
/* osd_command.indata should contain the previous value of the lock */
inbuf = &(sm_p->msgarray_op.msgpair.osd_command.indata);
outbuf = &(sm_p->msgarray_op.msgpair.osd_command.outdata);
assert(inbuf);
assert(outbuf);
if (get_ntohll(&inbuf[0]) != get_ntohll(&outbuf[0])) {
/* lock failed */
gossip_debug(GOSSIP_CLIENT_DEBUG,
"osd_dirops_attr4_lock_comp_fn: lock failed\n");
ret = OSD_PVFS_LOCK_FAILED;
goto out;
}
/* In case its a REMOVE operation we need to jump to a new state */
if (smcb->op == PVFS_SYS_REMOVE) {
ret = OSD_PVFS_REMOVE;
}
out:
/* free the 16 bytes alloced for the CAS operation above */
free((void *)sm_p->msgarray_op.msgpair.osd_command.indata);
free((void *)sm_p->msgarray_op.msgpair.osd_command.outdata);
return ret;
}
static PINT_sm_action osd_dirops_attr4_remove_setup_msgpair(
struct PINT_smcb *smcb, job_status_s *js_p)
{
int ret = 0;
uint64_t oid;
char *object_name = NULL;
PINT_sm_msgpair_state *msg_p = NULL;
struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
struct attribute_list attr;
struct osd_command *command = &sm_p->msgarray_op.msgpair.osd_command;
gossip_debug(GOSSIP_CLIENT_DEBUG,
"osd_dirops_attr4: osd_dirops_attr4_remove_setup_msgpair\n");
js_p->error_code = 0;
PINT_msgpair_init(&sm_p->msgarray_op);
msg_p = &sm_p->msgarray_op.msgpair;
oid = sm_p->parent_ref.handle;
ret = osd_command_set_get_attributes(command, PVFS_OSD_META_PID, oid);
if (ret) {
osd_error_xerrno(ret, "%s: osd_command_set_get_attributes failed",
__func__);
js_p->error_code = ret;
return 1;
}
object_name = sm_p->u.remove.object_name;
attr.type = ATTR_GET;
attr.page = ANY_PG + PVFS_USEROBJECT_DIR_PG;
attr.number = jenkins_one_at_a_time_hash((uint8_t *)object_name,
strlen(object_name) + 1);
attr.len = 1024;
ret = osd_command_attr_build(command, &attr, 1);
if (ret) {
osd_error_xerrno(ret, "%s: osd_command_attr_build failed", __func__);
js_p->error_code = ret;
return 1;
}
msg_p->fs_id = sm_p->parent_ref.fs_id;
msg_p->handle = sm_p->parent_ref.handle;
msg_p->retry_flag = PVFS_MSGPAIR_NO_RETRY;
msg_p->comp_fn = osd_dirops_attr4_remove_comp_fn;
ret = PINT_cached_config_map_to_server(&msg_p->svr_addr, msg_p->handle,
msg_p->fs_id);
if (ret) {
gossip_err("Failed to map meta server address\n");
js_p->error_code = ret;
}
PINT_sm_push_frame(smcb, 0, &sm_p->msgarray_op);
return SM_ACTION_COMPLETE;
}
static int osd_dirops_attr4_remove_comp_fn(void *v_p,
struct PVFS_server_resp *resp_p,
int index)
{
int ret;
PVFS_error status = 0;
PINT_smcb *smcb = v_p;
PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_MSGPAIR_PARENT_SM);
struct osd_command *command = &sm_p->msgarray_op.msgpair.osd_command;
gossip_debug(GOSSIP_CLIENT_DEBUG, "osd_dirops_attr4_remove_comp_fn\n");
status = osd_errno_from_status(sm_p->msgarray_op.msgpair.osd_command.status);
if (status != 0) {
gossip_debug(GOSSIP_CLIENT_DEBUG,
"osd_command_set_get_attributes failed %d\n", index);
return status;
}
ret = osd_command_attr_resolve(command);
if (ret) {
osd_error_xerrno(ret, "%s: osd_command_attr_resolve failed", __func__);
return ret;
}
sm_p->object_ref.handle = get_ntohll(command->attr->val);
sm_p->object_ref.fs_id = sm_p->parent_ref.fs_id;
osd_command_attr_free(&sm_p->msgarray_op.msgpair.osd_command);
return status;
}
static PINT_sm_action osd_dirops_attr4_setup_lookup_msgpair(
struct PINT_smcb *smcb, job_status_s *js_p)
{
int ret = 0;
uint64_t oid;
char *object_name = NULL;
PINT_sm_msgpair_state *msg_p = NULL;
struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
struct attribute_list attr;
struct osd_command *command = &sm_p->msgarray_op.msgpair.osd_command;
gossip_debug(GOSSIP_CLIENT_DEBUG,
"osd_dirops_attr4: osd_dirops_attr4_setup_lookup_msgpair\n");
js_p->error_code = 0;
PINT_msgpair_init(&sm_p->msgarray_op);
msg_p = &sm_p->msgarray_op.msgpair;
if (smcb->op == PVFS_SYS_CREATE) {
/* For a create operation, object_ref refers to the dir. object */
oid = sm_p->object_ref.handle;
msg_p->fs_id = sm_p->object_ref.fs_id;
msg_p->handle = sm_p->object_ref.handle;
object_name = sm_p->u.create.object_name;
} else if (smcb->op == PVFS_SYS_MKDIR) {
/* For a mkdir operation, object_ref refers to the dir. object */
oid = sm_p->object_ref.handle;
msg_p->fs_id = sm_p->object_ref.fs_id;
msg_p->handle = sm_p->object_ref.handle;
object_name = sm_p->u.mkdir.object_name;
} else if (smcb->op == PVFS_SYS_REMOVE) {
/* For a remove operation, parent_ref refers to the dir. object */
oid = sm_p->parent_ref.handle;
msg_p->fs_id = sm_p->parent_ref.fs_id;
msg_p->handle = sm_p->parent_ref.handle;
object_name = sm_p->u.remove.object_name;
} else {
return 1;
}
ret = osd_command_set_get_attributes(command, PVFS_OSD_META_PID, oid);
if (ret) {
osd_error_xerrno(ret, "%s: osd_command_set_get_attributes failed",
__func__);
js_p->error_code = ret;
return 1;
}
/* Perform lookup */
attr.type = ATTR_GET;
attr.page = ANY_PG + PVFS_USEROBJECT_DIR_PG;
attr.number = jenkins_one_at_a_time_hash((uint8_t *)object_name,
strlen(object_name) + 1);
attr.len = 1024;
ret = osd_command_attr_build(command, &attr, 1);
if (ret) {
osd_error_xerrno(ret, "%s: osd_command_attr_build failed", __func__);
js_p->error_code = ret;
return 1;
}
msg_p->retry_flag = PVFS_MSGPAIR_NO_RETRY;
msg_p->comp_fn = osd_dirops_attr4_lookup_comp_fn;
ret = PINT_cached_config_map_to_server(&msg_p->svr_addr, msg_p->handle,
msg_p->fs_id);
if (ret) {
gossip_err("Failed to map meta server address\n");
js_p->error_code = ret;
}
PINT_sm_push_frame(smcb, 0, &sm_p->msgarray_op);
return SM_ACTION_COMPLETE;
}
static int osd_dirops_attr4_lookup_comp_fn(void *v_p,
struct PVFS_server_resp *resp_p,
int index)
{
int ret = 0;
PVFS_error status = 0;
PINT_smcb *smcb = v_p;
PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_MSGPAIR_PARENT_SM);
gossip_debug(GOSSIP_CLIENT_DEBUG, "osd_dirops_attr4_lookup_comp_fn\n");
status = osd_errno_from_status(sm_p->msgarray_op.msgpair.osd_command.status);
if (status != 0) {
gossip_debug(GOSSIP_CLIENT_DEBUG,
"osd_command_set_get_attributes failed %d\n", index);
return status;
}
ret = osd_command_attr_resolve(&sm_p->msgarray_op.msgpair.osd_command);
if (ret) {
osd_error_xerrno(ret, "%s: attr_resolve failed", __func__);
goto out;
}
/*
* If the operation is either PVFS_SYS_CREATE or PVFS_SYS_MKDIR and the
* outlen contains a positive value, then the dirent already exists:
* return 1.
*/
if ((smcb->op == PVFS_SYS_CREATE || smcb->op == PVFS_SYS_MKDIR) &&
sm_p->msgarray_op.msgpair.osd_command.attr->val) {
ret = 1;
gossip_debug(GOSSIP_CLIENT_DEBUG,
"osd_dirops_attr4_lookup_comp_fn: dirent exists\n");
}
out:
osd_command_attr_free(&sm_p->msgarray_op.msgpair.osd_command);
return ret;
}
static PINT_sm_action osd_dirops_attr4_setup_insert_remove_msgpair(
struct PINT_smcb *smcb, job_status_s *js_p)
{
int ret = -1;
uint64_t oid;
uint8_t *dirent = NULL;
char *object_name = NULL;
PINT_sm_msgpair_state *msg_p = NULL;
struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
struct attribute_list attr;
struct osd_command *command = &sm_p->msgarray_op.msgpair.osd_command;
gossip_debug(GOSSIP_CLIENT_DEBUG,
"osd_dirops_attr4: osd_dirops_attr4_setup_insert_remove_msgpair\n");
js_p->error_code = 0;
PINT_msgpair_init(&sm_p->msgarray_op);
msg_p = &sm_p->msgarray_op.msgpair;
/* Insert dirent as attribute of directory object */
attr.type = ATTR_SET;
attr.page = ANY_PG + PVFS_USEROBJECT_DIR_PG;
if (smcb->op == PVFS_SYS_CREATE) {
/* insert dirent */
attr.len = 8 + strlen(sm_p->u.create.object_name) + 1;
dirent = malloc(attr.len);
if (!dirent) {
js_p->error_code = -PVFS_ENOMEM;
return 1;
}
set_htonll(dirent, sm_p->u.create.metafile_handle);
strcpy((char *)&dirent[8], sm_p->u.create.object_name);
attr.val = dirent;
/* For a create operation, object_ref refers to the dir. object */
oid = sm_p->object_ref.handle;
msg_p->fs_id = sm_p->object_ref.fs_id;
msg_p->handle = sm_p->object_ref.handle;
object_name = sm_p->u.create.object_name;
} else if (smcb->op == PVFS_SYS_MKDIR) {
/* insert dirent */
attr.len = 8 + strlen(sm_p->u.mkdir.object_name) + 1;
dirent = malloc(attr.len);
if (!dirent) {
js_p->error_code = -PVFS_ENOMEM;
return 1;
}
set_htonll(dirent, sm_p->u.mkdir.metafile_handle);
strcpy((char *)&dirent[8], sm_p->u.mkdir.object_name);
attr.val = dirent;
/* For a mkdir operation, object_ref refers to the dir. object */
oid = sm_p->object_ref.handle;
msg_p->fs_id = sm_p->object_ref.fs_id;
msg_p->handle = sm_p->object_ref.handle;
object_name = sm_p->u.mkdir.object_name;
} else if (smcb->op == PVFS_SYS_REMOVE) {
/* remove dirent */
attr.len = 0;
attr.val = NULL;
/* For a remove operation, parent_ref refers to the dir. object */
oid = sm_p->parent_ref.handle;
msg_p->fs_id = sm_p->parent_ref.fs_id;
msg_p->handle = sm_p->parent_ref.handle;
object_name = sm_p->u.remove.object_name;
} else {
return 1;
}
attr.number = jenkins_one_at_a_time_hash((uint8_t *)object_name,
strlen(object_name) + 1);
ret = osd_command_set_set_attributes(command, PVFS_OSD_META_PID, oid);
if (ret) {
osd_error_xerrno(ret, "%s: osd_command_set_set_attributes failed",
__func__);
js_p->error_code = ret;
return 1;
}
ret = osd_command_attr_build(command, &attr, 1);
if (ret) {
osd_error_xerrno(ret, "%s: osd_command_attr_build failed", __func__);
js_p->error_code = ret;
return 1;
}
msg_p->retry_flag = PVFS_MSGPAIR_NO_RETRY;
msg_p->comp_fn = osd_dirops_attr4_insert_remove_comp_fn;
ret = PINT_cached_config_map_to_server(&msg_p->svr_addr, msg_p->handle,
msg_p->fs_id);
if (ret) {
gossip_err("Failed to map meta server address\n");
js_p->error_code = ret;
}
PINT_sm_push_frame(smcb, 0, &sm_p->msgarray_op);
return SM_ACTION_COMPLETE;
}
static int osd_dirops_attr4_insert_remove_comp_fn(void *v_p,
struct PVFS_server_resp *resp_p,
int index)
{
PVFS_error status = 0;
PINT_smcb *smcb = v_p;
int ret = 0;
PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_MSGPAIR_PARENT_SM);
gossip_debug(GOSSIP_CLIENT_DEBUG, "%s\n", __func__);
status = osd_errno_from_status(sm_p->msgarray_op.msgpair.osd_command.status);
if (status != 0)
gossip_debug(GOSSIP_CLIENT_DEBUG,
"%s: failed to insert/remove dirent %d\n",
__func__, index);
ret = osd_command_attr_resolve(&sm_p->msgarray_op.msgpair.osd_command);
if (ret) {
osd_error_xerrno(ret, "%s: osd_command_attr_resolve failed", __func__);
return ret;
}
/* free data alloced for the create, if this was an insert */
if (sm_p->msgarray_op.msgpair.osd_command.attr->len)
free(sm_p->msgarray_op.msgpair.osd_command.attr->val);
osd_command_attr_free(&sm_p->msgarray_op.msgpair.osd_command);
return status;
}
static PINT_sm_action osd_dirops_attr4_setup_unlock_msgpair(
struct PINT_smcb *smcb, job_status_s *js_p)
{
int ret = 0;
uint64_t oid;
uint64_t *outbuf, *inbuf;
PINT_sm_msgpair_state *msg_p = NULL;
struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
gossip_debug(GOSSIP_CLIENT_DEBUG,
"osd_dirops_attr4: osd_dirops_attr4_setup_unlock_msgpair\n");
js_p->error_code = 0;
PINT_msgpair_init(&sm_p->msgarray_op);
msg_p = &sm_p->msgarray_op.msgpair;
/*
* Unlock dir. oid should correspond to the directory we're trying to
* unlock.
*/
if (smcb->op == PVFS_SYS_CREATE || smcb->op == PVFS_SYS_MKDIR) {
/* For a create operation, object_ref refers to the dir. object */
oid = sm_p->object_ref.handle;
msg_p->fs_id = sm_p->object_ref.fs_id;
msg_p->handle = sm_p->object_ref.handle;
} else if (smcb->op == PVFS_SYS_REMOVE) {
/* For a remove operation, parent_ref refers to the dir. object */
oid = sm_p->parent_ref.handle;
msg_p->fs_id = sm_p->parent_ref.fs_id;
msg_p->handle = sm_p->parent_ref.handle;
} else {
return 1;
}
struct osd_command *command = &sm_p->msgarray_op.msgpair.osd_command;
ret = osd_command_set_cas(command, PVFS_OSD_META_PID, oid, 8, 0);
if (ret) {
osd_error_xerrno(ret, "%s: osd_command_set_cas failed", __func__);
js_p->error_code = ret;
return 1;
}
command->outlen = 2 * sizeof(uint64_t);
outbuf = malloc(command->outlen);
if (!outbuf) {
js_p->error_code = -PVFS_ENOMEM;
return 1;
}
memset(outbuf, 0, command->outlen);
command->outdata = outbuf;
command->inlen_alloc = sizeof(uint64_t);
inbuf = malloc(command->inlen_alloc);
if (!inbuf) {
js_p->error_code = -PVFS_ENOMEM;
return 1;
}
memset(inbuf, 0, command->inlen_alloc);
command->indata = inbuf;
set_htonll((uint8_t *)&outbuf[0], 1); /* cmp */
set_htonll((uint8_t *)&outbuf[1], 0); /* swp */
msg_p->retry_flag = PVFS_MSGPAIR_NO_RETRY;
msg_p->comp_fn = osd_dirops_attr4_unlock_comp_fn;
ret = PINT_cached_config_map_to_server(&msg_p->svr_addr, msg_p->handle,
msg_p->fs_id);
if (ret) {
gossip_err("Failed to map meta server address\n");
js_p->error_code = ret;
}
PINT_sm_push_frame(smcb, 0, &sm_p->msgarray_op);
return SM_ACTION_COMPLETE;
}
static int osd_dirops_attr4_unlock_comp_fn(void *v_p,
struct PVFS_server_resp *resp_p,
int index)
{
int ret = 0;
PINT_smcb *smcb = v_p;
uint8_t *inbuf, *outbuf;
PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
gossip_debug(GOSSIP_CLIENT_DEBUG, "%s\n", __func__);
ret = osd_errno_from_status(sm_p->msgarray_op.msgpair.osd_command.status);
if (ret != 0) {
gossip_debug(GOSSIP_CLIENT_DEBUG,
"osd_command_set_cas failed %d\n", index);
}
/* osd_command.indata should contain the previous value of the lock */
inbuf = &(sm_p->msgarray_op.msgpair.osd_command.indata);
outbuf = &(sm_p->msgarray_op.msgpair.osd_command.outdata);
assert(inbuf);
assert(outbuf);
if (get_ntohll(&inbuf[0]) == get_ntohll(&outbuf[0])) {
/* unlock successful */
ret = 0;
goto out;
} else {
/* unlock failed */
gossip_debug(GOSSIP_CLIENT_DEBUG,
"osd_dirops_attr4_unlock_comp_fn: unlock failed\n");
ret = 1;
goto out;
}
out:
/* free the 16 bytes alloced for the CAS operation above */
free((void *)sm_p->msgarray_op.msgpair.osd_command.indata);
free((void *)sm_p->msgarray_op.msgpair.osd_command.outdata);
return ret;
}
static PINT_sm_action osd_dirops_attr4_cleanup(
struct PINT_smcb *smcb, job_status_s *js_p)
{
struct PINT_client_sm *sm_p;
sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
gossip_debug(GOSSIP_CLIENT_DEBUG, "osd_dirops_attr4 state: cleanup\n");
/* cleanup code goes here */
return SM_ACTION_COMPLETE;
}
static PINT_sm_action osd_dirops_attr4_failure(
struct PINT_smcb *smcb, job_status_s *js_p)
{
struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
sm_p->u.mkdir.stored_error_code = js_p->error_code;
gossip_debug(GOSSIP_CLIENT_DEBUG, "osd_dirops_attr4 state: failure\n");
/* failure code goes here */
return SM_ACTION_COMPLETE;
}
/*
* Local variables:
* mode: c
* c-indent-level: 4
* c-basic-offset: 4
* End:
*
* vim: ft=c ts=8 sts=4 sw=4 expandtab
*/