Skip to content
Permalink
1139b72d5e
Switch branches/tags

Name already in use

A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
Go to file
 
 
Cannot retrieve contributors at this time
661 lines (568 sloc) 21.5 KB
/*
* (C) 2003 Clemson University and The University of Chicago
*
* See COPYING in top-level directory.
*/
/** \file
* \ingroup sysint
*
* PVFS2 system interface routines for reading and writing small files.
*/
#include <string.h>
#include <assert.h>
#include "client-state-machine.h"
#include "pvfs2-debug.h"
#include "job.h"
#include "gossip.h"
#include "str-utils.h"
#include "pint-cached-config.h"
#include "PINT-reqproto-encode.h"
#include "pint-util.h"
#include "pvfs2-internal.h"
/* The small-io state machine should only be invoked/jumped-to from the
* sys-io state machine. We make this assumption and expect io parameters
* to be initialized already.
*/
static int small_io_completion_fn(void * user_args,
struct PVFS_server_resp * resp_p,
int index);
enum {
MIRROR_RETRY = 132
};
%%
nested machine pvfs2_client_small_io_sm
{
state setup_msgpairs
{
run small_io_setup_msgpairs;
success => xfer_msgpairs;
default => return;
}
state xfer_msgpairs
{
jump pvfs2_msgpairarray_sm;
default => check_for_retries;
}
state check_for_retries
{
run small_io_check_for_retries;
MIRROR_RETRY => xfer_msgpairs;
default => cleanup; /*no mirroring, done, or out of retries*/
}
state cleanup
{
run small_io_cleanup;
default => return;
}
}
%%
/**
* Small I/O is done in cases where the size of data transferred between
* client and server is smaller than the maximum unexpected message size
* accepted by the BMI transport interface in use. In this case, we don't
* need to perform initial rendezvous setup messages before sending the
* actual data (the 'flow'), instead we just pack the data in the unexpected
* message. The sys-io state machine checks for possible 'small i/o' cases
* and routes to the small i/o state actions in case.
*/
static PINT_sm_action small_io_setup_msgpairs(
struct PINT_smcb *smcb, job_status_s *js_p)
{
struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
PVFS_object_attr *attr = NULL;
int i = 0;
int ret;
PVFS_handle datafile_handle;
int regions = 0;
PINT_sm_msgpair_state *msg_p;
uint32_t server_nr;
js_p->error_code = 0;
attr = &sm_p->getattr.attr;
assert(attr);
/* initialize msgarray. one msgpair for each handle with data. */
ret = PINT_msgpairarray_init(&sm_p->msgarray_op, sm_p->u.io.datafile_count);
if(ret < 0)
{
js_p->error_code = ret;
return SM_ACTION_COMPLETE;
}
/*initialize small_io_ctx array. one context for each handle in the file.*/
sm_p->u.io.small_io_ctx = malloc(attr->u.meta.dfile_count *
sizeof(*sm_p->u.io.small_io_ctx));
if (!sm_p->u.io.small_io_ctx)
{
PINT_msgpairarray_destroy(&sm_p->msgarray_op);
js_p->error_code = -PVFS_ENOMEM;
return SM_ACTION_COMPLETE;
}
memset(sm_p->u.io.small_io_ctx,0,sizeof(*sm_p->u.io.small_io_ctx) *
attr->u.meta.dfile_count);
foreach_msgpair(&sm_p->msgarray_op, msg_p, i)
{
datafile_handle = attr->u.meta.dfile_array[
sm_p->u.io.datafile_index_array[i]];
gossip_debug(GOSSIP_IO_DEBUG, " small_io_setup_msgpairs: "
"handle: %llu\n", llu(datafile_handle));
if(sm_p->u.io.io_type == PVFS_IO_WRITE)
{
PINT_Request_state * file_req_state = NULL;
PINT_Request_state * mem_req_state = NULL;
PINT_request_file_data file_data;
PINT_Request_result result;
memset(&file_data, 0, sizeof(PINT_request_file_data));
file_data.server_ct = attr->u.meta.dfile_count;
file_data.fsize = 0;
file_data.dist = attr->u.meta.dist;
file_data.extend_flag = 1;
result.segmax = IO_MAX_REGIONS;
result.bytemax = PINT_REQUEST_TOTAL_BYTES(sm_p->u.io.mem_req);
file_req_state = PINT_new_request_state(sm_p->u.io.file_req);
mem_req_state = PINT_new_request_state(sm_p->u.io.mem_req);
PINT_REQUEST_STATE_SET_TARGET(file_req_state,
sm_p->u.io.file_req_offset);
PINT_REQUEST_STATE_SET_FINAL(file_req_state,
sm_p->u.io.file_req_offset +
result.bytemax);
file_data.server_nr = sm_p->u.io.datafile_index_array[i];
result.segs = 0;
result.bytes = 0;
result.offset_array = msg_p->req.u.small_io.offsets;
result.size_array = msg_p->req.u.small_io.sizes;
msg_p->req.u.small_io.buffer = sm_p->u.io.buffer;
ret = PINT_process_request(
file_req_state, mem_req_state,
&file_data,
&result,
PINT_CLIENT);
if(ret < 0)
{
js_p->error_code = ret;
PINT_free_request_state(file_req_state);
PINT_free_request_state(mem_req_state);
return SM_ACTION_COMPLETE;
}
regions = result.segs;
PINT_free_request_state(file_req_state);
PINT_free_request_state(mem_req_state);
}
/* if this is a write operation, the appropriate offset and size
* arrays will have been filled in by the request processing above.
* reads don't require processing of the memory request until
* the response.
*/
PINT_SERVREQ_SMALL_IO_FILL(
msg_p->req,
*sm_p->cred_p,
sm_p->object_ref.fs_id,
datafile_handle,
sm_p->u.io.io_type,
sm_p->u.io.datafile_index_array[i],
attr->u.meta.dfile_count,
attr->u.meta.dist,
sm_p->u.io.file_req,
sm_p->u.io.file_req_offset,
regions,
PINT_REQUEST_TOTAL_BYTES(sm_p->u.io.mem_req),
sm_p->hints);
msg_p->fs_id = sm_p->object_ref.fs_id;
msg_p->handle = datafile_handle;
/*if we are processing a read request and the source file has mirrored
*handles, then bypass msgpairarray's retry mechanism. SMALL-IO will
*prepare another set of msgpairs using the mirrors and then retry.
*/
if (sm_p->u.io.io_type == PVFS_IO_READ &&
attr->mask & PVFS_ATTR_META_MIRROR_DFILES)
{
msg_p->retry_flag = PVFS_MSGPAIR_NO_RETRY;
}
else
{
msg_p->retry_flag = PVFS_MSGPAIR_RETRY;
}
msg_p->comp_fn = small_io_completion_fn;
ret = PINT_cached_config_map_to_server(
&msg_p->svr_addr, datafile_handle,
sm_p->object_ref.fs_id);
if(ret < 0)
{
gossip_lerr("Failed to map data server address\n");
js_p->error_code = ret;
return SM_ACTION_COMPLETE;
}
/*store the original datahandle for later use.*/
server_nr = msg_p->req.u.small_io.server_nr;
sm_p->u.io.small_io_ctx[server_nr].original_datahandle = msg_p->handle;
}
js_p->error_code = 0;
PINT_sm_push_frame(smcb, 0, &sm_p->msgarray_op);
return SM_ACTION_COMPLETE;
}
/**
* We assume that the response buffer hasn't been freed yet (before
* the completion function is called. The msgpairarray.sm doesn't
* free the response buffer until after the completion function is
* called.
*/
static int small_io_completion_fn(void * user_args,
struct PVFS_server_resp * resp_p,
int index)
{
struct PINT_smcb *smcb = (struct PINT_smcb *)user_args;
struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_MSGPAIR_PARENT_SM);
PINT_sm_msgarray_op *mop = &(sm_p->msgarray_op);
PINT_sm_msgpair_state *msg_p = &(mop->msgarray[index]);
PINT_client_small_io_ctx *ctx =
&(sm_p->u.io.small_io_ctx[msg_p->req.u.small_io.server_nr]);
uint32_t server_nr = msg_p->req.u.small_io.server_nr;
#ifdef WIN32
PVFS_object_attr *attr = &(sm_p->getattr.attr);
PVFS_metafile_attr *meta = &(attr->u.meta);
#else
PVFS_object_attr *attr __attribute__((unused)) = &(sm_p->getattr.attr);
PVFS_metafile_attr *meta __attribute__((unused)) = &(attr->u.meta);
#endif
int ret = 0;
assert(resp_p->op == PVFS_SERV_SMALL_IO);
if(resp_p->status != 0)
{
return resp_p->status;
}
if(resp_p->u.small_io.io_type == PVFS_IO_READ)
{
PVFS_size sizes[IO_MAX_REGIONS];
PVFS_offset offsets[IO_MAX_REGIONS];
PVFS_object_attr * attr = &sm_p->getattr.attr;
PINT_request_file_data fdata;
PINT_Request_result result;
PINT_Request_state * file_req_state;
PINT_Request_state * mem_req_state;
int i = 0;
int done = 0;
PVFS_size bytes_processed = 0;
memset(&fdata, 0, sizeof(fdata));
if(resp_p->u.small_io.result_size != 0)
{
memset(&fdata, 0, sizeof(PINT_request_file_data));
fdata.server_ct = attr->u.meta.dfile_count;
fdata.server_nr = server_nr;
fdata.dist = attr->u.meta.dist;
fdata.fsize = resp_p->u.small_io.bstream_size;
result.segmax = IO_MAX_REGIONS;
result.bytemax = resp_p->u.small_io.result_size;
result.size_array = sizes;
result.offset_array = offsets;
file_req_state = PINT_new_request_state(sm_p->u.io.file_req);
mem_req_state = PINT_new_request_state(sm_p->u.io.mem_req);
PINT_REQUEST_STATE_SET_TARGET(file_req_state,
sm_p->u.io.file_req_offset);
PINT_REQUEST_STATE_SET_FINAL(
file_req_state, sm_p->u.io.file_req_offset +
PINT_REQUEST_TOTAL_BYTES(sm_p->u.io.mem_req));
do
{
result.segs = 0;
result.bytes = 0;
ret = PINT_process_request(
file_req_state,
mem_req_state,
&fdata,
&result,
PINT_CLIENT);
if(ret < 0)
{
gossip_err("Failed processing request in small I/O read\n");
return ret;
}
for(i = 0; i < result.segs && !done; ++i)
{
int tmp_size;
char * src_ptr;
char * dest_ptr;
dest_ptr = (char *)sm_p->u.io.buffer + offsets[i];
src_ptr = (char *)resp_p->u.small_io.buffer +
bytes_processed;
if((bytes_processed + sizes[i]) <=
resp_p->u.small_io.result_size)
{
tmp_size = sizes[i];
}
else
{
tmp_size = resp_p->u.small_io.result_size -
bytes_processed;
done = 1;
}
memcpy(dest_ptr, src_ptr, sizes[i]);
bytes_processed += tmp_size;
}
} while(!PINT_REQUEST_DONE(file_req_state) && !done);
if(resp_p->u.small_io.result_size != bytes_processed)
{
gossip_err("size of bytes copied to user buffer "
"(%llu) do not match size of response (%llu)\n",
llu(bytes_processed),
llu(resp_p->u.small_io.result_size));
return -PVFS_EINVAL;
}
PINT_free_request_state(file_req_state);
PINT_free_request_state(mem_req_state);
}
} /*if PVFS_IO_READ*/
sm_p->u.io.dfile_size_array[server_nr] = resp_p->u.small_io.bstream_size;
//sm_p->u.io.dfile_size_array[index] = resp_p->u.small_io.bstream_size;
sm_p->u.io.total_size += resp_p->u.small_io.result_size;
/* Let's SMALL-IO know that the msg completed. */
ctx->msg_completed = 1;
/* To test fail-over with small-io, uncomment the following code. This
* will force each primary handle to have each of its mirrored handles
* tried on a READ io. If you are testing a file with many primary
* handles and/or many copies and don't want to wade through such a large
* test, then don't set the retry-limit. By default, it is normally set
* at 5. You can tweak this value in the pvfs2-fs.conf file, if you
* want.
*/
gossip_debug(GOSSIP_IO_DEBUG,"handle=%llu \toperation=%d \toffset=%ld "
"\taggregate_size=%ld\n",
llu(msg_p->req.u.small_io.handle),
msg_p->req.u.small_io.io_type,
((long int)msg_p->req.u.small_io.file_req_offset),
((long int)msg_p->req.u.small_io.aggregate_size));
/*
if ( (sm_p->u.io.io_type == PVFS_IO_READ)
&& (attr->mask & PVFS_ATTR_META_MIRROR_DFILES)
== PVFS_ATTR_META_MIRROR_DFILES
&& (sm_p->u.io.retry_count < mop->params.retry_limit))
{
mop->params.retry_limit = meta->mirror_copies_count;
ctx->msg_completed = 0;
}
*/
return 0;
}
static int small_io_check_for_retries( struct PINT_smcb *smcb
, job_status_s *js_p)
{
#ifndef WIN32
gossip_debug(GOSSIP_MIRROR_DEBUG,"Executing %s..\n",__func__);
#endif
struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
struct PVFS_object_attr *attr = &(sm_p->getattr.attr);
PVFS_metafile_attr *meta = &(attr->u.meta);
PINT_client_small_io_ctx *ctx = NULL;
PINT_sm_msgarray_op *mop = &sm_p->msgarray_op;
PINT_sm_msgpair_state *msgarray = mop->msgarray;
PINT_sm_msgpair_state *msg = NULL;
PINT_sm_msgpair_state *new_msg = NULL;
PINT_sm_msgarray_op new_mop = {{0},0,0,{0}};
char *enc_req_bytes = NULL;
uint32_t retry_msg_count = 0;
uint32_t index = 0;
uint32_t copies = 0;
uint32_t server_nr = 0;
int i = 0;
int j = 0;
int k = 0;
int ret = 0;
#ifdef WIN32
gossip_debug(GOSSIP_MIRROR_DEBUG,"Executing %s..\n",__func__);
#endif
/*if we are processing a write request, then msgpairarray handles retries.
*if we are processing a read and the source file is mirrored, then
*SMALL-IO handles the retries.
*/
if (sm_p->u.io.io_type == PVFS_IO_WRITE ||
((attr->mask & PVFS_ATTR_META_MIRROR_DFILES) !=
PVFS_ATTR_META_MIRROR_DFILES))
{
return SM_ACTION_COMPLETE;
}
/* Do any messages need to be retried? */
for (i=0; i<mop->count; i++)
{
server_nr = msgarray[i].req.u.small_io.server_nr;
ctx = &sm_p->u.io.small_io_ctx[server_nr];
if (!ctx->msg_completed)
retry_msg_count++;
}
/* no retries needed */
if (!retry_msg_count)
{
return SM_ACTION_COMPLETE;
}
/* do we have any retries available? */
if (sm_p->u.io.retry_count >= mop->params.retry_limit)
{
return SM_ACTION_COMPLETE;
}
/* okay. let's setup new msgpairs to retry. we will modify the incomplete
* msg pairs stored in msgarray and then copy them into a new msgarray
* before calling msgpairarray.sm.
*/
for (i=0; i<mop->count; i++)
{
msg = &msgarray[i];
server_nr = msg->req.u.small_io.server_nr;
ctx = &sm_p->u.io.small_io_ctx[server_nr];
/* don't process completed messages */
if (ctx->msg_completed)
continue;
/* for incomplete messages, cleanup memory, if necessary */
enc_req_bytes = (char *)&(msg->encoded_req);
for (k=0; k<sizeof(msg->encoded_req); k++)
{
if (enc_req_bytes[k] != '\0')
{
PINT_encode_release(&(msg->encoded_req),PINT_ENCODE_REQ);
break;
}
}/*end for*/
if (msg->encoded_resp_p)
{
BMI_memfree(msg->svr_addr
,msg->encoded_resp_p
,msg->max_resp_sz
,BMI_RECV);
}
/* Should we use the original datahandle? */
if (ctx->retry_original)
{
ctx->retry_original = 0;
msg->handle = ctx->original_datahandle;
msg->req.u.small_io.handle = ctx->original_datahandle;
msg->svr_addr = 0;
ret = PINT_cached_config_map_to_server(&msg->svr_addr
,msg->handle
,msg->fs_id);
if (ret)
{
gossip_lerr("Unable to determine the server address "
"for this handle (%llu)"
,llu(msg->handle));
js_p->error_code = ret;
return SM_ACTION_COMPLETE;
}
continue;
}/*end retry_original*/
/* get next mirrored handle. note: if a mirrored handle is zero, then
* this means that the creation of this mirrored object failed for its
* particular server. if so, then get the next valid handle. as a
* last resort, retry the original handle.
*/
copies = ctx->current_copies_count;
for (;copies < meta->mirror_copies_count; copies++)
{
index = (copies*meta->dfile_count) + server_nr;
if (meta->mirror_dfile_array[index] != 0)
{ /* we have found a valid mirrored handle */
msg->handle = meta->mirror_dfile_array[index];
break;
}
}
/* if we haven't found a valid mirrored handle, retry the original
* datahandle.
*/
if ( copies == meta->mirror_copies_count )
{
msg->handle = ctx->original_datahandle;
ctx->retry_original = 0;
ctx->current_copies_count = 0;
sm_p->u.io.retry_count++;
msg->req.u.small_io.handle = ctx->original_datahandle;
msg->svr_addr = 0;
ret=PINT_cached_config_map_to_server(&(msg->svr_addr)
,msg->handle
,msg->fs_id);
if (ret)
{
gossip_lerr("Unable to determine the server address "
"for this handle (%llu)"
,llu(msg->handle));
js_p->error_code = ret;
return SM_ACTION_COMPLETE;
}
continue;
}/*end if we have to use the original*/
/* Otherwise, use the discovered mirrored handle */
msg->req.u.small_io.handle = msg->handle;
msg->svr_addr = 0;
ret=PINT_cached_config_map_to_server(&(msg->svr_addr)
,msg->handle
,msg->fs_id);
if (ret)
{
gossip_lerr("Unable to determine the server address "
"for this handle (%llu)"
,llu(msg->handle));
js_p->error_code = ret;
return SM_ACTION_COMPLETE;
}
/* and setup for the next retry event */
ctx->current_copies_count++;
if (ctx->current_copies_count == meta->mirror_copies_count)
{
ctx->current_copies_count = 0;
ctx->retry_original = 1;
sm_p->u.io.retry_count++;
}
}/*end for each msgpair*/
/* Now, create a new msgpair array and populate from the above modified
* messages.
*/
ret = PINT_msgpairarray_init(&new_mop,retry_msg_count);
if (ret)
{
gossip_lerr("Unable to initialize msgarray_op:new_op\n");
js_p->error_code = ret;
return SM_ACTION_COMPLETE;
}
/* populate the new msgarray with the modified messages */
for (i=0, j=0; i<mop->count && j<new_mop.count; i++)
{
msg = &msgarray[i];
server_nr = msg->req.u.small_io.server_nr;
ctx = &sm_p->u.io.small_io_ctx[server_nr];
/* don't populate with completed messages */
if (ctx->msg_completed)
continue;
new_msg = &new_mop.msgarray[j];
j++;
new_msg->fs_id = msg->fs_id;
new_msg->handle = msg->handle;
new_msg->comp_fn = msg->comp_fn;
new_msg->svr_addr = msg->svr_addr;
new_msg->req = msg->req;
new_msg->enc_type = msg->enc_type;
new_msg->retry_flag = msg->retry_flag;
}/*end for*/
/* Destroy the old msgarray and substitute with the new. Params are left
* in tact.
*/
PINT_msgpairarray_destroy(mop);
mop->count = new_mop.count;
mop->msgarray = new_mop.msgarray;
mop->msgpair = new_mop.msgpair;
/* Push the msgarray_op and jump to msgpairarray.sm */
PINT_sm_push_frame(smcb,0,mop);
js_p->error_code=MIRROR_RETRY;
return SM_ACTION_COMPLETE;
}/*end small_io_check_for_retries*/
static int small_io_cleanup(
struct PINT_smcb *smcb, job_status_s *js_p)
{
struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
PINT_msgpairarray_destroy(&sm_p->msgarray_op);
/*release the ctx array; this array is allocated whether or not the
*file to read is mirrored.
*/
free(sm_p->u.io.small_io_ctx);
return SM_ACTION_COMPLETE;
}
/*
* Local variables:
* mode: c
* c-indent-level: 4
* c-basic-offset: 4
* End:
*
* vim: ft=c ts=8 sts=4 sw=4 expandtab
*/