Skip to content
Permalink
1139b72d5e
Switch branches/tags

Name already in use

A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
Go to file
 
 
Cannot retrieve contributors at this time
3325 lines (2849 sloc) 106 KB
/*
* (C) 2003 Clemson University and The University of Chicago
*
* See COPYING in top-level directory.
*
*/
/** \file
* \ingroup sysint
*
* PVFS2 system interface routines for reading and writing files.
*/
#include <string.h>
#include <assert.h>
#ifndef WIN32
#include <unistd.h>
#endif
#include "client-state-machine.h"
#include "pvfs2-debug.h"
#include "job.h"
#include "gossip.h"
#include "str-utils.h"
#include "pint-cached-config.h"
#include "PINT-reqproto-encode.h"
#include "pint-util.h"
#include "pvfs2-internal.h"
#include "osd-util/osd-util.h"
#define IO_MAX_SEGMENT_NUM 50
#define IO_ATTR_MASKS (PVFS_ATTR_META_ALL|PVFS_ATTR_COMMON_TYPE)
extern job_context_id pint_client_sm_context;
enum {
IO_DO_OSD_IO = 2001,
OSD_MDFILE_MSGPAIR = 2002
};
enum
{
IO_NO_DATA = 132,
IO_DATAFILE_TRANSFERS_COMPLETE,
IO_RETRY,
IO_RETRY_NODELAY,
IO_GET_DATAFILE_SIZE,
IO_ANALYZE_SIZE_RESULTS,
IO_DO_SMALL_IO,
IO_UNSTUFF,
IO_GETATTR_SERVER,
IO_MIRRORING,
IO_NO_MIRRORING,
IO_FATAL_ERROR,
};
/* Helper functions local to sys-io.sm. */
static inline int io_complete_context_send_or_recv(
PINT_smcb *smcb, job_status_s *js_p);
static inline int io_decode_ack_response(
PINT_client_io_ctx *cur_ctx,
struct PINT_decoded_msg *decoded_resp,
struct PVFS_server_resp **resp);
static inline int io_post_flow(
PINT_smcb *smcb, PINT_client_io_ctx *cur_ctx);
static inline int io_post_write_ack_recv(
PINT_smcb *smcb, PINT_client_io_ctx * cur_ctx);
static inline int io_process_context_recv(
PINT_client_sm *sm_p, job_status_s *js_p, PINT_client_io_ctx **out_ctx);
static inline int io_check_context_status(
PINT_client_io_ctx *cur_ctx, int io_type,
PVFS_size *total_size);
static int io_find_target_datafiles(
PVFS_Request mem_req,
PVFS_Request file_req,
PVFS_offset file_req_offset,
PINT_dist *dist_p,
PVFS_fs_id fs_id,
enum PVFS_io_type io_type,
PVFS_handle *input_handle_array,
int input_handle_count,
int *handle_index_array,
int *handle_index_out_count,
int *sio_handle_index_array,
int *sio_handle_index_count);
static int io_find_total_size(
PINT_client_sm * sm_p,
PVFS_offset final_offset,
PVFS_size * total_return_size);
int io_find_offset(
PINT_client_sm * sm_p,
PVFS_size contig_size,
PVFS_size * total_return_offset);
static int io_get_max_unexp_size(
struct PINT_Request * file_req,
PVFS_handle handle,
PVFS_fs_id fs_id,
enum PVFS_io_type type,
int * max_unexp_payload);
static int io_zero_fill_holes(
PINT_client_sm *sm_p,
PVFS_size eof,
int datafile_count,
PVFS_size * datafile_size_array,
int * datafile_index_array);
static int io_contexts_init(PINT_client_sm *sm_p, int count,
PVFS_object_attr *attr);
static void io_contexts_destroy(PINT_client_sm *sm_p);
static int unstuff_needed(
PVFS_Request mem_req,
PVFS_offset file_req_offset,
PINT_dist *dist_p,
uint32_t mask,
enum PVFS_io_type io_type);
static int unstuff_comp_fn(
void *v_p,
struct PVFS_server_resp *resp_p,
int i);
/* misc constants and helper macros */
#define IO_RECV_COMPLETED 1
/* possible I/O state machine phases (status_user_tag) */
#define IO_SM_PHASE_REQ_MSGPAIR_RECV 0
#define IO_SM_PHASE_REQ_MSGPAIR_SEND 1
#define IO_SM_PHASE_FLOW 2
#define IO_SM_PHASE_FINAL_ACK 3
#define IO_SM_NUM_PHASES 4
#define STATUS_USER_TAG_TYPE(tag, type) \
((tag % IO_SM_NUM_PHASES) == type)
#define STATUS_USER_TAG_GET_INDEX(tag, type) \
(tag / IO_SM_NUM_PHASES)
#define STATUS_USER_TAG_IS_SEND_OR_RECV(tag) \
(STATUS_USER_TAG_TYPE(tag, IO_SM_PHASE_REQ_MSGPAIR_RECV) || \
STATUS_USER_TAG_TYPE(tag, IO_SM_PHASE_REQ_MSGPAIR_SEND))
static int io_datafile_index_array_init(
PINT_client_sm *sm_p,
int datafile_count);
static void io_datafile_index_array_destroy(
PINT_client_sm *sm_p);
%%
machine pvfs2_client_io_sm
{
state init
{
run io_init;
default => io_getattr;
}
state io_getattr
{
jump pvfs2_client_getattr_sm;
success => inspect_attr;
default => io_cleanup;
}
state inspect_attr
{
run io_inspect_attr;
IO_UNSTUFF => unstuff_setup_msgpair;
IO_GETATTR_SERVER => unstuff_setup_msgpair;
success => io_datafile_setup_msgpairs;
default => io_cleanup;
}
state unstuff_setup_msgpair
{
run io_unstuff_setup_msgpair;
success => unstuff_xfer_msgpair;
default => io_cleanup;
}
state unstuff_xfer_msgpair
{
jump pvfs2_msgpairarray_sm;
success => io_datafile_setup_msgpairs;
default => io_cleanup;
}
state io_datafile_setup_msgpairs
{
run io_datafile_setup_msgpairs;
IO_DO_OSD_IO => osd_io;
IO_NO_DATA => io_cleanup;
IO_DO_SMALL_IO => small_io;
success => io_datafile_post_msgpairs;
default => io_cleanup;
}
state osd_io
{
jump pvfs2_client_osd_io_sm;
default => io_cleanup;
}
state small_io
{
jump pvfs2_client_small_io_sm;
success => io_analyze_results;
default => io_cleanup;
}
state io_datafile_post_msgpairs
{
run io_datafile_post_msgpairs;
IO_RETRY => io_datafile_post_msgpairs_retry;
IO_FATAL_ERROR => io_cleanup;
default => io_datafile_complete_operations;
}
state io_datafile_post_msgpairs_retry
{
run io_datafile_post_msgpairs_retry;
IO_MIRRORING => io_datafile_mirror_retry;
IO_NO_MIRRORING => io_datafile_no_mirror_retry;
default => io_datafile_no_mirror_retry;
}
state io_datafile_no_mirror_retry
{
run io_datafile_no_mirror_retry;
IO_DATAFILE_TRANSFERS_COMPLETE => io_analyze_results;
default => io_datafile_post_msgpairs;
}
state io_datafile_mirror_retry
{
run io_datafile_mirror_retry;
IO_DATAFILE_TRANSFERS_COMPLETE => io_analyze_results;
default => io_datafile_post_msgpairs;
}
state io_datafile_complete_operations
{
run io_datafile_complete_operations;
IO_DATAFILE_TRANSFERS_COMPLETE => io_analyze_results;
IO_RETRY => io_datafile_post_msgpairs_retry;
default => io_datafile_complete_operations;
}
state io_analyze_results
{
run io_analyze_results;
IO_RETRY => init;
/*IO_ANALYZE_SIZE_RESULTS => io_analyze_size_results;*/
IO_GET_DATAFILE_SIZE => io_datafile_size;
default => io_cleanup;
}
state io_datafile_size
{
jump pvfs2_client_datafile_getattr_sizes_sm;
success => io_analyze_size_results;
default => io_cleanup;
}
state io_analyze_size_results
{
run io_analyze_size_results;
default => io_cleanup;
}
state io_cleanup
{
run io_cleanup;
default => terminate;
}
}
%%
/** Initiate a read or write operation.
*
* \param type specifies if the operation is a read or write.
*/
PVFS_error PVFS_isys_io(
PVFS_object_ref ref,
PVFS_Request file_req,
PVFS_offset file_req_offset,
void *buffer,
PVFS_Request mem_req,
const PVFS_credentials *credentials,
PVFS_sysresp_io *resp_p,
enum PVFS_io_type io_type,
PVFS_sys_op_id *op_id,
PVFS_hint hints,
void *user_ptr)
{
PVFS_error ret = -PVFS_EINVAL;
PINT_smcb *smcb = NULL;
PINT_client_sm *sm_p = NULL;
struct filesystem_configuration_s* cur_fs = NULL;
struct server_configuration_s *server_config = NULL;
gossip_debug(GOSSIP_CLIENT_DEBUG, "PVFS_isys_io entered [%llu]\n",
llu(ref.handle));
server_config = PINT_get_server_config_struct(ref.fs_id);
cur_fs = PINT_config_find_fs_id(server_config, ref.fs_id);
PINT_put_server_config_struct(server_config);
if ((ref.handle == PVFS_HANDLE_NULL) ||
(ref.fs_id == PVFS_FS_ID_NULL) || (resp_p == NULL))
{
if(!server_config->post_create) {
gossip_err("invalid (NULL) required argument\n");
return ret;
}
}
if ((io_type != PVFS_IO_READ) && (io_type != PVFS_IO_WRITE))
{
gossip_err("invalid (unknown) I/O type specified\n");
return ret;
}
if (!cur_fs)
{
gossip_err("invalid (unknown) fs id specified\n");
return ret;
}
/* look for zero byte operations */
if ((PINT_REQUEST_TOTAL_BYTES(mem_req) == 0) ||
(PINT_REQUEST_TOTAL_BYTES(file_req) == 0))
{
gossip_ldebug(GOSSIP_IO_DEBUG, "Warning: 0 byte I/O operation "
"attempted.\n");
resp_p->total_completed = 0;
return 1;
}
PINT_smcb_alloc(&smcb, PVFS_SYS_IO,
sizeof(struct PINT_client_sm),
client_op_state_get_machine,
client_state_machine_terminate,
pint_client_sm_context);
if (smcb == NULL)
{
return -PVFS_ENOMEM;
}
sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
PINT_init_msgarray_params(sm_p, ref.fs_id);
PINT_init_sysint_credentials(sm_p->cred_p, credentials);
sm_p->u.io.io_type = io_type;
sm_p->u.io.file_req = file_req;
sm_p->u.io.file_req_offset = file_req_offset;
sm_p->u.io.io_resp_p = resp_p;
sm_p->u.io.mem_req = mem_req;
sm_p->u.io.buffer = buffer;
sm_p->u.io.flowproto_type = cur_fs->flowproto;
sm_p->u.io.encoding = cur_fs->encoding;
sm_p->u.io.stored_error_code = 0;
sm_p->u.io.retry_count = 0;
sm_p->msgarray_op.msgarray = NULL;
sm_p->msgarray_op.count = 0;
sm_p->u.io.datafile_index_array = NULL;
sm_p->u.io.datafile_count = 0;
sm_p->u.io.total_size = 0;
sm_p->u.io.small_io = 0;
sm_p->object_ref = ref;
PVFS_hint_copy(hints, &sm_p->hints);
PVFS_hint_add(&sm_p->hints, PVFS_HINT_HANDLE_NAME, sizeof(PVFS_handle), &ref.handle);
return PINT_client_state_machine_post(
smcb, op_id, user_ptr);
}
/** Perform a read or write operation.
*
* \param type specifies if the operation is a read or write.
*/
PVFS_error PVFS_sys_io(
PVFS_object_ref ref,
PVFS_Request file_req,
PVFS_offset file_req_offset,
void *buffer,
PVFS_Request mem_req,
const PVFS_credentials *credentials,
PVFS_sysresp_io *resp_p,
enum PVFS_io_type io_type,
PVFS_hint hints)
{
PVFS_error ret = -PVFS_EINVAL, error = 0;
PVFS_sys_op_id op_id;
gossip_debug(GOSSIP_CLIENT_DEBUG, "PVFS_sys_io entered\n");
ret = PVFS_isys_io(ref, file_req, file_req_offset, buffer, mem_req,
credentials, resp_p, io_type, &op_id, hints, NULL);
if (ret == 1)
return 0;
else if (ret < 0)
{
PVFS_perror_gossip("PVFS_isys_io call", ret);
error = ret;
}
else
{
ret = PVFS_sys_wait(op_id, "io", &error);
if (ret)
{
PVFS_perror_gossip("PVFS_sys_wait call", ret);
error = ret;
}
PINT_sys_release(op_id);
}
return error;
}
/*******************************************************************/
static PINT_sm_action io_init(
struct PINT_smcb *smcb, job_status_s *js_p)
{
struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
job_id_t tmp_id;
struct server_configuration_s *server_config = NULL;
server_config = PINT_get_server_config_struct(sm_p->object_ref.fs_id);
PINT_put_server_config_struct(server_config);
gossip_debug(GOSSIP_CLIENT_DEBUG, "(%p) io state: io_init\n", sm_p);
assert((js_p->error_code == 0) ||
(js_p->error_code == IO_RETRY));
PINT_SM_GETATTR_STATE_FILL(
sm_p->getattr,
sm_p->object_ref,
IO_ATTR_MASKS,
PVFS_TYPE_METAFILE,
0);
if (js_p->error_code == IO_RETRY ||
(js_p->error_code == IO_RETRY_NODELAY))
{
js_p->error_code = 0;
io_datafile_index_array_destroy(sm_p);
io_contexts_destroy(sm_p);
if (PINT_smcb_cancelled(smcb))
{
js_p->error_code = -PVFS_ECANCEL;
return SM_ACTION_COMPLETE;
}
if(js_p->error_code == IO_RETRY_NODELAY)
{
gossip_debug(GOSSIP_IO_DEBUG, " sys-io retrying without delay.\n");
js_p->error_code = 0;
return 1;
}
gossip_debug(GOSSIP_IO_DEBUG, " sys-io retrying with delay.\n");
return job_req_sched_post_timer(
sm_p->msgarray_op.params.retry_delay, smcb, 0, js_p, &tmp_id,
pint_client_sm_context);
}
return SM_ACTION_COMPLETE;
}
static PINT_sm_action io_inspect_attr(
struct PINT_smcb *smcb, job_status_s *js_p)
{
struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
int is_osd_meta = fsid_is_osd_meta(sm_p->getattr.object_ref.fs_id);
int is_osd_md = fsid_is_osd_md(sm_p->getattr.object_ref.fs_id);
if (PINT_smcb_cancelled(smcb))
{
js_p->error_code = -PVFS_ECANCEL;
return SM_ACTION_COMPLETE;
}
/* determine if we need to unstuff or not to service this request */
js_p->error_code = unstuff_needed(
sm_p->u.io.mem_req,
sm_p->u.io.file_req_offset,
sm_p->getattr.attr.u.meta.dist,
sm_p->getattr.attr.mask,
sm_p->u.io.io_type);
if (is_osd_meta || is_osd_md)
{
js_p->error_code = 0;
}
return(SM_ACTION_COMPLETE);
}
static PINT_sm_action io_unstuff_setup_msgpair(
struct PINT_smcb *smcb, job_status_s *js_p)
{
struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
int ret = -PVFS_EINVAL;
PINT_sm_msgpair_state *msg_p = NULL;
PINT_msgpair_init(&sm_p->msgarray_op);
msg_p = &sm_p->msgarray_op.msgpair;
if(js_p->error_code == IO_UNSTUFF)
{
/* note that unstuff must request the same attr mask that we requested
* earlier. If the file has already been unstuffed then we need an
* updated authoritative copy of all of the attrs relevant to I/O.
*/
PINT_SERVREQ_UNSTUFF_FILL(
msg_p->req,
(*sm_p->cred_p),
sm_p->object_ref.fs_id,
sm_p->object_ref.handle,
IO_ATTR_MASKS);
}
else if(js_p->error_code == IO_GETATTR_SERVER)
{
PINT_SERVREQ_GETATTR_FILL(
msg_p->req,
(*sm_p->cred_p),
sm_p->object_ref.fs_id,
sm_p->object_ref.handle,
IO_ATTR_MASKS,
sm_p->hints);
}
else
{
assert(0);
}
js_p->error_code = 0;
msg_p->fs_id = sm_p->object_ref.fs_id;
msg_p->handle = sm_p->object_ref.handle;
msg_p->retry_flag = PVFS_MSGPAIR_RETRY;
msg_p->comp_fn = unstuff_comp_fn;
ret = PINT_cached_config_map_to_server(
&msg_p->svr_addr,
msg_p->handle,
msg_p->fs_id);
if (ret)
{
gossip_err("Failed to map meta server address\n");
js_p->error_code = ret;
}
PINT_sm_push_frame(smcb, 0, &sm_p->msgarray_op);
return SM_ACTION_COMPLETE;
}
static PINT_sm_action io_datafile_setup_msgpairs(
struct PINT_smcb *smcb, job_status_s *js_p)
{
struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
int ret = -PVFS_EINVAL, i = 0;
PVFS_object_attr *attr = NULL;
int target_datafile_count = 0;
int * sio_array;
int sio_count;
/* Is the underlying storage system OSD? */
int is_osd = fsid_is_osd(sm_p->object_ref.fs_id);
gossip_debug(GOSSIP_CLIENT_DEBUG, "(%p) io state: "
"io_datafile_setup_msgpairs\n", sm_p);
if (PINT_smcb_cancelled(smcb))
{
js_p->error_code = -PVFS_ECANCEL;
goto exit;
}
js_p->error_code = 0;
attr = &sm_p->getattr.attr;
assert(attr);
switch(attr->objtype)
{
case PVFS_TYPE_METAFILE:
assert(attr->mask & PVFS_ATTR_META_DFILES);
assert(attr->mask & PVFS_ATTR_META_DIST);
assert(attr->u.meta.dist_size > 0);
assert(attr->u.meta.dfile_array);
//assert(attr->u.meta.dfile_count > 0);
if (attr->mask & PVFS_ATTR_META_MIRROR_DFILES && !is_osd)
{
assert(attr->u.meta.mirror_dfile_array);
assert(attr->u.meta.mirror_copies_count);
}
break;
case PVFS_TYPE_DIRECTORY:
js_p->error_code = -PVFS_EISDIR;
goto exit;
default:
js_p->error_code = -PVFS_EBADF;
goto exit;
}
/* cannot write to an immutable file */
if (sm_p->u.io.io_type == PVFS_IO_WRITE
&& (attr->u.meta.hint.flags & PVFS_IMMUTABLE_FL))
{
js_p->error_code = -PVFS_EPERM;
goto exit;
}
ret = PINT_dist_lookup(attr->u.meta.dist);
if (ret)
{
PVFS_perror_gossip("PINT_dist_lookup failed; aborting I/O", ret);
js_p->error_code = -PVFS_EBADF;
goto exit;
}
ret = io_datafile_index_array_init(sm_p, attr->u.meta.dfile_count);
if(ret < 0)
{
js_p->error_code = ret;
goto error_exit;
}
PINT_SM_DATAFILE_SIZE_ARRAY_INIT(
&sm_p->u.io.dfile_size_array,
attr->u.meta.dfile_count);
/* initialize the array of indexes to datafiles in the file request
* that have requests small enough to do small I/O
* (pack data in unexpected message)
*/
sio_array = malloc(sizeof(int) * attr->u.meta.dfile_count);
if(!sio_array)
{
js_p->error_code = -PVFS_ENOMEM;
goto datafile_index_array_destroy;
}
if (is_osd & !attr->u.meta.dfile_count)
{
/* underlying storage is OSD */
gossip_debug(GOSSIP_IO_DEBUG, " %s: doing OSD I/O\n", __func__);
sm_p->u.io.small_io = 1; /* disable some flow IO checks */
js_p->error_code = IO_DO_OSD_IO;
goto sio_array_destroy;
}
ret = io_find_target_datafiles(
sm_p->u.io.mem_req,
sm_p->u.io.file_req,
sm_p->u.io.file_req_offset,
attr->u.meta.dist,
sm_p->getattr.object_ref.fs_id,
sm_p->u.io.io_type,
attr->u.meta.dfile_array,
attr->u.meta.dfile_count,
sm_p->u.io.datafile_index_array,
&target_datafile_count,
sio_array,
&sio_count);
if(ret < 0)
{
js_p->error_code = ret;
goto sio_array_destroy;
}
sm_p->u.io.datafile_count = target_datafile_count;
if (target_datafile_count == 0)
{
gossip_debug(GOSSIP_IO_DEBUG, " datafile_setup_msgpairs: no "
"datafiles have data; aborting\n");
js_p->error_code = IO_NO_DATA;
goto sio_array_destroy;
}
gossip_debug(GOSSIP_IO_DEBUG,
" %s: %d datafiles "
"might have data\n", __func__, target_datafile_count);
if (is_osd)
{
/* underlying storage is OSD */
gossip_debug(GOSSIP_IO_DEBUG, " %s: doing OSD I/O\n", __func__);
sm_p->u.io.small_io = 1; /* disable some flow IO checks */
js_p->error_code = IO_DO_OSD_IO;
goto sio_array_destroy;
}
/* look at sio_array and sio_count to see if there are any
* servers that we can do small I/O to, instead of setting up
* flows. For now, we're going to stick with the semantics that
* small I/O is only done if all of the sizes for the target datafiles
* are small enough (sio_count == target_datafile_count). This can
* be changed in the future, for example, if sio_count is some
* percentage of the target_datafile_count, then do small I/O to
* the sio_array servers, etc.
*/
if(sio_count == target_datafile_count)
{
gossip_debug(GOSSIP_IO_DEBUG, " %s: doing small I/O\n", __func__);
sm_p->u.io.small_io = 1;
js_p->error_code = IO_DO_SMALL_IO;
goto sio_array_destroy;
}
ret = io_contexts_init(sm_p, target_datafile_count, attr);
if(ret < 0)
{
js_p->error_code = ret;
goto sio_array_destroy;
}
sm_p->u.io.total_cancellations_remaining = 0;
/* initialize all per server I/O operation contexts and requests */
for(i = 0; i < target_datafile_count; i++)
{
gossip_debug(GOSSIP_IO_DEBUG, " filling I/O request "
"for %llu\n", llu(sm_p->u.io.contexts[i].data_handle));
PINT_SERVREQ_IO_FILL(
sm_p->u.io.contexts[i].msg.req,
*sm_p->cred_p,
sm_p->object_ref.fs_id,
sm_p->u.io.contexts[i].data_handle,
sm_p->u.io.io_type,
sm_p->u.io.flowproto_type,
sm_p->u.io.datafile_index_array[i],
attr->u.meta.dfile_count,
attr->u.meta.dist,
sm_p->u.io.file_req,
sm_p->u.io.file_req_offset,
PINT_REQUEST_TOTAL_BYTES(sm_p->u.io.mem_req),
sm_p->hints);
}
js_p->error_code = 0;
sio_array_destroy:
free(sio_array);
goto exit;
datafile_index_array_destroy:
io_datafile_index_array_destroy(sm_p);
error_exit:
exit:
return SM_ACTION_COMPLETE;
}
/*
This is based on msgpairarray_post() in msgpairarray.c. It's
different enough in that we don't have to wait on the msgpairarray
operations to all complete before posting flows as we can do so for each
server individually when we're ready. this avoids the msgpairarray
sync point implicit in the design
*/
static PINT_sm_action io_datafile_post_msgpairs(
struct PINT_smcb *smcb, job_status_s *js_p)
{
struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
int ret = -PVFS_EINVAL, i = 0;
unsigned long status_user_tag = 0;
int must_loop_encodings = 0;
struct server_configuration_s *server_config = NULL;
gossip_debug(GOSSIP_CLIENT_DEBUG, "io_datafile_post_msgpairs "
"state: post (%d message(s))\n", sm_p->u.io.datafile_count);
if (PINT_smcb_cancelled(smcb))
{
js_p->error_code = -PVFS_ECANCEL;
return SM_ACTION_COMPLETE;
}
js_p->error_code = 0;
/* completion count tracks sends/recvs separately, will increment
* as we go through the loop to maintain a count of outstanding msgpairs */
sm_p->u.io.msgpair_completion_count = 0;
for(i = 0; i < sm_p->u.io.context_count; i++)
{
PINT_client_io_ctx *cur_ctx = &sm_p->u.io.contexts[i];
PINT_sm_msgpair_state *msg = &cur_ctx->msg;
/* do not do this one again in retry case */
if (cur_ctx->msg_recv_has_been_posted &&
cur_ctx->msg_recv_in_progress)
{
++sm_p->u.io.msgpair_completion_count;
goto recv_already_posted;
}
if (!ENCODING_IS_VALID(sm_p->u.io.encoding))
{
PRINT_ENCODING_ERROR("supported", sm_p->u.io.encoding);
must_loop_encodings = 1;
sm_p->u.io.encoding = (ENCODING_INVALID_MIN + 1);
}
else if (!ENCODING_IS_SUPPORTED(sm_p->u.io.encoding))
{
PRINT_ENCODING_ERROR("supported", sm_p->u.io.encoding);
must_loop_encodings = 1;
sm_p->u.io.encoding = ENCODING_SUPPORTED_MIN;
}
try_next_encoding:
assert(ENCODING_IS_VALID(sm_p->u.io.encoding));
ret = PINT_encode(&msg->req, PINT_ENCODE_REQ, &msg->encoded_req,
msg->svr_addr, sm_p->u.io.encoding);
if (ret)
{
if (must_loop_encodings)
{
gossip_debug(GOSSIP_CLIENT_DEBUG, "Looping through "
"encodings [%d/%d]\n", sm_p->u.io.encoding,
ENCODING_INVALID_MAX);
sm_p->u.io.encoding++;
if (ENCODING_IS_VALID(sm_p->u.io.encoding))
{
goto try_next_encoding;
}
}
/*
FIXME: make this a clean error transition by adjusting
the completion count and/or (not) exiting
*/
/* If one of the msgpairs gets this type of error, then the entire
* request should be aborted. Becky Ligon.
*/
PVFS_perror_gossip("PINT_encode failed", ret);
sm_p->u.io.stored_error_code = ret;
js_p->error_code = IO_FATAL_ERROR;
return SM_ACTION_COMPLETE;
}
/* calculate maximum response message size and allocate it */
msg->max_resp_sz = PINT_encode_calc_max_size(
PINT_ENCODE_RESP, msg->req.op, sm_p->u.io.encoding);
msg->encoded_resp_p = BMI_memalloc(
msg->svr_addr, msg->max_resp_sz, BMI_RECV);
if (!msg->encoded_resp_p)
{
/* FIXME: see above FIXME */
sm_p->u.io.stored_error_code = -PVFS_ENOMEM;
js_p->error_code = IO_FATAL_ERROR;
return SM_ACTION_COMPLETE;
}
/*
recalculate the status user tag based on this the progress
of the current context like this: status_user_tag = (4 *
(context index) + context phase)
*/
assert(cur_ctx->index == i);
status_user_tag = ((4 * i) + IO_SM_PHASE_REQ_MSGPAIR_RECV);
gossip_debug(GOSSIP_IO_DEBUG," posting recv with "
"status_user_tag=%lu (max_size %d)\n",
status_user_tag, msg->max_resp_sz);
cur_ctx->session_tag = PINT_util_get_next_tag();
cur_ctx->msg_recv_has_been_posted = 0;
cur_ctx->msg_recv_in_progress = 0;
server_config = PINT_get_server_config_struct(sm_p->object_ref.fs_id);
ret = job_bmi_recv(
msg->svr_addr, msg->encoded_resp_p, msg->max_resp_sz,
cur_ctx->session_tag, BMI_PRE_ALLOC, smcb, status_user_tag,
&msg->recv_status, &msg->recv_id, pint_client_sm_context,
server_config->client_job_bmi_timeout, sm_p->hints);
PINT_put_server_config_struct(server_config);
/* ret -1: problem, do not look at msg recv_status */
/* ret 1: immediate completion, see status */
/* ret 0: okay */
if (ret < 0) {
PVFS_perror_gossip("Post of receive failed", ret);
js_p->error_code = ret;
continue;
}
if (ret == 0) {
int tmp = 0;
/* perform a quick test to see if the recv failed before
* posting the send; if it reports an error quickly then
* we can save the confusion of sending a request for
* which we can't recv a response
*/
ret = job_test(msg->recv_id, &tmp, NULL,
&msg->recv_status, 0,
pint_client_sm_context);
if (ret < 0) {
PVFS_perror_gossip("Post of receive failed", ret);
js_p->error_code = ret;
continue;
}
}
/* either from job_bmi_recv or from job_test finding something */
if (ret == 1) {
/*
* This recv must have completed with an error because the
* server has not yet been sent our request.
*/
PVFS_perror_gossip("Receive immediately failed",
msg->recv_status.error_code);
//ret = msg->recv_status.error_code;
js_p->error_code = IO_RETRY;
continue;
}
cur_ctx->msg_recv_has_been_posted = 1;
cur_ctx->msg_recv_in_progress = 1;
/* posted the receive okay */
++sm_p->u.io.msgpair_completion_count;
recv_already_posted:
if (cur_ctx->msg_send_has_been_posted &&
cur_ctx->msg_send_in_progress)
{
++sm_p->u.io.msgpair_completion_count;
continue;
}
status_user_tag = ((4 * i) + IO_SM_PHASE_REQ_MSGPAIR_SEND);
cur_ctx->msg_send_has_been_posted = 0;
cur_ctx->msg_send_in_progress = 0;
gossip_debug(GOSSIP_IO_DEBUG," posting send with "
"status_user_tag=%lu\n", status_user_tag);
server_config = PINT_get_server_config_struct(sm_p->object_ref.fs_id);
ret = job_bmi_send_list(
msg->encoded_req.dest, msg->encoded_req.buffer_list,
msg->encoded_req.size_list, msg->encoded_req.list_count,
msg->encoded_req.total_size, cur_ctx->session_tag,
msg->encoded_req.buffer_type, 1, smcb, status_user_tag,
&msg->send_status, &msg->send_id, pint_client_sm_context,
server_config->client_job_bmi_timeout, sm_p->hints);
PINT_put_server_config_struct(server_config);
if (ret < 0) {
PVFS_perror_gossip("Post of send failed, cancelling recv", ret);
msg->op_status = msg->send_status.error_code;
msg->send_id = 0;
job_bmi_cancel(msg->recv_id, pint_client_sm_context);
js_p->error_code = ret;
continue;
}
if (ret == 1) {
if (msg->send_status.error_code == 0) {
gossip_debug(GOSSIP_IO_DEBUG, " io_datafile_post_msgpairs: "
"send completed immediately.\n");
/* 0 is the valid "completed job id" value */
cur_ctx->msg_send_has_been_posted = 1;
msg->send_id = 0;
} else {
PVFS_perror_gossip("Send immediately failed, cancelling recv",
msg->recv_status.error_code);
msg->op_status = msg->send_status.error_code;
msg->send_id = 0;
/* still wait for the recv to complete */
job_bmi_cancel(msg->recv_id, pint_client_sm_context);
js_p->error_code = msg->send_status.error_code;
continue;
}
} else {
/* posted the send */
cur_ctx->msg_send_in_progress = 1;
cur_ctx->msg_send_has_been_posted = 1;
++sm_p->u.io.msgpair_completion_count;
}
}/*end for*/
gossip_debug(GOSSIP_IO_DEBUG, "io_datafile_post_msgpairs: "
"completion count is %d\n",
sm_p->u.io.msgpair_completion_count);
/* if anything posted, just wait for that to complete, else
* go sleep then try the remaining msgpairs again */
if (sm_p->u.io.msgpair_completion_count
|| sm_p->u.io.flow_completion_count
|| sm_p->u.io.write_ack_completion_count)
return SM_ACTION_DEFERRED; /* means go find another machine to run */
else {
js_p->error_code = IO_RETRY;
return SM_ACTION_COMPLETE; /* means look at error_code and run my */
/* machine again. */
}
}
/*
* For IO retry, come here to sleep a bit then go back and post
* some more msgpairs. If mirroring, then we have more setup before a
* retry can happen. Also, the retry-count is calculated differently.
*/
static int io_datafile_post_msgpairs_retry (struct PINT_smcb *smcb
,job_status_s *js_p)
{
struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
PVFS_object_attr *attr = &(sm_p->getattr.attr);
struct PINT_client_io_sm *io = &(sm_p->u.io);
gossip_debug(GOSSIP_IO_DEBUG,"Executing io_datafile_post_msgpairs_retry...\n");
/* Are we mirroring on a READ request? */
if ( (attr->mask & PVFS_ATTR_META_MIRROR_DFILES) &&
io->io_type == PVFS_IO_READ )
{
js_p->error_code = IO_MIRRORING;
return SM_ACTION_COMPLETE;
}
js_p->error_code = IO_NO_MIRRORING;
return SM_ACTION_COMPLETE;
}
static int io_datafile_mirror_retry(struct PINT_smcb *smcb
,job_status_s *js_p )
{
struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
PVFS_object_attr *attr = &(sm_p->getattr.attr);
PVFS_metafile_attr *meta = &(attr->u.meta);
struct PINT_client_io_sm *io = &(sm_p->u.io);
PINT_sm_msgpair_state *msg = NULL;
PINT_client_io_ctx *ctx = NULL;
uint32_t index = 0;
uint32_t copies = 0;
int i,j,ret;
char *enc_req_bytes = NULL;
gossip_debug(GOSSIP_MIRROR_DEBUG,"Executing %s...\n",__func__);
gossip_debug(GOSSIP_IO_DEBUG,"Executing io_datafile_mirror_retry...\n");
/* Have we exhausted the number of retries */
if (io->retry_count >= sm_p->msgarray_op.params.retry_limit)
{
js_p->error_code = IO_DATAFILE_TRANSFERS_COMPLETE;
return SM_ACTION_COMPLETE;
}
/* Find failed contexts and prepare them for retry.
*/
for (i=0; i<io->context_count; i++)
{
ctx = &(io->contexts[i]);
msg = &(ctx->msg);
if (ctx->msg_recv_has_been_posted && ctx->msg_send_has_been_posted)
/* this context has not failed. */
continue;
/* cleanup the failed context */
enc_req_bytes = (char *)&(msg->encoded_req);
for (j=0; j<sizeof(msg->encoded_req); j++)
{
if (enc_req_bytes[j] != '\0')
{
PINT_encode_release(&(msg->encoded_req),PINT_ENCODE_REQ);
break;
}
}/*end for*/
if (msg->encoded_resp_p)
{
BMI_memfree(msg->svr_addr
,msg->encoded_resp_p
,msg->max_resp_sz
,BMI_RECV);
}
memset(&(msg->encoded_req),0,sizeof(msg->encoded_req));
memset(&(msg->svr_addr),0,sizeof(msg->svr_addr));
msg->encoded_resp_p = NULL;
/* use the primary data handle */
if (ctx->retry_original)
{
/* setup context to retry the original */
ctx->data_handle = meta->dfile_array[ctx->server_nr];
ctx->retry_original = 0;
msg->handle = ctx->data_handle;
msg->req.u.io.handle = ctx->data_handle;
ret=PINT_cached_config_map_to_server(&(msg->svr_addr)
,msg->handle
,msg->fs_id);
if (ret)
{
gossip_lerr("Unable to determine the server address "
"for this handle (%llu)"
,llu(msg->handle));
js_p->error_code = ret;
return SM_ACTION_COMPLETE;
}
PINT_flow_reset(&(ctx->flow_desc));
continue;
}
/* get next mirrored handle. note: if a mirrored handle is zero, then
* this means that the creation of this mirrored object failed for its
* particular server. if so, then get the next valid handle. as a
* last resort, retry the original handle.
*/
copies = ctx->current_copies_count;
for (;copies < meta->mirror_copies_count; copies++)
{
index = (copies*meta->dfile_count) + ctx->server_nr;
if (meta->mirror_dfile_array[index] != 0)
{ /* we have found a valid mirrored handle */
ctx->data_handle = meta->mirror_dfile_array[index];
break;
}
}
/* we have NOT found a valid mirrored handle, so retry the primary */
if ( copies == meta->mirror_copies_count )
{
ctx->data_handle = meta->dfile_array[ctx->server_nr];
ctx->retry_original = 0;
ctx->current_copies_count = 0;
io->retry_count++;
/* setup context to retry original */
msg->handle = ctx->data_handle;
msg->req.u.io.handle = ctx->data_handle;
ret=PINT_cached_config_map_to_server(&(msg->svr_addr)
,msg->handle
,msg->fs_id);
if (ret)
{
gossip_lerr("Unable to determine the server address "
"for this handle (%llu)"
,llu(msg->handle));
js_p->error_code = ret;
return SM_ACTION_COMPLETE;
}
PINT_flow_reset(&(ctx->flow_desc));
continue;
}
/* setup the context for the discovered mirrored handle */
msg->handle = ctx->data_handle;
msg->req.u.io.handle = ctx->data_handle;
ret=PINT_cached_config_map_to_server(&(msg->svr_addr)
,msg->handle
,msg->fs_id);
if (ret)
{
gossip_lerr("Unable to determine the server address "
"for this handle (%llu)"
,llu(msg->handle));
js_p->error_code = ret;
return SM_ACTION_COMPLETE;
}
PINT_flow_reset(&(ctx->flow_desc));
/* setup for the NEXT io-retry event for this context */
ctx->current_copies_count++;
if ( ctx->current_copies_count == meta->mirror_copies_count )
{/* we have gone through all of the mirrored handles, after this
* iteration executes; so, indicate original for the next retry event.
*/
ctx->current_copies_count = 0;
ctx->retry_original = 1;
io->retry_count++;
}
}/*end for each context*/
/* sleep a small while before starting the next round of retries */
return (job_req_sched_post_timer(sm_p->msgarray_op.params.retry_delay
,smcb
,0
,js_p
,NULL
,pint_client_sm_context));
}/*end io_datafile_mirror_retry*/
static int io_datafile_no_mirror_retry(struct PINT_smcb *smcb
,job_status_s *js_p)
{
struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
/* give up if beyond retry limit */
++sm_p->u.io.retry_count;
if (sm_p->u.io.retry_count > sm_p->msgarray_op.params.retry_limit) {
gossip_debug(GOSSIP_CLIENT_DEBUG, "%s: retry %d exceeds limit %d\n"
,__func__
,sm_p->u.io.retry_count
,sm_p->msgarray_op.params.retry_delay);
js_p->error_code = IO_DATAFILE_TRANSFERS_COMPLETE;
return SM_ACTION_COMPLETE;
}
gossip_debug(GOSSIP_CLIENT_DEBUG, "%s: retry %d, wait %d ms\n", __func__,
sm_p->u.io.retry_count, sm_p->msgarray_op.params.retry_delay);
return job_req_sched_post_timer(sm_p->msgarray_op.params.retry_delay,
smcb, 0, js_p, NULL, pint_client_sm_context);
}
/*
This state allows us to make sure all posted operations complete and
are accounted for. since this handles ALL operation completions,
there's special case handling of completing the msgpair recv. in
this case we post the flow operations as soon as we see them (the
main motivation for not using the common msgpairarray code).
*/
static PINT_sm_action io_datafile_complete_operations(
struct PINT_smcb *smcb, job_status_s *js_p)
{
struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
int ret = -PVFS_EINVAL, index = 0, i;
unsigned long status_user_tag = (unsigned long)
js_p->status_user_tag;
PINT_client_io_ctx *cur_ctx = NULL;
PVFS_object_attr * attr;
int matched_send_or_recv = 0;
struct server_configuration_s *server_config = NULL;
gossip_debug(
GOSSIP_CLIENT_DEBUG, "(%p) io_datafile_complete_operations "
"(tag %lu)\n", sm_p, status_user_tag);
assert(sm_p->u.io.msgpair_completion_count > -1);
assert(sm_p->u.io.flow_completion_count > -1);
assert(sm_p->u.io.write_ack_completion_count > -1);
attr = &sm_p->getattr.attr;
assert(attr);
/* check if we're completing a send or recv msgpair */
if (STATUS_USER_TAG_IS_SEND_OR_RECV(status_user_tag))
{
/*
* The completion count might validly be zero when recovering from
* a cancellation.
*/
if (sm_p->u.io.msgpair_completion_count)
{
ret = io_complete_context_send_or_recv(smcb, js_p);
if (ret < 0) {
/* problem */
PVFS_perror_gossip(
"io_complete_context_send_or_recv failed", ret);
js_p->error_code = ret;
return SM_ACTION_COMPLETE;
} else if (ret == 0) {
/* is a send */
gossip_debug(GOSSIP_IO_DEBUG, " matched send in context "
"%d; continuing.\n", index);
js_p->error_code = 0;
/* If send had problem, BMI will apparently ensure that the
* recv will fail too, so handle the retry stuff there.
*/
return SM_ACTION_DEFERRED;
} else {
/* is a recv */
assert(ret == IO_RECV_COMPLETED);
matched_send_or_recv = 1;
}
}
}
/* if we've just completed a recv above, process the receive
* and post the flow if we're doing a read
*/
if (ret == IO_RECV_COMPLETED)
{
ret = io_process_context_recv(sm_p, js_p, &cur_ctx);
if (ret < 0)
{
char buf[64] = {0};
PVFS_strerror_r(ret, buf, 64);
gossip_debug(GOSSIP_IO_DEBUG,
"%s: io_process_context_recv failed: "
"%s (%d remaining msgpairs)\n",
__func__, buf, sm_p->u.io.msgpair_completion_count);
js_p->error_code = ret;
/* if recv failed, probably have to do the send again too */
cur_ctx->msg_send_has_been_posted = 0;
cur_ctx->msg_recv_has_been_posted = 0;
goto check_next_step;
}
if(sm_p->u.io.io_type == PVFS_IO_WRITE)
{
/* we expect this write to _not_ succeed immediately, because we
* have not posted the flow yet.
*/
ret = io_post_write_ack_recv(smcb, cur_ctx);
if(ret < 0)
{
PVFS_perror_gossip("Post of write-ack recv failed", ret);
js_p->error_code = ret;
goto check_next_step;
}
}
/* for now we wait to post the flow until we get back
* the response from the server for both reads and writes
*/
ret = io_post_flow(smcb, cur_ctx);
if(ret < 0)
{
char buf[64] = {0};
PVFS_strerror_r(ret, buf, 64);
gossip_debug(GOSSIP_IO_DEBUG,
"%s: io_post_flow failed: "
"%s (%d remaining msgpairs)\n",
__func__,
buf,
sm_p->u.io.msgpair_completion_count);
PVFS_perror_gossip("Flow post failed", ret);
cur_ctx->msg_send_has_been_posted = 0;
cur_ctx->msg_recv_has_been_posted = 0;
js_p->error_code = ret;
goto check_next_step;
}
}
/* check if we've completed all msgpairs and posted all flows */
if (matched_send_or_recv)
{
if (sm_p->u.io.msgpair_completion_count == 0)
{
gossip_debug(GOSSIP_IO_DEBUG, "*** all msgpairs complete "
"(all flows posted)\n");
}
else
{
gossip_debug(
GOSSIP_IO_DEBUG, "*** %d msgpair completions "
"pending\n", sm_p->u.io.msgpair_completion_count);
}
}
/* at this point, we're either completing a flow or a write ack */
if (STATUS_USER_TAG_TYPE(status_user_tag, IO_SM_PHASE_FLOW))
{
assert(sm_p->u.io.flow_completion_count);
index = STATUS_USER_TAG_GET_INDEX(
status_user_tag, IO_SM_PHASE_FLOW);
cur_ctx = &sm_p->u.io.contexts[index];
assert(cur_ctx);
cur_ctx->flow_status = *js_p;
if (cur_ctx->write_ack_in_progress)
{
int ret = 0;
assert(sm_p->u.io.write_ack_completion_count);
server_config = PINT_get_server_config_struct(
sm_p->object_ref.fs_id);
ret = job_reset_timeout(cur_ctx->write_ack.recv_id,
server_config->client_job_bmi_timeout);
PINT_put_server_config_struct(server_config);
/*
allow -PVFS_EINVAL errors in case the recv has already
completed (before we've processed it)
*/
assert((ret == 0) || (ret == -PVFS_EINVAL));
}
gossip_debug(GOSSIP_IO_DEBUG, " matched completed flow for "
"context %p%s\n", cur_ctx,
((cur_ctx->write_ack_in_progress ?
" and reset write_recv timeout" : "")));
cur_ctx->flow_in_progress = 0;
sm_p->u.io.flow_completion_count--;
assert(sm_p->u.io.flow_completion_count > -1);
/* look for flow error when no write ack is in progress (usually a
* read case)
*/
if (js_p->error_code < 0 && !cur_ctx->write_ack_in_progress)
{
if ((PVFS_ERROR_CLASS(-js_p->error_code) == PVFS_ERROR_BMI) ||
(PVFS_ERROR_CLASS(-js_p->error_code) == PVFS_ERROR_FLOW) ||
(js_p->error_code == -ECONNRESET) ||
(js_p->error_code == -PVFS_EPROTO))
{
/* if this is a an error that we can retry */
gossip_err(
"%s: flow failed, retrying from msgpair\n", __func__);
cur_ctx->msg_send_has_been_posted = 0;
cur_ctx->msg_recv_has_been_posted = 0;
}
else
{
/* do not retry on remaining error codes */
gossip_err(
"%s: flow failed, not retrying\n", __func__);
/* forcing the count high insures that the sm won't restart */
sm_p->u.io.retry_count = sm_p->msgarray_op.params.retry_limit;
}
}
/*To test fail-over uncomment the following. This will allow the code
* to go through the retry state at least one time on a read operation.
*/
//if (!cur_ctx->write_ack_in_progress && sm_p->u.io.retry_count==0)
//{
// cur_ctx->msg_send_has_been_posted = 0;
// cur_ctx->msg_recv_has_been_posted = 0;
//}
}
else if (STATUS_USER_TAG_TYPE(status_user_tag, IO_SM_PHASE_FINAL_ACK))
{
assert(sm_p->u.io.write_ack_completion_count);
index = STATUS_USER_TAG_GET_INDEX(
status_user_tag, IO_SM_PHASE_FINAL_ACK);
cur_ctx = &sm_p->u.io.contexts[index];
assert(cur_ctx);
assert(cur_ctx->write_ack.recv_status.actual_size <=
cur_ctx->write_ack.max_resp_sz);
cur_ctx->write_ack.recv_id = 0;
cur_ctx->write_ack.recv_status = *js_p;
gossip_debug(GOSSIP_IO_DEBUG, " matched completed ack for "
"context %p\n", cur_ctx);
cur_ctx->write_ack_in_progress = 0;
sm_p->u.io.write_ack_completion_count--;
assert(sm_p->u.io.write_ack_completion_count > -1);
if (js_p->error_code < 0) {
gossip_debug(GOSSIP_IO_DEBUG,
"%s: write-ack failed, retrying from msgpair\n", __func__);
cur_ctx->msg_send_has_been_posted = 0;
cur_ctx->msg_recv_has_been_posted = 0;
}
else
{
/* if we successfully received an ack. If the flow has _not_
* finished, then we should go ahead and cancel it (the ack is
* reporting an error, no point in waiting for flow timeout).
*/
if(cur_ctx->flow_in_progress != 0)
{
job_flow_cancel(cur_ctx->flow_job_id,
pint_client_sm_context);
/* bump up the retry count to prevent the state machine from
* restarting after this error propigates
*/
sm_p->u.io.retry_count = sm_p->msgarray_op.params.retry_limit;
}
}
}
check_next_step:
/*
* If something is pending, return SM_ACTION_DEFERRED to let SM find the
* next thing to do.
*/
if (sm_p->u.io.msgpair_completion_count
|| sm_p->u.io.flow_completion_count
|| sm_p->u.io.write_ack_completion_count) {
if (PINT_smcb_cancelled(smcb))
{
gossip_debug(GOSSIP_IO_DEBUG, "detected I/O cancellation with "
"%d flows and %d write acks pending\n",
sm_p->u.io.flow_completion_count,
sm_p->u.io.write_ack_completion_count);
}
else
{
gossip_debug(GOSSIP_IO_DEBUG, " %d flows pending, %d write acks "
"pending, %d msgpair\n",
sm_p->u.io.flow_completion_count,
sm_p->u.io.write_ack_completion_count,
sm_p->u.io.msgpair_completion_count);
}
return SM_ACTION_DEFERRED;
}
/*
* Else either we've finished it all or have some msgpairs to retry
* that failed earlier.
*/
for (i=0; i < sm_p->u.io.datafile_count; i++) {
PINT_client_io_ctx *cur_ctx = &sm_p->u.io.contexts[i];
if (!cur_ctx->msg_recv_has_been_posted)
break;
if (!cur_ctx->msg_send_has_been_posted)
break;
}
if (i < sm_p->u.io.datafile_count && !PINT_smcb_cancelled(smcb)) {
gossip_debug(GOSSIP_IO_DEBUG,
"*** %s: some msgpairs to repost\n", __func__);
js_p->error_code = IO_RETRY;
} else {
gossip_debug(GOSSIP_IO_DEBUG, "*** all operations %s "
"(msgpairs, flows, write acks)\n",
(PINT_smcb_cancelled(smcb) ? "cancelled" : "completed"));
js_p->error_code = IO_DATAFILE_TRANSFERS_COMPLETE;
}
return SM_ACTION_COMPLETE;
}
static PINT_sm_action io_analyze_results(
struct PINT_smcb *smcb, job_status_s *js_p)
{
struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
int ret = -PVFS_EINVAL, i = 0;
PVFS_object_attr *attr;
gossip_debug(GOSSIP_CLIENT_DEBUG, "(%p) io state: "
"io_analyze_results\n", sm_p);
attr = &sm_p->getattr.attr;
/* I/O op failed or cancelled if the transfers didn't complete or
* the error code is non-zero when returning from small I/O sm
*/
if (js_p->error_code != IO_DATAFILE_TRANSFERS_COMPLETE &&
(js_p->error_code == 0 && !sm_p->u.io.small_io))
{
ret = (sm_p->u.io.stored_error_code ?
sm_p->u.io.stored_error_code :
js_p->error_code);
if (ret == 0)
{
ret = (PINT_smcb_cancelled(smcb) ? -PVFS_ECANCEL : -PVFS_EIO);
}
}
else if (!PINT_smcb_cancelled(smcb))
{
/*
* look through all the contexts for errors, saving the first
* one to return (if any) while adding up the size of the
* transfer (in case things actually completed).
*
* in the case of small I/O, we don't have to add up the size
* based on the contexts (which weren't initialized), since the
* small I/O state machine sets the total_size directly
*/
ret = 0;
if(!sm_p->u.io.small_io)
{
for(i = 0; i < sm_p->u.io.context_count; i++)
{
PINT_client_io_ctx *cur_ctx = &sm_p->u.io.contexts[i];
assert(cur_ctx);
ret = io_check_context_status(
cur_ctx, sm_p->u.io.io_type, &sm_p->u.io.total_size);
if (ret < 0)
{
if (ret == -PVFS_ECANCEL)
{
gossip_debug(GOSSIP_IO_DEBUG, "*** I/O operation "
"cancelled\n");
}
else
{
gossip_debug(GOSSIP_IO_DEBUG,
"io_check_context_status found error: %d", ret);
}
break;
}
gossip_debug(
GOSSIP_IO_DEBUG, "[%d/%d] running size is %lld\n",
(i + 1), sm_p->u.io.datafile_count,
lld(sm_p->u.io.total_size));
}
gossip_debug(GOSSIP_IO_DEBUG, "[%d/%d] running size is %lld\n",
(i + 1), sm_p->u.io.datafile_count,
lld(sm_p->u.io.total_size));
}
/*
at this point, we may know an error occurred. if we
couldn't find any errors in the context, use the preserved
error code from the complete_operations state (which may be
success)
*/
if (ret == 0)
{
char buf[64] = {0};
ret = (PINT_smcb_cancelled(smcb) ? -PVFS_ECANCEL :
sm_p->u.io.stored_error_code);
PVFS_strerror_r(ret, buf, 64);
gossip_debug(GOSSIP_IO_DEBUG, "no context errors found; "
"using: %s\n", buf);
}
}
else
{
ret = (PINT_smcb_cancelled(smcb) ? -PVFS_ECANCEL : -PVFS_EIO);
}
/* be sure there are no jobs still laying around */
assert((sm_p->u.io.msgpair_completion_count == 0) &&
(sm_p->u.io.flow_completion_count == 0) &&
(sm_p->u.io.write_ack_completion_count == 0));
/*
FIXME: non bmi errors pop out in flow failures above -- they are
not properly marked as flow errors either, so we check for them
explicitly here (but not all -- fix it for real).
*/
if (((PVFS_ERROR_CLASS(-ret) == PVFS_ERROR_BMI) ||
(PVFS_ERROR_CLASS(-ret) == PVFS_ERROR_FLOW) ||
(ret == -ECONNRESET) || (ret == -PVFS_EPROTO)) &&
(sm_p->u.io.retry_count < sm_p->msgarray_op.params.retry_limit))
{
assert(!PINT_smcb_cancelled(smcb));
sm_p->u.io.stored_error_code = 0;
sm_p->u.io.total_size = 0; /* start from the beginning again */
sm_p->u.io.retry_count++;
gossip_debug(GOSSIP_IO_DEBUG, "Retrying I/O operation "
"(attempt number %d)\n", sm_p->u.io.retry_count);
if(ret == -BMI_ECANCEL)
{
/* if we got a BMI cancellation, then it probably indicates a
* that a BMI timeout has expired; we should retry without
* introducing another delay
*/
js_p->error_code = IO_RETRY_NODELAY;
}
else
{
js_p->error_code = IO_RETRY;
}
goto analyze_results_exit;
}
/* all other errors we should just propigate immediately */
if(ret != 0)
{
js_p->error_code = ret;
goto analyze_results_exit;
}
gossip_debug(GOSSIP_IO_DEBUG, "total bytes transferred is %lld\n",
lld(sm_p->u.io.total_size));
if(sm_p->u.io.io_type == PVFS_IO_WRITE)
{
js_p->error_code = 0;
sm_p->u.io.io_resp_p->total_completed = sm_p->u.io.total_size;
/* we don't know if the file size has changed here so we invalidate
* the size in the attribute cache. The only sure-fire way to
* recompute the file size (if our write was past eof) is to
* get all the datafile sizes (we could have written to what was
* previously a hole). That's too expensive for just a cached
* size update.
*/
PINT_acache_invalidate_size(sm_p->object_ref);
/* we can skip the check for holes since its only needed in the
* case of reads
*/
goto analyze_results_exit;
}
/* In order to give the sysint caller the correct value for length
* of bytes read, we have to check for holes in the logical file.
* The algorithm is as follows:
*
* 1. If the size of the memory request is equivalent to the number of
* bytes read, we know there are no holes and the total size
* is the correct value to return back to the caller.
*
* 2. If 1. is false, either there's a hole in the file within the
* region of the file request, or the request is past EOF. If the request
* is NOT past EOF, then the return value for length of bytes read
* is equivalent to the size of the memory request. To check that the
* request is not past EOF, we iterate through the target datafiles,
* calculate the logical file offsets of each based on the their physical
* bstream size, looking for a logical offset that is >= the upper bound
* of the memory request. If we find a target datafile that matches this
* criteria, we know that the request is not past EOF, and that
* returned bytes read is equivalent to the size of the memory request.
*
* 3. If none of the target datafile logical offsets are >= the upper
* bound of the file request, we still must check all the datafiles
* that are beyond the last target datafile. To do this we have to
* go and get the sizes of each one, and perform the above comparison
* on them as well. Again, if one of their logical offsets >= the upper
* bound of the file request, we know the returned bytes read is the
* size of the file request.
*
* 4. If we don't find any datafiles with
* logical offset >= the upper bound of the file request, the returned
* bytes read value is the size of the file request minus the last offset
* of the datafiles (where the EOF occurs).
*/
if(sm_p->u.io.total_size == PINT_REQUEST_TOTAL_BYTES(sm_p->u.io.mem_req))
{
sm_p->u.io.io_resp_p->total_completed = sm_p->u.io.total_size;
}
else
{
PVFS_offset eor; /* end-of-request */
PVFS_offset max_datafile_logical_offset;
PVFS_offset filereq_ub_offset;
/* compute the upper bound of the file request (based on the
* memory request. This is the logical offset used to compare
* against all the datafile logical offsets
*/
ret = io_find_offset(
sm_p,
PINT_REQUEST_TOTAL_BYTES(sm_p->u.io.mem_req),
&filereq_ub_offset);
if(ret)
{
js_p->error_code = ret;
goto analyze_results_exit;
}
eor = filereq_ub_offset + sm_p->u.io.file_req_offset;
max_datafile_logical_offset = attr->u.meta.dist->methods->
logical_file_size(
attr->u.meta.dist->params,
attr->u.meta.dfile_count,
sm_p->u.io.dfile_size_array);
if(max_datafile_logical_offset > eor)
{
eor = max_datafile_logical_offset;
/* we found a logical offset that is past the end of the
* request, so we know the request is not past EOF.
*
* In this case, we don't need to change the size of the
* file stored in the attribute cache
*/
sm_p->u.io.io_resp_p->total_completed =
PINT_REQUEST_TOTAL_BYTES(
sm_p->u.io.mem_req);
ret = io_zero_fill_holes(sm_p, eor,
sm_p->u.io.datafile_count,
sm_p->u.io.dfile_size_array,
sm_p->u.io.datafile_index_array);
if(ret < 0)
{
js_p->error_code = ret;
goto analyze_results_exit;
}
}
else
{
/* if we fail to find a datafile that matches, we still don't
* know if the request is past EOF or we just don't have all
* the datafiles sizes. If it turns out we already
* have all the datafile sizes, we
* can go straight to computing the total size. Otherwise, we
* need to get the rest of the datafiles.
* At some point we should fix
* this and the getattr state machine to allow us to only get
* the remaining datafile sizes that we need, instead of
* getting all of them. Right now the getattr state machine
* just gets them all.
*/
if(sm_p->u.io.datafile_count == attr->u.meta.dfile_count)
{
/* we skip getting all the datafile sizes (since we already
* have them) and just compute the total size.
*/
PVFS_size total_size;
ret = io_find_total_size(
sm_p, max_datafile_logical_offset, &total_size);
if(ret < 0)
{
js_p->error_code = ret;
goto analyze_results_exit;
}
sm_p->u.io.io_resp_p->total_completed = total_size;
ret = io_zero_fill_holes(sm_p, eor,
sm_p->u.io.datafile_count,
sm_p->u.io.dfile_size_array,
NULL);
if(ret < 0)
{
js_p->error_code = ret;
goto analyze_results_exit;
}
js_p->error_code = 0;
goto analyze_results_exit;
}
/* looks like we don't have all the datafile sizes, so
* we need to go and get them
*/
/* NOTE: when jumping to getattr_datafile_sizes, results will be */
/* allocated and stored in sm_p->getattr.size_array. */
/* setting this state result will cause the state machine to
* jump to getattr_datafile_sizes and get all the
* datafile sizes from all the
* servers. Once complete, we will return back to the
* io state machine at the analyze_size_results state.
*/
js_p->error_code = IO_GET_DATAFILE_SIZE;
goto analyze_results_exit;
}
}
js_p->error_code = ret;
analyze_results_exit:
return SM_ACTION_COMPLETE;
}
static PINT_sm_action io_analyze_size_results(
struct PINT_smcb *smcb, job_status_s *js_p)
{
/* Now that we have all the datafile sizes,
* this state allows us to finish our check that the file request
* is not beyond EOF, and return the appropriate value for bytes
* read to the sysint caller.
*
* The check iterates through all the datafiles and compares
* their logical sizes with the upper bound of the file request.
* If one of the datafile's logical sizes is >= than the ub,
* we know the request is not past EOF. Otherwise it must be, and
* the return value for bytes read is calculated from the size
* of the file request and the greatest logical offset of
* the datafiles (the actual EOF).
*/
struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
PVFS_offset eof = 0;
PVFS_offset eor;
PVFS_offset filereq_ub_offset;
int ret;
PVFS_object_attr * attr;
attr = &sm_p->getattr.attr;
assert(attr);
ret = io_find_offset(
sm_p,
PINT_REQUEST_TOTAL_BYTES(sm_p->u.io.mem_req),
&filereq_ub_offset);
if(ret < 0)
{
js_p->error_code = ret;
goto error_exit;
}
eor = filereq_ub_offset + sm_p->u.io.file_req_offset;
eof = attr->u.meta.dist->methods->
logical_file_size(
attr->u.meta.dist->params,
attr->u.meta.dfile_count,
sm_p->getattr.size_array);
if(eof > eor)
{
eor = eof;
/* we found a logical offset that is past the end of the
* request, so we know the request is not past EOF
*/
sm_p->u.io.io_resp_p->total_completed =
PINT_REQUEST_TOTAL_BYTES(sm_p->u.io.mem_req);
ret = io_zero_fill_holes(sm_p, eor,
attr->u.meta.dfile_count,
sm_p->getattr.size_array,
NULL);
if(ret < 0)
{
js_p->error_code = ret;
goto error_exit;
}
}
else
{
PVFS_size total_size;
ret = io_find_total_size(sm_p, eof, &total_size);
if(ret < 0)
{
js_p->error_code = ret;
goto error_exit;
}
sm_p->u.io.io_resp_p->total_completed = total_size;
ret = io_zero_fill_holes(sm_p, eof,
attr->u.meta.dfile_count,
sm_p->getattr.size_array,
NULL);
if(ret < 0)
{
js_p->error_code = ret;
goto error_exit;
}
}
js_p->error_code = 0;
error_exit:
return SM_ACTION_COMPLETE;
}
static PINT_sm_action io_cleanup(
struct PINT_smcb *smcb, job_status_s *js_p)
{
struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
gossip_debug(GOSSIP_CLIENT_DEBUG,
"(%p) io state: io_cleanup\n", sm_p);
io_contexts_destroy(sm_p);
io_datafile_index_array_destroy(sm_p);
PINT_SM_GETATTR_STATE_CLEAR(sm_p->getattr);
if(sm_p->u.io.dfile_size_array)
{
PINT_SM_DATAFILE_SIZE_ARRAY_DESTROY(&sm_p->u.io.dfile_size_array);
}
/*these errors occur only within THIS machine and indicate an error that
*occurred BEFORE starting msgpairs or small-io.
*/
if (js_p->error_code == IO_FATAL_ERROR)
js_p->error_code = sm_p->u.io.stored_error_code;
sm_p->error_code = js_p->error_code;
if (sm_p->error_code)
{
char buf[64] = {0};
PINT_acache_invalidate(sm_p->object_ref);
PVFS_strerror_r(sm_p->error_code, buf, 64);
gossip_debug(GOSSIP_IO_DEBUG,
"*** Final I/O operation error is %s\n", buf);
}
PINT_SET_OP_COMPLETE;
return SM_ACTION_TERMINATE;
}
static inline int io_decode_ack_response(
PINT_client_io_ctx *cur_ctx,
struct PINT_decoded_msg *decoded_resp,
struct PVFS_server_resp **resp)
{
int ret = -PVFS_EINVAL;
gossip_debug(GOSSIP_IO_DEBUG, "- io_process_context_recv called\n");
assert(cur_ctx && decoded_resp && resp);
ret = PINT_serv_decode_resp(
cur_ctx->msg.fs_id, cur_ctx->msg.encoded_resp_p, decoded_resp,
&cur_ctx->msg.svr_addr,
cur_ctx->msg.recv_status.actual_size, resp);
if (ret)
{
PVFS_perror("PINT_server_decode_resp failed", ret);
return ret;
}
assert((*resp)->status < 1);
cur_ctx->msg.op_status = (*resp)->status;
if (cur_ctx->msg.recv_status.error_code || cur_ctx->msg.op_status)
{
gossip_debug(
GOSSIP_IO_DEBUG, " error %d with status %d related "
"to response from context %p; not submitting flow.\n",
cur_ctx->msg.recv_status.error_code,
cur_ctx->msg.op_status, cur_ctx);
if (cur_ctx->msg.recv_status.error_code)
{
PVFS_perror_gossip(
"io_process_context_recv (recv_status.error_code)",
cur_ctx->msg.recv_status.error_code);
ret = cur_ctx->msg.recv_status.error_code;
}
else if (cur_ctx->msg.op_status)
{
PVFS_perror_gossip("io_process_context_recv (op_status)",
cur_ctx->msg.op_status);
ret = cur_ctx->msg.op_status;
}
PINT_serv_free_msgpair_resources(
&cur_ctx->msg.encoded_req, cur_ctx->msg.encoded_resp_p,
decoded_resp, &cur_ctx->msg.svr_addr,
cur_ctx->msg.max_resp_sz);
memset(&cur_ctx->msg.encoded_req,0,sizeof(cur_ctx->msg.encoded_req));
cur_ctx->msg.encoded_resp_p = NULL;
}
return ret;
}
/* post flow sets up the flow and posts it. This may be called
* either immediately after the request is posted (in the case of writes
* at present), or not until the ack from the request is received (as
* in the case of reads).
*/
static inline int io_post_flow(
PINT_smcb *smcb,
PINT_client_io_ctx *cur_ctx)
{
PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
int ret = 0;
PVFS_object_attr *attr = NULL;
struct server_configuration_s *server_config = NULL;
unsigned long status_user_tag = 0;
struct filesystem_configuration_s * fs_config;
gossip_debug(GOSSIP_IO_DEBUG, "%s: entry\n", __func__);
if (!sm_p || !cur_ctx)
{
return -PVFS_EINVAL;
}
/* We need the file's metadata info (distribution and datafile count) */
attr = &sm_p->getattr.attr;
assert(attr);
/*
* Notify BMI about the memory buffer the user passed in. For transports
* that need registration, this allows them to work with one large region
* rather than lots of small stripe-size regions. But only bother if the
* request is contiguous; too complex and likely no faster in the highly
* fragmented case.
*/
if (sm_p->u.io.mem_req->num_contig_chunks == 1)
{
struct bmi_optimistic_buffer_info binfo;
binfo.buffer = sm_p->u.io.buffer;
binfo.len = PINT_REQUEST_TOTAL_BYTES(sm_p->u.io.mem_req),
binfo.rw = sm_p->u.io.io_type;
BMI_set_info(cur_ctx->msg.svr_addr, BMI_OPTIMISTIC_BUFFER_REG, &binfo);
}
gossip_debug(GOSSIP_IO_DEBUG, "* mem req size is %lld, "
"file_req size is %lld (bytes)\n",
lld(PINT_REQUEST_TOTAL_BYTES(sm_p->u.io.mem_req)),
lld(PINT_REQUEST_TOTAL_BYTES(sm_p->u.io.file_req)));
/* must reset the error_code and internal PINT_distribute fields
* in case of a retry */
PINT_flow_reset(&cur_ctx->flow_desc);
cur_ctx->flow_desc.file_data.fsize =
sm_p->u.io.dfile_size_array[cur_ctx->index];
cur_ctx->flow_desc.file_data.dist = attr->u.meta.dist;
cur_ctx->flow_desc.file_data.server_nr = cur_ctx->server_nr;
cur_ctx->flow_desc.file_data.server_ct = attr->u.meta.dfile_count;
cur_ctx->flow_desc.file_req = sm_p->u.io.file_req;
cur_ctx->flow_desc.file_req_offset = sm_p->u.io.file_req_offset;
cur_ctx->flow_desc.mem_req = sm_p->u.io.mem_req;
cur_ctx->flow_desc.tag = cur_ctx->session_tag;
cur_ctx->flow_desc.type = sm_p->u.io.flowproto_type;
cur_ctx->flow_desc.user_ptr = NULL;
gossip_debug(GOSSIP_IO_DEBUG, " bstream_size = %lld, datafile "
"nr=%d, ct=%d, file_req_off = %lld\n",
lld(cur_ctx->flow_desc.file_data.fsize),
cur_ctx->flow_desc.file_data.server_nr,
cur_ctx->flow_desc.file_data.server_ct,
lld(cur_ctx->flow_desc.file_req_offset));
if (sm_p->u.io.io_type == PVFS_IO_READ)
{
cur_ctx->flow_desc.file_data.extend_flag = 0;
cur_ctx->flow_desc.src.endpoint_id = BMI_ENDPOINT;
cur_ctx->flow_desc.src.u.bmi.address = cur_ctx->msg.svr_addr;
cur_ctx->flow_desc.dest.endpoint_id = MEM_ENDPOINT;
cur_ctx->flow_desc.dest.u.mem.buffer = sm_p->u.io.buffer;
}
else
{
assert(sm_p->u.io.io_type == PVFS_IO_WRITE);
cur_ctx->flow_desc.file_data.extend_flag = 1;
cur_ctx->flow_desc.src.endpoint_id = MEM_ENDPOINT;
cur_ctx->flow_desc.src.u.mem.buffer = sm_p->u.io.buffer;
cur_ctx->flow_desc.dest.endpoint_id = BMI_ENDPOINT;
cur_ctx->flow_desc.dest.u.bmi.address = cur_ctx->msg.svr_addr;
}
status_user_tag = ((4 * cur_ctx->index) + IO_SM_PHASE_FLOW);
server_config = PINT_get_server_config_struct(sm_p->object_ref.fs_id);
fs_config = PINT_config_find_fs_id(server_config, cur_ctx->msg.fs_id);
if(fs_config)
{
/* pick up any buffer settings overrides from fs conf */
cur_ctx->flow_desc.buffer_size = fs_config->fp_buffer_size;
cur_ctx->flow_desc.buffers_per_flow = fs_config->fp_buffers_per_flow;
}
ret = job_flow(
&cur_ctx->flow_desc, smcb, status_user_tag,
&cur_ctx->flow_status, &cur_ctx->flow_job_id,
pint_client_sm_context,
server_config->client_job_flow_timeout, sm_p->hints);
PINT_put_server_config_struct(server_config);
/* if the flow fails immediately, then we have to do some special
* handling. This function is not equiped to handle the failure
* directly, so we instead post a null job that will propigate the error
* to the normal state where we interpret flow errors
*/
if((ret < 0) || (ret == 1 && cur_ctx->flow_status.error_code != 0))
{
/* make sure the error code is stored in the flow descriptor */
if(ret == 1)
{
cur_ctx->flow_desc.error_code = cur_ctx->flow_status.error_code;
}
else
{
cur_ctx->flow_desc.error_code = ret;
}
gossip_debug(GOSSIP_IO_DEBUG, " immediate flow failure for "
"context %p, error code: %d\n", cur_ctx,
cur_ctx->flow_desc.error_code);
gossip_debug(GOSSIP_IO_DEBUG, " posting job_null() to propigate.\n");
/* post a fake job to propigate the flow failure to a later state */
ret = job_null(cur_ctx->flow_desc.error_code, sm_p,
status_user_tag, &cur_ctx->flow_status,
&cur_ctx->flow_job_id, pint_client_sm_context);
if(ret !=0)
{
return(ret);
}
}
else
{
gossip_debug(GOSSIP_IO_DEBUG, " posted flow for "
"context %p\n", cur_ctx);
}
cur_ctx->flow_has_been_posted = 1;
cur_ctx->flow_in_progress = 1;
sm_p->u.io.flow_completion_count++;
return 0;
}
static inline int io_post_write_ack_recv(
PINT_smcb *smcb,
PINT_client_io_ctx * cur_ctx)
{
PINT_client_sm * sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
int ret;
unsigned long status_user_tag;
gossip_debug(GOSSIP_IO_DEBUG, " preposting write "
"ack for context %p.\n", cur_ctx);
cur_ctx->write_ack.max_resp_sz = PINT_encode_calc_max_size(
PINT_ENCODE_RESP, PVFS_SERV_WRITE_COMPLETION,
sm_p->u.io.encoding);
cur_ctx->write_ack.encoded_resp_p = BMI_memalloc(
cur_ctx->msg.svr_addr, cur_ctx->write_ack.max_resp_sz,
BMI_RECV);
if (!cur_ctx->write_ack.encoded_resp_p)
{
gossip_err("BMI_memalloc (for write ack) failed\n");
return -PVFS_ENOMEM;
}
/*
we're pre-posting the final write ack here, even though it's
ahead of the flow phase; reads are at the flow phase.
the timeout used here is a scaling one that needs to be long
enough for the entire flow to occur
*/
status_user_tag = ((4 * cur_ctx->index) + IO_SM_PHASE_FINAL_ACK);
/*
pre-post this recv with an infinite timeout and adjust it
after the flow completes since we don't know how long a flow
can take at this point
*/
ret = job_bmi_recv(
cur_ctx->msg.svr_addr, cur_ctx->write_ack.encoded_resp_p,
cur_ctx->write_ack.max_resp_sz, cur_ctx->session_tag,
BMI_PRE_ALLOC, smcb, status_user_tag,
&cur_ctx->write_ack.recv_status, &cur_ctx->write_ack.recv_id,
pint_client_sm_context, JOB_TIMEOUT_INF, sm_p->hints);
if (ret < 0)
{
gossip_err("job_bmi_recv (write ack) failed\n");
return ret;
}
assert(ret == 0);
cur_ctx->write_ack_has_been_posted = 1;
cur_ctx->write_ack_in_progress = 1;
sm_p->u.io.write_ack_completion_count++;
return 0;
}
/*
returns 0 on send completion; IO_RECV_COMPLETED on recv completion,
and -PVFS_error on failure
*/
static inline int io_complete_context_send_or_recv(
PINT_smcb *smcb,
job_status_s *js_p)
{
int ret = -PVFS_EINVAL, index = 0;
unsigned long status_user_tag = 0;
PINT_client_io_ctx *cur_ctx = NULL;
PINT_sm_msgpair_state *msg = NULL;
PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
gossip_debug(GOSSIP_IO_DEBUG,
"- complete_context_send_or_recv called\n");
assert(smcb && js_p);
assert(smcb->op == PVFS_SYS_IO);
status_user_tag = (unsigned long)js_p->status_user_tag;
if (STATUS_USER_TAG_TYPE(
status_user_tag, IO_SM_PHASE_REQ_MSGPAIR_RECV))
{
index = STATUS_USER_TAG_GET_INDEX(
status_user_tag, IO_SM_PHASE_REQ_MSGPAIR_RECV);
gossip_debug(GOSSIP_IO_DEBUG, "got a recv completion with "
"context index %d\n", index);
cur_ctx = &sm_p->u.io.contexts[index];
assert(cur_ctx);
msg = &cur_ctx->msg;
msg->recv_id = 0;
msg->recv_status = *js_p;
assert(msg->recv_status.error_code <= 0);
assert(msg->recv_status.actual_size <= msg->max_resp_sz);
cur_ctx->msg_recv_in_progress = 0;
sm_p->u.io.msgpair_completion_count--;
ret = IO_RECV_COMPLETED;
}
else if (STATUS_USER_TAG_TYPE(
status_user_tag, IO_SM_PHASE_REQ_MSGPAIR_SEND))
{
index = STATUS_USER_TAG_GET_INDEX(
status_user_tag, IO_SM_PHASE_REQ_MSGPAIR_RECV);
gossip_debug(GOSSIP_IO_DEBUG, "got a send completion with "
"context index %d\n", index);
cur_ctx = &sm_p->u.io.contexts[index];
assert(cur_ctx);
msg = &cur_ctx->msg;
msg->send_id = 0;
msg->send_status = *js_p;
assert(msg->send_status.error_code <= 0);
cur_ctx->msg_send_in_progress = 0;
sm_p->u.io.msgpair_completion_count--;
ret = 0;
}
return ret;
}
/**
* process_context_recv handles the ack or nack from the server
* in response to the I/O request. This is called for each I/O context
* i.e. each specific server response for each datafile.
*/
static inline int io_process_context_recv(
PINT_client_sm *sm_p,
job_status_s *js_p,
PINT_client_io_ctx **out_ctx)
{
int ret = -PVFS_EINVAL, index = 0;
unsigned long status_user_tag = 0;
struct PINT_decoded_msg decoded_resp;
struct PVFS_server_resp *resp = NULL;
PINT_client_io_ctx *cur_ctx = NULL;
gossip_debug(GOSSIP_IO_DEBUG,
"- io_process_context_recv called\n");
assert(sm_p && js_p);
assert(STATUS_USER_TAG_TYPE(
status_user_tag, IO_SM_PHASE_REQ_MSGPAIR_RECV));
status_user_tag = (unsigned long)js_p->status_user_tag;
index = STATUS_USER_TAG_GET_INDEX(
status_user_tag, IO_SM_PHASE_REQ_MSGPAIR_RECV);
cur_ctx = &sm_p->u.io.contexts[index];
assert(cur_ctx);
*out_ctx = cur_ctx;
if (js_p->error_code)
{
{
char buf[1024];
PVFS_strerror_r(js_p->error_code, buf, sizeof(buf));
buf[sizeof(buf)-1] = '\0';
gossip_debug(GOSSIP_IO_DEBUG, "%s: entered with error: %s\n",
__func__, buf);
}
return js_p->error_code;
}
/* decode the response from the server */
ret = io_decode_ack_response(cur_ctx, &decoded_resp, &resp);
if (ret)
{
{
char buf[1024];
PVFS_strerror_r(js_p->error_code, buf, sizeof(buf));
buf[sizeof(buf)-1] = '\0';
gossip_debug(GOSSIP_IO_DEBUG, "%s: failed: %s\n", __func__, buf);
}
return ret;
}
/* save the datafile size */
sm_p->u.io.dfile_size_array[cur_ctx->index] = resp->u.io.bstream_size;
/* now we can destroy I/O request response resources */
ret = PINT_serv_free_msgpair_resources(
&cur_ctx->msg.encoded_req, cur_ctx->msg.encoded_resp_p,
&decoded_resp, &cur_ctx->msg.svr_addr,
cur_ctx->msg.max_resp_sz);
memset(&cur_ctx->msg.encoded_req,0,sizeof(cur_ctx->msg.encoded_req));
cur_ctx->msg.encoded_resp_p = NULL;
if (ret)
{
PVFS_perror_gossip("PINT_serv_free_msgpair_resources "
"failed", ret);
return ret;
}
return ret;
}
static inline int io_check_context_status(
PINT_client_io_ctx *cur_ctx,
int io_type,
PVFS_size *total_size)
{
int ret = 0;
gossip_debug(GOSSIP_IO_DEBUG, "- io_check_context_status called\n");
assert(cur_ctx && total_size);
if (cur_ctx->msg.send_status.error_code)
{
gossip_debug(GOSSIP_IO_DEBUG,
" error (%d) in msgpair send for context %p\n",
cur_ctx->msg.send_status.error_code, cur_ctx);
ret = cur_ctx->msg.send_status.error_code;
}
else if (cur_ctx->msg.recv_status.error_code)
{
gossip_debug(GOSSIP_IO_DEBUG,
" error (%d) in msgpair recv for context %p\n",
cur_ctx->msg.recv_status.error_code, cur_ctx);
ret = cur_ctx->msg.recv_status.error_code;
}
else if (io_type == PVFS_IO_WRITE)
{
/* we check the write ack status before the flow status so that the
* error code that the server reported in the ack takes precedence
*/
if (cur_ctx->write_ack.recv_status.error_code)
{
gossip_debug(
GOSSIP_IO_DEBUG,
" error (%d) in final ack for context %p\n",
cur_ctx->write_ack.recv_status.error_code, cur_ctx);
assert(cur_ctx->write_ack_has_been_posted);
ret = cur_ctx->write_ack.recv_status.error_code;
}
else if (cur_ctx->write_ack_has_been_posted)
{
struct PINT_decoded_msg decoded_resp;
struct PVFS_server_resp *resp = NULL;
/*
size for writes are reported in the final ack, but we
have to decode it first
*/
ret = PINT_serv_decode_resp(
cur_ctx->msg.fs_id, cur_ctx->write_ack.encoded_resp_p,
&decoded_resp, &cur_ctx->msg.svr_addr,
cur_ctx->write_ack.recv_status.actual_size, &resp);
if (ret == 0)
{
gossip_debug(
GOSSIP_IO_DEBUG,
" %lld bytes written to context %p\n",
lld(resp->u.write_completion.total_completed),
cur_ctx);
*total_size += resp->u.write_completion.total_completed;
/* pass along the error code from the server as well */
ret = resp->status;
PINT_decode_release(&decoded_resp, PINT_DECODE_RESP);
}
else
{
PVFS_perror_gossip("PINT_serv_decode_resp failed", ret);
}
PINT_flow_reset(&cur_ctx->flow_desc);
BMI_memfree(cur_ctx->msg.svr_addr,
cur_ctx->write_ack.encoded_resp_p,
cur_ctx->write_ack.max_resp_sz, BMI_RECV);
}
else if (cur_ctx->flow_status.error_code)
{
gossip_debug(GOSSIP_IO_DEBUG,
" error (%d) in flow for context %p\n",
cur_ctx->flow_status.error_code, cur_ctx);
PINT_flow_reset(&cur_ctx->flow_desc);
ret = cur_ctx->flow_status.error_code;
}
}
else if (cur_ctx->flow_status.error_code)
{
gossip_debug(GOSSIP_IO_DEBUG,
" error (%d) in flow for context %p\n",
cur_ctx->flow_status.error_code, cur_ctx);
PINT_flow_reset(&cur_ctx->flow_desc);
ret = cur_ctx->flow_status.error_code;
}
else if (io_type == PVFS_IO_READ)
{
gossip_debug(
GOSSIP_IO_DEBUG, " %lld bytes read from context %p\n",
lld(cur_ctx->flow_desc.total_transferred), cur_ctx);
/* size for reads are reported in the flow */
*total_size += cur_ctx->flow_desc.total_transferred;
/*
we can't reset the flow here in case we have to do a zero
fill adjustment that we haven't detected yet
*/
}
return ret;
}
static int io_get_max_unexp_size(
struct PINT_Request * file_req,
PVFS_handle handle,
PVFS_fs_id fs_id,
enum PVFS_io_type type,
int * max_unexp_payload)
{
int ret;
PVFS_BMI_addr_t server_addr;
int bmi_max_unexp_payload;
/* we need to get the server address for the particular server
* with this datafile so that we can use it to get the size
* of the unexpected payload for the bmi interface associated
* with that server. We used the payload size to determine
* whether to do small I/O.
*/
ret = PINT_cached_config_map_to_server(
&server_addr,
handle,
fs_id);
if(ret < 0)
{
return ret;
}
/* now get the value for the BMI interface in use for that server */
ret = BMI_get_info(server_addr, BMI_GET_UNEXP_SIZE,
(void *)&bmi_max_unexp_payload);
if(ret < 0)
{
return ret;
}
*max_unexp_payload = bmi_max_unexp_payload;
return 0;
}
/*
determines what subset of the datafiles actually contain data that
we are interested in for this request. returns 0 on success,
-PVFS_error on failure
*/
static int io_find_target_datafiles(
PVFS_Request mem_req,
PVFS_Request file_req,
PVFS_offset file_req_offset,
PINT_dist *dist_p,
PVFS_fs_id fs_id,
enum PVFS_io_type io_type,
PVFS_handle *input_handle_array,
int input_handle_count,
int *handle_index_array,
int *handle_index_out_count,
int *sio_handle_index_array,
int *sio_handle_index_count)
{
int ret = -PVFS_EINVAL, i = 0;
struct PINT_Request_state *req_state = NULL;
struct PINT_Request_state *mem_req_state = NULL;
PINT_request_file_data tmp_file_data;
PINT_Request_result tmp_result;
PVFS_offset offsets;
PVFS_size sizes;
int total_bytes = 0;
struct server_configuration_s * server_config;
struct filesystem_configuration_s * fs_config;
int small_io_size;
gossip_debug(GOSSIP_IO_DEBUG, "- io_find_target_datafiles called\n");
if (!handle_index_array || !handle_index_out_count)
{
return ret;
}
*handle_index_out_count = 0;
*sio_handle_index_count = 0;
req_state = PINT_new_request_state(file_req);
if (!req_state)
{
return -PVFS_ENOMEM;
}
mem_req_state = PINT_new_request_state(mem_req);
if (!mem_req_state)
{
PINT_free_request_state(req_state);
return -PVFS_ENOMEM;
}
tmp_file_data.dist = dist_p;
tmp_file_data.server_ct = input_handle_count;
tmp_file_data.extend_flag = 1;
/* for each datafile handle, calculate the unexp request or response
* size (may be different for each server), and then calculate if
* any data exists on that server (in the case of reads)
* or if any data should be written
* (in the case of writes).
*/
for(i = 0; i < input_handle_count; i++)
{
int max_unexp_payload;
ret = io_get_max_unexp_size(
file_req, input_handle_array[i], fs_id,
io_type, &max_unexp_payload);
if(ret < 0)
{
PINT_free_request_state(mem_req_state);
PINT_free_request_state(req_state);
return ret;
}
/* NOTE: we don't have to give an accurate file size here, as
* long as we set the extend flag to tell the I/O req
* processor to continue past eof if needed
*/
tmp_file_data.fsize = 0;
tmp_file_data.server_nr = i;
PINT_REQUEST_STATE_RESET(req_state);
PINT_REQUEST_STATE_RESET(mem_req_state);
/* if a file datatype offset was specified, go ahead and skip
* ahead before calculating
*/
if (file_req_offset)
{
PINT_REQUEST_STATE_SET_TARGET(req_state, file_req_offset);
}
PINT_REQUEST_STATE_SET_FINAL(req_state,
file_req_offset+PINT_REQUEST_TOTAL_BYTES(mem_req));
memset(&tmp_result, 0, sizeof(PINT_Request_result));
/* + 1 here so that the total_bytes processed can exist
* be > max_unexp_payload
*/
tmp_result.bytemax = max_unexp_payload + 1;
tmp_result.segmax = 1;
tmp_result.offset_array = &offsets;
tmp_result.size_array = &sizes;
total_bytes = 0;
/* we need to keep processing the request (not just check for non-zero)
* so that we can figure out whether to do small I/O.
*/
do
{
tmp_result.bytes = 0;
tmp_result.segs = 0;
/* PINT_process_request() returns number of bytes processed */
ret = PINT_process_request(
req_state, mem_req_state, &tmp_file_data,
&tmp_result, PINT_CLIENT);
if (ret < 0)
{
PINT_free_request_state(mem_req_state);
PINT_free_request_state(req_state);
return ret;
}
total_bytes += tmp_result.bytes;
/* we limit the request processing for each datafile to only
* check that the size is as least as big as max_unexp_size.
* That way we know whether to do small I/O. Calculating the
* entire size for each datafile isn't necessary (and may be
* expensive).
*/
} while(!PINT_REQUEST_DONE(req_state)
&& total_bytes <= max_unexp_payload);
/* check if we found data that belongs to this handle */
if (total_bytes != 0)
{
handle_index_array[(*handle_index_out_count)++] = i;
#ifndef PVFS2_SMALL_IO_OFF
/* we need the encoding type from the server config */
server_config = PINT_get_server_config_struct(fs_id);
if(!server_config)
{
return -PVFS_EINVAL;
}
fs_config = PINT_config_find_fs_id(server_config, fs_id);
if(!fs_config)
{
return -PVFS_EINVAL;
}
/* finally, compute the exact payload from the bmi_max_unexp_payload
* and the small io message size
*/
small_io_size = PINT_encode_calc_max_size(
(io_type == PVFS_IO_READ ? PINT_ENCODE_RESP : PINT_ENCODE_REQ),
PVFS_SERV_SMALL_IO,
fs_config->encoding);
PINT_put_server_config_struct(server_config);
if(io_type == PVFS_IO_WRITE)
{
/* add the size of the entire file request */
small_io_size += (PVFS_REQUEST_ENCODED_SIZE *
(file_req->num_nested_req + 1));
}
/* encode/decode funcs malloc a max size for small I/O independent
* of the bmi max unexp payload, so we need to subtract that
* extra here
*/
small_io_size -= (io_type == PVFS_IO_READ ?
extra_size_PVFS_servreq_small_io :
extra_size_PVFS_servresp_small_io);
if(total_bytes + small_io_size <= max_unexp_payload)
{
sio_handle_index_array[(*sio_handle_index_count)++] = i;
}
#endif
gossip_debug(GOSSIP_IO_DEBUG, "%s: "
"datafile[%d] might have data (out=%d)\n",
__func__, i, *handle_index_out_count);
}
}
PINT_free_request_state(req_state);
PINT_free_request_state(mem_req_state);
return 0;
}
/* If there are no datafiles that have a logical
* offset past the upper bound of the file request, we know that
* the request is beyond the EOF of the file. We compute
* the return value for bytes read by finding the upper bound of the
* memory request *within* the logical file (before EOF). This is
* the end of the contiguous segment in the file request < EOF.
* The number of bytes read is then the length of the file request
* from start to this point.
*/
static int
io_find_total_size(
PINT_client_sm * sm_p,
PVFS_offset final_offset,
PVFS_size * total_return_size)
{
int res;
PVFS_offset current_offset;
PVFS_offset offsets[IO_MAX_SEGMENT_NUM];
PVFS_size sizes[IO_MAX_SEGMENT_NUM];
PINT_Request_state * filereq_state;
PINT_Request_state * memreq_state;
PINT_request_file_data rfdata;
PINT_Request_result result;
PVFS_size total_size = 0;
PVFS_object_attr * attr;
int index = 0;
/* if the final offset is zero, then the file size is zero */
if(final_offset == 0)
{
*total_return_size = 0;
return 0;
}
attr = &sm_p->getattr.attr;
filereq_state = PINT_new_request_state(sm_p->u.io.file_req);
memreq_state = PINT_new_request_state(sm_p->u.io.mem_req);
rfdata.server_nr = 0;
rfdata.server_ct = 1;
rfdata.fsize = final_offset;
rfdata.dist = attr->u.meta.dist;
rfdata.extend_flag = 0;
result.offset_array = offsets;
result.size_array = sizes;
result.segmax = IO_MAX_SEGMENT_NUM;
result.bytemax = final_offset;
PINT_REQUEST_STATE_SET_FINAL(
filereq_state, final_offset);
do
{
result.segs = 0;
result.bytes = 0;
res = PINT_process_request(filereq_state, memreq_state,
&rfdata, &result, PINT_SERVER);
if(res < 0)
{
goto exit;
}
for(index = 0; index < result.segs; ++index)
{
current_offset = sm_p->u.io.file_req_offset + offsets[index];
if((final_offset >= current_offset) &&
(final_offset <= (current_offset + sizes[index])))
{
total_size += (final_offset - current_offset);
break;
}
else if(final_offset < current_offset)
{
break;
}
else
{
total_size += sizes[index];
}
}
} while(!PINT_REQUEST_DONE(filereq_state) && result.segs);
*total_return_size = total_size;
exit:
PINT_free_request_state(filereq_state);
PINT_free_request_state(memreq_state);
return 0;
}
/* computes the logical offset in the file request from the size
* of contiguous buffer. This function acts only on the file request
* since the actual size of the file doesn't matter.
*/
int
io_find_offset(
PINT_client_sm * sm_p,
PVFS_size contig_size,
PVFS_offset * total_return_offset)
{
PINT_Request_state * filereq_state;
PINT_Request_state * memreq_state;
PINT_request_file_data rfdata;
PINT_Request_result result;
int res;
PVFS_offset offsets[IO_MAX_SEGMENT_NUM];
PVFS_size sizes[IO_MAX_SEGMENT_NUM];
PVFS_offset total_offset = 0;
PVFS_size total_size = 0;
PVFS_object_attr * attr;
int index = 0;
attr = &sm_p->getattr.attr;
filereq_state = PINT_new_request_state(sm_p->u.io.file_req);
memreq_state = PINT_new_request_state(sm_p->u.io.mem_req);
rfdata.server_nr = 0;
rfdata.server_ct = 1;
rfdata.fsize = 0;
rfdata.dist = attr->u.meta.dist;
rfdata.extend_flag = 1;
result.offset_array = offsets;
result.size_array = sizes;
result.segmax = IO_MAX_SEGMENT_NUM;
result.bytemax = contig_size;
PINT_REQUEST_STATE_SET_FINAL(
filereq_state, contig_size);
do
{
result.segs = 0;
result.bytes = 0;
res = PINT_process_request(filereq_state, memreq_state,
&rfdata, &result, PINT_SERVER);
if(res < 0)
{
PINT_free_request_state(filereq_state);
PINT_free_request_state(memreq_state);
return res;
}
for(index = 0; index < result.segs; ++index)
{
if(contig_size <= (total_size + sizes[index]))
{
total_offset = offsets[index] + (contig_size - total_size);
break;
}
else
{
total_offset = offsets[index] + sizes[index];
total_size += sizes[index];
}
}
} while(!PINT_REQUEST_DONE(filereq_state) && result.segs);
*total_return_offset = total_offset;
PINT_free_request_state(filereq_state);
PINT_free_request_state(memreq_state);
return 0;
}
static int io_zero_fill_holes(
PINT_client_sm *sm_p,
PVFS_size eof,
int datafile_count,
PVFS_size * datafile_size_array,
int * datafile_index_array)
{
PVFS_object_attr * attr;
PINT_request_file_data fdata;
PINT_Request_result result;
PINT_Request_state * file_req_state = NULL;
PINT_Request_state * mem_req_state = NULL;
int i = 0;
int j = 0;
int ret;
PVFS_offset offsets[IO_MAX_SEGMENT_NUM];
PVFS_size sizes[IO_MAX_SEGMENT_NUM];
attr = &sm_p->getattr.attr;
fdata.server_ct = attr->u.meta.dfile_count;
fdata.dist = attr->u.meta.dist;
fdata.extend_flag = 0;
result.bytemax = PINT_REQUEST_TOTAL_BYTES(sm_p->u.io.mem_req);
result.segmax = IO_MAX_SEGMENT_NUM;
result.offset_array = offsets;
result.size_array = sizes;
file_req_state = PINT_new_request_state(sm_p->u.io.file_req);
mem_req_state = PINT_new_request_state(sm_p->u.io.mem_req);
for(; i < datafile_count; ++i)
{
PVFS_size real_bstream_size, actual_bstream_size;
fdata.server_nr = (datafile_index_array ? datafile_index_array[i] : i);
actual_bstream_size = datafile_size_array[i];
fdata.fsize = actual_bstream_size;
result.bytemax = PINT_REQUEST_TOTAL_BYTES(sm_p->u.io.mem_req);
result.segmax = IO_MAX_SEGMENT_NUM;
fdata.extend_flag = 0;
fdata.dist = attr->u.meta.dist;
real_bstream_size =
(PVFS_size) fdata.dist->methods->logical_to_physical_offset(
fdata.dist->params, &fdata, eof);
if(actual_bstream_size < real_bstream_size)
{
/* We've got holes! */
PVFS_offset real_logical_offset =
fdata.dist->methods->physical_to_logical_offset(
fdata.dist->params, &fdata, real_bstream_size);
PVFS_offset actual_logical_offset =
fdata.dist->methods->physical_to_logical_offset(
fdata.dist->params, &fdata, actual_bstream_size);
PINT_REQUEST_STATE_SET_TARGET(
file_req_state,
sm_p->u.io.file_req_offset);
PINT_REQUEST_STATE_SET_FINAL(
file_req_state,
actual_logical_offset);
do
{
result.bytes = 0;
result.segs = 0;
ret = PINT_process_request(file_req_state, mem_req_state,
&fdata, &result, PINT_CLIENT);
if(ret < 0)
{
PVFS_perror_gossip("PINT_process_request failed", ret);
return ret;
}
} while(!PINT_REQUEST_DONE(file_req_state) && result.segs);
PINT_REQUEST_STATE_SET_FINAL(
file_req_state,
real_logical_offset);
fdata.extend_flag = 1;
result.bytes = 0;
result.segs = 0;
do
{
result.bytes = 0;
result.segs = 0;
ret = PINT_process_request(file_req_state, mem_req_state,
&fdata, &result, PINT_CLIENT);
if(ret < 0)
{
PVFS_perror_gossip("PINT_process_request failed", ret);
return ret;
}
if(result.segs)
{
for(j = 0; j < result.segs; ++j)
{
memset((void *)(((size_t)sm_p->u.io.buffer) +
((size_t)offsets[j])),
0, sizes[j]);
}
}
} while(!PINT_REQUEST_DONE(file_req_state) && result.segs);
PINT_REQUEST_STATE_RESET(file_req_state);
PINT_REQUEST_STATE_RESET(mem_req_state);
}
}
PINT_free_request_state(file_req_state);
PINT_free_request_state(mem_req_state);
return 0;
}
static int io_datafile_index_array_init(
PINT_client_sm *sm_p,
int datafile_count)
{
sm_p->u.io.datafile_index_array =
(int *)malloc(datafile_count * sizeof(int));
if(!sm_p->u.io.datafile_index_array)
{
return -PVFS_ENOMEM;
}
memset(sm_p->u.io.datafile_index_array, 0,
(datafile_count * sizeof(int)));
sm_p->u.io.datafile_count = datafile_count;
return 0;
}
static void io_datafile_index_array_destroy(
PINT_client_sm *sm_p)
{
free(sm_p->u.io.datafile_index_array);
sm_p->u.io.datafile_index_array = NULL;
sm_p->u.io.datafile_count = 0;
}
static int io_contexts_init(
PINT_client_sm *sm_p,
int context_count,
PVFS_object_attr *attr)
{
int ret;
int i = 0;
sm_p->u.io.contexts = (PINT_client_io_ctx *)malloc(
context_count * sizeof(PINT_client_io_ctx));
if(!sm_p->u.io.contexts)
{
return -PVFS_ENOMEM;
}
memset(sm_p->u.io.contexts, 0,
(context_count * sizeof(PINT_client_io_ctx)));
sm_p->u.io.context_count = context_count;
for(i = 0; i < context_count; ++i)
{
PINT_client_io_ctx * cur_ctx = &sm_p->u.io.contexts[i];
PINT_sm_msgpair_state *msg = &cur_ctx->msg;
msg->fs_id = sm_p->object_ref.fs_id;
msg->handle =
attr->u.meta.dfile_array[
sm_p->u.io.datafile_index_array[i]];
msg->retry_flag = PVFS_MSGPAIR_NO_RETRY;
msg->comp_fn = NULL;
ret = PINT_cached_config_map_to_server(
&msg->svr_addr, msg->handle, msg->fs_id);
if(ret)
{
gossip_err("Failed to map meta server address\n");
free(sm_p->u.io.contexts);
return ret;
}
gossip_debug(GOSSIP_IO_DEBUG, "initializing context[%d] %p\n",
i, cur_ctx);
cur_ctx->index = i;
cur_ctx->server_nr = sm_p->u.io.datafile_index_array[i];
cur_ctx->data_handle =
attr->u.meta.dfile_array[cur_ctx->server_nr];
PINT_flow_reset(&cur_ctx->flow_desc);
}
return 0;
}
static void io_contexts_destroy(PINT_client_sm *sm_p)
{
int i = 0;
for(; i < sm_p->u.io.context_count; ++i)
{
PINT_flow_clear(&(sm_p->u.io.contexts[i].flow_desc));
}
free(sm_p->u.io.contexts);
sm_p->u.io.contexts = NULL;
sm_p->u.io.context_count = 0;
}
/* unstuff_needed()
*
* looks at the I/O pattern requested and compares against the distribution
* to determine if a stuffed file would have to be "unstuffed" in order to
* service the request
*
* returns IO_UNSTUFF if unstuff is needed
* returns IO_GETATTR_SERVER if current stuffed status needs to be confirmed
* returns 0 otherwise
*/
static int unstuff_needed(
PVFS_Request mem_req,
PVFS_offset file_req_offset,
PINT_dist *dist_p,
uint32_t mask,
enum PVFS_io_type io_type)
{
PVFS_offset max_offset = 0;
PVFS_offset first_unstuffed_offset = 0;
PINT_request_file_data fake_file_data;
gossip_debug(GOSSIP_IO_DEBUG, "sys-io checking to see if file should be unstuffed.\n");
/* check the flag first to see if file is already explicitly marked as
* unstuffed
*/
if(mask & PVFS_ATTR_META_UNSTUFFED)
{
gossip_debug(GOSSIP_IO_DEBUG, "sys-io detected file is already unstuffed.\n");
return(0);
}
/* calculate maximum logical file offset from the callers's parameters */
/* file request is tiled, so we only need to know the beginning file
* offset and size of the memory offset */
max_offset = file_req_offset + PINT_REQUEST_TOTAL_BYTES(mem_req);
gossip_debug(GOSSIP_IO_DEBUG, "sys-io calculated max offset of I/O request as %lld.\n", lld(max_offset));
/* we need to query the distribution to determine what the first offset
* is that does not belong to the first server/datafile. We construct a
* fake server data struct for 2 servers and find out what the first
* offset (above zero) is that hits the second server */
fake_file_data.dist = dist_p;
fake_file_data.server_ct = 2;
fake_file_data.extend_flag = 1;
fake_file_data.fsize = 0;
fake_file_data.server_nr = 1;
/* call next mapped offset to find the next logical offset that appears
* on the 2nd server
*/
first_unstuffed_offset = dist_p->methods->next_mapped_offset(
dist_p->params,
&fake_file_data,
0);
gossip_debug(GOSSIP_IO_DEBUG, "sys-io calculated first unstuffed offset as %lld.\n", lld(first_unstuffed_offset));
/* compare to see if the file needs to be unstuffed yet */
if(max_offset > first_unstuffed_offset)
{
if(io_type == PVFS_IO_READ)
{
/* reads should not unstuff, but we do need to confirm that the
* attributes are up to date before proceeding
*/
gossip_debug(GOSSIP_IO_DEBUG, "sys-io will perform an extra getattr to confirm file is still stuffed.\n");
return(IO_GETATTR_SERVER);
}
else
{
gossip_debug(GOSSIP_IO_DEBUG, "sys-io will unstuff the file.\n");
return(IO_UNSTUFF);
}
}
gossip_debug(GOSSIP_IO_DEBUG, "sys-io will not unstuff the file.\n");
return(0);
}
/* unstuff_comp_fn()
*
* completion function for unstuff msgpair array
*/
static int unstuff_comp_fn(
void *v_p,
struct PVFS_server_resp *resp_p,
int i)
{
PINT_smcb *smcb = v_p;
PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_MSGPAIR_PARENT_SM);
gossip_debug(GOSSIP_IO_DEBUG,
"unstuff/getattr completion fn: unstuff_comp_fn\n");
/* only posted one msgpair */
assert(i==0);
if (resp_p->status != 0)
{
gossip_debug(GOSSIP_IO_DEBUG,
"unstuff negative response with error code: %d\n",
resp_p->status);
return resp_p->status;
}
assert(resp_p->op == PVFS_SERV_UNSTUFF || resp_p->op ==
PVFS_SERV_GETATTR);
if(resp_p->op == PVFS_SERV_UNSTUFF)
{
PINT_acache_update(sm_p->object_ref,
&resp_p->u.unstuff.attr,
NULL);
/* replace attrs found by getattr */
/* PINT_copy_object_attr() takes care of releasing old memory */
PINT_copy_object_attr(&sm_p->getattr.attr, &resp_p->u.unstuff.attr);
}
else
{
gossip_debug(GOSSIP_CLIENT_DEBUG, "Updating attributes before reading beyond stuffing boundary.\n");
PINT_acache_update(sm_p->object_ref,
&resp_p->u.getattr.attr,
NULL);
/* replace attrs found by getattr */
/* PINT_copy_object_attr() takes care of releasing old memory */
PINT_copy_object_attr(&sm_p->getattr.attr, &resp_p->u.getattr.attr);
}
return(0);
}
/*
* Local variables:
* mode: c
* c-indent-level: 4
* c-basic-offset: 4
* End:
*
* vim: ft=c ts=8 sts=4 sw=4 expandtab
*/