Skip to content
Permalink
1139b72d5e
Switch branches/tags

Name already in use

A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
Go to file
 
 
Cannot retrieve contributors at this time
1649 lines (1446 sloc) 55 KB
/*
* (C) 2003 Clemson University and The University of Chicago
*
* See COPYING in top-level directory.
*/
/** \file
* \ingroup sysint
*
* PVFS2 system interface routines for obtaining attributes of an object
* (file or directory).
*/
#include <string.h>
#include <assert.h>
#include "client-state-machine.h"
#include "pvfs2-debug.h"
#include "job.h"
#include "gossip.h"
#include "str-utils.h"
#include "pint-util.h"
#include "pvfs2-util.h"
#include "pint-cached-config.h"
#include "PINT-reqproto-encode.h"
#include "pvfs2-internal.h"
#include "pvfs2-types-debug.h"
#include "osd-util/osd-util.h"
#include "src/common/misc/extent-utils.h"
/* pvfs2_client_getattr_sm
*
* The sm_p->msgpair structure is used to get the attributes of the
* object itself. We convert the original attribute mask (in
* sm_p->u.getattr.attrmask) to ask for datafile and distribution info
* if the user asked for file size (PVFS_ATTR_SYS_SIZE). This allows
* us to obtain this information (if the object turns out to be a
* metafile) so that we can later look up the datafile sizes and
* calculate the overall file size.
*
* The sm_p->msgpairarray is used to get datafile sizes, if it turns
* out that we need them. This space will also need to be freed, if
* we grab these sizes.
*/
#ifdef WIN32
static struct profiler getattr_prof;
#else
static struct profiler getattr_prof __attribute__((unused));
#endif
extern job_context_id pint_client_sm_context;
enum
{
GETATTR_ACACHE_MISS = 1,
GETATTR_NEED_DATAFILE_SIZES = 2,
GETATTR_IO_RETRY = 3,
OSD_MSGPAIR = 2001
};
/* completion function prototypes */
static int getattr_object_getattr_comp_fn(
void *v_p, struct PVFS_server_resp *resp_p, int index);
static int getattr_datafile_getattr_comp_fn(
void *v_p, struct PVFS_server_resp *resp_p, int index);
%%
nested machine pvfs2_client_datafile_getattr_sizes_sm
{
state datafile_getattr_setup_msgpairarray
{
run getattr_datafile_getattr_setup_msgpairarray;
OSD_MSGPAIR => datafile_getattr_xfer_osd_msgpairarray;
success => datafile_getattr_xfer_msgpairarray;
default => datafile_getattr_cleanup;
}
state datafile_getattr_xfer_osd_msgpairarray
{
jump pvfs2_osd_msgpairarray_sm;
default => datafile_getattr_cleanup;
}
state datafile_getattr_xfer_msgpairarray
{
jump pvfs2_msgpairarray_sm;
default => datafile_getattr_retry;
}
state datafile_getattr_retry
{
run getattr_datafile_getattr_retry;
GETATTR_IO_RETRY => datafile_getattr_xfer_msgpairarray;
default => datafile_getattr_cleanup;
}
state datafile_getattr_cleanup
{
run getattr_datafile_getattr_cleanup;
default => return;
}
}
nested machine pvfs2_client_getattr_sm
{
state acache_lookup
{
run getattr_acache_lookup;
GETATTR_ACACHE_MISS => object_getattr_setup_msgpair;
GETATTR_NEED_DATAFILE_SIZES => datafile_get_sizes;
default => cleanup;
}
state object_getattr_setup_msgpair
{
run getattr_object_getattr_setup_msgpair;
OSD_MSGPAIR => object_getattr_xfer_osd_msgpair;
success => object_getattr_xfer_msgpair;
default => cleanup;
}
state object_getattr_xfer_osd_msgpair
{
jump pvfs2_osd_msgpairarray_sm;
success => acache_insert;
GETATTR_NEED_DATAFILE_SIZES => datafile_get_sizes;
default => object_getattr_failure;
}
state object_getattr_xfer_msgpair
{
jump pvfs2_msgpairarray_sm;
success => acache_insert;
GETATTR_NEED_DATAFILE_SIZES => datafile_get_sizes;
default => object_getattr_failure;
}
state acache_insert
{
run getattr_acache_insert;
default => cleanup;
}
state object_getattr_failure
{
run getattr_object_getattr_failure;
default => cleanup;
}
state datafile_get_sizes
{
jump pvfs2_client_datafile_getattr_sizes_sm;
success => acache_insert;
default => cleanup;
}
state cleanup
{
run getattr_cleanup;
default => return;
}
}
machine pvfs2_client_sysint_getattr_sm
{
state dowork
{
jump pvfs2_client_getattr_sm;
default => set_sys_response;
}
state set_sys_response
{
run getattr_set_sys_response;
default => terminate;
}
}
%%
/** Initiate retrieval of object attributes.
*/
PVFS_error PVFS_isys_getattr(
PVFS_object_ref ref,
uint32_t attrmask,
const PVFS_credentials *credentials,
PVFS_sysresp_getattr *resp_p,
PVFS_sys_op_id *op_id,
PVFS_hint hints,
void *user_ptr)
{
PVFS_error ret = -PVFS_EINVAL;
PINT_smcb *smcb = NULL;
PINT_client_sm *sm_p = NULL;
gossip_debug(GOSSIP_CLIENT_DEBUG, "PVFS_isys_getattr entered\n");
if ((ref.handle == PVFS_HANDLE_NULL) ||
(ref.fs_id == PVFS_FS_ID_NULL) || (resp_p == NULL))
{
gossip_err("invalid (NULL) required argument\n");
return ret;
}
if (attrmask & ~(PVFS_ATTR_SYS_ALL))
{
gossip_err("invalid attrmask\n");
return ret;
}
PINT_smcb_alloc(&smcb, PVFS_SYS_GETATTR,
sizeof(struct PINT_client_sm),
client_op_state_get_machine,
client_state_machine_terminate,
pint_client_sm_context);
if (smcb == NULL)
{
return -PVFS_ENOMEM;
}
sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
PINT_init_msgarray_params(sm_p, ref.fs_id);
PINT_init_sysint_credentials(sm_p->cred_p, credentials);
sm_p->error_code = 0;
sm_p->object_ref = ref;
sm_p->u.getattr.getattr_resp_p = resp_p;
PVFS_hint_copy(hints, &sm_p->hints);
PVFS_hint_add(&sm_p->hints, PVFS_HINT_HANDLE_NAME, sizeof(PVFS_handle)
,&ref.handle);
PINT_SM_GETATTR_STATE_FILL(
sm_p->getattr,
ref,
PVFS_util_sys_to_object_attr_mask(
attrmask),
PVFS_TYPE_NONE,
0);
return PINT_client_state_machine_post(
smcb, op_id, user_ptr);
}
/** Retrieve object attributes.
*/
PVFS_error PVFS_sys_getattr(
PVFS_object_ref ref,
uint32_t attrmask,
const PVFS_credentials *credentials,
PVFS_sysresp_getattr *resp_p,
PVFS_hint hints)
{
PVFS_error ret, error;
PVFS_sys_op_id op_id;
gossip_debug(GOSSIP_CLIENT_DEBUG, "PVFS_sys_getattr entered\n");
ret = PVFS_isys_getattr(ref, attrmask, credentials,
resp_p, &op_id, hints, NULL);
if (ret)
{
PVFS_perror_gossip("PVFS_isys_getattr call", ret);
return ret;
}
if(op_id != -1)
{
/* did not complete immediately, so we wait */
ret = PVFS_sys_wait(op_id, "getattr", &error);
if (ret)
{
PVFS_perror_gossip("PVFS_sys_wait call", ret);
}
if(error)
{
ret = error;
}
PINT_sys_release(op_id);
}
return ret;
}
/**
* getattr_acache_lookup
* @ingroup client_sm_getattr
*
* This function is invoked as the first state action of the
* getattr-dowork state machine. It performs a lookup into the
* attribute cache for the attribute in question, and returns
* result codes for a hit or a miss.
*
* @param smcb This must be a valid client state machine handle, with
* the @ref getattr field containing valid values for the
* fsid/handle of the desired attribute (in object_ref), as well as the
* requested attribute mask (req_attrmask).
*
* @param js_p Contains the return code to be set by this function in the
* @ref error_code field. This determines the next state action to jump
* to in the getattr-dowork state machine. Possible values for js_p->error_code
* are:
* <ul>
* <li><b>GETATTR_ACACHE_MISS</b> - The requested attribute was not found
* in the attribute cache, or the attribute was found, but the attribute's
* mask did not include values required by the requested mask (req_attrmask).
* </li>
* <li><b>GETATTR_NEED_DATAFILE_SIZES</b> - The requested attribute was found
* in the attribute cache and the mask was sufficient, but the data file size
* was requested for this handle, so we need to get that next.
* </li>
* <li><b>default</b> - The requested attribute was found in the attribute
* cache and its mask values were sufficient.
* </li>
*
* @return This function should always return 1 unless an error occurred
* within the internals of the state machine.
*/
static PINT_sm_action getattr_acache_lookup(
struct PINT_smcb *smcb, job_status_s *js_p)
{
struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
uint32_t trimmed_mask = 0;
int missing_attrs;
PVFS_object_ref object_ref;
struct server_configuration_s *server_config;
int ret = -1;
int attr_status = -1;
int size_status = -1;
js_p->error_code = 0;
object_ref = sm_p->getattr.object_ref;
server_config = PINT_get_server_config_struct(
sm_p->object_ref.fs_id);
PINT_put_server_config_struct(server_config);
assert(object_ref.handle != PVFS_HANDLE_NULL);
assert(object_ref.fs_id != PVFS_FS_ID_NULL);
gossip_debug(GOSSIP_ACACHE_DEBUG, "%s: handle %llu fsid %d\n",
__func__, llu(object_ref.handle), object_ref.fs_id);
/* The sys attr mask request is converted to object
* attr mask values for comparison with the cached
*/
if(sm_p->getattr.req_attrmask & PVFS_ATTR_DATA_SIZE)
{
sm_p->getattr.req_attrmask |= PVFS_ATTR_META_ALL;
}
if(sm_p->getattr.flags & PINT_SM_GETATTR_BYPASS_CACHE)
{
gossip_debug(GOSSIP_ACACHE_DEBUG, "acache: forced acache miss: "
" [%llu]\n",
llu(object_ref.handle));
js_p->error_code = GETATTR_ACACHE_MISS;
return SM_ACTION_COMPLETE;
}
ret = PINT_acache_get_cached_entry(object_ref,
&sm_p->getattr.attr,
&attr_status,
&sm_p->getattr.size,
&size_status);
if(ret < 0 || attr_status < 0)
{
gossip_debug(GOSSIP_ACACHE_DEBUG, "acache: clean acache miss: "
" [%llu]\n",
llu(object_ref.handle));
js_p->error_code = GETATTR_ACACHE_MISS;
return SM_ACTION_COMPLETE;
}
/* acache hit, check results */
/* The sys attr mask request is converted to object
* attr mask values for comparison with the cached
* entry
*/
trimmed_mask = sm_p->getattr.req_attrmask;
gossip_debug(GOSSIP_GETATTR_DEBUG,"request attrmask:\n");
PINT_attrmask_print(GOSSIP_GETATTR_DEBUG,sm_p->getattr.req_attrmask);
gossip_debug(GOSSIP_GETATTR_DEBUG,"trimmed attrmask:\n");
PINT_attrmask_print(GOSSIP_GETATTR_DEBUG,trimmed_mask);
gossip_debug(GOSSIP_GETATTR_DEBUG,"returned attrmask:\n");
PINT_attrmask_print(GOSSIP_GETATTR_DEBUG,sm_p->getattr.attr.mask);
/* the trimmed mask is used for making sure that we're only
* checking attr bits that make sense for the object type
* since the caller may have requested all attributes in
* the case where it doesn't know what type of object we're
* doing the getattr against.
*/
if (sm_p->getattr.attr.objtype == PVFS_TYPE_METAFILE)
{
trimmed_mask &= (PVFS_ATTR_META_ALL |
PVFS_ATTR_META_UNSTUFFED |
PVFS_ATTR_DATA_SIZE |
PVFS_ATTR_COMMON_ALL);
}
else if (sm_p->getattr.attr.objtype == PVFS_TYPE_SYMLINK)
{
trimmed_mask &= (PVFS_ATTR_SYMLNK_ALL | PVFS_ATTR_COMMON_ALL);
}
else if (sm_p->getattr.attr.objtype == PVFS_TYPE_DIRECTORY)
{
trimmed_mask &= (PVFS_ATTR_COMMON_ALL | PVFS_ATTR_DIR_ALL);
}
/* trimmed_mask contains the list of attributes
* requested for a particular object,
* while sm_p->getattr.attr.mask contains
* the list of attributes cached for that object.
* The cached attributes can be used if requested
* is less than cached, i.e. all the attributes
* we need are already cached. So we need to do
* a bitwise comparison of requested <= cached.
*
* xor of the two masks gives us the bits that are different,
* and-ing that result with the requested mask gives us the
* bits in the requested mask but not in the cached mask.
*/
missing_attrs = ((trimmed_mask ^ sm_p->getattr.attr.mask) &
trimmed_mask);
gossip_debug(GOSSIP_GETATTR_DEBUG,"missing attrmask BEFORE:\n");
PINT_attrmask_print(GOSSIP_GETATTR_DEBUG,missing_attrs);
if ( missing_attrs & PVFS_ATTR_META_MIRROR_DFILES )
{
/*Mirroring is optional, so remove mirror-dfiles*/
missing_attrs &= ~PVFS_ATTR_META_MIRROR_DFILES;
}
gossip_debug(GOSSIP_GETATTR_DEBUG,"missing attrmask AFTER:\n");
PINT_attrmask_print(GOSSIP_GETATTR_DEBUG,missing_attrs);
if((missing_attrs == PVFS_ATTR_DATA_SIZE && size_status == 0) ||
(missing_attrs == 0))
{
/* nothing's missing, a hit! */
gossip_debug(GOSSIP_ACACHE_DEBUG, "acache: acache hit "
"[%llu]\n", llu(object_ref.handle));
js_p->error_code = 0;
return SM_ACTION_COMPLETE;
}
/* check if the only thing missing is the file size, then
* we don't need to do the object getattr operation, we only
* need to do the datafile getattr operation, so we return
* the GETATTR_NEED_DATAFILE_SIZES error code which will make
* the getattr-dowork state machine jump to the datafile getattr
* operation state.
*/
if(missing_attrs == PVFS_ATTR_DATA_SIZE)
{
if(!(sm_p->getattr.attr.mask & PVFS_ATTR_META_UNSTUFFED))
{
/* We are missing the size, and we don't know for sure if the
* file has been unstuffed. In this case, act as though we
* missed on all atributes so that we can get fresh stuffed size
* or datafile information as needed.
*/
}
else
{
/* if the file size is requested but the distribution info
* isn't and it hasn't been cached, then we need to
* get that first.
*/
js_p->error_code = GETATTR_NEED_DATAFILE_SIZES;
gossip_debug(GOSSIP_ACACHE_DEBUG, "acache: acache hit, need sizes"
"[%llu]\n", llu(object_ref.handle));
return SM_ACTION_COMPLETE;
}
}
/* we missed */
/* clean out the attributes we got from the cache; this will be
* overwritten when we request updated information from the server
*/
PINT_free_object_attr(&sm_p->getattr.attr);
gossip_debug(GOSSIP_ACACHE_DEBUG, "acache: acache miss due to mask: "
" [%llu]\n",
llu(object_ref.handle));
js_p->error_code = GETATTR_ACACHE_MISS;
return SM_ACTION_COMPLETE;
}
static PINT_sm_action getattr_object_getattr_setup_msgpair(
struct PINT_smcb *smcb, job_status_s *js_p)
{
struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
int ret = -PVFS_EINVAL;
PVFS_object_ref object_ref;
PINT_sm_msgpair_state *msg_p;
gossip_debug(GOSSIP_CLIENT_DEBUG, "(%p) %s\n", sm_p, __func__);
PINT_msgpair_init(&sm_p->msgarray_op);
msg_p = &sm_p->msgarray_op.msgpair;
object_ref = sm_p->getattr.object_ref;
assert(object_ref.fs_id != PVFS_FS_ID_NULL);
assert(object_ref.handle != PVFS_HANDLE_NULL);
msg_p->fs_id = object_ref.fs_id;
msg_p->handle = object_ref.handle;
msg_p->retry_flag = PVFS_MSGPAIR_RETRY;
msg_p->comp_fn = getattr_object_getattr_comp_fn;
ret = PINT_cached_config_map_to_server(
&msg_p->svr_addr, msg_p->handle,
msg_p->fs_id);
if (ret)
{
gossip_err("Failed to map meta server address\n");
js_p->error_code = ret;
}
if (server_is_osd(msg_p->svr_addr)) {
uint64_t oid, pid;
int i, numattrs = 10;
struct attribute_list attrl[numattrs];
struct osd_command *command = &sm_p->msgarray_op.msgpair.osd_command;
int is_osd_md = fsid_is_osd_md(object_ref.fs_id);
/* Set attr type, page and number */
for (i = 0; i < numattrs - 3; i++) {
attrl[i].type = ATTR_GET;
attrl[i].page = ANY_PG + PVFS_USEROBJECT_ATTR_PG;
attrl[i].number = i;
}
/* uid, gid & perms */
attrl[0].len = sizeof(PVFS_uid);
attrl[1].len = sizeof(PVFS_gid);
attrl[2].len = sizeof(PVFS_permissions);
/* mask, object type & dist */
attrl[3].len = sizeof(uint32_t);
attrl[4].len = sizeof(PVFS_ds_type);
attrl[5].len = 1024;
/* dfile_array */
attrl[6].len = 1024;
/* ctime, atime & mtime */
attrl[7].type = ATTR_GET;
attrl[7].page = ANY_PG + USER_TMSTMP_PG;
attrl[7].number = UTSAP_CTIME;
attrl[7].len = UTSAP_CTIME_LEN;
attrl[8].type = ATTR_GET;
attrl[8].page = ANY_PG + USER_TMSTMP_PG;
attrl[8].number = UTSAP_DATA_ATIME;
attrl[8].len = UTSAP_DATA_ATIME_LEN;
attrl[9].type = ATTR_GET;
attrl[9].page = ANY_PG + USER_TMSTMP_PG;
attrl[9].number = UTSAP_DATA_MTIME;
attrl[9].len = UTSAP_DATA_MTIME_LEN;
/*
* XXX: need to know if we're looking at a dir object or
* at a metafile. In the is_osd_md case, for a dir, we must
* do the meta pid. As in all other cases where we do the meta
* pid. This ?: below is just to handle the way we keep metadata
* on the 1st data strip in mdfile. Must override it for
* looking up a directory.
*/
/*
* Could be looking at either a dir object or a metafile. For
* datafile, metafile, both will live in the metadata pid space.
* But for mdfile, dir objcts are in metadata while metafile
* lives on the first data stripe. So query the handle extents
* for the filesystem to see if it is metadata or not.
*/
pid = PVFS_OSD_META_PID; /* datafile, metafile */
oid = object_ref.handle;
if (is_osd_md) {
struct server_configuration_s *config;
struct filesystem_configuration_s *fs;
PINT_llist *l;
int i;
pid = PVFS_OSD_DATA_PID; /* mdfile: assume data first */
config = PINT_get_server_config_struct(object_ref.fs_id);
fs = PINT_config_find_fs_id(config, object_ref.fs_id);
PINT_put_server_config_struct(config);
for (l = fs->meta_handle_ranges; l; l = PINT_llist_next(l)) {
/*
* Could do BMI_addr_rev_lookup and compare rather than
* iterating all metadata extents, but that may be slower,
* depending on the number of md servers.
*/
struct host_handle_mapping_s *hhm = PINT_llist_head(l);
if (!hhm)
break;
PVFS_handle_extent_array *hea = &hhm->handle_extent_array;
for (i=0; i < hea->extent_count; i++)
if (PINT_handle_in_extent(&hea->extent_array[i], oid)) {
pid = PVFS_OSD_META_PID; /* aha, is meta */
goto out;
}
}
}
out:
ret = osd_command_set_get_attributes(command, pid, oid);
if (ret) {
osd_error_xerrno(ret, "%s: osd_command_set_get_attributes failed",
__func__);
js_p->error_code = ret;
return SM_ACTION_COMPLETE;
}
/*
* XXX Right now, I'll go ahead and retrieve ALL the attributes from
* the OSD MDS. In the future, I'll look at the attrmask for the
* requested attributes.
*/
ret = osd_command_attr_build(command, attrl, numattrs);
if (ret) {
osd_error_xerrno(ret, "%s: osd_command_attr_build failed",
__func__);
js_p->error_code = ret;
return SM_ACTION_COMPLETE;
}
js_p->error_code = OSD_MSGPAIR;
} else {
/* setup the msgpair to do a getattr operation */
PINT_SERVREQ_GETATTR_FILL(
msg_p->req,
*sm_p->cred_p,
object_ref.fs_id,
object_ref.handle,
sm_p->getattr.req_attrmask,
sm_p->hints);
js_p->error_code = 0;
}
PINT_sm_push_frame(smcb, 0, &sm_p->msgarray_op);
return SM_ACTION_COMPLETE;
}
/*
copies data from getattr response into the user supplied sys_attr
structure. returns 0 for directories and symlinks, and
GETATTR_NEED_DATAFILE_SIZES for a metafile (when appropriate)
*/
static int getattr_object_getattr_comp_fn(
void *v_p,
struct PVFS_server_resp *resp_p,
int index)
{
PVFS_object_attr *attr = NULL;
int ret, status;
PINT_smcb *smcb = v_p;
PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_MSGPAIR_PARENT_SM);
attr = &sm_p->getattr.attr;
if (server_is_osd(sm_p->msgarray_op.msgpair.svr_addr)) {
struct osd_command *command = &sm_p->msgarray_op.msgpair.osd_command;
status = osd_errno_from_status(command->status);
if (status != 0)
return status;
ret = osd_command_attr_resolve(command);
if (ret) {
osd_error_xerrno(ret, "%s: osd_command_attr_resolve failed",
__func__);
return ret;
}
/* memcpy the attributes from osd_command attr to sm_p attr object */
if (*(PVFS_ds_type*)command->attr[4].val == PVFS_TYPE_DIRECTORY) {
PINT_copy_osd_dir_attr(attr, command);
} else {
PINT_copy_osd_object_attr(attr, command);
}
sm_p->msgarray_op.msgpair.req.u.getattr.attrmask = attr->mask;
osd_command_attr_free(command);
} else {
assert(resp_p->op == PVFS_SERV_GETATTR);
gossip_debug(GOSSIP_GETATTR_DEBUG,
"getattr_object_getattr_comp_fn called\n");
if (resp_p->status != 0)
{
return resp_p->status;
}
/*
* If we've reached the callback for the getattr msgpair tranfer,
* then we can make a copy of the retrieved attribute for later
* caching.
*/
PINT_copy_object_attr(&sm_p->getattr.attr,
&resp_p->u.getattr.attr);
}
/* if the ref_type mask is set to a non-zero value (!PVFS_TYPE_NONE)
* a -PVFS_error will be triggered if the
* attributes received are not one of the the types specified.
* This is useful so that the client can know (in some cases) that it
* can avoid issuing an operation to the server since the server will
* just pass an error back anyway.
*/
if(sm_p->getattr.ref_type &&
sm_p->getattr.ref_type != attr->objtype)
{
int ret;
gossip_debug(GOSSIP_CLIENT_DEBUG, "*** "
"getattr_comp_fn: Object type mismatch.\n Possibly "
"saving network roundtrip by returning an error\n");
if (sm_p->getattr.ref_type == PVFS_TYPE_DIRECTORY)
{
ret = -PVFS_ENOTDIR;
}
else
{
assert(sm_p->getattr.ref_type == PVFS_TYPE_METAFILE);
ret = ((attr->objtype == PVFS_TYPE_DIRECTORY) ?
-PVFS_EISDIR : -PVFS_EBADF);
}
PVFS_perror_gossip("Object Type mismatch error", ret);
return ret;
}
/* do assertion checking of getattr response values, and
* check if file sizes are needed. With NDEBUG defined, this block
* only checks if file sizes are needed.
*/
switch (attr->objtype)
{
case PVFS_TYPE_METAFILE:
if (sm_p->msgarray_op.msgpair.req.u.getattr.attrmask &
PVFS_ATTR_META_DIST)
{
/* if we requested distribution attrs, did the distribution
* get set and is the size valid?
*/
assert(attr->mask & PVFS_ATTR_META_DIST);
assert(attr->u.meta.dist && (attr->u.meta.dist_size > 0));
}
if (sm_p->msgarray_op.msgpair.req.u.getattr.attrmask &
PVFS_ATTR_META_MIRROR_DFILES)
{
if (attr->mask & PVFS_ATTR_META_MIRROR_DFILES)
{ assert(attr->u.meta.mirror_dfile_array &&
(attr->u.meta.mirror_copies_count > 0));
gossip_debug(GOSSIP_GETATTR_DEBUG,"%s: Mirror handles and "
"copy count retrieved.\n"
,__func__);
}
else
{
gossip_debug(GOSSIP_GETATTR_DEBUG,"%s: request attribute "
"mask says to get the mirror dfiles, if they exist, \nbut "
"the response attribute mask says they were not retrieved. "
"This is okay.\n"
,__func__);
}
}
if (sm_p->msgarray_op.msgpair.req.u.getattr.attrmask &
PVFS_ATTR_META_DFILES)
{
/* if we requested the datafile handles for the file, did
* the datafile array get populated?
*/
//assert(attr->u.meta.dfile_array &&
// (attr->u.meta.dfile_count > 0));
gossip_debug(GOSSIP_GETATTR_DEBUG,
"getattr_object_getattr_comp_fn: "
"%d datafiles.\n", attr->u.meta.dfile_count);
/* if we need the size, that should be the only time we're
* going to have to do a full data file fetch.
* (that's expensive)
*/
if (sm_p->getattr.req_attrmask & PVFS_ATTR_DATA_SIZE)
{
/* is the file stuffed? */
if(!(attr->mask & PVFS_ATTR_META_UNSTUFFED))
{
/* we can compute the size without doing any more
* getattr requests
*/
gossip_debug(GOSSIP_GETATTR_DEBUG,
"getattr_object_getattr_comp_fn: "
"detected stuffed file.\n");
return(0);
}
/* if caller asked for the size, then we need
* to jump to the datafile_getattr state, which
* will retrieve the datafile sizes for us.
*/
return GETATTR_NEED_DATAFILE_SIZES;
}
}
return 0;
case PVFS_TYPE_DIRECTORY:
{
gossip_debug(GOSSIP_CLIENT_DEBUG,
"getattr comp_fn [%p] "
"dfile_count = %d "
"dist_name_len = %d "
"dist_params_len = %d\n",
attr,
attr->u.dir.hint.dfile_count,
attr->u.dir.hint.dist_name_len,
attr->u.dir.hint.dist_params_len);
return 0;
}
case PVFS_TYPE_SYMLINK:
return 0;
case PVFS_TYPE_DATAFILE:
return 0;
case PVFS_TYPE_DIRDATA:
return 0;
case PVFS_TYPE_INTERNAL:
return 0;
default:
gossip_err("error: getattr_object_getattr_comp_fn: "
"handle refers to invalid object type\n");
}
return -PVFS_EINVAL;
}
static PINT_sm_action getattr_object_getattr_failure(
struct PINT_smcb *smcb, job_status_s *js_p)
{
#ifdef WIN32
struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
#else
struct PINT_client_sm *sm_p __attribute__((unused)) = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
#endif
gossip_debug(GOSSIP_CLIENT_DEBUG
,"(%p) getattr state: getattr_object_getattr_failure\n"
,sm_p);
if ((js_p->error_code != -PVFS_ENOENT) &&
(js_p->error_code != -PVFS_EINVAL))
{
PVFS_perror_gossip("getattr_object_getattr_failure ",
js_p->error_code);
}
return SM_ACTION_COMPLETE;
}
/* NOTE: This nested state machine allocates and stores the results in getattr.size_array. So,
* if you call this state machine directly, do not allocate space prior to calling it to avoid a
* nasty memory leak.
*/
static PINT_sm_action getattr_datafile_getattr_setup_msgpairarray(
struct PINT_smcb *smcb, job_status_s *js_p)
{
struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
PINT_sm_getattr_state *getattr = &(sm_p->getattr);
int ret = -PVFS_EINVAL;
int i = 0;
PVFS_object_attr *attr = NULL;
PINT_sm_msgpair_state *msg_p = NULL;
uint64_t mirror_retry = (sm_p->getattr.attr.mask & PVFS_ATTR_META_MIRROR_DFILES);
PVFS_handle *handles;
int is_osd = fsid_is_osd(sm_p->object_ref.fs_id);
gossip_debug(GOSSIP_MIRROR_DEBUG,"Executing getattr_datafile_getattr_setup_msgpairarray...\n");
gossip_debug(GOSSIP_MIRROR_DEBUG,"%s: Are we mirroring? %s\n",__func__
,(mirror_retry ? "YES" : "NO"));
gossip_debug(GOSSIP_MIRROR_DEBUG,"%s: attr.mask:0x%08x \tmirror_retry:0x%08x\n"
,__func__
,sm_p->getattr.attr.mask
,(unsigned int)mirror_retry);
js_p->error_code = is_osd ? OSD_MSGPAIR : 0;
attr = &sm_p->getattr.attr;
assert(attr);
PINT_msgpair_init(&sm_p->msgarray_op);
msg_p = &sm_p->msgarray_op.msgpair;
/*initialize the size_array, which will hold the size of each datahandle*/
PINT_SM_DATAFILE_SIZE_ARRAY_INIT(&getattr->size_array,attr->u.meta.dfile_count);
/* initialize mir_ctx_array: one context for each handle in the file */
getattr->mir_ctx_array = malloc(attr->u.meta.dfile_count *
sizeof(*getattr->mir_ctx_array));
if (!getattr->mir_ctx_array)
{
gossip_lerr("Unable to allocate mirror context array.\n");
js_p->error_code = -PVFS_ENOMEM;
return SM_ACTION_COMPLETE;
}
memset(getattr->mir_ctx_array,0,attr->u.meta.dfile_count *
sizeof(*getattr->mir_ctx_array));
getattr->mir_ctx_count = attr->u.meta.dfile_count;
for (i=0; i<getattr->mir_ctx_count; i++)
{
getattr->mir_ctx_array[i].original_datahandle = attr->u.meta.dfile_array[i];
getattr->mir_ctx_array[i].original_server_nr = i;
}
/* allocate handle array and populate */
handles = malloc(sizeof(PVFS_handle) * attr->u.meta.dfile_count);
if (!handles)
{
gossip_lerr("Unable to allocation local handles array.\n");
js_p->error_code = -PVFS_ENOMEM;
return SM_ACTION_COMPLETE;
}
memset(handles,0,sizeof(PVFS_handle) * attr->u.meta.dfile_count);
memcpy(handles,attr->u.meta.dfile_array,attr->u.meta.dfile_count *
sizeof(PVFS_handle));
/*allocate index-to-server array and populate*/
getattr->index_to_server = malloc(sizeof(uint32_t)*attr->u.meta.dfile_count);
if (!getattr->index_to_server)
{
gossip_lerr("Unable to allocate index-to-server array.\n");
js_p->error_code = -PVFS_ENOMEM;
return SM_ACTION_COMPLETE;
}
memset(getattr->index_to_server,0,sizeof(uint32_t)*attr->u.meta.dfile_count);
for (i=0; i<attr->u.meta.dfile_count; i++)
{
getattr->index_to_server[i] = (uint32_t)i;
}
if (is_osd)
{
/* for each datafile, post a send/recv pair to obtain the size */
for(i = 0; i < attr->u.meta.dfile_count; i++)
{
struct osd_command *command = &sm_p->msgarray_op.msgarray[i].osd_command;
struct attribute_list id = {
.type = ATTR_GET,
.page = ANY_PG + 0x1,
.number = 0x82, /* logical length (not used capacity) */
.len = sizeof(uint64_t),
};
ret = osd_command_set_get_attributes(command, PVFS_OSD_DATA_PID,
attr->u.meta.dfile_array[i]);
if (ret) {
osd_error_xerrno(ret,
"%s: osd_command_set_get_attributes failed",
__func__);
js_p->error_code = ret;
return 1;
}
ret = osd_command_attr_build(command, &id, 1);
if (ret) {
osd_error_xerrno(ret, "%s: osd_command_attr_build failed",
__func__);
js_p->error_code = ret;
return 1;
}
}
} else {
//START_PROFILER(getattr_prof);
PINT_SERVREQ_TREE_GET_FILE_SIZE_FILL(
msg_p->req,
*sm_p->cred_p,
sm_p->getattr.object_ref.fs_id,
0,
attr->u.meta.dfile_count,
handles,
(mirror_retry ? 1 : 0),
sm_p->hints);
}
msg_p->fs_id = sm_p->getattr.object_ref.fs_id;
msg_p->handle = handles[0];
msg_p->comp_fn = getattr_datafile_getattr_comp_fn;
ret = PINT_cached_config_map_to_server(
&msg_p->svr_addr, msg_p->handle, msg_p->fs_id);
if (ret)
{
gossip_lerr("Unable to map server address for this handle(%llu) and "
"filesystem(%d)\n"
,llu(msg_p->handle)
,msg_p->fs_id);
js_p->error_code = ret;
return SM_ACTION_COMPLETE;
}
/* set retry flag based on mirroring option...if mirroring, we will handle
* retries from this machine; if not, msgpairarray will handle retries.
*/
if (mirror_retry)
{
msg_p->retry_flag = PVFS_MSGPAIR_NO_RETRY;
}
else
{
msg_p->retry_flag = PVFS_MSGPAIR_RETRY;
}
PINT_sm_push_frame(smcb, 0, &sm_p->msgarray_op);
return SM_ACTION_COMPLETE;
}
static int getattr_datafile_getattr_comp_fn(
void *v_p, struct PVFS_server_resp *resp_p, int index)
{
PINT_smcb *smcb = v_p;
PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_MSGPAIR_PARENT_SM);
PINT_sm_msgpair_state *msg = &(sm_p->msgarray_op.msgarray[index]);
struct PVFS_servreq_tree_get_file_size *tree =
&(msg->req.u.tree_get_file_size);
int is_osd = fsid_is_osd(sm_p->object_ref.fs_id);
PVFS_error status;
int64_t size;
PINT_sm_getattr_state *getattr = &(sm_p->getattr);
PINT_client_getattr_mirror_ctx *ctx = NULL;
uint32_t server_nr = 0;
int i = 0;
if (is_osd) {
status = osd_errno_from_status(
sm_p->msgarray_op.msgarray[index].osd_command.status);
} else {
if (resp_p->status)
{ /* tree request had a problem */
return resp_p->status;
}
assert(resp_p->op == PVFS_SERV_TREE_GET_FILE_SIZE);
}
if (status != 0)
{
return status;
}
if (is_osd) {
struct osd_command *command = &sm_p->msgarray_op.msgarray[index].osd_command;
int ret = osd_command_attr_resolve(command);
if (ret) {
osd_error_xerrno(ret, "%s: osd_command_attr_resolve failed",
__func__);
return ret;
}
if (command->attr->outlen != sizeof(uint64_t)) {
gossip_err("%s: expecting 8 bytes returned, got %u.\n", __func__,
command->attr->outlen);
return ret;
}
size = get_ntohll(command->attr->val);
osd_command_attr_free(command);
} else {
/* if we are mirroring, then we need to check the error code returned from
* each server. If an error is found, mirroring will try to get the size
* from a different server. Below, we are marking which handles completed
* successfully, which tells mirroring NOT to retry them.
*/
if ( getattr->attr.mask & PVFS_ATTR_META_MIRROR_DFILES)
{
for (i=0; i<resp_p->u.tree_get_file_size.handle_count; i++)
{
if (resp_p->u.tree_get_file_size.error[i] != 0)
{ /* error retrieving size for this handle..we will retry it. */
continue;
}
server_nr = getattr->index_to_server[i];
sm_p->getattr.size_array[server_nr] = resp_p->u.tree_get_file_size.size[i];
ctx = &(getattr->mir_ctx_array[server_nr]);
ctx->msg_completed = 1;
/*For completed messages, update the size array with the file size
*just retrieved.
*/
getattr->size_array[server_nr] = resp_p->u.tree_get_file_size.size[i];
gossip_debug(GOSSIP_GETATTR_DEBUG,"%s: size[%d]:%lld \thandle:%llu\n"
,__func__
,i
,llu(getattr->size_array[i])
,llu(tree->handle_array[i]));
}/*end for*/
}
else
{
/* if we are NOT mirroring and an error is found for an individual handle,
* then we must invalidate the size array and return an error code.
*/
for (i=0; i<resp_p->u.tree_get_file_size.handle_count; i++)
{
gossip_debug(GOSSIP_GETATTR_DEBUG,"%s: error[%d]:%d"
"\tsize[%d]:%d\n"
,__func__
,i
,(int)resp_p->u.tree_get_file_size.error[i]
,i
,(int)resp_p->u.tree_get_file_size.size[i]);
if (resp_p->u.tree_get_file_size.error[i] != 0)
{
gossip_debug(GOSSIP_GETATTR_DEBUG,"%s: error[%d] is %d\n"
,__func__
,i
,resp_p->u.tree_get_file_size.error[i]);
memset(getattr->size_array,0,sizeof(*getattr->size_array));
return (resp_p->u.tree_get_file_size.error[i]);
}
getattr->size_array[i] = resp_p->u.tree_get_file_size.size[i];
}/*end for*/
}/*end if*/
}
return(0);
}/*end getattr_datafile_getattr_comp_fn*/
static PINT_sm_action getattr_datafile_getattr_retry(
struct PINT_smcb *smcb, job_status_s *js_p)
{
struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
struct PVFS_object_attr *attr = &(sm_p->getattr.attr);
PVFS_metafile_attr *meta = &(attr->u.meta);
PINT_sm_getattr_state *getattr = &(sm_p->getattr);
PINT_client_getattr_mirror_ctx *ctx = NULL;
PINT_sm_msgarray_op *mop = &(sm_p->msgarray_op);
PINT_sm_msgpair_state *msg = &(mop->msgarray[0]);
char *enc_req_bytes = NULL;
struct PVFS_servreq_tree_get_file_size *tree =
&(msg->req.u.tree_get_file_size);
uint32_t retry_msg_count = 0;
uint32_t index = 0;
uint32_t copies = 0;
uint32_t server_nr = 0;
int i = 0;
int j = 0;
int k = 0;
int ret = 0;
uint32_t *tmp_server_nr;
PVFS_handle *tmp_handles;
gossip_debug(GOSSIP_CLIENT_DEBUG,
"(%p) getattr state: "
"getattr_datafile_getattr_retry\n", sm_p);
/*We only need to retry if we have mirrors; otherwise, msgpairarray
*has already handled the retries.
*/
if (!(attr->mask & PVFS_ATTR_META_MIRROR_DFILES))
{
/*we are NOT mirroring.*/
return SM_ACTION_COMPLETE;
}
/* How many handles need to be retried? */
for (i=0; i<tree->num_data_files; i++)
{
server_nr = getattr->index_to_server[i];
ctx = &(getattr->mir_ctx_array[server_nr]);
if (ctx->msg_completed == 0)
retry_msg_count++;
}
/* no retries needed */
if (retry_msg_count == 0)
{
return SM_ACTION_COMPLETE;
}
/* do we have any retries available? */
if (getattr->retry_count >= mop->params.retry_limit)
{
/* at this point, we have msgpairs that need to be retried, but we
* we have met our retry limit. so, we must invalidate the size array,
* since we don't have all of the necessary sizes AND return an error.
*/
memset(getattr->size_array,0,sizeof(*getattr->size_array) * getattr->size);
js_p->error_code = (js_p->error_code ? js_p->error_code : -PVFS_ETIME);
gossip_err("%s: Ran out of retries(%d)\n",__func__
,getattr->retry_count);
return SM_ACTION_COMPLETE;
}
/*allocate temporary index-to-server array*/
tmp_server_nr = malloc(sizeof(uint32_t) * retry_msg_count);
if (!tmp_server_nr)
{
gossip_lerr("Unable to allocate temporary index-to-server array.\n");
js_p->error_code = -PVFS_ENOMEM;
return SM_ACTION_COMPLETE;
}
memset(tmp_server_nr,0,sizeof(uint32_t) * retry_msg_count);
/*allocate temporary handle array*/
tmp_handles = malloc(sizeof(PVFS_handle) * retry_msg_count);
if (!tmp_handles)
{
gossip_lerr("Unable to allocate temporary handle array.\n");
js_p->error_code = -PVFS_ENOMEM;
return SM_ACTION_COMPLETE;
}
memset(tmp_handles,0,sizeof(PVFS_handle) * retry_msg_count);
/* okay. let's setup new handles to retry.
*/
for (i=0,j=0; i<tree->num_data_files; i++)
{
server_nr = getattr->index_to_server[i];
ctx = &(getattr->mir_ctx_array[server_nr]);
/* don't process completed messages */
if (ctx->msg_completed)
continue;
/* for incomplete messages, cleanup memory, if necessary */
enc_req_bytes = (char *)&(msg->encoded_req);
for (k=0; k<sizeof(msg->encoded_req); k++)
{
if (enc_req_bytes[k] != '\0')
{
PINT_encode_release(&(msg->encoded_req),PINT_ENCODE_REQ);
break;
}
}/*end for*/
if (msg->encoded_resp_p)
{
BMI_memfree(msg->svr_addr
,msg->encoded_resp_p
,msg->max_resp_sz
,BMI_RECV);
}
/* Should we use the original datahandle? */
if (ctx->retry_original)
{
ctx->retry_original = 0;
tmp_handles[j] = ctx->original_datahandle;
tmp_server_nr[j] = ctx->original_server_nr;
j++;
continue;
}/*end retry_original*/
/* otherwise, get next mirrored handle. note: if a mirrored handle is
* zero, then this means that the creation of this mirrored object
* failed for its particular server. in this case, get the next valid
* handle. as a last resort, retry the original handle.
*/
copies = ctx->current_copies_count;
for (;copies < meta->mirror_copies_count; copies++)
{
index = (copies*meta->dfile_count) + server_nr;
if (meta->mirror_dfile_array[index] != 0)
{ /* we have found a valid mirrored handle */
tmp_handles[j] = meta->mirror_dfile_array[index];
tmp_server_nr[j] = server_nr;
j++;
break;
}
}
/* if we haven't found a valid mirrored handle, retry the original
* datahandle.
*/
if ( copies == meta->mirror_copies_count )
{
tmp_handles[j] = ctx->original_datahandle;
tmp_server_nr[j] = ctx->original_server_nr;
j++;
ctx->retry_original = 0;
ctx->current_copies_count = 0;
getattr->retry_count++;
continue;
}/*end if we have to use the original*/
/* otherwise, setup for the next retry event */
ctx->current_copies_count++;
if (ctx->current_copies_count == meta->mirror_copies_count)
{
ctx->current_copies_count = 0;
ctx->retry_original = 1;
getattr->retry_count++;
}
}/*end for each handle in the old request*/
/*replace values in old tree request*/
free(tree->handle_array);
tree->handle_array = tmp_handles;
tree->num_data_files = retry_msg_count;
/*replace values in old message request*/
msg->handle = tmp_handles[0];
msg->svr_addr=0;
ret = PINT_cached_config_map_to_server(&msg->svr_addr
,msg->handle
,msg->fs_id);
if (ret)
{
gossip_lerr("Unable to determine server address for handle(%llu) and "
"file system(%d).\n"
,llu(msg->handle)
,msg->fs_id);
js_p->error_code = -PVFS_EINVAL;
return SM_ACTION_COMPLETE;
}
/*save index-to-server array*/
free(getattr->index_to_server);
getattr->index_to_server = tmp_server_nr;
/* Push the msgarray_op and jump to msgpairarray.sm */
PINT_sm_push_frame(smcb,0,mop);
js_p->error_code=GETATTR_IO_RETRY;
return SM_ACTION_COMPLETE;
} /*end datafile_getattr_retry*/
static PINT_sm_action getattr_datafile_getattr_cleanup(
struct PINT_smcb *smcb, job_status_s *js_p)
{
struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
//FINISH_PROFILER("getattr", getattr_prof, 1);
PINT_sm_getattr_state *getattr = &(sm_p->getattr);
/* cleanup handle_array created for the one tree request */
PINT_sm_msgpair_state *msg_p = &(sm_p->msgarray_op.msgarray[0]);
if (msg_p->req.u.tree_get_file_size.handle_array)
free(msg_p->req.u.tree_get_file_size.handle_array);
/* cleanup tree request */
PINT_msgpairarray_destroy(&sm_p->msgarray_op);
/* cleanup memory that may have been used for mirrored retries.*/
if (getattr->mir_ctx_array)
free(getattr->mir_ctx_array);
if (getattr->index_to_server)
free(getattr->index_to_server);
return SM_ACTION_COMPLETE;
}
static PINT_sm_action getattr_acache_insert(
struct PINT_smcb *smcb, job_status_s *js_p)
{
struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
PVFS_size* tmp_size = NULL;
if( sm_p->getattr.attr.objtype == PVFS_TYPE_METAFILE ||
sm_p->getattr.attr.objtype == PVFS_TYPE_DIRECTORY ||
sm_p->getattr.attr.objtype == PVFS_TYPE_SYMLINK )
{
/* see if we have a size value to cache */
if (sm_p->getattr.attr.objtype == PVFS_TYPE_METAFILE &&
sm_p->getattr.req_attrmask & PVFS_ATTR_DATA_SIZE)
{
if(!(sm_p->getattr.attr.mask & PVFS_ATTR_META_UNSTUFFED))
{
/* stuffed file case */
sm_p->getattr.size = sm_p->getattr.attr.u.meta.stuffed_size;
tmp_size = &sm_p->getattr.size;
gossip_debug(GOSSIP_GETATTR_DEBUG, "getattr_acache_insert "
"calculated stuffed logical size of %lld\n", lld(*tmp_size));
}
else
{
/* compute size as requested */
assert(sm_p->getattr.attr.u.meta.dist);
assert(sm_p->getattr.attr.u.meta.dist->methods &&
sm_p->getattr.attr.u.meta.dist->methods->logical_file_size);
sm_p->getattr.size =
(sm_p->getattr.attr.u.meta.dist->methods->logical_file_size)(
sm_p->getattr.attr.u.meta.dist->params,
sm_p->getattr.attr.u.meta.dfile_count,
sm_p->getattr.size_array);
tmp_size = &sm_p->getattr.size;
gossip_debug(GOSSIP_GETATTR_DEBUG,"getattr_acache_insert calculated"
" unstuffed logical size of %lld\n"
, lld(*tmp_size));
}
}
PINT_acache_update(sm_p->getattr.object_ref,
&sm_p->getattr.attr,
tmp_size);
gossip_debug(GOSSIP_CLIENT_DEBUG, "trying to add object "
"reference to acache\n");
}
return SM_ACTION_COMPLETE;
}
static PINT_sm_action getattr_cleanup(
struct PINT_smcb *smcb, job_status_s *js_p)
{
struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
PINT_sm_getattr_state *getattr = &(sm_p->getattr);
gossip_debug(GOSSIP_CLIENT_DEBUG,
"(%p) getattr state: getattr_cleanup\n", sm_p);
gossip_debug(GOSSIP_GETATTR_DEBUG
,"%s: js_p->error_code:%d \tgetattr->attr.mask:0x%08x\n"
,__func__
,js_p->error_code
,getattr->attr.mask);
sm_p->error_code = js_p->error_code;
/* cleanup size array; is only allocated if datafile sizes are retrieved */
if (getattr->size_array)
free(getattr->size_array);
/* cleanup getattr when an error occurs */
if (js_p->error_code)
{
if (getattr->attr.mask & PVFS_ATTR_META_DFILES)
{
if (getattr->attr.u.meta.dfile_array)
free(getattr->attr.u.meta.dfile_array);
}
if (getattr->attr.mask & PVFS_ATTR_META_MIRROR_DFILES)
{
if (getattr->attr.u.meta.mirror_dfile_array)
free(getattr->attr.u.meta.mirror_dfile_array);
}
if (getattr->attr.mask & PVFS_ATTR_META_DIST)
{
PINT_dist_free(getattr->attr.u.meta.dist);
}
}/*end if error*/
return SM_ACTION_COMPLETE;
}
static PINT_sm_action getattr_set_sys_response(
struct PINT_smcb *smcb, job_status_s *js_p)
{
struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
PVFS_sysresp_getattr * sysresp = NULL;
PVFS_object_attr *attr = NULL;
if(js_p->error_code != 0)
{
PINT_SET_OP_COMPLETE;
return SM_ACTION_TERMINATE;
}
attr = &sm_p->getattr.attr;
assert(attr);
/* If we get to this state action,
* the getattr state machine was invoked, so
* we can assume that one of the PVFS_[i]sys_getattr functions
* was called, and the response field must be filled in for the
* user.
*/
sysresp = sm_p->u.getattr.getattr_resp_p;
/*
* if we retrieved a symlink target, copy it for the caller; this
* target path will be handed all the way back up to the caller via
* the PVFS_sys_attr object. The caller of PVFS_[i]sys_getattr
* must free it.
*/
if(attr->objtype == PVFS_TYPE_SYMLINK &&
attr->mask & PVFS_ATTR_SYMLNK_TARGET)
{
assert(attr->u.sym.target_path_len > 0);
assert(attr->u.sym.target_path);
sysresp->attr.link_target = strdup(attr->u.sym.target_path);
if (!sysresp->attr.link_target)
{
js_p->error_code = -PVFS_ENOMEM;
PINT_SET_OP_COMPLETE;
return SM_ACTION_TERMINATE;
}
}
if(attr->objtype == PVFS_TYPE_METAFILE)
{
/* Copy if there are any special object specific flags */
sysresp->attr.flags = attr->u.meta.hint.flags;
/* special case for when users ask for dfile count */
if (sm_p->getattr.req_attrmask & PVFS_ATTR_META_DFILES)
{
sysresp->attr.dfile_count = attr->u.meta.dfile_count;
}
if (sm_p->getattr.req_attrmask & PVFS_ATTR_META_MIRROR_DFILES)
{
sysresp->attr.mirror_copies_count = attr->u.meta.mirror_copies_count;
}
}
if (attr->objtype == PVFS_TYPE_DIRECTORY)
{
gossip_debug(GOSSIP_CLIENT_DEBUG, "dfile_count: %d\n",
attr->u.dir.hint.dfile_count);
gossip_debug(GOSSIP_CLIENT_DEBUG, "dist_name_len = %d, "
"dist_params_len = %d\n",
attr->u.dir.hint.dist_name_len, attr->u.dir.hint.dist_params_len);
sysresp->attr.dfile_count = attr->u.dir.hint.dfile_count;
/*
* If we retrieved any extended attributes for the directory
* in question, the caller's responsibility to free it up
*/
if (attr->u.dir.hint.dist_name_len > 0 &&
(sm_p->getattr.req_attrmask & PVFS_ATTR_DIR_HINT))
{
sysresp->attr.dist_name = strdup(attr->u.dir.hint.dist_name);
if (!sysresp->attr.dist_name)
{
js_p->error_code = -PVFS_ENOMEM;
PINT_SET_OP_COMPLETE;
return SM_ACTION_TERMINATE;
}
gossip_debug(GOSSIP_CLIENT_DEBUG, "dist_name_hint: %s\n"
, sysresp->attr.dist_name);
}
if (attr->u.dir.hint.dist_params_len > 0 &&
(sm_p->getattr.req_attrmask & PVFS_ATTR_DIR_HINT))
{
sysresp->attr.dist_params = strdup(attr->u.dir.hint.dist_params);
if (!sysresp->attr.dist_params)
{
free(sysresp->attr.dist_name);
sysresp->attr.dist_name = NULL;
js_p->error_code = -PVFS_ENOMEM;
PINT_SET_OP_COMPLETE;
return SM_ACTION_TERMINATE;
}
gossip_debug(GOSSIP_CLIENT_DEBUG, "dist_name_params: %s\n"
, sysresp->attr.dist_params);
}
}
/* copy outgoing sys_attr fields from returned object_attr */
sysresp->attr.owner = attr->owner;
sysresp->attr.group = attr->group;
sysresp->attr.perms = attr->perms;
sysresp->attr.atime = attr->atime;
sysresp->attr.mtime = attr->mtime;
sysresp->attr.ctime = attr->ctime;
sysresp->attr.mask = PVFS_util_object_to_sys_attr_mask(attr->mask);
sysresp->attr.size = 0;
sysresp->attr.objtype = attr->objtype;
sysresp->attr.cid = attr->cid;
if (js_p->error_code == 0)
{
/* convert outgoing attribute mask based on what we got */
sysresp->attr.mask = PVFS_util_object_to_sys_attr_mask(
sm_p->getattr.attr.mask);
if (sm_p->getattr.req_attrmask & PVFS_ATTR_DATA_SIZE)
{
if( attr->objtype == PVFS_TYPE_DATAFILE )
{
sysresp->attr.size = attr->u.data.size;
}
else
{
sysresp->attr.size = sm_p->getattr.size;
}
sysresp->attr.mask |= PVFS_ATTR_SYS_SIZE;
}
if(attr->mask & PVFS_ATTR_META_DIST)
{
/* we have enough information to set a block size */
sysresp->attr.blksize = attr->u.meta.dist->methods->get_blksize(
attr->u.meta.dist->params);
sysresp->attr.mask |= PVFS_ATTR_SYS_BLKSIZE;
}
/* if this is a symlink, add the link target */
if (sm_p->getattr.req_attrmask & PVFS_ATTR_SYMLNK_TARGET)
{
sysresp->attr.mask |= PVFS_ATTR_SYS_LNK_TARGET;
}
if(sm_p->getattr.req_attrmask & PVFS_ATTR_DIR_DIRENT_COUNT)
{
sysresp->attr.dirent_count = attr->u.dir.dirent_count;
sysresp->attr.mask |= PVFS_ATTR_SYS_DIRENT_COUNT;
}
}
else
{
/* in case of failure, blank out response */
memset(sm_p->u.getattr.getattr_resp_p,
0, sizeof(PVFS_sysresp_getattr));
}
PINT_SM_GETATTR_STATE_CLEAR(sm_p->getattr);
PINT_SET_OP_COMPLETE;
return SM_ACTION_TERMINATE;
}
/*
* Local variables:
* mode: c
* c-indent-level: 4
* c-basic-offset: 4
* End:
*
* vim: ft=c ts=8 sts=4 sw=4 expandtab
*/