Skip to content

Commit

Permalink
Changes to support user tracking collections in queries and elsewhere
Browse files Browse the repository at this point in the history
  • Loading branch information
joc02012 committed Jun 2, 2012
1 parent c6dbc75 commit a5f27ae
Show file tree
Hide file tree
Showing 18 changed files with 554 additions and 169 deletions.
2 changes: 1 addition & 1 deletion osd-target/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
-include ../Makedefs

SRC := attr.c db.c obj.c osd-schema.c osd.c cdb.c osd-sense.c list-entry.c
SRC += osd-schema.c coll.c mtq.c
SRC += osd-schema.c coll.c mtq.c tracking.c
INC := attr.h db.h obj.h osd-types.h osd.h cdb.h list-entry.h target-sense.h
INC += coll.h mtq.c
DEP := .depend
Expand Down
21 changes: 17 additions & 4 deletions osd-target/cdb.c
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ struct command {
int senselen;
};

/* get attribute by page has been made obsolete as of OSD3R02 */
static int get_attr_page(struct command *cmd, uint64_t pid, uint64_t oid,
uint8_t isembedded, uint16_t numoid, uint32_t cdb_cont_len)
{
Expand Down Expand Up @@ -799,6 +800,16 @@ static int cdb_query(struct command *cmd, uint64_t pid, uint64_t cid,
uint64_t alloc_len = get_ntohll(&cdb[32]);

uint64_t matches_cid = get_ntohll(&cdb[40]);
uint8_t immed_tr = cdb[11] >> 7;

/* osd2r04 6.26.1 - if immed_tr is one, then alloc_len should
be zero and matches_cid should be nonzero */
if (immed_tr && (alloc_len != 0 || matches_cid == 0))
goto out_query_cdb_err;

if (alloc_len != 0 && matches_cid != 0)
goto out_query_cdb_err;

struct cdb_continuation_descriptor *query_desc;
struct cdb_continuation_descriptor *descriptors = cmd->cont.descriptors;

Expand Down Expand Up @@ -830,7 +841,8 @@ static int cdb_query(struct command *cmd, uint64_t pid, uint64_t cid,

return osd_query(cmd->osd, pid, cid, query_desc->length, alloc_len,
query_desc->desc_specific_hdr, cmd->outdata,
&cmd->used_outlen, cdb_cont_len, cmd->sense);
&cmd->used_outlen, cdb_cont_len, immed_tr,
matches_cid, cmd->sense);
out_query_cdb_err:
return sense_basic_build(cmd->sense, OSD_SSK_ILLEGAL_REQUEST,
OSD_ASC_INVALID_FIELD_IN_CDB, pid, cid);
Expand Down Expand Up @@ -1391,7 +1403,7 @@ osd_warning("%s:%d:", __FILE__, __LINE__);
(typeof(desc_hdr))cdb_cont;
uint16_t type = get_ntohs(&desc_hdr->type);
uint32_t length = get_ntohl(&desc_hdr->length);
uint8_t pad_length = desc_hdr->pad_length & 0x7;
uint8_t pad_length = desc_hdr->pad_length;

uint32_t desc_len = length + pad_length;

Expand Down Expand Up @@ -1674,8 +1686,9 @@ static void exec_service_action(struct command *cmd)
uint64_t requested_cid = get_ntohll(&cdb[24]);
uint64_t source_cid = get_ntohll(&cdb[40]);

ret = osd_create_user_tracking_collection(osd, pid, requested_cid,
cdb_cont_len, source_cid, sense);
ret = osd_create_user_tracking_collection(osd, pid,requested_cid,
source_cid,
cdb_cont_len, sense);
break;
}
case OSD_DETACH_CLONE: {
Expand Down
53 changes: 53 additions & 0 deletions osd-target/coll.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ struct coll_tab {
sqlite3_stmt *delcid; /* delete collection cid */
sqlite3_stmt *deloid; /* delete oid from all collections */
sqlite3_stmt *emptycid; /* is collection empty? */
sqlite3_stmt *max; /* max object collection pointer value */
sqlite3_stmt *getcid; /* get collection */
sqlite3_stmt *getoids; /* get objects in a collection */
sqlite3_stmt *copyoids; /* copy oids from one collection to another */
Expand Down Expand Up @@ -117,6 +118,12 @@ int coll_initialize(struct db_context *dbc)
if (ret != SQLITE_OK)
goto out_finalize_emptycid;

sprintf(SQL, "SELECT MAX (number) FROM %s WHERE pid = ? AND cid = ?;",
dbc->coll->name);
ret = sqlite3_prepare(dbc->db, SQL, -1, &dbc->coll->max, NULL);
if (ret != SQLITE_OK)
goto out_finalize_max;

sprintf(SQL, "SELECT cid FROM %s WHERE pid = ? AND oid = ? AND "
" number = ?;", dbc->coll->name);
ret = sqlite3_prepare(dbc->db, SQL, -1, &dbc->coll->getcid, NULL);
Expand Down Expand Up @@ -147,6 +154,9 @@ int coll_initialize(struct db_context *dbc)
out_finalize_getcid:
db_sqfinalize(dbc->db, dbc->coll->getcid, SQL);
SQL[0] = '\0';
out_finalize_max:
db_sqfinalize(dbc->db, dbc->coll->max, SQL);
SQL[0] = '\0';
out_finalize_emptycid:
db_sqfinalize(dbc->db, dbc->coll->emptycid, SQL);
SQL[0] = '\0';
Expand Down Expand Up @@ -178,6 +188,7 @@ int coll_finalize(struct db_context *dbc)
sqlite3_finalize(dbc->coll->delcid);
sqlite3_finalize(dbc->coll->deloid);
sqlite3_finalize(dbc->coll->emptycid);
sqlite3_finalize(dbc->coll->max);
sqlite3_finalize(dbc->coll->getcid);
sqlite3_finalize(dbc->coll->getoids);
sqlite3_finalize(dbc->coll->copyoids);
Expand Down Expand Up @@ -375,6 +386,48 @@ int coll_isempty_cid(struct db_context *dbc, uint64_t pid, uint64_t cid,
}


/*
* return the max object pointer for a particular pid,cid
*
* returns:
* -EINVAL: invalid arg, ignore value of isempty
* OSD_ERROR: in case of other errors, ignore value of isempty
* OSD_OK: success, isempty is set to:
* ==1: if collection is empty or absent
* ==0: if not empty
*/
int coll_max_pointer(struct db_context *dbc, uint64_t pid, uint64_t cid,
uint32_t *number)
{
int ret = 0;
int bound = 0;
*number = 0;

assert(dbc && dbc->db && dbc->coll && dbc->coll->max);

repeat:
ret = 0;
ret |= sqlite3_bind_int64(dbc->coll->max, 1, pid);
ret |= sqlite3_bind_int64(dbc->coll->max, 2, cid);
bound = (ret == SQLITE_OK);
if (!bound) {
error_sql(dbc->db, "%s: bind failed", __func__);
goto out_reset;
}

while ((ret = sqlite3_step(dbc->coll->max)) == SQLITE_BUSY);
if (ret == SQLITE_ROW)
*number = (0 == sqlite3_column_int(dbc->coll->max, 0));

out_reset:
ret = db_reset_stmt(dbc, dbc->coll->max, bound, __func__);
if (ret == OSD_REPEAT)
goto repeat;
out:
return ret;
}


/*
* returns:
* -EINVAL: invalid arg, cid is not set
Expand Down
3 changes: 3 additions & 0 deletions osd-target/coll.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ int coll_get_oids_in_cid(struct db_context *dbc, uint64_t pid, uint64_t cid,
uint8_t *outdata, uint64_t *used_outlen,
uint64_t *add_len, uint64_t *cont_id);

int coll_max_pointer(struct db_context *dbc, uint64_t pid, uint64_t cid,
uint32_t *number);

int coll_copyoids(struct db_context *dbc, uint64_t pid, uint64_t dest_cid,
uint64_t source_cid);

Expand Down
53 changes: 44 additions & 9 deletions osd-target/mtq.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@
*/
int mtq_run_query(struct db_context *dbc, uint64_t pid, uint64_t cid,
struct query_criteria *qc, void *outdata,
uint32_t alloc_len, uint64_t *used_outlen)
uint32_t alloc_len, uint64_t *used_outlen,
uint64_t matches_cid)
{
int ret = 0;
int pos = 0;
Expand All @@ -55,6 +56,7 @@ int mtq_run_query(struct db_context *dbc, uint64_t pid, uint64_t cid,
uint8_t *p = NULL;
uint32_t i = 0;
uint32_t sqlen = 0;
uint32_t number;
uint32_t factor = 2; /* this query fills space quickly */
uint64_t len = 0;
const char *op = NULL;
Expand Down Expand Up @@ -90,10 +92,24 @@ int mtq_run_query(struct db_context *dbc, uint64_t pid, uint64_t cid,
*/

/* build the SQL statment */
sprintf(select_stmt, "SELECT attr.oid FROM %s as coll, %s as attr "
" WHERE coll.pid = coll.pid AND coll.oid = attr.oid AND "
" coll.pid = %llu AND coll.cid = %llu ", coll, attr,
llu(pid), llu(cid));
if (matches_cid != 0) {
ret = coll_max_pointer(dbc, pid, cid, &number);
if (ret != SQLITE_OK) {
goto out;
}
sprintf(select_stmt, "SELECT coll.pid, %llu, attr.oid, %d "
"FROM %s AS coll, %s AS attr WHERE "
"coll.pid=coll.pid AND coll.oid=attr.oid "
"AND coll.pid = %llu AND coll.cid = %llu ",
llu(matches_cid), number, coll, attr, llu(pid),
llu(cid));
} else {
sprintf(select_stmt, "SELECT attr.oid FROM %s AS coll, "
"%s AS attr WHERE coll.pid = coll.pid AND "
"coll.oid = attr.oid AND coll.pid = %llu "
"AND coll.cid = %llu ",
coll, attr, llu(pid), llu(cid));
}
strncpy(cp, select_stmt, MAXSQLEN*factor);
sqlen += strlen(cp);
cp += sqlen;
Expand Down Expand Up @@ -123,13 +139,25 @@ int mtq_run_query(struct db_context *dbc, uint64_t pid, uint64_t cid,
}
cp = strcat(cp, " GROUP BY attr.oid ORDER BY 1;");

if (matches_cid != 0) {
char *SQL2 = Malloc(strlen(SQL) + 100);
if (SQL2 == NULL) {
ret = -ENOMEM;
goto out;
}
sprintf(SQL2, "INSERT INTO %s %s", coll, SQL);
free(SQL);
SQL = SQL2;
}

ret = sqlite3_prepare(dbc->db, SQL, strlen(SQL)+1, &stmt, NULL);
if (ret != SQLITE_OK) {
error_sql(dbc->db, "%s: sqlite3_prepare", __func__);
ret = -EIO;
goto out;
}

repeat:
/* bind the values */
pos = 1;
for (i = 0; i < qc->qc_cnt; i++) {
Expand Down Expand Up @@ -159,6 +187,13 @@ int mtq_run_query(struct db_context *dbc, uint64_t pid, uint64_t cid,
}
}

if (matches_cid != 0) {
ret = db_exec_dms(dbc, stmt, ret, __func__);
if (ret == OSD_REPEAT)
goto repeat;
goto out_finalize;
}

/* execute the query */
p = outdata;
p += ML_ODL_OFF;
Expand Down Expand Up @@ -442,10 +477,9 @@ int mtq_set_member_attrs(struct db_context *dbc, uint64_t pid, uint64_t cid,
char *SQL = NULL;
size_t sqlen = 0;
sqlite3_stmt *stmt = NULL;
const char *coll = coll_getname(dbc);
const char *attr = attr_getname(dbc);

assert(dbc && dbc->db && set_attr && coll && attr);
assert(dbc && dbc->db && set_attr && attr);

if (set_attr->sz == 0) {
ret = 0;
Expand All @@ -466,8 +500,9 @@ int mtq_set_member_attrs(struct db_context *dbc, uint64_t pid, uint64_t cid,

for (i = 0; i < set_attr->sz; i++) {
sprintf(cp, " SELECT %llu, oid, %u, %u, ? FROM %s "
" WHERE cid = %llu ", llu(pid), set_attr->le[i].page,
set_attr->le[i].number, coll, llu(cid));
" WHERE pid = %llu AND page = %u AND value = %llu",
llu(pid), set_attr->le[i].page, set_attr->le[i].number,
attr, llu(pid), USER_COLL_PG, llu(cid));
if (i < (set_attr->sz - 1))
cp = strcat(cp, " UNION ALL ");
sqlen += strlen(cp);
Expand Down
3 changes: 2 additions & 1 deletion osd-target/mtq.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@

int mtq_run_query(struct db_context *dbc, uint64_t pid, uint64_t cid,
struct query_criteria *qc, void *outdata,
uint32_t alloc_len, uint64_t *used_outlen);
uint32_t alloc_len, uint64_t *used_outlen,
uint64_t matches_cid);

int mtq_list_oids_attr(struct db_context *dbc, uint64_t pid,
uint64_t initial_oid, struct getattr_list *get_attr,
Expand Down
11 changes: 7 additions & 4 deletions osd-target/obj.c
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ int obj_initialize(struct db_context *dbc)
goto out;
}

sprintf(SQL, "INSERT INTO %s VALUES (?, ?, ?);", dbc->obj->name);
sprintf(SQL, "INSERT INTO %s VALUES (?, ?, ?, ?);", dbc->obj->name);
ret = sqlite3_prepare(dbc->db, SQL, -1, &dbc->obj->insert, NULL);
if (ret != SQLITE_OK)
goto out_finalize_insert;
Expand Down Expand Up @@ -131,7 +131,7 @@ int obj_initialize(struct db_context *dbc)
if (ret != SQLITE_OK)
goto out_finalize_pcount;

sprintf(SQL, "SELECT type FROM %s WHERE pid = ? AND oid = ?;",
sprintf(SQL, "SELECT type, coll_type FROM %s WHERE pid = ? AND oid = ?;",
dbc->obj->name);
ret = sqlite3_prepare(dbc->db, SQL, -1, &dbc->obj->gettype, NULL);
if (ret != SQLITE_OK)
Expand Down Expand Up @@ -240,7 +240,7 @@ const char *obj_getname(struct db_context *dbc)
* OSD_OK: success
*/
int obj_insert(struct db_context *dbc, uint64_t pid, uint64_t oid,
uint32_t type)
uint8_t type, uint8_t coll_type)
{
int ret = 0;

Expand All @@ -252,6 +252,7 @@ int obj_insert(struct db_context *dbc, uint64_t pid, uint64_t oid,
ret |= sqlite3_bind_int64(dbc->obj->insert, 1, pid);
ret |= sqlite3_bind_int64(dbc->obj->insert, 2, oid);
ret |= sqlite3_bind_int(dbc->obj->insert, 3, type);
ret |= sqlite3_bind_int(dbc->obj->insert, 4, coll_type);
ret = db_exec_dms(dbc, dbc->obj->insert, ret, __func__);
if (ret == OSD_REPEAT)
goto repeat;
Expand Down Expand Up @@ -493,7 +494,7 @@ int obj_pcount(struct db_context *dbc, uint64_t *pcount)
* obj_types set to the determined type.
*/
int obj_get_type(struct db_context *dbc, uint64_t pid, uint64_t oid,
uint8_t *obj_type)
uint8_t *obj_type, uint8_t *coll_type)
{
int ret = 0;
int bound = 0;
Expand All @@ -514,6 +515,8 @@ int obj_get_type(struct db_context *dbc, uint64_t pid, uint64_t oid,
while ((ret = sqlite3_step(dbc->obj->gettype)) == SQLITE_BUSY);
if (ret == SQLITE_ROW) {
*obj_type = sqlite3_column_int(dbc->obj->gettype, 0);
if (coll_type)
*coll_type = sqlite3_column_int(dbc->obj->gettype, 1);
} else if (ret == SQLITE_DONE) {
osd_debug("%s: object (%llu %llu) doesn't exist", __func__,
llu(pid), llu(oid));
Expand Down
4 changes: 2 additions & 2 deletions osd-target/obj.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ int obj_finalize(struct db_context *dbc);
const char *obj_getname(struct db_context *dbc);

int obj_insert(struct db_context *dbc, uint64_t pid, uint64_t oid,
uint32_t type);
uint8_t type, uint8_t coll_type);

int obj_delete(struct db_context *dbc, uint64_t pid, uint64_t oid);

Expand All @@ -46,7 +46,7 @@ int obj_isempty_pid(struct db_context *dbc, uint64_t pid, int *isempty);
int obj_pcount(struct db_context *dbc, uint64_t *pcount);

int obj_get_type(struct db_context *dbc, uint64_t pid, uint64_t oid,
uint8_t *obj_type);
uint8_t *obj_type, uint8_t *coll_type);

int obj_get_oids_in_pid(struct db_context *dbc, uint64_t pid,
uint64_t initial_oid, uint64_t alloc_len,
Expand Down
Loading

0 comments on commit a5f27ae

Please sign in to comment.