Skip to content
Permalink
1139b72d5e
Switch branches/tags

Name already in use

A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
Go to file
 
 
Cannot retrieve contributors at this time
1537 lines (1316 sloc) 49.2 KB
/*
* Copyright © Acxiom Corporation, 2006
*
* See COPYING in top-level directory.
*/
/** \file
* \ingroup sysint
*
* PVFS2 system interface routines for creating files.
*/
#include <string.h>
#include <assert.h>
#include "client-state-machine.h"
#include "pvfs2-debug.h"
#include "pvfs2-dist-simple-stripe.h"
#include "job.h"
#include "gossip.h"
#include "str-utils.h"
#include "pint-cached-config.h"
#include "pint-distribution.h"
#include "PINT-reqproto-encode.h"
#include "pint-util.h"
#include "pint-dist-utils.h"
#include "ncache.h"
#include "pvfs2-internal.h"
#include "pvfs2-dist-varstrip.h"
#include "osd-util/osd-util.h"
extern job_context_id pint_client_sm_context;
enum
{
CREATE_RETRY = 170
};
/* completion function prototypes */
static int create_comp_fn(
void *v_p, struct PVFS_server_resp *resp_p, int index);
static int create_datafiles_comp_fn(
void *v_p, struct PVFS_server_resp *resp_p, int index);
static int create_crdirent_comp_fn(
void *v_p, struct PVFS_server_resp *resp_p, int index);
static int create_delete_handles_comp_fn(
void *v_p, struct PVFS_server_resp *resp_p, int index);
/* misc helper functions */
static PINT_dist* get_default_distribution(PVFS_fs_id fs_id);
enum {
OSD_MSGPAIR = 2001,
OSD_CREATE_MSGPAIR = 2002,
OSD_MDFILE_MSGPAIR = 2003,
OSD_POST_CREATE = 2004
};
%%
machine pvfs2_client_create_sm
{
state init
{
run create_init;
default => parent_getattr;
}
state parent_getattr
{
jump pvfs2_client_getattr_sm;
success => parent_getattr_inspect;
default => cleanup;
}
state parent_getattr_inspect
{
run create_parent_getattr_inspect;
success => create_setup_msgpair;
default => cleanup;
}
state create_setup_msgpair
{
run create_create_setup_msgpair;
OSD_MSGPAIR => create_xfer_osd_msgpair;
OSD_CREATE_MSGPAIR => create_xfer_datafile_msgpair;
OSD_MDFILE_MSGPAIR => datafiles_setup_msgpair_array;
success => create_xfer_msgpair;
default => cleanup;
}
state create_xfer_osd_msgpair
{
jump pvfs2_osd_msgpairarray_sm;
success => datafiles_setup_msgpair_array;
default => cleanup;
}
state create_xfer_msgpair
{
jump pvfs2_msgpairarray_sm;
success => crdirent_setup_msgpair;
default => cleanup;
}
state create_xfer_datafile_msgpair
{
jump pvfs2_msgpairarray_sm;
success => datafiles_setup_msgpair_array;
default => cleanup;
}
state crdirent_setup_msgpair
{
run create_crdirent_setup_msgpair;
OSD_MSGPAIR => crdirent_osd_msgpair;
success => crdirent_xfer_msgpair;
default => crdirent_failure;
}
state crdirent_osd_msgpair
{
jump pvfs2_client_osd_dirops_sm;
success => cleanup;
default => crdirent_failure;
}
state datafiles_setup_msgpair_array
{
run create_datafiles_setup_msgpair_array;
OSD_MSGPAIR => datafiles_xfer_osd_msgpair_array;
default => cleanup;
}
state datafiles_xfer_osd_msgpair_array
{
jump pvfs2_osd_msgpairarray_sm;
success => crdirent_setup_msgpair;
default => crdirent_failure;
}
state crdirent_xfer_msgpair
{
jump pvfs2_msgpairarray_sm;
success => cleanup;
default => crdirent_failure;
}
state crdirent_failure
{
run create_crdirent_failure;
default => delete_handles_setup_msgpair_array;
}
state delete_handles_setup_msgpair_array
{
run create_delete_handles_setup_msgpair_array;
success => delete_handles_xfer_msgpair_array;
default => cleanup;
}
state delete_handles_xfer_msgpair_array
{
jump pvfs2_msgpairarray_sm;
default => cleanup;
}
state cleanup
{
run create_cleanup;
CREATE_RETRY => init;
default => terminate;
}
}
%%
/** Initiate creation of a file with a specified distribution.
*/
PVFS_error PVFS_isys_create(
char *object_name,
PVFS_object_ref parent_ref,
PVFS_sys_attr attr,
const PVFS_credentials *credentials,
PVFS_sys_dist *dist,
PVFS_sys_layout *layout,
PVFS_sysresp_create *resp,
PVFS_sys_op_id *op_id,
PVFS_hint hints,
void *user_ptr)
{
PVFS_error ret = -PVFS_EINVAL;
PINT_smcb *smcb = NULL;
PINT_client_sm *sm_p = NULL;
gossip_debug(GOSSIP_CLIENT_DEBUG, "PVFS_isys_create entered\n");
if ((parent_ref.handle == PVFS_HANDLE_NULL) ||
(parent_ref.fs_id == PVFS_FS_ID_NULL) ||
(object_name == NULL) || (resp == NULL))
{
gossip_err("invalid (NULL) required argument\n");
return ret;
}
if ((attr.mask & PVFS_ATTR_SYS_ALL_SETABLE) != PVFS_ATTR_SYS_ALL_SETABLE)
{
gossip_lerr("PVFS_isys_create() failure: invalid attribute mask: %d, "
"expected SYS_ALL_SETABLE (%d)\n",
attr.mask, PVFS_ATTR_SYS_ALL_SETABLE);
return ret;
}
if ((attr.mask & PVFS_ATTR_SYS_DFILE_COUNT) &&
((attr.dfile_count < 1) ||
(attr.dfile_count > PVFS_REQ_LIMIT_DFILE_COUNT)))
{
gossip_err("Error: invalid number of datafiles (%d) specified "
"in PVFS_sys_create().\n", (int)attr.dfile_count);
return ret;
}
if ((strlen(object_name) + 1) > PVFS_REQ_LIMIT_SEGMENT_BYTES)
{
return -PVFS_ENAMETOOLONG;
}
PINT_smcb_alloc(&smcb, PVFS_SYS_CREATE,
sizeof(struct PINT_client_sm),
client_op_state_get_machine,
client_state_machine_terminate,
pint_client_sm_context);
if (smcb == NULL)
{
return -PVFS_ENOMEM;
}
sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
PINT_init_msgarray_params(sm_p, parent_ref.fs_id);
PINT_init_sysint_credentials(sm_p->cred_p, credentials);
sm_p->u.create.object_name = object_name;
sm_p->u.create.create_resp = resp;
PINT_CONVERT_ATTR(&sm_p->u.create.attr, &attr, PVFS_ATTR_META_ALL);
/* save the original attribute passed in. since create does it's own
* retries we need the original attribute available on retries */
PINT_copy_object_attr(&(sm_p->u.create.store_attr), &(sm_p->u.create.attr));
sm_p->u.create.stored_error_code = 0;
sm_p->u.create.retry_count = 0;
PVFS_hint_copy(hints, &sm_p->hints);
PVFS_hint_add(&sm_p->hints, PVFS_HINT_HANDLE_NAME, sizeof(PVFS_handle), &parent_ref.handle);
sm_p->parent_ref = parent_ref;
if(attr.mask & PVFS_ATTR_SYS_DFILE_COUNT)
{
sm_p->u.create.user_requested_num_data_files = attr.dfile_count;
}
/* copy layout to sm struct */
if(layout)
{
/* make sure it is a supported layout */
switch(layout->algorithm)
{
/* these are valid */
case PVFS_SYS_LAYOUT_NONE:
case PVFS_SYS_LAYOUT_LOCAL:
case PVFS_SYS_LAYOUT_ROUND_ROBIN:
case PVFS_SYS_LAYOUT_RANDOM:
case PVFS_SYS_LAYOUT_LIST:
break;
/* anything else is not */
default:
return(-PVFS_EINVAL);
}
sm_p->u.create.layout.algorithm = layout->algorithm;
if(layout->algorithm == PVFS_SYS_LAYOUT_LIST)
{
sm_p->u.create.layout.server_list.count = layout->server_list.count;
sm_p->u.create.layout.server_list.servers =
malloc(layout->server_list.count * sizeof(PVFS_BMI_addr_t));
if(!sm_p->u.create.layout.server_list.servers)
{
return -PVFS_ENOMEM;
}
memcpy(sm_p->u.create.layout.server_list.servers,
layout->server_list.servers,
layout->server_list.count * sizeof(PVFS_BMI_addr_t));
}
}
else
{
sm_p->u.create.layout.algorithm = PVFS_SYS_LAYOUT_ROUND_ROBIN;
}
sm_p->object_ref = parent_ref;
/* If the user specifies a distribution use that
else, use the default distribution */
if (dist)
{
if (!dist->name)
{
PINT_smcb_free(smcb);
return -PVFS_EINVAL;
}
sm_p->u.create.dist = PINT_dist_create(dist->name);
if (!sm_p->u.create.dist)
{
PINT_smcb_free(smcb);
return -PVFS_ENOMEM;
}
sm_p->u.create.dist->params = dist->params;
}
else
{
/* Get the default distribution */
sm_p->u.create.dist =
get_default_distribution(sm_p->parent_ref.fs_id);
if (!sm_p->u.create.dist)
{
PINT_smcb_free(smcb);
return -PVFS_ENOMEM;
}
}
gossip_debug(
GOSSIP_CLIENT_DEBUG, "Creating file %s under %llu, %d\n",
object_name, llu(parent_ref.handle), parent_ref.fs_id);
return PINT_client_state_machine_post(
smcb, op_id, user_ptr);
}
/** Create a file with a specified distribution.
*/
PVFS_error PVFS_sys_create(
char *object_name, /**< name of the file to create */
PVFS_object_ref parent_ref, /**< handle of the parent dir */
PVFS_sys_attr attr, /**< attributes of new file */
const PVFS_credentials *credentials,/**< identity of the caller */
PVFS_sys_dist *dist, /**< distribution of new file */
PVFS_sysresp_create *resp, /**< response from the request */
PVFS_sys_layout *layout, /**< selection of servers to hold file */
PVFS_hint hints) /**< user supplied PVFS hints */
{
PVFS_error ret = -PVFS_EINVAL, error = 0;
PVFS_sys_op_id op_id;
int ios_num;
struct server_configuration_s *server_config;
server_config = PINT_get_server_config_struct(
parent_ref.fs_id);
PINT_cached_config_get_num_io(parent_ref.fs_id, &ios_num);
if(server_config->energysaving)
{
if(server_config->econumnodes >= ios_num)
{
server_config->econumnodes = 0;
server_config->energysaving = 0;
}
}
PINT_put_server_config_struct(server_config);
gossip_debug(GOSSIP_CLIENT_DEBUG, "PVFS_sys_create entered\n");
ret = PVFS_isys_create(object_name, parent_ref, attr, credentials,
dist, layout, resp, &op_id, hints, NULL);
if (ret)
{
PVFS_perror_gossip("PVFS_isys_create call", ret);
error = ret;
}
else
{
ret = PVFS_sys_wait(op_id, "create", &error);
if (ret)
{
PVFS_perror_gossip("PVFS_sys_wait call", ret);
error = ret;
}
}
PINT_sys_release(op_id);
return error;
}
/****************************************************************/
static PINT_sm_action create_init(
struct PINT_smcb *smcb, job_status_s *js_p)
{
struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
job_id_t tmp_id;
assert((js_p->error_code == 0) ||
(js_p->error_code == CREATE_RETRY));
PINT_SM_GETATTR_STATE_FILL(
sm_p->getattr,
sm_p->object_ref,
PVFS_ATTR_COMMON_ALL|PVFS_ATTR_DIR_HINT,
PVFS_TYPE_DIRECTORY,
0);
if (js_p->error_code == CREATE_RETRY)
{
js_p->error_code = 0;
/* call for getattr_acache_lookup (next state) */
PINT_SM_GETATTR_STATE_FILL(
sm_p->getattr,
sm_p->object_ref,
PVFS_ATTR_COMMON_ALL|PVFS_ATTR_DIR_HINT,
PVFS_TYPE_DIRECTORY,
0);
return job_req_sched_post_timer(
sm_p->msgarray_op.params.retry_delay, smcb, 0, js_p, &tmp_id,
pint_client_sm_context);
}
return SM_ACTION_COMPLETE;
}
static int create_comp_fn(void *v_p,
struct PVFS_server_resp *resp_p,
int index)
{
PINT_smcb *smcb = v_p;
PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_MSGPAIR_PARENT_SM);
struct server_configuration_s *server_config;
server_config = PINT_get_server_config_struct(
sm_p->object_ref.fs_id);
PINT_put_server_config_struct(server_config);
int is_osd_md = fsid_is_osd_md(sm_p->object_ref.fs_id);
int is_osd_meta = fsid_is_osd_meta(sm_p->object_ref.fs_id);
gossip_debug(GOSSIP_CLIENT_DEBUG, "create_create_comp_fn\n");
if (is_osd_meta) {
int ret;
uint64_t oid;
PVFS_error status;
status = osd_errno_from_status(
sm_p->msgarray_op.msgpair.osd_command.status);
if (status != 0) {
return status;
}
/* otherwise, just stash the newly created meta handle */
ret = osd_command_attr_resolve(&sm_p->msgarray_op.msgpair.osd_command);
if (ret) {
osd_error_xerrno(ret, "%s: attr_resolve failed", __func__);
}
oid = get_ntohll(sm_p->msgarray_op.msgpair.osd_command.attr[0].val);
sm_p->u.create.metafile_handle = oid;
sm_p->u.create.create_resp->ref.handle = oid;
sm_p->u.create.create_resp->ref.fs_id = sm_p->object_ref.fs_id;
osd_command_attr_free(&sm_p->msgarray_op.msgpair.osd_command);
} else {
assert(resp_p->op == PVFS_SERV_CREATE);
if (resp_p->status != 0)
{
return resp_p->status;
}
/* otherwise, just stash the newly created meta handle */
sm_p->u.create.datafile_count = resp_p->u.create.datafile_count;
sm_p->u.create.datafile_handles = malloc(
sizeof(*sm_p->u.create.datafile_handles) *
sm_p->u.create.datafile_count);
if(!sm_p->u.create.datafile_handles)
{
return -PVFS_ENOMEM;
}
memcpy(sm_p->u.create.datafile_handles,
resp_p->u.create.datafile_handles,
(sizeof(*sm_p->u.create.datafile_handles) *
resp_p->u.create.datafile_count));
if (is_osd_md && server_config->post_create)
sm_p->u.create.metafile_handle = sm_p->u.create.datafile_handles[0];
else
sm_p->u.create.metafile_handle = resp_p->u.create.metafile_handle;
sm_p->u.create.stuffed = resp_p->u.create.stuffed;
}
gossip_debug(
GOSSIP_CLIENT_DEBUG, "*** Got newly created handle %llu\n",
llu(sm_p->u.create.metafile_handle));
return 0;
}
static int create_datafiles_comp_fn(void *v_p,
struct PVFS_server_resp *resp_p,
int index)
{
int ret;
PVFS_error status;
PINT_smcb *smcb = v_p;
PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_MSGPAIR_PARENT_SM);
int is_osd = fsid_is_osd(sm_p->object_ref.fs_id);
int is_osd_md = fsid_is_osd_md(sm_p->object_ref.fs_id);
gossip_debug(GOSSIP_CLIENT_DEBUG,
"create_datafiles_comp_fn[%d]\n",index);
if (is_osd) {
status = osd_errno_from_status(
sm_p->msgarray_op.msgarray[index].osd_command.status);
} else {
assert(resp_p->op == PVFS_SERV_CREATE);
status = resp_p->status;
}
if (status != 0)
{
gossip_err("%s: Failed to create data handle at server: %s "
"for file: %s\n",
__func__,
BMI_addr_rev_lookup(
sm_p->u.create.data_server_addrs[index]),
sm_p->u.create.object_name);
PVFS_perror_gossip("Creation failure", status);
return status;
}
/* allocate memory for the data handles if we haven't already */
if (sm_p->u.create.datafile_handles == NULL)
{
sm_p->u.create.datafile_handles = (PVFS_handle *)malloc(
sm_p->u.create.num_data_files * sizeof(PVFS_handle));
if (sm_p->u.create.datafile_handles == NULL)
{
gossip_err("create: Failed to allocate data handle array\n");
return -PVFS_ENOMEM;
}
memset(sm_p->u.create.datafile_handles, 0,
sm_p->u.create.num_data_files * sizeof(PVFS_handle));
}
/* otherwise, just stash the newly created data file handle */
if (is_osd) {
uint64_t oid;
PVFS_handle_extent_array *ea
= &sm_p->u.create.io_handle_extent_array[index];
PVFS_handle first = ea->extent_array[0].first;
PVFS_handle last = ea->extent_array[0].last;
ret = osd_command_attr_resolve(&sm_p->msgarray_op.msgarray[index].osd_command);
if (ret)
osd_error_xerrno(ret, "%s: attr_resolve failed", __func__);
if (sm_p->u.create.datafile_handles[index])
oid = sm_p->u.create.datafile_handles[index];
else {
oid = get_ntohll(sm_p->msgarray_op.msgarray[index].osd_command.attr[6].val);
sm_p->u.create.datafile_handles[index] = oid;
}
/*if (oid < first || oid > last)
gossip_err("%s: OSD-assigned oid %llu out of range %llu-%llu\n",
__func__, llu(oid), llu(first), llu(last));*/
/* The first datafile also happens to be the metafile */
if (is_osd_md && index == 0) {
sm_p->u.create.metafile_handle = oid;
sm_p->u.create.create_resp->ref.handle = oid;
sm_p->u.create.create_resp->ref.fs_id = sm_p->object_ref.fs_id;
}
sm_p->u.create.datafile_count = 1;
osd_command_attr_free(&sm_p->msgarray_op.msgarray[index].osd_command);
}
gossip_debug(GOSSIP_CLIENT_DEBUG, "Datafile handle %d is %llu\n",
index, llu(sm_p->u.create.datafile_handles[index]));
return ret;
}
static int create_crdirent_comp_fn(void *v_p,
struct PVFS_server_resp *resp_p,
int index)
{
gossip_debug(GOSSIP_CLIENT_DEBUG, "create_crdirent_comp_fn\n");
assert(resp_p->op == PVFS_SERV_CRDIRENT);
return resp_p->status;
}
static PINT_sm_action create_create_setup_msgpair(
struct PINT_smcb *smcb, job_status_s *js_p)
{
struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
int ret = -PVFS_EINVAL;
PVFS_handle_extent_array meta_handle_extent_array;
PINT_sm_msgpair_state *msg_p = NULL;
int server_type;
struct server_configuration_s *server_config;
server_config = PINT_get_server_config_struct(
sm_p->object_ref.fs_id);
PINT_put_server_config_struct(server_config);
/*
* Check whether we're storing file metadata on a dedicated OSD MDS
* or as attributes of the first datafile.
*/
int is_osd_md = fsid_is_osd_md(sm_p->object_ref.fs_id);
int is_osd_meta = fsid_is_osd_meta(sm_p->object_ref.fs_id);
int is_osd = fsid_is_osd(sm_p->object_ref.fs_id);
gossip_debug(GOSSIP_CLIENT_DEBUG, "create state: "
"dspace_create_setup_msgpair\n");
gossip_debug(GOSSIP_CLIENT_DEBUG," create: posting create req\n");
if (is_osd_md) {
/*
* OK, so osd_type == OSD_MDFILE which means that we're storing the
* file metadata as attributes of the first datafile. There is nothing
* else to be done here so we'll go ahead and skip the
* dspace_create_xfer state and jump directly to the
* datafiles_setup_msgpair_array state.
*/
js_p->error_code = OSD_MDFILE_MSGPAIR;
return SM_ACTION_COMPLETE;
}
/* reset the attributes to what got passed in to the sysint call. the retry
* path comes through here so we'll want to reset it after each try.
* force the mask to all meta attributes. */
if( sm_p->u.create.retry_count > 0 )
{
PINT_copy_object_attr(&(sm_p->u.create.attr),
&(sm_p->u.create.store_attr));
sm_p->u.create.attr.mask |= PVFS_ATTR_META_ALL;
}
PINT_msgpair_init(&sm_p->msgarray_op);
msg_p = &sm_p->msgarray_op.msgpair;
ret = PINT_cached_config_get_next_meta(
sm_p->object_ref.fs_id, &msg_p->svr_addr, &meta_handle_extent_array, 0);
if(ret != 0)
{
gossip_err("Failed to map meta server address\n");
js_p->error_code = ret;
return SM_ACTION_COMPLETE;
}
/* resolve and print selected server only if gossip debugging enabled */
if(gossip_debug_enabled(GOSSIP_CLIENT_DEBUG))
{
gossip_debug(GOSSIP_CLIENT_DEBUG,
"PVFS_isys_create() selected meta server: %s\n",
PINT_cached_config_map_addr(sm_p->object_ref.fs_id,
msg_p->svr_addr,
&server_type));
}
if (is_osd_meta) {
struct osd_command *command = &sm_p->msgarray_op.msgpair.osd_command;
struct attribute_list attr = { ATTR_GET, CUR_CMD_ATTR_PG,
CCAP_OID, NULL, CCAP_OID_LEN };
ret = osd_command_set_create(command, PVFS_OSD_META_PID, 0, 1);
if (ret) {
osd_error_xerrno(ret, "%s: osd_command_set_create failed",
__func__);
js_p->error_code = ret;
return SM_ACTION_COMPLETE;
}
ret = osd_command_attr_build(command, &attr, 1);
if (ret) {
osd_error_xerrno(ret, "%s: osd_command_attr_build failed",
__func__);
js_p->error_code = ret;
return SM_ACTION_COMPLETE;
}
js_p->error_code = OSD_MSGPAIR;
} else {
PINT_SERVREQ_CREATE_FILL(
msg_p->req,
*sm_p->cred_p,
sm_p->object_ref.fs_id,
sm_p->u.create.attr,
sm_p->u.create.num_data_files,
server_config->econumnodes,
sm_p->u.create.layout,
sm_p->hints);
js_p->error_code = 0;
if (is_osd && !server_config->post_create)
js_p->error_code = OSD_CREATE_MSGPAIR;
}
msg_p->fs_id = sm_p->object_ref.fs_id;
msg_p->handle = meta_handle_extent_array.extent_array[0].first;
msg_p->retry_flag = PVFS_MSGPAIR_RETRY;
msg_p->comp_fn = create_comp_fn;
msg_p->req.u.create.attr.u.meta.dfile_count = 0;
msg_p->req.u.create.attr.u.meta.dist =
sm_p->u.create.dist;
msg_p->req.u.create.attr.u.meta.dist_size =
PINT_DIST_PACK_SIZE(sm_p->u.create.dist);
msg_p->req.u.create.attr.cid = sm_p->getattr.attr.cid;
sm_p->u.create.attr.cid = sm_p->getattr.attr.cid;
PINT_sm_push_frame(smcb, 0, &sm_p->msgarray_op);
return SM_ACTION_COMPLETE;
}
static PINT_sm_action create_datafiles_setup_msgpair_array(
struct PINT_smcb *smcb, job_status_s *js_p)
{
struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
int ret = -PVFS_EINVAL;
struct server_configuration_s *server_config;
int is_osd;
int is_osd_md = fsid_is_osd_md(sm_p->object_ref.fs_id);
PINT_sm_msgpair_state *msg_p = NULL;
int num_attr = 8, i;
gossip_debug(GOSSIP_CLIENT_DEBUG, "create state: "
"datafiles_setup_msgpair_array\n");
server_config = PINT_get_server_config_struct(
sm_p->object_ref.fs_id);
PINT_put_server_config_struct(server_config);
is_osd = (server_config->osd_type != OSD_NONE);
js_p->error_code = is_osd ? OSD_MSGPAIR : 0;
/* allocate handle extent array objects */
if (sm_p->u.create.io_handle_extent_array == NULL)
{
sm_p->u.create.io_handle_extent_array = (PVFS_handle_extent_array *)
malloc(sm_p->u.create.num_data_files *
sizeof(PVFS_handle_extent_array));
}
if (!sm_p->u.create.io_handle_extent_array)
{
gossip_err("create: failed to allocate handle_extent_array\n");
js_p->error_code = -PVFS_ENOMEM;
return SM_ACTION_COMPLETE;
}
/* allocate data server bmi address array */
if (sm_p->u.create.data_server_addrs == NULL)
{
sm_p->u.create.data_server_addrs = (PVFS_BMI_addr_t *)malloc(
sm_p->u.create.num_data_files * sizeof(PVFS_BMI_addr_t));
}
if (!sm_p->u.create.data_server_addrs)
{
gossip_err("create: failed to allocate data server addrs\n");
js_p->error_code = -PVFS_ENOMEM;
return SM_ACTION_COMPLETE;
}
ret = PINT_cached_config_map_servers(
sm_p->object_ref.fs_id,
&sm_p->u.create.num_data_files,
&sm_p->u.create.layout,
sm_p->u.create.data_server_addrs,
sm_p->u.create.io_handle_extent_array);
if(ret < 0)
{
gossip_err("create: failed to map the layout to a set of IO servers\n");
js_p->error_code = ret;
return 1;
}
if (ret)
{
gossip_err("Failed to retrieve data server addresses\n");
js_p->error_code = ret;
return SM_ACTION_COMPLETE;
}
PINT_msgpair_init(&sm_p->msgarray_op);
msg_p = &sm_p->msgarray_op.msgpair;
#define CURRENT_COMMAND_PAGE 0xfffffffeUL
#define CURRENT_COMMAND_PAGE_OID 4
if (is_osd) {
uint64_t attrval;
struct osd_command *command = &sm_p->msgarray_op.msgarray[0].osd_command;
if (is_osd_md)
{
/* needed for osd_md */
char *dist_buf;
sm_p->u.create.attr.perms = PVFS_PERM_VALID;
sm_p->u.create.attr.mask = PVFS_ATTR_COMMON_UID |
PVFS_ATTR_COMMON_GID |
PVFS_ATTR_COMMON_PERM |
PVFS_ATTR_COMMON_ATIME |
PVFS_ATTR_COMMON_CTIME |
PVFS_ATTR_COMMON_MTIME |
PVFS_ATTR_META_DIST |
PVFS_ATTR_META_DFILES |
PVFS_ATTR_COMMON_TYPE;
sm_p->u.create.attr.objtype = PVFS_TYPE_METAFILE;
dist_buf = (char *)malloc(PINT_DIST_PACK_SIZE(sm_p->u.create.dist));
if (!dist_buf) {
js_p->error_code = -PVFS_ENOMEM;
return 1;
}
PINT_dist_encode(dist_buf, sm_p->u.create.dist);
struct attribute_list attrs[num_attr];
for (i = 0; i < 6; i++) {
attrs[i].type = ATTR_SET;
attrs[i].page = ANY_PG + PVFS_USEROBJECT_ATTR_PG;
attrs[i].number = i;
}
attrs[0].val = &sm_p->u.create.attr.owner;
attrs[0].len = sizeof(PVFS_uid);
attrs[1].val = &sm_p->u.create.attr.group;
attrs[1].len = sizeof(PVFS_gid);
attrs[2].val = &sm_p->u.create.attr.perms;
attrs[2].len = sizeof(PVFS_permissions);
attrs[3].val = &sm_p->u.create.attr.mask;
attrs[3].len = sizeof(uint32_t);
attrs[4].val = &sm_p->u.create.attr.objtype;
attrs[4].len = sizeof(PVFS_ds_type);
attrs[5].val = dist_buf;
attrs[5].len = PINT_DIST_PACK_SIZE(sm_p->u.create.dist);
if(!sm_p->getattr.attr.cid) {
sm_p->getattr.attr.cid = sm_p->object_ref.handle; /* root directory */
}
set_htonll(&attrval, sm_p->getattr.attr.cid);
attrs[7].type = ATTR_SET;
attrs[7].page = ANY_PG + USER_COLL_PG;
attrs[7].number = 7;
attrs[7].val = &attrval;
attrs[7].len = 8;
if (sm_p->u.create.datafile_handles)
{
attrs[6].type = ATTR_SET;
attrs[6].page = ANY_PG + PVFS_USEROBJECT_ATTR_PG;
attrs[6].number = 6;
attrs[6].val = sm_p->u.create.datafile_handles;
attrs[6].len = sizeof(PVFS_handle) * sm_p->u.create.num_data_files;
ret = osd_command_set_create(command, PVFS_OSD_DATA_PID, sm_p->u.create.datafile_handles[0], 1);
}
else
{
/* retrieve oid */
attrs[6].type = ATTR_GET;
attrs[6].page = CUR_CMD_ATTR_PG;
attrs[6].number = CCAP_OID;
attrs[6].val = NULL;
attrs[6].len = CCAP_OID_LEN;
ret = osd_command_set_create(command, PVFS_OSD_DATA_PID, 0, 1);
}
if (ret) {
osd_error_xerrno(ret, "%s: osd_command_set_create failed",
__func__);
js_p->error_code = ret;
return 1;
}
ret = osd_command_attr_build(command, attrs, num_attr);
if (ret) {
osd_error_xerrno(ret, "%s: osd_command_attr_build failed",
__func__);
js_p->error_code = ret;
return 1;
}
}
else
{
struct attribute_list attrs = {ATTR_SET, ANY_PG + USER_COLL_PG, 1, &attrval, 8};
if(!sm_p->getattr.attr.cid) {
sm_p->getattr.attr.cid = COLLECTION_OID_LB; /* root directory */
}
set_htonll(&attrval, sm_p->getattr.attr.cid);
ret = osd_command_set_create(command, PVFS_OSD_DATA_PID, sm_p->u.create.datafile_handles[0], 1);
if (ret) {
osd_error_xerrno(ret, "%s: osd_command_set_create failed",
__func__);
js_p->error_code = ret;
return 1;
}
ret = osd_command_attr_build(command, &attrs, 1);
if (ret) {
osd_error_xerrno(ret, "%s: osd_command_attr_build failed",
__func__);
js_p->error_code = ret;
return 1;
}
}
}
msg_p->fs_id = sm_p->object_ref.fs_id;
if (is_osd_md)
msg_p->handle = sm_p->u.create.io_handle_extent_array[0].extent_array[0].first;
else
msg_p->handle = sm_p->u.create.datafile_handles[0];
msg_p->retry_flag = PVFS_MSGPAIR_NO_RETRY;
msg_p->comp_fn = create_datafiles_comp_fn;
ret = PINT_cached_config_map_to_server(
&msg_p->svr_addr, msg_p->handle,
sm_p->object_ref.fs_id);
if (ret)
{
gossip_err("Failed to map meta server address\n");
js_p->error_code = ret;
}
PINT_sm_push_frame(smcb, 0, &sm_p->msgarray_op);
return SM_ACTION_COMPLETE;
}
static PINT_sm_action create_crdirent_setup_msgpair(
struct PINT_smcb *smcb, job_status_s *js_p)
{
struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
int ret = -1;
PINT_sm_msgpair_state *msg_p = NULL;
gossip_debug(GOSSIP_CLIENT_DEBUG,
"create state: crdirent_setup_msgpair\n");
gossip_debug(GOSSIP_CLIENT_DEBUG,
"create: %s: posting crdirent req: parent handle: %llu, "
"name: %s, handle: %llu\n",
__func__,
llu(sm_p->object_ref.handle), sm_p->u.create.object_name,
llu(sm_p->u.create.metafile_handle));
PINT_msgpair_init(&sm_p->msgarray_op);
msg_p = &sm_p->msgarray_op.msgpair;
ret = PINT_cached_config_map_to_server(
&msg_p->svr_addr, sm_p->object_ref.handle,
sm_p->object_ref.fs_id);
if (ret)
{
gossip_err("Failed to map meta server address\n");
js_p->error_code = ret;
}
if (server_is_osd(msg_p->svr_addr)) {
/*
* Directory operations for metafile and mdfile. We don't do anything
* here because we'll handle the individual directory operations in
* different states.
*/
js_p->error_code = OSD_MSGPAIR;
return SM_ACTION_COMPLETE;
} else {
PINT_SERVREQ_CRDIRENT_FILL(
msg_p->req,
*sm_p->cred_p,
sm_p->u.create.object_name,
sm_p->u.create.metafile_handle,
sm_p->object_ref.handle,
sm_p->object_ref.fs_id,
sm_p->hints);
js_p->error_code = 0;
}
msg_p->fs_id = sm_p->object_ref.fs_id;
msg_p->handle = sm_p->object_ref.handle;
msg_p->retry_flag = PVFS_MSGPAIR_NO_RETRY;
msg_p->comp_fn = create_crdirent_comp_fn;
PINT_sm_push_frame(smcb, 0, &sm_p->msgarray_op);
return SM_ACTION_COMPLETE;
}
static PINT_sm_action create_crdirent_failure(
struct PINT_smcb *smcb, job_status_s *js_p)
{
struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
gossip_debug(GOSSIP_CLIENT_DEBUG, "create state: crdirent_failure\n");
sm_p->u.create.stored_error_code = js_p->error_code;
if (sm_p->u.create.stored_error_code == -PVFS_EEXIST)
{
gossip_debug(GOSSIP_CLIENT_DEBUG, "crdirent failed: "
"dirent already exists!\n");
}
return SM_ACTION_COMPLETE;
}
static PINT_sm_action create_cleanup(
struct PINT_smcb *smcb, job_status_s *js_p)
{
struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
PVFS_object_ref metafile_ref;
PVFS_size tmp_size = 0;
int ret;
gossip_debug(GOSSIP_CLIENT_DEBUG, "create state: cleanup\n");
PINT_free_object_attr(&sm_p->u.create.attr);
if(js_p->error_code == OSD_POST_CREATE)
js_p->error_code = 0;
PINT_SM_GETATTR_STATE_CLEAR(sm_p->getattr);
sm_p->error_code = (sm_p->u.create.stored_error_code ?
sm_p->u.create.stored_error_code :
js_p->error_code);
memset(&metafile_ref, 0, sizeof(metafile_ref));
if (sm_p->error_code == 0)
{
metafile_ref.handle = sm_p->u.create.metafile_handle;
metafile_ref.fs_id = sm_p->object_ref.fs_id;
metafile_ref.cid = sm_p->getattr.attr.cid;
/* fill in outgoing response fields */
sm_p->u.create.create_resp->ref = metafile_ref;
/* insert newly created metafile into the ncache */
PINT_ncache_update((const char*) sm_p->u.create.object_name,
(const PVFS_object_ref*) &metafile_ref,
(const PVFS_object_ref*) &(sm_p->object_ref));
sm_p->u.create.attr.mask |= PVFS_ATTR_META_DFILES;
sm_p->u.create.attr.u.meta.dfile_array =
sm_p->u.create.datafile_handles;
sm_p->u.create.attr.u.meta.dfile_count =
sm_p->u.create.datafile_count;
if(sm_p->u.create.stuffed)
{
gossip_debug(GOSSIP_CLIENT_DEBUG, "created stuffed file\n");
sm_p->u.create.attr.u.meta.stuffed_size = 0;
}
else
{
gossip_debug(GOSSIP_CLIENT_DEBUG, "created un-stuffed file\n");
sm_p->u.create.attr.mask |= PVFS_ATTR_META_UNSTUFFED;
}
if(sm_p->u.create.dist)
{
sm_p->u.create.attr.u.meta.dist = sm_p->u.create.dist;
sm_p->u.create.attr.u.meta.dist_size = PINT_DIST_PACK_SIZE(sm_p->u.create.dist);
sm_p->u.create.attr.mask |= PVFS_ATTR_META_DIST;
}
/* we only insert a cache entry if the entire create succeeds,
* set size to 0
*/
/* Also, make sure to clear time masks. The server is responsible
* for setting that.
*/
sm_p->u.create.attr.mask &= (~(PVFS_ATTR_COMMON_MTIME));
sm_p->u.create.attr.mask &= (~(PVFS_ATTR_COMMON_CTIME));
sm_p->u.create.attr.mask &= (~(PVFS_ATTR_COMMON_ATIME));
ret = PINT_acache_update(metafile_ref,
&sm_p->u.create.attr,
&tmp_size);
if(ret < 0)
{
js_p->error_code = ret;
}
}
else if ((PVFS_ERROR_CLASS(-sm_p->error_code) == PVFS_ERROR_BMI) &&
(sm_p->u.create.retry_count < sm_p->msgarray_op.params.retry_limit))
{
sm_p->u.create.stored_error_code = 0;
sm_p->u.create.retry_count++;
gossip_debug(GOSSIP_CLIENT_DEBUG, "Retrying create operation "
"(attempt number %d)\n", sm_p->u.create.retry_count);
js_p->error_code = CREATE_RETRY;
return SM_ACTION_COMPLETE;
}
if(sm_p->u.create.layout.algorithm == PVFS_SYS_LAYOUT_LIST)
{
free(sm_p->u.create.layout.server_list.servers);
sm_p->u.create.layout.server_list.servers = NULL;
}
if(sm_p->u.create.dist)
{
PINT_dist_free(sm_p->u.create.dist);
sm_p->u.create.dist = NULL;
}
if(sm_p->u.create.datafile_handles)
{
free(sm_p->u.create.datafile_handles);
sm_p->u.create.datafile_handles = NULL;
}
PINT_msgpairarray_destroy(&sm_p->msgarray_op);
PINT_SET_OP_COMPLETE;
return SM_ACTION_TERMINATE;
}
/** looks at the attributes of the parent directory and decides if it impacts
* the file creation in any way
*/
static PINT_sm_action create_parent_getattr_inspect(
struct PINT_smcb *smcb, job_status_s *js_p)
{
struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
PVFS_object_attr *attr = NULL;
PINT_dist *current_dist;
int ret = 0;
int num_dfiles_requested = 0;
gossip_debug(GOSSIP_CLIENT_DEBUG, "create state: parent_getattr_inspect\n");
attr = &sm_p->getattr.attr;
assert(attr);
gossip_debug(GOSSIP_CLIENT_DEBUG, "parent owner: %d, group: %d, perms: %d\n",
(int)attr->owner, (int)attr->group, (int)attr->perms);
/* do we have a setgid bit? */
if(attr->perms & PVFS_G_SGID)
{
gossip_debug(GOSSIP_CLIENT_DEBUG, "parent has setgid bit set.\n");
gossip_debug(GOSSIP_CLIENT_DEBUG, " - modifying requested attr "
"for new file.\n");
sm_p->u.create.attr.group = attr->group;
/* note that permission checking is left to server even in this case */
}
gossip_debug(GOSSIP_CLIENT_DEBUG, "create_parent_getattr: [%p] "
"dfile_count = %d "
"dist_name_len = %d "
"dist_params_len = %d\n",
attr,
attr->u.dir.hint.dfile_count,
attr->u.dir.hint.dist_name_len,
attr->u.dir.hint.dist_params_len);
current_dist = sm_p->u.create.dist;
/* We have an overriding distribution name for this directory.. honor that */
if (attr->u.dir.hint.dist_name_len > 0)
{
/* switch it only if it is different! */
if (strcmp(attr->u.dir.hint.dist_name, current_dist->dist_name))
{
PINT_dist *new_dist = NULL;
new_dist = PINT_dist_create(attr->u.dir.hint.dist_name);
if (new_dist)
{
gossip_debug(GOSSIP_CLIENT_DEBUG,
"Overridding distribution name to %s instead of %s\n",
attr->u.dir.hint.dist_name,
current_dist->dist_name);
PINT_dist_free(current_dist);
sm_p->u.create.dist = new_dist;
current_dist = new_dist;
}
else
{
gossip_debug(
GOSSIP_CLIENT_DEBUG,
"Could not override distribution name with %s instead of %s\n",
attr->u.dir.hint.dist_name,
current_dist->dist_name);
}
}
else {
gossip_debug(
GOSSIP_CLIENT_DEBUG,
"retaining current distribution name %s\n",
current_dist->dist_name);
}
}
/* okay, we might need to override some dist params as well */
if (attr->u.dir.hint.dist_params_len > 0)
{
/* We have a series of comma separated key:val strings */
char **key, **val;
int64_t tmp_val;
int nparams = 0;
if (strncmp(current_dist->dist_name,
PVFS_DIST_VARSTRIP_NAME,
PVFS_DIST_VARSTRIP_NAME_SIZE) == 0)
{
/* varstrip parameters are a special case; we can't use the
* normal split_keyvals function because the : separater is also
* used within paramers that only varstrip can parse
*/
/* look for a "strips:" prefix */
if(strstr(attr->u.dir.hint.dist_params, "strips:")
!= attr->u.dir.hint.dist_params)
{
gossip_err("Error: failed to parse directory hints for varstrip distribution.\n");
js_p->error_code = -PVFS_EINVAL;
return SM_ACTION_COMPLETE;
}
if(current_dist->methods->set_param(current_dist->dist_name,
current_dist->params,
"strips",
&attr->u.dir.hint.dist_params[strlen("strips:")]))
{
gossip_err("Error: failed to set directory hints for varstrip distribution.\n");
js_p->error_code = -PVFS_EINVAL;
return SM_ACTION_COMPLETE;
}
}
/* ignore parse errors! */
/* TODO: why should we ignore parsing errors? */
else if (PINT_split_keyvals(attr->u.dir.hint.dist_params,
&nparams, &key, &val) == 0)
{
int i;
for (i = 0; i < nparams; i++)
{
gossip_debug(GOSSIP_CLIENT_DEBUG,
"distribution parameter %s, value %s\n",
key[i], val[i]);
/* NOTE: just as in server-config.c when parsing "Param" and
* "Value" fields, we will assume that all values are 64 bit
* integers. The only difference here is that we scan
* directly into a 64 bit integer, rather than converting
* from the int format that dotconf supports.
*/
ret = sscanf(val[i], SCANF_lld, &tmp_val);
if(ret != 1)
{
gossip_err(
"Error: unsupported type for distribution parameter %s, "
"value %s found in directory hints.\n",
key[i], val[i]);
gossip_err("Error: continuing anyway.\n");
}
else
{
if(current_dist->methods->set_param(current_dist->dist_name,
current_dist->params,
key[i],
&tmp_val))
{
gossip_err(
"Error: could not override hinted distribution "
"parameter %s, value %s found in directory hints\n",
key[i], val[i]);
}
}
free(key[i]);
free(val[i]);
}
free(key);
free(val);
}
}
/* priority for determining user's preference for number of data files:
* 1) count specified in attr's passed into sys_create
* 2) directory hints
* 3) mount options
* 4) system default
* All of the above can be overridden by the distribution itself.
*/
if(sm_p->u.create.user_requested_num_data_files > 0)
{
/* specified by sys_create caller */
num_dfiles_requested = sm_p->u.create.user_requested_num_data_files;
}
else if(attr->u.dir.hint.dfile_count > 0)
{
num_dfiles_requested = attr->u.dir.hint.dfile_count;
}
else
{
/* Check the mount options */
int rc;
struct PVFS_sys_mntent mntent;
rc = PVFS_util_get_mntent_copy(sm_p->object_ref.fs_id, &mntent);
if (0 == rc)
{
num_dfiles_requested = mntent.default_num_dfiles;
PVFS_util_free_mntent(&mntent);
}
}
/* Determine the number of dfiles. Pass in the number requested by the
* client, but will be overridden by default configuration and/or
* distribution if necessary
*/
ret = PINT_cached_config_get_num_dfiles(sm_p->object_ref.fs_id,
sm_p->u.create.dist,
num_dfiles_requested,
&sm_p->u.create.num_data_files);
if(ret < 0)
{
gossip_err("Error: failed to get number of data servers\n");
js_p->error_code = ret;
return SM_ACTION_COMPLETE;
}
gossip_debug(GOSSIP_CLIENT_DEBUG, "Setting number of datafiles to %d [requested %d]\n",
sm_p->u.create.num_data_files, num_dfiles_requested);
return SM_ACTION_COMPLETE;
}
/**
* Returns the default distribution, or NULL if the distribution could not
* be created. The default distribution is read from the server
* configuration if possible. If the server config does not specify a
* default distribution, simple_stripe will be used.
*/
static PINT_dist* get_default_distribution(PVFS_fs_id fs_id)
{
server_configuration_s* server_config = NULL;
PINT_dist* dist = NULL;
/* Retrieve the server configuration (with mutex) */
server_config = PINT_get_server_config_struct(fs_id);
/* If a default dist is specified in the config, use that
else just create a simple_stripe distribution */
if (NULL != server_config &&
NULL != server_config->default_dist_config.name)
{
dist = PINT_dist_create(server_config->default_dist_config.name);
if (dist)
{
PINT_llist_p iter = server_config->default_dist_config.param_list;
/* Set supplied the distribution parameters */
while (iter)
{
int rc;
distribution_param_configuration* param =PINT_llist_head(iter);
/* If we are at the list end, break
else, set the distribution parameter to the given value */
if (NULL == param)
{
break;
}
else
{
rc = dist->methods->set_param(dist->dist_name,
dist->params,
param->name,
&param->value);
if (0 != rc)
{
gossip_err("Error setting distribution parameter\n"
" dist: %s\n"
" param name: %s\n"
" param value: %lld\n",
dist->dist_name, param->name,
lld(param->value));
}
}
iter = PINT_llist_next(iter);
}
}
else
{
gossip_err("Error creating default distribution: %s\n",
server_config->default_dist_config.name);
}
}
else
{
dist = PINT_dist_create(PVFS_DIST_SIMPLE_STRIPE_NAME);
}
/* Release the server config mutex */
PINT_put_server_config_struct(server_config);
return dist;
}
static int create_delete_handles_comp_fn(void *v_p,
struct PVFS_server_resp *resp_p,
int index)
{
gossip_debug(GOSSIP_CLIENT_DEBUG, "create_delete_handles_comp_fn\n");
assert(resp_p->op == PVFS_SERV_REMOVE);
if (resp_p->status != 0)
{
gossip_debug(GOSSIP_CLIENT_DEBUG,
"Failed to remove handle number %d\n", index);
}
return resp_p->status;
}
/* delete the newly created meta and data handles */
static PINT_sm_action create_delete_handles_setup_msgpair_array(
struct PINT_smcb *smcb, job_status_s *js_p)
{
struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
int ret = -PVFS_EINVAL, i = 0;
PINT_sm_msgpair_state *msg_p = NULL;
gossip_debug(GOSSIP_CLIENT_DEBUG, "create state: "
"delete_handles_setup_msgpair_array\n");
js_p->error_code = 0;
ret = PINT_msgpairarray_init(&sm_p->msgarray_op, (sm_p->u.create.datafile_count+1));
if(ret != 0)
{
gossip_err("Failed to initialize %d msgpairs\n", (sm_p->u.create.datafile_count+1));
js_p->error_code = ret;
return(SM_ACTION_COMPLETE);
}
/*
for the metafile and each datafile, prepare to post a remove
send/recv pair
*/
foreach_msgpair(&sm_p->msgarray_op, msg_p, i)
{
gossip_debug(GOSSIP_CLIENT_DEBUG,
"create: posting data file remove req %d\n",i);
/* arbitrarily handle deletion of the metafile last */
if (i == sm_p->u.create.datafile_count)
{
PINT_SERVREQ_REMOVE_FILL(
msg_p->req,
*sm_p->cred_p,
sm_p->object_ref.fs_id,
sm_p->u.create.metafile_handle,
sm_p->hints);
msg_p->fs_id = sm_p->object_ref.fs_id;
msg_p->handle = sm_p->u.create.metafile_handle;
msg_p->retry_flag = PVFS_MSGPAIR_NO_RETRY;
msg_p->comp_fn = create_delete_handles_comp_fn;
gossip_debug(GOSSIP_CLIENT_DEBUG, " Preparing to remove "
"metafile handle %llu\n", llu(msg_p->handle));
}
else
{
PINT_SERVREQ_REMOVE_FILL(
msg_p->req,
*sm_p->cred_p,
sm_p->object_ref.fs_id,
sm_p->u.create.datafile_handles[i],
sm_p->hints);
msg_p->fs_id = sm_p->object_ref.fs_id;
msg_p->handle = sm_p->u.create.datafile_handles[i];
msg_p->retry_flag = PVFS_MSGPAIR_NO_RETRY;
msg_p->comp_fn = create_delete_handles_comp_fn;
gossip_debug(GOSSIP_CLIENT_DEBUG, " Preparing to remove "
"datafile handle %llu\n", llu(msg_p->handle));
}
}
ret = PINT_serv_msgpairarray_resolve_addrs(&sm_p->msgarray_op);
if(ret)
{
gossip_err("Error: failed to resolve server addresses.\n");
js_p->error_code = ret;
}
PINT_sm_push_frame(smcb, 0, &sm_p->msgarray_op);
return SM_ACTION_COMPLETE;
}
/*
* Local variables:
* mode: c
* c-indent-level: 4
* c-basic-offset: 4
* End:
*
* vim: ft=c ts=8 sts=4 sw=4 expandtab
*/