Skip to content
Permalink
1139b72d5e
Switch branches/tags

Name already in use

A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
Go to file
 
 
Cannot retrieve contributors at this time
977 lines (824 sloc) 35.2 KB
/*
* OSD I/O State Machine.
*
* Copyright (C) 2007 OSD Team <pvfs-osd@osc.edu>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, version 2 of the License.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <string.h>
#include <assert.h>
#include "client-state-machine.h"
#include "pvfs2-debug.h"
#include "job.h"
#include "gossip.h"
#include "str-utils.h"
#include "pint-cached-config.h"
#include "PINT-reqproto-encode.h"
#include "pint-util.h"
#include "pvfs2-internal.h"
#include "osd-util/osd-util.h"
#include "sense.h"
#include "osd-util/osd-sense.h"
#define KERNEL_BUFSIZE (400*1024)
enum {
LOOP_NEXT_CHUNK = 1012
};
static int osd_io_completion_fn(void *user_args,
struct PVFS_server_resp *resp_p, int index);
%%
nested machine pvfs2_client_osd_io_sm
{
state init
{
run osd_io_init;
success => setup_msgpairs;
default => return;
}
state setup_msgpairs
{
run osd_io_setup_msgpairs;
success => xfer_osd_msgpairs;
default => return;
}
state xfer_osd_msgpairs
{
/* We jump to the OSD state machine to transfer the data */
jump pvfs2_osd_msgpairarray_sm;
success => maybe_xfer_more;
default => cleanup;
}
state maybe_xfer_more
{
run osd_io_maybe_xfer_more;
LOOP_NEXT_CHUNK => setup_msgpairs;
default => analyze_results;
}
state analyze_results
{
run osd_io_analyze_results;
default => test_reconstruct;
}
state test_reconstruct
{
run osd_reconstruct_setup_msgpair;
default => test_reconstruct_xfer_osd_msgpairs;
}
state test_reconstruct_xfer_osd_msgpairs
{
jump pvfs2_osd_msgpairarray_sm;
default => cleanup;
}
state cleanup
{
run osd_io_cleanup;
default => return;
}
}
%%
static int osd_io_init(struct PINT_smcb *smcb, job_status_s *js_p)
{
struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
struct PINT_client_io_sm *io = &sm_p->u.io;
PVFS_object_attr *attr = &sm_p->getattr.attr;
int i, ret;
io->datafile_count = 1;
sm_p->getattr.attr.u.meta.dfile_count = 1;
ret = PINT_msgpairarray_init(&sm_p->msgarray_op, io->datafile_count);
if (ret)
goto out;
sm_p->msgarray_op.count = io->datafile_count;
/*
* Build req states, one for each server, and lookup addresses.
*/
ret = -ENOMEM;
io->file_req_state = PINT_new_request_states(io->file_req,
io->datafile_count);
if (!io->file_req_state)
goto out;
io->mem_req_state = PINT_new_request_states(io->mem_req,
io->datafile_count);
if (!io->mem_req_state)
goto out;
io->temp_req_state = PINT_new_request_state(io->file_req);
if (!io->temp_req_state)
goto out;
/*
* To track read file sizes without GETATTR on the request.
*/
io->short_read = malloc(io->datafile_count * sizeof(*io->short_read));
if (!io->short_read)
goto out;
memset(io->short_read, 0, io->datafile_count * sizeof(*io->short_read));
io->file_data = malloc(io->datafile_count * sizeof(*io->file_data));
if (!io->file_data)
goto out;
io->file_data[0].fsize = 0;
io->file_data[0].server_ct = attr->u.meta.dfile_count;
io->file_data[0].dist = attr->u.meta.dist;
io->file_data[0].extend_flag = 1; /* always disable fsize checking */
for (i=0; i<io->datafile_count; i++) {
//PVFS_handle datafile_handle = attr->u.meta.dfile_array[
// io->datafile_index_array[i]];
PVFS_handle datafile_handle = sm_p->object_ref.handle;
gossip_debug(GOSSIP_IO_DEBUG, "%s: server %d/%d handle %llu\n",
__func__, i, io->datafile_count,
llu(datafile_handle));
ret = PINT_cached_config_map_to_server(
&sm_p->msgarray_op.msgarray[i].svr_addr, datafile_handle,
sm_p->object_ref.fs_id);
if (ret)
goto out;
io->file_req_state[i].target_offset = io->file_req_offset;
io->file_req_state[i].final_offset = io->file_req_offset
+ io->mem_req->aggregate_size;
if (i > 0)
memcpy(&io->file_data[i], &io->file_data[0],
sizeof(io->file_data[0]));
io->file_data[i].server_nr = io->datafile_index_array[i];
/* invariants */
sm_p->msgarray_op.msgarray[i].fs_id = sm_p->object_ref.fs_id;
sm_p->msgarray_op.msgarray[i].handle = sm_p->object_ref.handle;
sm_p->msgarray_op.msgarray[i].retry_flag = PVFS_MSGPAIR_RETRY;
sm_p->msgarray_op.msgarray[i].comp_fn = osd_io_completion_fn;
}
out:
js_p->error_code = ret;
return 1;
}
/* check this */
#define OSD_INIT_MAX_IOVEC (1023)
static int osd_io_setup_msgpairs(struct PINT_smcb *smcb, job_status_s *js_p)
{
struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
struct PINT_client_io_sm *io = &sm_p->u.io;
PVFS_object_attr *attr = &sm_p->getattr.attr;
int i, ret = 0;
struct osd_command *command;
struct bsg_iovec *iov;
int dfile_count = sm_p->u.io.datafile_count;
int is_osd_md = fsid_is_osd_md(sm_p->object_ref.fs_id);
struct server_configuration_s *server_config;
server_config = PINT_get_server_config_struct(
sm_p->object_ref.fs_id);
PINT_put_server_config_struct(server_config);
//~ printf("MAX IOVEC is %d\n", OSD_INIT_MAX_IOVEC);
/* clients offset and length pairs */
PVFS_offset offseta[OSD_INIT_MAX_IOVEC];
PVFS_size sizea[OSD_INIT_MAX_IOVEC];
int csegs_count;
PVFS_size cagg_len;
/* temporary space for querying offset length pairs */
PVFS_offset temp_offset[OSD_INIT_MAX_IOVEC];
PVFS_size temp_size[OSD_INIT_MAX_IOVEC];
/* targets offset and length pairs */
PVFS_offset target_offset[OSD_INIT_MAX_IOVEC];
PVFS_size target_size[OSD_INIT_MAX_IOVEC];
/* total bytes to read/write from each io server */
PVFS_size sbytemax = io->mem_req->aggregate_size;
PVFS_size this_aggregate_size = 0;
/* structures to pass into req proc engine */
PINT_Request_result sresult, cresult;
/* kernel SCSI layer can only handle so many pages at once */
if (sbytemax > KERNEL_BUFSIZE) {
gossip_debug(GOSSIP_IO_DEBUG, "%s: SCSI buffer %lld cant handle %lld\n",
__func__, lld(KERNEL_BUFSIZE), lld(sbytemax));
sbytemax = KERNEL_BUFSIZE;
}
/* for each datafile/io server/osd write ALL of its data */
for (i = 0; i < dfile_count; i++) {
uint8_t *p;
uint32_t len;
PVFS_offset cur_offset;
PVFS_handle datafile_handle;
int j;
//datafile_handle = attr->u.meta.dfile_array[io->datafile_index_array[i]];
datafile_handle = sm_p->object_ref.handle;
if (PINT_REQUEST_DONE(&io->file_req_state[i]) || io->short_read[i]) {
gossip_debug(GOSSIP_IO_DEBUG, "%s: Nothing to do for server %d\n",
__func__, i);
sm_p->msgarray_op.msgarray[i].suppress = 1; /* disable this entry */
continue;
}
/* first time through, request processing will not have skipped in */
cur_offset = io->file_req_state[i].type_offset;
if (cur_offset == 0)
cur_offset = io->file_req_state[i].target_offset;
gossip_debug(GOSSIP_IO_DEBUG,
"%s: %d: frs type %lld target %lld final %lld cur %lld bytemax %lld\n",
__func__, i,
lld(io->file_req_state[i].type_offset),
lld(io->file_req_state[i].target_offset),
lld(io->file_req_state[i].final_offset),
lld(cur_offset),
lld(sbytemax));
PINT_REQUEST_STATE_RST(io->temp_req_state);
/* */
/* figure out servers offset/lengths should get all in one go */
/* */
sresult.segs = 0;
sresult.bytes = 0;
sresult.bytemax = sbytemax; /* request full size */
sresult.offset_array = target_offset;
sresult.size_array = target_size;
sresult.segmax = OSD_INIT_MAX_IOVEC;
/* set up the io req state to pass in */
io->temp_req_state->target_offset = cur_offset; /*may or may not be 0 */
io->temp_req_state->final_offset = io->file_req_state[i].final_offset;
ret = PINT_process_request(io->temp_req_state, NULL, &io->file_data[i],
&sresult, PINT_SERVER);
if (ret) {
gossip_err("%s: server %d process_request call failed\n",
__func__, i);
goto out;
}
this_aggregate_size += sresult.bytes;
#if 0
printf("SERVER INFO [%d]:\n", i);
printf("Total size %lld \n", lld(sresult.bytes));
for (j = 0; j < sresult.segs; j++) {
printf("offset= %lld length= %lld\n", lld(target_offset[j]),
lld(target_size[j]));
}
#endif
/* */
/* figure out clinets offset/lengths one stripe at a time */
/* */
csegs_count = 0;
cagg_len = 0;
while (cagg_len != sresult.bytes) {
cresult.segs = 0;
cresult.bytes = 0;
cresult.bytemax = sresult.bytes;
cresult.offset_array = temp_offset;
cresult.size_array = temp_size;
cresult.segmax = OSD_INIT_MAX_IOVEC - csegs_count;
ret = PINT_process_request(&io->file_req_state[i],
&io->mem_req_state[i], &io->file_data[i],
&cresult, PINT_CLIENT);
if (ret) {
gossip_err("%s: client %d process_request call failed\n",
__func__, i);
goto out;
}
/* now move the results for this strip to the perm array */
for (j = 0; j < cresult.segs; j++) {
offseta[csegs_count + j] = temp_offset[j];
sizea[csegs_count + j] = temp_size[j];
}
csegs_count += cresult.segs;
cagg_len += cresult.bytes;
if(cagg_len > sresult.bytes) {
gossip_err("%s: Client Agg len too big\n", __func__);
ret = -EINVAL;
goto out;
}
}
#if 0
printf("CLIENT INFO [%d]:\n", i);
printf("Total size %lld\n", lld(cagg_len));
for( j = 0; j < csegs_count; j++ ) {
printf("offset= %lld length= %lld\n", lld(offseta[j]),
lld(sizea[j]));
}
#endif
gossip_debug(GOSSIP_IO_DEBUG, "%s: %d: %d Server Segments, %lld bytes\n",
__func__, i, sresult.segs, lld(sresult.bytes));
gossip_debug(GOSSIP_IO_DEBUG, "%s: %d: %d Client Segments, %lld bytes\n",
__func__, i, csegs_count, lld(cagg_len));
command = &sm_p->msgarray_op.msgarray[i].osd_command;
/* must be initalized to NULL for later check */
sm_p->msgarray_op.msgarray[i].osd_iov = NULL;
sm_p->msgarray_op.msgarray[i].osd_sgl = NULL;
p = io->buffer;
if (sresult.segs == 1) { /* contiguous server buff to write to */
if (csegs_count == 1) {
p += offseta[0];
len = sizea[0];
csegs_count = 0; /* do not build a 1-unit iovec */
} else if (csegs_count > 1) {
iov = malloc(csegs_count * sizeof(*iov));
if (iov == NULL) {
ret = -ENOMEM;
goto out;
}
len = 0;
int j;
for (j=0; j<csegs_count; j++) {
iov[j].iov_base = (uintptr_t) (p + offseta[j]);
iov[j].iov_len = sizea[j];
len += sizea[j];
}
p = (void *) iov;
sm_p->msgarray_op.msgarray[i].osd_iov = p; /* free IOV later */
} else {
ret = -EINVAL;
goto out;
}
/* Build the command for this server, using physical offset. */
if (io->io_type == PVFS_IO_READ) {
osd_command_set_read(command, PVFS_OSD_DATA_PID, datafile_handle,
len, target_offset[0]);
command->indata = p;
command->inlen_alloc = len;
command->iov_inlen = csegs_count;
//~ printf("Contig READ %d\n", len);
} else if (io->io_type == PVFS_IO_WRITE) {
if (server_config->post_create && !target_offset[0])
{
uint64_t attrval;
if (is_osd_md)
{
/* needed for osd_md */
char *dist_buf;
attr->perms = PVFS_PERM_VALID;
attr->mask = PVFS_ATTR_COMMON_UID |
PVFS_ATTR_COMMON_GID |
PVFS_ATTR_COMMON_PERM |
PVFS_ATTR_COMMON_ATIME |
PVFS_ATTR_COMMON_CTIME |
PVFS_ATTR_COMMON_MTIME |
PVFS_ATTR_META_DIST |
PVFS_ATTR_META_DFILES |
PVFS_ATTR_COMMON_TYPE;
attr->objtype = PVFS_TYPE_METAFILE;
dist_buf = (char *)malloc(PINT_DIST_PACK_SIZE(sm_p->getattr.attr.u.meta.dist));
if (!dist_buf) {
js_p->error_code = -PVFS_ENOMEM;
return 1;
}
PINT_dist_encode(dist_buf, sm_p->getattr.attr.u.meta.dist);
struct attribute_list attrs[] = {
{ATTR_SET, ANY_PG + PVFS_USEROBJECT_ATTR_PG, 0, &attr->owner, sizeof(PVFS_uid)},
{ATTR_SET, ANY_PG + PVFS_USEROBJECT_ATTR_PG, 1, &attr->group, sizeof(PVFS_gid)},
{ATTR_SET, ANY_PG + PVFS_USEROBJECT_ATTR_PG, 2, &attr->perms, sizeof(PVFS_permissions)},
{ATTR_SET, ANY_PG + PVFS_USEROBJECT_ATTR_PG, 3, &attr->mask, sizeof(uint32_t)},
{ATTR_SET, ANY_PG + PVFS_USEROBJECT_ATTR_PG, 4, &attr->objtype, sizeof(PVFS_ds_type)},
{ATTR_SET, ANY_PG + PVFS_USEROBJECT_ATTR_PG, 5, dist_buf, PINT_DIST_PACK_SIZE(sm_p->getattr.attr.u.meta.dist)},
{ATTR_SET, ANY_PG + PVFS_USEROBJECT_ATTR_PG, 6, attr->u.meta.dfile_array, sizeof(PVFS_handle) * sm_p->getattr.attr.u.meta.dfile_count },
{ATTR_SET, ANY_PG + USER_COLL_PG, 7, &attrval, 8}};
if(!sm_p->getattr.attr.cid) {
sm_p->getattr.attr.cid = COLLECTION_OID_LB; /* root directory */
}
set_htonll(&attrval, sm_p->getattr.attr.cid);
osd_command_set_create_and_write(command, PVFS_OSD_DATA_PID, datafile_handle, len, target_offset[0]);
osd_command_attr_build(command, attrs, 8);
}
else
{
struct attribute_list attrs = {ATTR_SET, ANY_PG + USER_COLL_PG, 1, &attrval, 8};
if(!sm_p->getattr.attr.cid) {
sm_p->getattr.attr.cid = COLLECTION_OID_LB; /* root directory */
}
set_htonll(&attrval, sm_p->getattr.attr.cid);
osd_command_set_create_and_write(command, PVFS_OSD_DATA_PID, datafile_handle, len, target_offset[0]);
osd_command_attr_build(command, &attrs, 1);
}
} else {
osd_command_set_write(command, PVFS_OSD_DATA_PID, datafile_handle,
len, target_offset[0]);
command->outdata = p;
}
command->outlen = len;
command->iov_outlen = csegs_count;
//~ printf("Contig WRITE %d\n", len);
}
} else if (sresult.segs > 0) { /* either need a SGL or optimized SGL */
int j, flag, stride, segl, total_len;
void *sgl = NULL;
len = 0;
uint64_t ddt_size, hdr_offset;
if (sresult.segs <= 1) {
ret = -EINVAL;
goto out;
}
/* check to see if we can optimize things a bit
* much quicker to walk the array than transmit over
* the network -- hopefully
*/
flag = 1;
stride = target_offset[1] - target_offset[0];
segl = target_size[0];
total_len = 0;
for (j = 0; j < sresult.segs; j += 1) {
int t;
if (j+1 < sresult.segs) { /* really only j-1 segments for j offsets */
t = target_offset[j+1] - target_offset[j];
if (stride != t){
flag = 0;
break;
}
}
if (segl != target_size[j]) {
flag = 0;
break;
}
total_len += segl;
}
//~ flag=0; /* force SGL mode always so we can compare to VEC later */
/* move this into a config file or something -- remove eventually*/
if (io->io_type == PVFS_IO_WRITE) {
iov = malloc((csegs_count + 1) * sizeof(*iov));
if (iov == NULL) {
ret = -ENOMEM;
goto out;
}
sm_p->msgarray_op.msgarray[i].osd_iov = iov; /* free iov later */
len = 0;
if (flag) {
ddt_size = sizeof(uint64_t) * 2;
} else {
ddt_size = (sizeof(uint64_t) * sresult.segs * 2) + sizeof(uint64_t);
}
sgl = malloc(ddt_size);
if (sgl == NULL) {
ret = -ENOMEM;
goto out;
}
sm_p->msgarray_op.msgarray[i].osd_sgl = sgl; /* free sgl later */
hdr_offset = 0;
if (flag) {
gossip_debug(GOSSIP_IO_DEBUG, "%s: Building STRIDED read from: Offset/Length pairs"
" %d. DDT Size %lld\n", __func__, sresult.segs, lld(ddt_size));
set_htonll((uint8_t *)sgl, stride);
hdr_offset = sizeof(uint64_t);
set_htonll((uint8_t *)sgl + hdr_offset, segl);
hdr_offset += sizeof(uint64_t);
len = total_len;
} else {
gossip_debug(GOSSIP_IO_DEBUG, "%s: Building SGL: Offset/Length pairs"
" %d. DDT Size %lld\n", __func__, sresult.segs, lld(ddt_size));
set_htonll(sgl, sresult.segs);
hdr_offset = sizeof(uint64_t);
for ( j = 0; j < sresult.segs; j+=1 ) {
set_htonll((uint8_t *)sgl + hdr_offset, target_offset[j]);
hdr_offset += sizeof(uint64_t);
set_htonll((uint8_t *)sgl + hdr_offset, target_size[j]);
len += target_size[j];
hdr_offset += sizeof(uint64_t);
}
}
iov[0].iov_base = (uintptr_t)sgl;
iov[0].iov_len = ddt_size;
for (j=1; j<=csegs_count; j++) {
iov[j].iov_base = (uintptr_t)(p + offseta[j-1]);
iov[j].iov_len = sizea[j-1];
}
p = (void *) iov;
if (len != sresult.bytes) {
ret = -EINVAL;
goto out;
}
len += ddt_size;
osd_command_set_write(command, PVFS_OSD_DATA_PID, datafile_handle,
len, 0);
command->outdata = p;
command->outlen = len;
command->iov_outlen = csegs_count+1;
if (flag)
osd_command_set_ddt(command, DDT_VEC);
else
osd_command_set_ddt(command, DDT_SGL);
//~ printf("SGL Write %d\n", len);
} else if (io->io_type == PVFS_IO_READ) {
if (csegs_count == 1) {
p += offseta[0];
len = sizea[0];
csegs_count = 0; /* do not build a 1-unit iovec */
} else if (csegs_count > 1) {
iov = malloc(csegs_count * sizeof(*iov));
if (iov == NULL) {
ret = -ENOMEM;
goto out;
}
len = 0;
sm_p->msgarray_op.msgarray[i].osd_iov = iov; /* free sgl later */
for (i=0; i<csegs_count; i++) {
iov[i].iov_base = (uintptr_t) (p + offseta[i]);
iov[i].iov_len = sizea[i];
len += sizea[i];
}
p = (void *) iov;
}
osd_command_set_read(command, PVFS_OSD_DATA_PID, datafile_handle,
len, 0);
command->indata = p;
command->inlen_alloc = len;
command->iov_inlen = csegs_count;
len = 0;
if (flag) {
osd_command_set_ddt(command, DDT_VEC);
ddt_size = sizeof(uint64_t) * 2;
} else {
osd_command_set_ddt(command, DDT_SGL);
ddt_size = (sizeof(uint64_t) * sresult.segs * 2) + sizeof(uint64_t);
}
sgl = malloc(ddt_size);
if (sgl == NULL) {
ret = -ENOMEM;
goto out;
}
sm_p->msgarray_op.msgarray[i].osd_sgl = sgl; /* free sgl later */
hdr_offset = 0;
if (flag) {
gossip_debug(GOSSIP_IO_DEBUG, "%s: Building STRIDED from: Offset/Length pairs"
" %d. DDT Size %lld\n", __func__, sresult.segs, lld(ddt_size));
set_htonll((uint8_t *)sgl, stride);
hdr_offset += sizeof(uint64_t);
set_htonll((uint8_t *)sgl + hdr_offset, segl);
hdr_offset += sizeof(uint64_t);
len = total_len;
} else {
gossip_debug(GOSSIP_IO_DEBUG, "%s: Building SGL: Offset/Length pairs"
" %d. DDT Size %lld\n", __func__, sresult.segs, lld(ddt_size));
set_htonll(sgl, sresult.segs);
hdr_offset = sizeof(uint64_t);
for ( j = 0; j < sresult.segs; j+=1 ) {
set_htonll((uint8_t *)sgl + hdr_offset, target_offset[j]);
hdr_offset += sizeof(uint64_t);
set_htonll((uint8_t *)sgl + hdr_offset, target_size[j]);
len += target_size[j];
hdr_offset += sizeof(uint64_t);
}
}
if (len != command->inlen_alloc) {
ret = -EINVAL;
goto out;
}
command->outdata = sgl;
command->outlen = ddt_size;
//~ printf("SGL READ %d\n", len);
} else {
ret = -EINVAL;
goto out;
}
//~ printf("NON-CONTIG\n");
} else {
/* Nothing to do for this server */
gossip_debug(GOSSIP_IO_DEBUG, "%s: Nothing to do for server [%d]",
__func__, i);
sm_p->msgarray_op.msgarray[i].suppress = 1; /* disable this entry */
continue;
}
}
gossip_debug(GOSSIP_IO_DEBUG, "%s: Aggreagte Transferred: %lld\n", __func__,
lld(this_aggregate_size));
out:
js_p->error_code = ret;
PINT_sm_push_frame(smcb, 0, &sm_p->msgarray_op);
return 1;
}
/**
* We assume that the response buffer hasn't been freed yet (before the
* completion function is called. The msgpairarray.sm doesn't free the
* response buffer until after the completion function is called.
*/
static int osd_io_completion_fn(
void *user_args,
struct PVFS_server_resp *resp_p __attribute__((unused)),
int index)
{
struct PINT_smcb *smcb = user_args;
struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_MSGPAIR_PARENT_SM);
struct PINT_client_io_sm *io = &sm_p->u.io;
struct osd_command *command = &sm_p->msgarray_op.msgarray[index].osd_command;
PVFS_size result_size;
/* fixup short read error */
if (io->io_type == PVFS_IO_READ) {
if (command->status == SAM_STAT_CHECK_CONDITION) {
int key, code;
osd_sense_extract(command->sense, command->sense_len, &key, &code);
/* ignore "read past end of user object" */
if (key == OSD_SSK_RECOVERED_ERROR &&
code == OSD_ASC_READ_PAST_END_OF_USER_OBJECT) {
gossip_debug(GOSSIP_IO_DEBUG, "%s: server %d short read\n",
__func__, index);
io->short_read[index] = 1;
command->status = 0;
}
}
}
if (command->status)
return osd_errno_from_status(command->status);
if (io->short_read[index]) {
/* grab actual result size from CSI in status */
const uint8_t *s;
s = osd_sense_extract_csi(command->sense, command->sense_len);
if (!s) {
gossip_err("%s: sense_extract_csi failed\n", __func__);
return -PVFS_EINVAL;
}
result_size = get_ntohll(s);
} else {
/*
* No residual on write, always same as what we asked for, or
* will get status error and fail.
*/
if (io->io_type == PVFS_IO_READ) {
gossip_debug(GOSSIP_IO_DEBUG, "%s: Read Completed\n", __func__);
result_size = command->inlen;
} else {
gossip_debug(GOSSIP_IO_DEBUG, "%s: Write Completed\n", __func__);
result_size = command->outlen;
if (osd_command_get_ddt(command) == DDT_CONTIG) {
/* do nothing */
gossip_debug(GOSSIP_IO_DEBUG, "%s: CONTIG: %lld bytes complete\n",
__func__, lld(result_size));
} else if (osd_command_get_ddt(command) == DDT_SGL) {
if (command->iov_outlen > 0) { /*buffer no longer has the data
directly in it but has IOVs*/
uint64_t addr;
memcpy(&addr, command->outdata, sizeof(uint64_t));
result_size -= ((get_ntohll((void *)addr) * sizeof(uint64_t) * 2)
+ sizeof(uint64_t));
gossip_debug(GOSSIP_IO_DEBUG, "%s: SGL: %lld bytes complete\n",
__func__, lld(result_size));
} else {
gossip_err("%s: Invalid IOV count\n", __func__);
return -PVFS_EINVAL;
}
} else if (osd_command_get_ddt(command) == DDT_VEC) {
result_size -= (2 * sizeof(uint64_t));
gossip_debug(GOSSIP_IO_DEBUG, "%s: VEC: %lld bytes complete\n",
__func__, lld(result_size));
} else {
gossip_err("%s: Inavalid DDT OP\n", __func__);
return -PVFS_EINVAL;
}
}
}
if (sm_p->msgarray_op.msgarray[index].osd_sgl != NULL)
free(sm_p->msgarray_op.msgarray[index].osd_sgl);
if (sm_p->msgarray_op.msgarray[index].osd_iov != NULL)
free(sm_p->msgarray_op.msgarray[index].osd_iov);
io->dfile_size_array[index] += result_size;
io->total_size += result_size;
return 0;
}
static int osd_io_maybe_xfer_more(struct PINT_smcb *smcb, job_status_s *js_p)
{
struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
struct PINT_client_io_sm *io = &sm_p->u.io;
int i, more = 1;
gossip_debug(GOSSIP_IO_DEBUG, "%s: total %lld want %lld.\n", __func__,
lld(io->total_size), lld(io->mem_req->aggregate_size));
if (io->total_size == io->mem_req->aggregate_size)
more = 0;
/*
* If all short read, done.
*
* Proper way to do this is to make sure that _all_ the IO nodes
* report short. This is required to catch files with holes. But
* that is buggy at least on a certain big file generated by Flash.
* This SM would go into an infinite loop somehow.
* Rather than fixing the bug, we hack around for now by checking that
* any one IO node generates a short read. For non-holey files, this
* works fine and requires one fewer iteration around the loop.
* Consider in the future doing a getattr to find the exact size instead
* of relying on short read detection. But coupling a getattr with
* the read guarantees a stripe of data to pad up to the attribute
* offset. Doing a getattr on its own will sit in the critical path
* and take just as long as doing another read to find zero bytes.
* Prefetching the getattr along with the first set of reads would
* work for big files, but would get in the way for small files. Hard.
*/
if (more) {
for (i=0; i<io->datafile_count; i++)
if (io->short_read[i]) {
more = 0;
break;
}
}
gossip_debug(GOSSIP_IO_DEBUG, "%s: End result is: More %d\n", __func__,
more);
js_p->error_code = more ? LOOP_NEXT_CHUNK : 0;
return 1;
}
/* in sys-io.sm */
extern int io_find_offset(struct PINT_client_sm *sm_p, PVFS_size contig_size,
PVFS_offset *total_return_offset);
/*
* For a short read, go figure out if there were holes in the files. There
* are never holes in the objects themselves---the OSD return zeroes. But
* a write at an offset beyond what ever was written could leave some
* untouched servers with empty datafiles. These will return 0 (or not
* enough) bytes.
*/
static int osd_io_analyze_results(struct PINT_smcb *smcb, job_status_s *js_p)
{
struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
PVFS_offset eor, filereq_ub_offset;
int ret = 0;
gossip_debug(GOSSIP_IO_DEBUG, "%s: total bytes transferred %lld\n",
__func__, lld(sm_p->u.io.total_size));
/* Write, no holes possible. Blow the acache like sys-io. */
if (sm_p->u.io.io_type == PVFS_IO_WRITE) {
PINT_acache_invalidate_size(sm_p->object_ref);
goto out;
}
/* Got everything we asked for, no holes. */
if (sm_p->u.io.total_size == PINT_REQUEST_TOTAL_BYTES(sm_p->u.io.mem_req))
goto out;
/* Upper offset of what was asked for in the request */
ret = io_find_offset(sm_p, PINT_REQUEST_TOTAL_BYTES(sm_p->u.io.mem_req),
&filereq_ub_offset);
if (ret)
goto out;
eor = filereq_ub_offset + sm_p->u.io.file_req_offset;
#if 0
/* Now we need to find out the maximum sizes of the bstreams. This
* will be used to see if there is more data in the global file than
* what we read. We'll try to be a bit clever here, in that if a
* server returned a short read, there is no need to ask it again;
* we already have the upper limit.
*/
int missing_servers = sm_p->u.io.datafile_count;
for (i=0; i<sm_p->u.io.datafile_count; i++)
if (sm_p->u.io.short_read_array[i]) /* was a short read */
--missing_servers;
if (missing_servers == 0)
zero_fill without asking sizes, dfile_array is valid
else
ask all servers
zero_fill with the sizes
set of servers that might have data
/* indices that might have data */
/* index points into attr->u.meta.dfile_array */
sm_p->u.io.datafile_index_array
sm_p->u.io.datafile_count
set of servers that did not generate short reads
#endif
out:
js_p->error_code = ret;
return 1;
}
static int osd_io_cleanup(struct PINT_smcb *smcb, job_status_s *js_p)
{
struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
struct PINT_client_io_sm *io = &sm_p->u.io;
struct server_configuration_s *server_config;
server_config = PINT_get_server_config_struct(
sm_p->object_ref.fs_id);
PINT_put_server_config_struct(server_config);
sm_p->msgarray_op.msgarray = NULL;
sm_p->msgarray_op.count = 0;
PINT_free_request_states(io->file_req_state);
PINT_free_request_states(io->mem_req_state);
PINT_free_request_state(io->temp_req_state);
free(io->short_read);
free(io->file_data);
/* return this to PVFS_sys_io */
io->io_resp_p->total_completed = sm_p->u.io.total_size;
return 1;
}
static PINT_sm_action osd_reconstruct_setup_msgpair(
struct PINT_smcb *smcb, job_status_s *js_p)
{
struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
int ret = -PVFS_EINVAL;
PINT_sm_msgpair_state *msg_p = NULL;
PINT_msgpair_init(&sm_p->msgarray_op);
msg_p = &sm_p->msgarray_op.msgpair;
struct osd_command *command = &msg_p->osd_command;
ret = osd_command_set_reconstruct(command, PVFS_OSD_DATA_PID);
command->outdata = "test";
command->outlen = 4;
if (ret) {
osd_error_xerrno(ret, "%s: osd_command_set_remove failed",
__func__);
js_p->error_code = ret;
return 1;
}
js_p->error_code = 0;
msg_p->fs_id = sm_p->object_ref.fs_id;
msg_p->handle = sm_p->object_ref.handle;
msg_p->retry_flag = PVFS_MSGPAIR_RETRY;
msg_p->comp_fn = NULL;
ret = PINT_cached_config_map_to_server(
&msg_p->svr_addr, msg_p->handle, msg_p->fs_id);
if (ret)
{
gossip_err("Failed to map meta server address\n");
js_p->error_code = ret;
}
PINT_sm_push_frame(smcb, 0, &sm_p->msgarray_op);
return SM_ACTION_COMPLETE;
}
/*
* Local variables:
* mode: c
* c-indent-level: 4
* c-basic-offset: 4
* End:
*
* vim: ft=c ts=8 sts=4 sw=4 expandtab
*/