Skip to content
Permalink
1139b72d5e
Switch branches/tags

Name already in use

A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
Go to file
 
 
Cannot retrieve contributors at this time
1346 lines (1155 sloc) 42.1 KB
/*
* (C) 2003 Clemson University and The University of Chicago
*
* See COPYING in top-level directory.
*/
/** \file
* \ingroup sysint
*
* PVFS2 system interface routines for dereferencing path strings.
*/
#include <string.h>
#include <assert.h>
#include "client-state-machine.h"
#include "pvfs2-debug.h"
#include "job.h"
#include "gossip.h"
#include "str-utils.h"
#include "pint-util.h"
#include "ncache.h"
#include "pint-cached-config.h"
#include "PINT-reqproto-encode.h"
#include "pvfs2-internal.h"
#include "osd-util/osd-util.h"
extern job_context_id pint_client_sm_context;
#define GET_CURRENT_CONTEXT(__sm_p) \
(&(__sm_p->u.lookup.contexts[__sm_p->u.lookup.current_context]))
#define GET_SEGMENT_AT(__sm_p, index) \
(&((GET_CURRENT_CONTEXT(__sm_p))->segments[index]))
#define GET_CURRENT_SEGMENT(__sm_p) \
(GET_SEGMENT_AT(__sm_p, (GET_CURRENT_CONTEXT(__sm_p))->current_segment))
enum {
OSD_MSGPAIR = 2001
};
enum
{
LOOKUP_CONTINUE = 2,
LOOKUP_TYPE_DIRECTORY = 3,
LOOKUP_TYPE_METAFILE = 4,
LOOKUP_TYPE_RELATIVE_LN = 5,
LOOKUP_TYPE_ABSOLUTE_LN = 6,
LOOKUP_TYPE_LN_NO_FOLLOW = 7,
};
static int lookup_segment_lookup_comp_fn(
void *v_p, struct PVFS_server_resp *resp_p, int index);
/* helper function prototypes */
static int initialize_context(
struct PINT_client_lookup_sm * lookup_sm,
char *pathname,
PVFS_object_ref ctx_starting_refn);
static void finalize_context(
PINT_client_lookup_sm_ctx *ctx);
%%
machine pvfs2_client_lookup_sm
{
state lookup_segment_start
{
run lookup_segment_start;
success => lookup_segment_query_ncache;
default => lookup_segment_lookup_failure;
}
state lookup_segment_query_ncache
{
run lookup_segment_query_ncache;
success => lookup_segment_verify_attr_present;
default => lookup_segment_setup_msgpair;
}
state lookup_segment_setup_msgpair
{
run lookup_segment_setup_msgpair;
success => lookup_segment_lookup_xfer_msgpair;
OSD_MSGPAIR => lookup_segment_lookup_xfer_osd_msgpair;
default => lookup_segment_lookup_failure;
}
state lookup_segment_lookup_xfer_osd_msgpair
{
jump pvfs2_osd_msgpairarray_sm;
success => lookup_segment_verify_attr_present;
default => lookup_segment_lookup_failure;
}
state lookup_segment_lookup_xfer_msgpair
{
jump pvfs2_msgpairarray_sm;
success => lookup_segment_verify_attr_present;
default => lookup_segment_lookup_failure;
}
state lookup_segment_lookup_failure
{
run lookup_segment_lookup_failure;
default => lookup_cleanup;
}
state lookup_segment_verify_attr_present
{
run lookup_segment_verify_attr_present;
success => lookup_segment_check_attr_type;
default => lookup_segment_getattr;
}
state lookup_segment_getattr
{
jump pvfs2_client_getattr_sm;
success => lookup_segment_check_attr_type;
default => lookup_cleanup;
}
state lookup_segment_check_attr_type
{
run lookup_segment_check_attr_type;
LOOKUP_TYPE_DIRECTORY => lookup_context_check_completion;
LOOKUP_TYPE_METAFILE => lookup_context_check_completion;
LOOKUP_TYPE_LN_NO_FOLLOW => lookup_context_check_completion;
LOOKUP_TYPE_RELATIVE_LN => lookup_segment_handle_relative_link;
LOOKUP_TYPE_ABSOLUTE_LN => lookup_segment_handle_absolute_link;
default => lookup_cleanup;
}
state lookup_context_check_completion
{
run lookup_context_check_completion;
LOOKUP_CONTINUE => lookup_segment_start;
default => lookup_cleanup;
}
state lookup_segment_handle_relative_link
{
run lookup_segment_handle_relative_link;
success => lookup_segment_start;
default => lookup_cleanup;
}
state lookup_segment_handle_absolute_link
{
run lookup_segment_handle_absolute_link;
success => lookup_segment_start;
default => lookup_cleanup;
}
state lookup_cleanup
{
run lookup_cleanup;
default => terminate;
}
}
%%
static int initialize_context(
struct PINT_client_lookup_sm * lookup_sm,
char *pathname,
PVFS_object_ref ctx_starting_refn)
{
int i, ret = -PVFS_EINVAL, pathlen = 0, num_segments = 0;
void *state = NULL;
int cur_seg_index = 0, prev_ctx_index = 0;
char *cur_seg_name = NULL;
char *orig_pathname = NULL, *seg_remaining = NULL, *slash_str = NULL;
PINT_client_lookup_sm_segment *cur_seg = NULL;
int num_consecutive_prev_ctx_dot_dots = 0;
PINT_client_lookup_sm_ctx *ctx = NULL;
PINT_client_lookup_sm_ctx *prev_ctx = NULL;
gossip_debug(GOSSIP_LOOKUP_DEBUG, "initialize_context called\n");
assert(lookup_sm->current_context <= lookup_sm->context_count);
if(lookup_sm->current_context == lookup_sm->context_count)
{
/* we have usee the last available context */
lookup_sm->context_count++;
if(lookup_sm->context_count == 1)
{
/* first context, so allocate one */
lookup_sm->contexts = malloc(sizeof(PINT_client_lookup_sm_ctx));
if(!lookup_sm->contexts)
{
return -PVFS_ENOMEM;
}
}
else
{
/* no the first one, so realloc to get one more */
lookup_sm->contexts = realloc(lookup_sm->contexts,
sizeof(PINT_client_lookup_sm_ctx) *
lookup_sm->context_count);
if(!lookup_sm->contexts)
{
return -PVFS_ENOMEM;
}
}
}
if (!pathname)
{
return ret;
}
pathlen = strlen(pathname);
num_segments = PINT_string_count_segments(pathname);
if ((pathlen == 0) || (num_segments == 0))
{
return ret;
}
if ((pathlen > (PVFS_REQ_LIMIT_PATH_NAME_BYTES - 1)) ||
(num_segments > PVFS_REQ_LIMIT_PATH_SEGMENT_COUNT))
{
gossip_err("Filename %s is too long\n", pathname);
return -PVFS_ENAMETOOLONG;
}
/* initialize new context */
if (lookup_sm->current_context < 0)
{
return ret;
}
ctx = &lookup_sm->contexts[lookup_sm->current_context];
assert(ctx);
prev_ctx_index = (lookup_sm->current_context - 1);
memset(ctx, 0, sizeof(PINT_client_lookup_sm_ctx));
ctx->current_segment = 0;
ctx->total_segments = 0;
ctx->ctx_starting_refn = ctx_starting_refn;
/* initialize all segments within the context */
orig_pathname = strdup(pathname);
gossip_debug(GOSSIP_LOOKUP_DEBUG, " original pathname is: %s\n",
orig_pathname);
while(!PINT_string_next_segment(pathname, &cur_seg_name, &state))
{
/* grab the next segment in the context to fill in */
cur_seg = &(ctx->segments[cur_seg_index]);
assert(cur_seg);
memset(cur_seg, 0, sizeof(PINT_client_lookup_sm_segment));
gossip_debug(GOSSIP_LOOKUP_DEBUG, " cur_seg_name[%d]: %s\n",
cur_seg_index, cur_seg_name);
gossip_debug(GOSSIP_LOOKUP_DEBUG, " pathname is: %s\n",
pathname);
if (strcmp(cur_seg_name,".") == 0)
{
/* reset the count of consecutive dot dot segments */
num_consecutive_prev_ctx_dot_dots = 0;
gossip_debug(GOSSIP_LOOKUP_DEBUG,
" ignoring useless segment\n");
continue;
}
else if (strcmp(cur_seg_name,"..") == 0)
{
/*
if this isn't true, we need to
grab the previous context's previous segment
if this weren't true, we'd normally:
assert(cur_seg_index > 0);
*/
if ((cur_seg_index < 1) ||
(num_consecutive_prev_ctx_dot_dots > 0))
{
PINT_client_lookup_sm_segment *prev_ctx_prev_seg;
gossip_debug(
GOSSIP_LOOKUP_DEBUG, " got a '..' segment that "
"requires attention of the previous context\n");
init_next_prev_segment:
/*
grab the previous context to access the segments
within it, assuming a previous context is available
*/
if (prev_ctx_index < 0)
{
gossip_debug(GOSSIP_LOOKUP_DEBUG, "there are no "
"more previous contexts available: "
"failing lookup\n");
free(orig_pathname);
return -PVFS_ENOENT;
}
prev_ctx = &lookup_sm->contexts[prev_ctx_index];
assert(prev_ctx);
assert(prev_ctx->current_segment > 0);
num_consecutive_prev_ctx_dot_dots++;
gossip_debug(
GOSSIP_LOOKUP_DEBUG, "num consecutive '..' "
"segments requiring the previous segment "
"is now %d\n", num_consecutive_prev_ctx_dot_dots);
/*
further, if we have a number of consecutive '..'
segments, we may need to keep backing up into
the previous contexts' space
*/
if (prev_ctx->current_segment -
num_consecutive_prev_ctx_dot_dots < 0)
{
/* skip to next previous context, if any */
if (prev_ctx_index > -1)
{
/*
bump down dot dot count since it wasn't
used yet if we got here
*/
num_consecutive_prev_ctx_dot_dots--;
prev_ctx_index--;
goto init_next_prev_segment;
}
gossip_debug(
GOSSIP_LOOKUP_DEBUG, "there are no more segments "
"in the previous context: failing lookup\n");
free(orig_pathname);
return -PVFS_ENOENT;
}
prev_ctx_prev_seg = &prev_ctx->segments[
prev_ctx->current_segment -
num_consecutive_prev_ctx_dot_dots];
assert(prev_ctx_prev_seg);
/*
instead of decrementing the seg index and
continuing, we need to replace the last segment
copied from the last context in this case. (so
we drop through to segment init)
*/
ctx_starting_refn =
prev_ctx_prev_seg->seg_starting_refn;
cur_seg_name = prev_ctx_prev_seg->seg_name;
gossip_debug(
GOSSIP_LOOKUP_DEBUG,
"using previous segment: %s\n", cur_seg_name);
cur_seg_index--;
}
else
{
gossip_debug(GOSSIP_LOOKUP_DEBUG,
" got a '..' segment\n");
cur_seg_index--;
continue;
}
}
else
{
/* reset the count of consecutive dot dot segments */
num_consecutive_prev_ctx_dot_dots = 0;
}
/*
fill in the current segment now. the first segment
MUST have the same starting refn as the context
*/
if (cur_seg_index == 0)
{
cur_seg->seg_starting_refn = ctx_starting_refn;
}
if (cur_seg->seg_name)
{
free(cur_seg->seg_name);
}
cur_seg->seg_name = strdup(cur_seg_name);
assert(cur_seg->seg_name);
slash_str = orig_pathname;
for (i = 0; i < cur_seg_index; i++)
{
slash_str = strchr(slash_str, '/');
if (slash_str == NULL)
{
break;
}
slash_str++;
}
/* seg_remaining = strstr(orig_pathname, cur_seg_name); */
seg_remaining = slash_str;
if (seg_remaining)
{
gossip_debug(GOSSIP_LOOKUP_DEBUG,
" *seg_remaining is: %s\n", seg_remaining);
cur_seg->seg_remaining = strdup(seg_remaining);
assert(cur_seg->seg_remaining);
}
else
{
cur_seg->seg_remaining = NULL;
}
cur_seg_index++;
}
free(orig_pathname);
#if 0
/* DEBUGGING ONLY */
{
int i = 0;
gossip_debug(GOSSIP_LOOKUP_DEBUG, "Processed context path is:\n");
for(i = 0; i < cur_seg_index; i++)
{
assert(ctx->segments[i].seg_name);
gossip_debug(GOSSIP_LOOKUP_DEBUG, "/%s",
ctx->segments[i].seg_name);
}
gossip_debug(GOSSIP_LOOKUP_DEBUG, "\n");
}
#endif
ctx->total_segments = cur_seg_index;
assert(ctx->current_segment == 0);
ret = 0;
return ret;
}
static void finalize_context(
PINT_client_lookup_sm_ctx *ctx)
{
int i = 0;
PINT_client_lookup_sm_segment *cur_seg = NULL;
for(i = 0; i < MAX_LOOKUP_SEGMENTS; i++)
{
cur_seg = &ctx->segments[i];
assert(cur_seg);
if (cur_seg->seg_name)
{
gossip_debug(GOSSIP_LOOKUP_DEBUG, "Freeing segment %s\n",
cur_seg->seg_name);
free(cur_seg->seg_name);
cur_seg->seg_name = NULL;
}
if (cur_seg->seg_remaining)
{
gossip_debug(GOSSIP_LOOKUP_DEBUG, "Freeing remaining "
"segment %s\n", cur_seg->seg_remaining);
free(cur_seg->seg_remaining);
cur_seg->seg_remaining = NULL;
}
PINT_free_object_attr(&(cur_seg->seg_attr));
}
}
/****************************************************************/
/** Initiate dereferencing of a path string relative to a parent
* directory.
*/
PVFS_error PVFS_isys_ref_lookup(
PVFS_fs_id fs_id,
char* relative_pathname,
PVFS_object_ref parent,
const PVFS_credentials *credentials,
PVFS_sysresp_lookup *resp,
int32_t follow_link,
PVFS_sys_op_id *op_id,
PVFS_hint hints,
void *user_ptr)
{
PVFS_error ret = -PVFS_EINVAL;
PINT_smcb *smcb = NULL;
PINT_client_sm *sm_p = NULL;
gossip_debug(GOSSIP_CLIENT_DEBUG, "PVFS_isys_ref_lookup entered\n");
if ((relative_pathname == NULL) || (resp == NULL))
{
return ret;
}
PINT_smcb_alloc(&smcb, PVFS_SYS_LOOKUP,
sizeof(struct PINT_client_sm),
client_op_state_get_machine,
client_state_machine_terminate,
pint_client_sm_context);
if (smcb == NULL)
{
return -PVFS_ENOMEM;
}
sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
PINT_init_msgarray_params(sm_p, fs_id);
PINT_init_sysint_credentials(sm_p->cred_p, credentials);
sm_p->u.lookup.orig_pathname = relative_pathname;
sm_p->u.lookup.starting_refn = parent;
sm_p->u.lookup.lookup_resp = resp;
sm_p->u.lookup.follow_link = follow_link;
sm_p->u.lookup.current_context = 0;
PVFS_hint_copy(hints, &sm_p->hints);
PVFS_hint_add(&sm_p->hints, PVFS_HINT_HANDLE_NAME, sizeof(PVFS_handle), &parent.handle);
ret = initialize_context(&sm_p->u.lookup,
relative_pathname, parent);
if (ret != 0)
{
gossip_err("%s: failed to init context (path = %s)\n",
__func__, relative_pathname);
if (ret == -PVFS_ENOENT && sm_p->u.lookup.lookup_resp->error_path)
{
/* copy out error path */
strncpy(sm_p->u.lookup.lookup_resp->error_path,
relative_pathname,
sm_p->u.lookup.lookup_resp->error_path_size);
}
PVFS_util_release_credentials(sm_p->cred_p);
PINT_smcb_free(smcb);
return ret;
}
gossip_debug(GOSSIP_CLIENT_DEBUG, "lookup got: %s (parent %llu)\n",
relative_pathname, llu(parent.handle));
return PINT_client_state_machine_post(
smcb, op_id, user_ptr);
}
/** Dereference a path string relative to a parent directory.
*/
PVFS_error PVFS_sys_ref_lookup(
PVFS_fs_id fs_id,
char *relative_pathname,
PVFS_object_ref parent,
const PVFS_credentials *credentials,
PVFS_sysresp_lookup *resp,
int32_t follow_link,
PVFS_hint hints)
{
PVFS_error ret, error;
PVFS_sys_op_id op_id;
gossip_debug(GOSSIP_CLIENT_DEBUG, "PVFS_sys_ref_lookup entered\n");
ret = PVFS_isys_ref_lookup(
fs_id, relative_pathname, parent, credentials, resp,
follow_link, &op_id, hints, resp);
if (ret)
{
PVFS_perror_gossip("PVFS_isys_ref_lookup call", ret);
return ret;
}
if(op_id != -1)
{
/* did not complete immediately, so we wait */
ret = PVFS_sys_wait(op_id, "lookup", &error);
if (ret)
{
PVFS_perror_gossip("PVFS_sys_wait call", ret);
}
if(error)
{
ret = error;
}
PINT_sys_release(op_id);
}
return ret;
}
/** Dereference a path string relative to the root directory.
*/
PVFS_error PVFS_sys_lookup(
PVFS_fs_id fs_id, char *name,
const PVFS_credentials *credentials,
PVFS_sysresp_lookup *resp,
int32_t follow_link,
PVFS_hint hints)
{
PVFS_error ret = -PVFS_EINVAL;
PVFS_object_ref parent;
if (name && resp)
{
parent.handle = 0;
parent.fs_id = fs_id;
ret = PINT_cached_config_get_root_handle(
parent.fs_id,&parent.handle);
if (ret < 0)
{
return ret;
}
/* special case root handle lookup, since we already know it */
if (strcmp(name, "/") == 0)
{
resp->ref.handle = parent.handle;
resp->ref.fs_id = fs_id;
ret = 0;
}
else
{
/*
strip off leading slash (if any) and lookup the rest
of the path. NOTE: If it's always an error to not
have a leading slash, we should check that above
without doing this function call.
*/
char *path = ((name[0] == '/') ? &name[1] : name);
ret = PVFS_sys_ref_lookup(
fs_id, path, parent, credentials, resp, follow_link, hints);
}
}
return ret;
}
/****************************************************************/
/*
* Reset for next segment lookup.
*/
static PINT_sm_action lookup_segment_start(struct PINT_smcb *smcb,
job_status_s *js_p)
{
gossip_debug(GOSSIP_CLIENT_DEBUG, "%s\n", __func__);
js_p->error_code = 0;
return SM_ACTION_COMPLETE;
}
static int lookup_segment_query_ncache(struct PINT_smcb *smcb,
job_status_s *js_p)
{
struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
PINT_client_lookup_sm_segment *cur_seg;
PVFS_object_ref parent_ref, object_ref;
int ret;
cur_seg = GET_CURRENT_SEGMENT(sm_p);
gossip_debug(GOSSIP_CLIENT_DEBUG, "%s: segment [%s]\n", __func__,
cur_seg->seg_name);
parent_ref.fs_id = cur_seg->seg_starting_refn.fs_id;
parent_ref.handle = cur_seg->seg_starting_refn.handle;
ret = PINT_ncache_get_cached_entry(cur_seg->seg_name, &object_ref,
&parent_ref);
if (ret == 0)
{
gossip_debug(GOSSIP_NCACHE_DEBUG,
"*** ncache hit on first segment of %s (%llu|%d)\n",
cur_seg->seg_name, llu(object_ref.handle),
object_ref.fs_id);
cur_seg->seg_resolved_refn.handle = object_ref.handle;
cur_seg->seg_resolved_refn.fs_id = object_ref.fs_id;
js_p->error_code = 0; /* hit */
} else {
gossip_debug(GOSSIP_NCACHE_DEBUG,
"*** ncache clean miss on first segment of %s\n",
cur_seg->seg_name);
js_p->error_code = 1; /* miss */
}
return SM_ACTION_COMPLETE;
}
/*
* This is called if the ncache lookup failed. It builds the msgpair
* so that the next state can jump into xfer msgpair.
*/
static PINT_sm_action lookup_segment_setup_msgpair(struct PINT_smcb *smcb,
job_status_s *js_p)
{
struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
int ret = -PVFS_EINVAL;
PINT_sm_msgpair_state *msg_p = NULL;
PINT_client_lookup_sm_segment *cur_seg = NULL;
char *seg_to_lookup = NULL;
gossip_debug(GOSSIP_CLIENT_DEBUG, "%s\n", __func__);
/* do a lookup on the current segment of the current context */
cur_seg = GET_CURRENT_SEGMENT(sm_p);
assert(cur_seg);
PINT_msgpair_init(&sm_p->msgarray_op);
msg_p = &sm_p->msgarray_op.msgpair;
ret = PINT_cached_config_map_to_server(&msg_p->svr_addr,
cur_seg->seg_starting_refn.handle,
cur_seg->seg_starting_refn.fs_id);
if (ret)
{
gossip_err("Failed to map meta server address\n");
js_p->error_code = ret;
}
if (server_is_osd(msg_p->svr_addr)) {
uint64_t oid;
struct attribute_list attr;
struct osd_command *command = &sm_p->msgarray_op.msgpair.osd_command;
/* Disable multi-segment resolution optimization */
seg_to_lookup = cur_seg->seg_name;
attr.type = ATTR_GET;
attr.page = ANY_PG + PVFS_USEROBJECT_DIR_PG;
attr.number = jenkins_one_at_a_time_hash((uint8_t *)seg_to_lookup,
strlen(seg_to_lookup) + 1);
attr.len = 1024;
oid = cur_seg->seg_starting_refn.handle;
ret = osd_command_set_get_attributes(command, PVFS_OSD_META_PID, oid);
if (ret) {
osd_error_xerrno(ret, "%s: osd_command_set_get_attributes failed",
__func__);
js_p->error_code = ret;
return SM_ACTION_COMPLETE;
}
ret = osd_command_attr_build(command, &attr, 1);
if (ret) {
osd_error_xerrno(ret, "%s: osd_command_attr_build failed",
__func__);
js_p->error_code = ret;
return SM_ACTION_COMPLETE;
}
js_p->error_code = OSD_MSGPAIR;
} else {
/*
the pvfs2 lookup_path server operation has an optimization that
allows several path components to be resolved at once. this
code here resolves on a segment-by-segment basis, but we can
handle the multi-segment resolution by advancing through all
handles/attrs returned and populating the segments on
completion. the lookup_comp_fn does just this.
one way to disable use of this optimization is to make sure
seg_to_lookup is set to seg_name, and never seg_remaining. that
guarantees we're issuing a lookup on a single path segment
*/
seg_to_lookup = cur_seg->seg_remaining ? cur_seg->seg_remaining :
cur_seg->seg_name;
PINT_SERVREQ_LOOKUP_PATH_FILL(
msg_p->req,
*sm_p->cred_p,
seg_to_lookup,
cur_seg->seg_starting_refn.fs_id,
cur_seg->seg_starting_refn.handle,
PVFS_ATTR_COMMON_ALL,
sm_p->hints);
js_p->error_code = 0;
}
gossip_debug(
GOSSIP_LOOKUP_DEBUG, "Looking up segment %s under handle %llu\n",
seg_to_lookup, llu(cur_seg->seg_starting_refn.handle));
msg_p->fs_id = cur_seg->seg_starting_refn.fs_id;
msg_p->handle = cur_seg->seg_starting_refn.handle;
msg_p->retry_flag = PVFS_MSGPAIR_RETRY;
msg_p->comp_fn = lookup_segment_lookup_comp_fn;
PINT_sm_push_frame(smcb, 0, &sm_p->msgarray_op);
return SM_ACTION_COMPLETE;
}
static PINT_sm_action lookup_segment_lookup_failure(
struct PINT_smcb *smcb, job_status_s *js_p)
{
gossip_debug(GOSSIP_CLIENT_DEBUG,
"lookup state: lookup_segment_lookup_failure\n");
return SM_ACTION_COMPLETE;
}
/*
* We get here either if the ncache hit, or the xfer msgpair finished
* successfully. In the ncache hit case, seg_attr will be all zeroes,
* so it will head into getattr, which may hit in its own acache.
*/
static PINT_sm_action lookup_segment_verify_attr_present(
struct PINT_smcb *smcb, job_status_s *js_p)
{
struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
PINT_client_lookup_sm_segment *cur_seg = NULL;
gossip_debug(GOSSIP_CLIENT_DEBUG,
"lookup state: lookup_segment_verify_attr_present\n");
cur_seg = GET_CURRENT_SEGMENT(sm_p);
assert(cur_seg);
/*
NOTE: there are two cases here where we need to fetch the attrs.
1) either the attrs are not present at all (i.e. objtype is
uninitialized). this is normal for a distributed meta data
setups (where we got a handle but still need to fetch the attrs)
2) we got a symlink back from the lookup and we need to fetch
the target path. if we got a symlink, we ALWAYS need to fetch
the target path UNLESS:
a) we're on the last segment, and
b) we're on the initial context (context 0), and
c) we were told NOT to follow the link
This second case is handled in check_attr_type method.
*/
if ((cur_seg->seg_attr.objtype == PVFS_TYPE_NONE) ||
(cur_seg->seg_attr.objtype == PVFS_TYPE_SYMLINK))
{
/* if we don't have the attr, direct sm to fetch the attr */
gossip_debug(GOSSIP_LOOKUP_DEBUG, " -- NO attrs for %s\n",
cur_seg->seg_name);
PINT_SM_GETATTR_STATE_FILL(
sm_p->getattr,
cur_seg->seg_resolved_refn,
(PVFS_ATTR_COMMON_ALL | PVFS_ATTR_SYMLNK_ALL),
PVFS_TYPE_NONE,
0);
js_p->error_code = 1;
}
else
{
gossip_debug(GOSSIP_LOOKUP_DEBUG, " -- we have the attrs for %s\n",
cur_seg->seg_name);
/* otherwise, continue to next state */
js_p->error_code = 0;
}
return SM_ACTION_COMPLETE;
}
static PINT_sm_action lookup_segment_check_attr_type(
struct PINT_smcb *smcb, job_status_s *js_p)
{
struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
PINT_client_lookup_sm_ctx *cur_ctx = NULL;
PINT_client_lookup_sm_segment *cur_seg = NULL;
gossip_debug(GOSSIP_CLIENT_DEBUG,
"lookup state: lookup_segment_check_attr_type\n");
cur_seg = GET_CURRENT_SEGMENT(sm_p);
assert(cur_seg);
if ((cur_seg->seg_attr.objtype == PVFS_TYPE_NONE) ||
(cur_seg->seg_attr.objtype == PVFS_TYPE_SYMLINK)) {
/*
* Got here from getattr. Else cur_seg already populated by
* a combined lookup+attrs from the server. See function
* directly above for why this objtype condition.
*/
PINT_free_object_attr(&(cur_seg->seg_attr));
PINT_copy_object_attr(&(cur_seg->seg_attr),
&(sm_p->getattr.attr));
}
cur_ctx = GET_CURRENT_CONTEXT(sm_p);
assert(cur_ctx);
switch(cur_seg->seg_attr.objtype)
{
case PVFS_TYPE_DIRECTORY:
js_p->error_code = LOOKUP_TYPE_DIRECTORY;
break;
case PVFS_TYPE_METAFILE:
js_p->error_code = LOOKUP_TYPE_METAFILE;
break;
case PVFS_TYPE_SYMLINK:
/*
check if we need to skip the symlink resolution
(in the case that it's the final segment and the
user requested that we do NOT follow the last link)
*/
if ((cur_seg->seg_attr.objtype == PVFS_TYPE_SYMLINK) &&
(cur_ctx->current_segment ==
(cur_ctx->total_segments - 1)) &&
(sm_p->u.lookup.current_context == 0) &&
(sm_p->u.lookup.follow_link ==
PVFS2_LOOKUP_LINK_NO_FOLLOW))
{
gossip_debug(GOSSIP_LOOKUP_DEBUG, "** skipping final "
"symlink resolution due to user request\n");
sm_p->u.lookup.skipped_final_resolution = 1;
js_p->error_code = LOOKUP_TYPE_LN_NO_FOLLOW;
}
else
{
assert(cur_seg->seg_attr.u.sym.target_path);
if (cur_seg->seg_attr.u.sym.target_path[0] == '/')
{
js_p->error_code = LOOKUP_TYPE_ABSOLUTE_LN;
}
else
{
js_p->error_code = LOOKUP_TYPE_RELATIVE_LN;
}
}
break;
default:
gossip_debug(GOSSIP_CLIENT_DEBUG, "cannot resolve object of "
"type %d\n", cur_seg->seg_attr.objtype);
js_p->error_code = -PVFS_EINVAL;
}
return SM_ACTION_COMPLETE;
}
static PINT_sm_action lookup_segment_handle_relative_link(
struct PINT_smcb *smcb, job_status_s *js_p)
{
struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
int ret = -PVFS_EINVAL;
PINT_client_lookup_sm_segment *cur_seg = NULL;
char *relative_symlink_target = NULL;
/*
NOTE: if we have a relative link, we need to essentially
recurse. we're basically doing that iteratively by moving to a
new context and continuing along our completely linear state
machine. the absolute link case is a simplification of this
case. the reason this is tricker is because the relative link
may contain '.' and '..' characters (i.e. foo -> ../oldfoo)
*/
gossip_debug(GOSSIP_CLIENT_DEBUG,
"lookup state: lookup_segment_handle_relative_link\n");
js_p->error_code = 0;
/* grab the current segment that has a relative link to be resolved */
cur_seg = GET_CURRENT_SEGMENT(sm_p);
assert(cur_seg);
relative_symlink_target = cur_seg->seg_attr.u.sym.target_path;
sm_p->u.lookup.current_context++;
/*
and initialize the new context with the information we have so
that it can resolve this absolute link.
the starting refn is initialized as the starting refn of
the segment that resolves to this relative link, since the
link must be relative to that starting parent
*/
ret = initialize_context(&sm_p->u.lookup,
relative_symlink_target,
cur_seg->seg_starting_refn);
if (ret)
{
if (ret == -PVFS_ENOENT && sm_p->u.lookup.lookup_resp->error_path)
{
/* copy out error path */
strncpy(sm_p->u.lookup.lookup_resp->error_path,
relative_symlink_target,
sm_p->u.lookup.lookup_resp->error_path_size);
}
js_p->error_code = ret;
}
return SM_ACTION_COMPLETE;
}
static PINT_sm_action lookup_segment_handle_absolute_link(
struct PINT_smcb *smcb, job_status_s *js_p)
{
struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
int ret = -PVFS_EINVAL;
PINT_client_lookup_sm_segment *cur_seg = NULL;
char absolute_symlink_target[PVFS_NAME_MAX] = {0};
PVFS_object_ref root_refn;
PVFS_fs_id fs_id;
/*
NOTE: if we have an absolute link, we need to essentially
recurse. we're basically doing that iteratively by moving to a
new context and continuing along our completely linear state
machine.
*/
gossip_debug(GOSSIP_CLIENT_DEBUG,
"lookup state: lookup_segment_handle_absolute_link\n");
js_p->error_code = 0;
/* grab the current segment that has an absolute link to be resolved */
cur_seg = GET_CURRENT_SEGMENT(sm_p);
assert(cur_seg);
ret = PVFS_util_resolve(cur_seg->seg_attr.u.sym.target_path,
&fs_id,
absolute_symlink_target,
PVFS_NAME_MAX);
if (ret < 0)
{
gossip_debug(GOSSIP_CLIENT_DEBUG,
"PVFS_util_resolve returned %d on %s\n",
ret, cur_seg->seg_attr.u.sym.target_path);
if (ret == -PVFS_ENOENT && sm_p->u.lookup.lookup_resp->error_path)
{
/* copy out error path */
strncpy(sm_p->u.lookup.lookup_resp->error_path,
cur_seg->seg_attr.u.sym.target_path,
/*absolute_symlink_target,*/
sm_p->u.lookup.lookup_resp->error_path_size);
/* set special error code */
ret = -PVFS_ENOTPVFS;
}
js_p->error_code = ret;
return SM_ACTION_COMPLETE;
}
sm_p->u.lookup.current_context++;
/*
and initialize the new context with the information we have so
that it can resolve this absolute link.
the starting refn is initialized as the root refn matching the
fs_id that this lookup started with.
*/
root_refn.fs_id = sm_p->u.lookup.starting_refn.fs_id;
ret = PINT_cached_config_get_root_handle(
root_refn.fs_id, &root_refn.handle);
assert(ret == 0);
ret = initialize_context(&sm_p->u.lookup,
absolute_symlink_target + 1,
root_refn);
if (ret)
{
if (ret == -PVFS_ENOENT && sm_p->u.lookup.lookup_resp->error_path)
{
/* copy out error path */
strncpy(sm_p->u.lookup.lookup_resp->error_path,
absolute_symlink_target + 1,
sm_p->u.lookup.lookup_resp->error_path_size);
}
js_p->error_code = ret;
}
return SM_ACTION_COMPLETE;
}
static PINT_sm_action lookup_context_check_completion(
struct PINT_smcb *smcb, job_status_s *js_p)
{
struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
PVFS_object_ref last_resolved_refn;
PINT_client_lookup_sm_ctx *cur_ctx = NULL;
PINT_client_lookup_sm_segment *cur_seg = NULL;
gossip_debug(GOSSIP_CLIENT_DEBUG,
"lookup state: lookup_context_check_completion\n");
cur_ctx = GET_CURRENT_CONTEXT(sm_p);
assert(cur_ctx);
cur_seg = GET_CURRENT_SEGMENT(sm_p);
assert(cur_seg);
last_resolved_refn = cur_seg->seg_resolved_refn;
/*
since we're here, we've completed resolving the previous
segment, so update the context's current_segment to be the next
one
*/
check_for_completion:
cur_ctx->current_segment++;
if (cur_ctx->current_segment == cur_ctx->total_segments)
{
/*
if we've run out of segments, we've completed a context
lookup
*/
cur_ctx->ctx_resolved_refn = last_resolved_refn;
/*
NOTE: we're only done if the current_segment == the total
segments AND we're in context 0 (i.e. the initial context)
*/
if (sm_p->u.lookup.current_context == 0)
{
/* we are done, done at this point */
sm_p->u.lookup.lookup_resp->ref =
cur_ctx->ctx_resolved_refn;
js_p->error_code = 0;
return SM_ACTION_COMPLETE;
}
else
{
/*
otherwise, bump back to the previous segment and update
the state of the segment that we had previously left off
at in that context
*/
sm_p->u.lookup.current_context--;
cur_ctx = GET_CURRENT_CONTEXT(sm_p);
assert(cur_ctx);
cur_seg = GET_CURRENT_SEGMENT(sm_p);
assert(cur_seg);
/*
this segment resolved to be the last
context refn resolved
*/
cur_seg->seg_resolved_refn = last_resolved_refn;
/*
NOTE: once we're done updating the state of the segment
in this context, we should check one more time if we've
reached completion since we just effectively resolved a
new segment
*/
goto check_for_completion;
}
}
else
{
/*
otherwise, we just need to prepare for the next segment
resolution (by filling in the starting refn), now that we
have the previous one. the current_segment has already been
adjusted.
*/
cur_seg = GET_CURRENT_SEGMENT(sm_p);
assert(cur_seg);
cur_seg->seg_starting_refn = last_resolved_refn;
}
js_p->error_code = LOOKUP_CONTINUE;
return SM_ACTION_COMPLETE;
}
static PINT_sm_action lookup_cleanup(
struct PINT_smcb *smcb, job_status_s *js_p)
{
struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
int i = 0;
gossip_debug(GOSSIP_CLIENT_DEBUG, "lookup state: lookup_cleanup\n");
PINT_SM_GETATTR_STATE_CLEAR(sm_p->getattr);
sm_p->error_code = js_p->error_code;
/* clean up all used memory for this lookup */
for(i = 0; i < sm_p->u.lookup.context_count; i++)
{
finalize_context(&sm_p->u.lookup.contexts[i]);
}
if(sm_p->u.lookup.context_count > 0)
{
free(sm_p->u.lookup.contexts);
}
gossip_debug(GOSSIP_LOOKUP_DEBUG, "All contexts finalized\n");
if (sm_p->error_code)
{
char buf[64] = {0};
PVFS_strerror_r(sm_p->error_code, buf, 64);
gossip_debug(
GOSSIP_LOOKUP_DEBUG, "Lookup failed to "
"resolve %s: %s\n", sm_p->u.lookup.orig_pathname, buf);
}
else
{
gossip_debug(GOSSIP_LOOKUP_DEBUG, "Lookup resolved segment %s "
"to %llu\n", sm_p->u.lookup.orig_pathname,
llu(sm_p->u.lookup.lookup_resp->ref.handle));
}
/* mark operation as complete */
PINT_SET_OP_COMPLETE;
return SM_ACTION_TERMINATE;
}
static int lookup_segment_lookup_comp_fn(
void *v_p,
struct PVFS_server_resp *resp_p,
int index)
{
int i = 0;
PINT_smcb *smcb = v_p;
PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_MSGPAIR_PARENT_SM);
PINT_client_lookup_sm_segment *cur_seg = NULL;
int current_seg_index = sm_p->u.lookup.contexts[
sm_p->u.lookup.current_context].current_segment;
PVFS_object_ref last_resolved_refn;
gossip_debug(GOSSIP_CLIENT_DEBUG, "lookup_segment_lookup_comp_fn\n");
if (server_is_osd(sm_p->msgarray_op.msgpair.svr_addr)) {
struct osd_command *command = &sm_p->msgarray_op.msgpair.osd_command;
int status = osd_errno_from_status(command->status);
int ret;
if (status != 0) {
gossip_debug(GOSSIP_CLIENT_DEBUG,
"osd_command_set_get_attributes failed %d\n", index);
return status;
}
ret = osd_command_attr_resolve(command);
if (ret) {
osd_error_xerrno(ret, "%s: attr_resolve failed", __func__);
return ret;
}
/* dirent with this handle not present on server */
if (command->attr->outlen == 0)
return -PVFS_ENOENT;
cur_seg = GET_SEGMENT_AT(sm_p, current_seg_index);
cur_seg->seg_resolved_refn.handle = get_ntohll(command->attr->val);
cur_seg->seg_resolved_refn.fs_id = cur_seg->seg_starting_refn.fs_id;
gossip_debug(GOSSIP_NCACHE_DEBUG, "*** ncache update on %s "
"target (%llu|%d) parent (%llu|%d)\n",
cur_seg->seg_name, llu(cur_seg->seg_resolved_refn.handle),
cur_seg->seg_resolved_refn.fs_id,
llu(cur_seg->seg_starting_refn.handle),
cur_seg->seg_starting_refn.fs_id);
PINT_ncache_update(cur_seg->seg_name, &cur_seg->seg_resolved_refn,
&cur_seg->seg_starting_refn);
osd_command_attr_free(command);
return 0;
}
assert(resp_p->op == PVFS_SERV_LOOKUP_PATH);
if (resp_p->status != 0)
{
return resp_p->status;
}
assert(resp_p->u.lookup_path.handle_count);
gossip_debug(GOSSIP_LOOKUP_DEBUG, "- Resolved %d segments\n",
resp_p->u.lookup_path.handle_count);
memset(&last_resolved_refn, 0, sizeof(PVFS_object_ref));
/*
at this point we have 1 or more resolved handles so we're going
to advance through all of them and populate the segments with
those resolved handles. in addition, we fill in the attribute
information for each segment if the attrs are available here
*/
for(i = 0; i < resp_p->u.lookup_path.handle_count; i++)
{
cur_seg = GET_SEGMENT_AT(sm_p, current_seg_index + i);
assert(cur_seg);
if (i > 0)
{
/*
if we got more than one resolved segment, fill the
current segment with the previously resolved
handle/fs_id
*/
cur_seg->seg_starting_refn = last_resolved_refn;
}
last_resolved_refn.handle = resp_p->u.lookup_path.handle_array[i];
last_resolved_refn.fs_id = cur_seg->seg_starting_refn.fs_id;
cur_seg->seg_resolved_refn = last_resolved_refn;
/*
and store the retrieved attr in the current
segment (if one was returned)
*/
if (i < resp_p->u.lookup_path.attr_count)
{
PINT_free_object_attr(&(cur_seg->seg_attr));
PINT_copy_object_attr(
&(cur_seg->seg_attr),
&(resp_p->u.lookup_path.attr_array[i]));
}
gossip_debug(
GOSSIP_NCACHE_DEBUG, "*** ncache update on %s "
"target (%llu|%d) "
"parent (%llu|%d)\n",
cur_seg->seg_name,
llu(last_resolved_refn.handle),
last_resolved_refn.fs_id,
llu(cur_seg->seg_starting_refn.handle),
cur_seg->seg_starting_refn.fs_id);
PINT_ncache_update(
(const char*) cur_seg->seg_name,
(const PVFS_object_ref*) &(last_resolved_refn),
(const PVFS_object_ref*) &(cur_seg->seg_starting_refn));
}
assert(i == resp_p->u.lookup_path.handle_count);
/*
if we've updated all of the segments that we could, advance
the context's current segment position
*/
GET_CURRENT_CONTEXT(sm_p)->current_segment = current_seg_index + (i - 1);
return 0;
}
/*
* Local variables:
* mode: c
* c-indent-level: 4
* c-basic-offset: 4
* End:
*
* vim: ft=c ts=8 sts=4 sw=4 expandtab
*/