From 0508affcc8342b96480fa67655731bfa24dc2c12 Mon Sep 17 00:00:00 2001 From: "John A. Chandy" Date: Sun, 17 Mar 2013 19:27:49 -0400 Subject: [PATCH] Fixes to support multithreading Many changes to support multithreading in OSD - mainly in osd.c to allow separate database connections per thread. --- osd-target/db.c | 14 +++ osd-target/db.h | 2 + osd-target/osd-types.h | 25 ++-- osd-target/osd.c | 255 +++++++++++++++++++++++++++-------------- osd-util/osd-util.c | 2 +- 5 files changed, 193 insertions(+), 105 deletions(-) diff --git a/osd-target/db.c b/osd-target/db.c index f19a272..21ace0e 100644 --- a/osd-target/db.c +++ b/osd-target/db.c @@ -218,6 +218,20 @@ int db_finalize(struct db_context *dbc) return OSD_ERROR; } +static const char db_delete_sql[] = "DELETE * FROM obj;\n" + "DELETE * FROM attr;\n" + "DELETE * FROM coll;\n"; +int db_delete_data(struct db_context *dbc) +{ + char *err = NULL; + /* empty out the database */ + int ret = sqlite3_exec(dbc->db, db_delete_sql, NULL, NULL, &err); + if (ret != SQLITE_OK) { + sqlite3_free(err); + return OSD_ERROR; + } + return OSD_OK; +} int db_begin_txn(struct db_context *dbc) { diff --git a/osd-target/db.h b/osd-target/db.h index 5a9b4bd..0821f93 100644 --- a/osd-target/db.h +++ b/osd-target/db.h @@ -29,6 +29,8 @@ int db_initialize(struct db_context *dbc); int db_finalize(struct db_context *dbc); +int db_delete_data(struct db_context *dbc); + int db_begin_txn(struct db_context *dbc); int db_end_txn(struct db_context *dbc); diff --git a/osd-target/osd-types.h b/osd-target/osd-types.h index 1a10661..0cc0e4e 100644 --- a/osd-target/osd-types.h +++ b/osd-target/osd-types.h @@ -22,6 +22,7 @@ #include #include #include +#include #include "osd-util/osd-defs.h" @@ -145,6 +146,7 @@ enum { struct id_cache { uint64_t cur_pid; /* last pid referenced */ uint64_t next_id; /* next free oid/cid within partition (cur_pid) */ + pthread_mutex_t lock; }; struct buffer { @@ -179,25 +181,12 @@ struct db_context { struct attr_tab *attr; }; -/* - * 'osd_context' will replace 'osd_device' in future. Each osd context is a - * thread safe structure allocated as a private data for the thread. Thread - * specific state information will be contained in this struct - */ -struct osd_context { - struct db_context *dbc; - struct cur_cmd_attr_pg ccap; - struct id_cache ic; - struct id_list idl; -}; - - struct osd_device { - char *root; - struct db_context *dbc; - struct cur_cmd_attr_pg ccap; - struct id_cache ic; - struct id_list idl; + struct id_cache *ic; /* per process */ + char *root; /* per process */ + struct db_context *dbc; /* per thread */ + struct cur_cmd_attr_pg ccap; /* per thread */ + struct id_list idl; /* per thread */ }; enum { diff --git a/osd-target/osd.c b/osd-target/osd.c index ca3486e..1164be9 100644 --- a/osd-target/osd.c +++ b/osd-target/osd.c @@ -27,6 +27,7 @@ #include #include #include +#include #include @@ -93,6 +94,73 @@ static const char *dbname = "osd.db"; static const char *dfiles = "dfiles"; static const char *stranded = "stranded"; +static inline void osd_invalidate_id_cache(struct osd_device *osd) +{ + pthread_mutex_lock(&osd->ic->lock); + osd->ic->cur_pid = osd->ic->next_id = 0; + pthread_mutex_unlock(&osd->ic->lock); +} + +static int osd_get_nextoid(struct osd_device *osd, uint64_t pid, uint64_t *oid, + int numoid) +{ + int ret = 0; + pthread_mutex_lock(&osd->ic->lock); + + /* + * XXX: there should be a better way of getting next maximum + * oid using SQL itself + */ + if (osd->ic->cur_pid == pid) { /* cache hit */ + *oid = osd->ic->next_id; + osd->ic->next_id += numoid; + } else { + int ret = obj_get_nextoid(osd->dbc, pid, oid); + if (ret != 0) + goto out_err; + osd->ic->cur_pid = pid; + osd->ic->next_id = *oid + numoid; + } + if (*oid == 1) { + *oid = USEROBJECT_OID_LB; /* first oid in partition */ + osd->ic->next_id = *oid + numoid; + } + +out_err: + pthread_mutex_unlock(&osd->ic->lock); + return ret; +} + +static int osd_get_nextcid(struct osd_device *osd, uint64_t pid, uint64_t *cid) +{ + int ret = 0; + + pthread_mutex_lock(&osd->ic->lock); + + /* + * XXX: there should be a better way of getting next maximum + * cid using SQL itself + */ + if (osd->ic->cur_pid == pid) { /* cache hit */ + *cid = osd->ic->next_id; + osd->ic->next_id++; + } else { + ret = obj_get_nextoid(osd->dbc, pid, cid); + if (ret != 0) + goto out_err; + osd->ic->cur_pid = pid; + osd->ic->next_id = *cid + 1; + } + if (*cid == 1) { + *cid = COLLECTION_OID_LB; /* first oid in partition */ + osd->ic->next_id = *cid + 1; + } + +out_err: + pthread_mutex_unlock(&osd->ic->lock); + return ret; +} + static inline uint8_t get_obj_type(struct osd_device *osd, uint64_t pid, uint64_t oid) { @@ -1051,10 +1119,6 @@ static int osd_initialize_db(struct osd_device *osd) if (!osd) return -EINVAL; - memset(&osd->ccap, 0, sizeof(osd->ccap)); - memset(&osd->ic, 0, sizeof(osd->ic)); - memset(&osd->idl, 0, sizeof(osd->idl)); - /* tables already created by osd_db_open, so insertions can be done */ ret = obj_insert(osd->dbc, ROOT_PID, ROOT_OID, ROOT, -1); if (ret != SQLITE_OK) @@ -1086,6 +1150,9 @@ static int osd_initialize_db(struct osd_device *osd) ret = attr_set_attr(osd->dbc, ROOT_PID, ROOT_OID, PARTITION_PG + 1, 1, &pid, sizeof(pid)); + memset(&osd->ccap, 0, sizeof(osd->ccap)); + memset(&osd->idl, 0, sizeof(osd->idl)); + out: return ret; } @@ -1095,6 +1162,7 @@ struct osd_device *osd_device_alloc(void) struct osd_device *osd; osd = malloc(sizeof(*osd)); + return osd; } @@ -1103,7 +1171,19 @@ void osd_device_free(struct osd_device *osd) free(osd); } -int osd_open(const char *root, struct osd_device *osd) +static pthread_mutex_t osd_open_lock = PTHREAD_MUTEX_INITIALIZER; + +/* _osd_open initializes the parts of the osd_device that are common + to all threads - the root path, the id cache, and also the + directory structure (if necessary). The db_context is per-thread + so that we have separate connections per thread. While we could + have one connection shared amongst therads, this doesn't allow + transactions, because transactions could get mixed on the same + connection. _osd_open is called by a single thread with the help + of the osd_open_lock. +*/ + +static int _osd_open(const char *root, struct osd_device *osd, int connect_to_db) { int i = 0; int ret = 0; @@ -1119,8 +1199,6 @@ int osd_open(const char *root, struct osd_device *osd) goto out; } - memset(osd, 0, sizeof(*osd)); - /* test if root exists and is a directory */ ret = create_dir(root); if (ret != 0) { @@ -1156,6 +1234,11 @@ int osd_open(const char *root, struct osd_device *osd) goto out; } + if (!connect_to_db) { + ret = 1; + goto init_db; + } + /* create 'md' sub-directory */ sprintf(path, "%s/%s/", root, md); ret = create_dir(path); @@ -1178,6 +1261,7 @@ int osd_open(const char *root, struct osd_device *osd) osd_error("!osd_db_open(%s)", path); goto out; } +init_db: if (ret == 1) { ret = osd_initialize_db(osd); if (ret != 0) { @@ -1186,13 +1270,55 @@ int osd_open(const char *root, struct osd_device *osd) } } ret = db_exec_pragma(osd->dbc); + + osd_invalidate_id_cache(osd); out: if (ret != 0) - osd_error("!db_exec_pragma => %d", ret); + osd_error("!_osd_open => %d", ret); return ret; } +/* this routine initializes thread-specific data */ +int osd_open(const char *root, struct osd_device *osd) +{ + int ret = 0; + static char * _root = NULL; + static struct id_cache _ic; + + memset(osd, 0, sizeof(*osd)); + osd->ic = &_ic; + + /* use osd_open_lock to make sure only one thread calls + _osd_open. We could use pthread_once() but that doesn't + allow init_routines with arguments. + */ + pthread_mutex_lock(&osd_open_lock); + if (!_root) { + ret = _osd_open(root, osd, 1); + _root = osd->root; + pthread_mutex_init(&osd->ic->lock, NULL); + } else { + char path[MAXNAMELEN]; + + osd->root = _root; + + get_dbname(path, root); + + ret = osd_db_open(path, osd); + if (ret != 0) { + osd_error("!osd_db_open(%s)", path); + goto out; + } + } + pthread_mutex_unlock(&osd_open_lock); + + memset(&osd->ccap, 0, sizeof(osd->ccap)); + memset(&osd->idl, 0, sizeof(osd->idl)); +out: + return ret; +} + int osd_set_name(struct osd_device *osd, char *osdname) { int ret = 0; @@ -1661,20 +1787,9 @@ int osd_copy_user_objects(struct osd_device *osd, uint64_t pid, uint64_t request goto out_cdb_err; if (requested_oid == 0) { - if (osd->ic.cur_pid == pid) { /* cache hit */ - oid = osd->ic.next_id; - osd->ic.next_id++; - } else { - ret = obj_get_nextoid(osd->dbc, pid, &oid); - if (ret != 0) - goto out_hw_err; - osd->ic.cur_pid = pid; - osd->ic.next_id = oid + 1; - } - if (oid == 1) { - oid = USEROBJECT_OID_LB; /* first oid in partition */ - osd->ic.next_id = oid + 1; - } + ret = osd_get_nextoid(osd, pid, &oid, 1); + if (ret != 0) + goto out_hw_err; } else { ret = obj_ispresent(osd->dbc, pid, requested_oid, &present); if (ret != OSD_OK || present) @@ -1682,7 +1797,7 @@ int osd_copy_user_objects(struct osd_device *osd, uint64_t pid, uint64_t request oid = requested_oid; /* requested_oid works! */ /*XXX: invalidate cache */ - osd->ic.cur_pid = osd->ic.next_id = 0; + osd_invalidate_id_cache(osd); } if (dupl_method == DEFAULT) { @@ -1703,7 +1818,7 @@ int osd_copy_user_objects(struct osd_device *osd, uint64_t pid, uint64_t request OSD_ASC_INVALID_FIELD_IN_CDB, pid, requested_oid); out_hw_err: - osd->ic.cur_pid = osd->ic.next_id = 0; /* invalidate cache */ + osd_invalidate_id_cache(osd); return sense_build_sdd(sense, OSD_SSK_HARDWARE_ERROR, OSD_ASC_INVALID_FIELD_IN_CDB, pid, requested_oid); @@ -1739,38 +1854,22 @@ int osd_create(struct osd_device *osd, uint64_t pid, uint64_t requested_oid, if (numoid > 1 && requested_oid != 0) goto out_illegal_req; + if (numoid == 0) + numoid = 1; /* create atleast one object */ + if (requested_oid == 0) { - /* - * XXX: there should be a better way of getting next maximum - * oid using SQL itself - */ - if (osd->ic.cur_pid == pid) { /* cache hit */ - oid = osd->ic.next_id; - osd->ic.next_id++; - } else { - ret = obj_get_nextoid(osd->dbc, pid, &oid); - if (ret != 0) - goto out_hw_err; - osd->ic.cur_pid = pid; - osd->ic.next_id = oid + 1; - } - if (oid == 1) { - oid = USEROBJECT_OID_LB; /* first oid in partition */ - osd->ic.next_id = oid + 1; - } + ret = osd_get_nextoid(osd, pid, &oid, numoid); + if (ret != 0) + goto out_hw_err; } else { ret = obj_ispresent(osd->dbc, pid, requested_oid, &present); if (ret != OSD_OK || present) goto out_illegal_req; /* requested_oid exists! */ oid = requested_oid; /* requested_oid works! */ - /*XXX: invalidate cache */ - osd->ic.cur_pid = osd->ic.next_id = 0; + osd_invalidate_id_cache(osd); } - if (numoid == 0) - numoid = 1; /* create atleast one object */ - for (i = oid; i < (oid + numoid); i++) { ret = obj_insert(osd->dbc, pid, i, USEROBJECT, -1); if (ret != 0) { @@ -1798,7 +1897,6 @@ int osd_create(struct osd_device *osd, uint64_t pid, uint64_t requested_oid, } #endif } - osd->ic.next_id += (numoid - 1); /* fill CCAP with highest oid, osd2r00 Sec 6.3, 3rd last para */ fill_ccap(&osd->ccap, NULL, USEROBJECT, pid, (oid+numoid-1), 0); @@ -1811,7 +1909,7 @@ int osd_create(struct osd_device *osd, uint64_t pid, uint64_t requested_oid, pid, requested_oid); out_hw_err: - osd->ic.cur_pid = osd->ic.next_id = 0; /* invalidate cache */ + osd_invalidate_id_cache(osd); return sense_build_sdd(sense, OSD_SSK_HARDWARE_ERROR, OSD_ASC_INVALID_FIELD_IN_CDB, pid, requested_oid); @@ -1877,20 +1975,9 @@ int osd_create_collection(struct osd_device *osd, uint64_t pid, * XXX: there should be a better way of getting next maximum * oid using SQL itself */ - if (osd->ic.cur_pid == pid) { /* cache hit */ - cid = osd->ic.next_id; - osd->ic.next_id++; - } else { - ret = obj_get_nextoid(osd->dbc, pid, &cid); - if (ret != 0) - goto out_hw_err; - osd->ic.cur_pid = pid; - osd->ic.next_id = cid + 1; - } - if (cid == 1) { - cid = COLLECTION_OID_LB; /* first id in partition */ - osd->ic.next_id = cid + 1; - } + ret = osd_get_nextcid(osd, pid, &cid); + if (ret != 0) + goto out_hw_err; } else { /* Make sure requested_cid doesn't already exist */ ret = obj_ispresent(osd->dbc, pid, requested_cid, &present); @@ -1899,7 +1986,7 @@ int osd_create_collection(struct osd_device *osd, uint64_t pid, cid = requested_cid; /*XXX: invalidate cache */ - osd->ic.cur_pid = osd->ic.next_id = 0; + osd_invalidate_id_cache(osd); } /* if cid already exists, obj_insert will fail */ @@ -1918,7 +2005,7 @@ int osd_create_collection(struct osd_device *osd, uint64_t pid, OSD_ASC_INVALID_FIELD_IN_CDB, requested_cid, 0); out_hw_err: - osd->ic.cur_pid = osd->ic.next_id = 0; /* invalidate cache */ + osd_invalidate_id_cache(osd); return sense_build_sdd(sense, OSD_SSK_HARDWARE_ERROR, OSD_ASC_INVALID_FIELD_IN_CDB, requested_cid, 0); @@ -2042,21 +2129,10 @@ int osd_create_user_tracking_collection(struct osd_device *osd, uint64_t pid, } if (requested_cid == 0) { - - if (osd->ic.cur_pid == pid) { /* cache hit */ - cid = osd->ic.next_id; - osd->ic.next_id++; - }else { - ret = obj_get_nextoid(osd->dbc, pid, &cid); - if (ret != 0) - goto out_hw_err; - osd->ic.cur_pid = pid; - osd->ic.next_id = cid + 1; - } - if (cid == 1) { - cid = COLLECTION_OID_LB; /* first id in partition */ - osd->ic.next_id = cid + 1; - } + + ret = osd_get_nextcid(osd, pid, &cid); + if (ret != 0) + goto out_hw_err; } else { @@ -2067,7 +2143,7 @@ int osd_create_user_tracking_collection(struct osd_device *osd, uint64_t pid, cid = requested_cid; /*XXX: invalidate cache */ - osd->ic.cur_pid = osd->ic.next_id = 0; + osd_invalidate_id_cache(osd); } if (source_cid != 0) { @@ -2091,7 +2167,7 @@ int osd_create_user_tracking_collection(struct osd_device *osd, uint64_t pid, OSD_ASC_INVALID_FIELD_IN_CDB, requested_cid, 0); out_hw_err: - osd->ic.cur_pid = osd->ic.next_id = 0; + osd_invalidate_id_cache(osd); return sense_build_sdd(sense, OSD_SSK_HARDWARE_ERROR, OSD_ASC_INVALID_FIELD_IN_CDB, requested_cid, 0); @@ -2230,6 +2306,10 @@ int osd_format_osd(struct osd_device *osd, uint64_t capacity, uint32_t cdb_cont_ goto create; } + /* do not close the OSD or empty the md directory - just + delete the data from the db. if we close the OSD, then the + dbc pointers in other threads will no longer be valid + ret = osd_close(osd); if (ret) { osd_error("%s: DB close failed, ret %d", __func__, ret); @@ -2242,6 +2322,8 @@ int osd_format_osd(struct osd_device *osd, uint64_t capacity, uint32_t cdb_cont_ osd_error("%s: empty_dir %s failed", __func__, path); goto out_sense; } + */ + db_delete_data(osd->dbc); sprintf(path, "%s/%s", root, stranded); ret = empty_dir(path); @@ -2260,11 +2342,12 @@ int osd_format_osd(struct osd_device *osd, uint64_t capacity, uint32_t cdb_cont_ #endif create: - ret = osd_open(root, osd); /* will create files/dirs under root */ + ret = _osd_open(root, osd, 0); /* will create files/dirs under root */ if (ret != 0) { osd_error("%s: osd_open %s failed", __func__, root); goto out_sense; } + memset(&osd->ccap, 0, sizeof(osd->ccap)); /* reset ccap */ ret = OSD_OK; goto out; @@ -3484,7 +3567,7 @@ int osd_remove(struct osd_device *osd, uint64_t pid, uint64_t oid, goto out_cdb_err; /* XXX: invalidate ic_cache immediately */ - osd->ic.cur_pid = osd->ic.next_id = 0; + osd_invalidate_id_cache(osd); /* if userobject is absent unlink will fail */ get_dfile_name(path, osd->root, pid, oid); @@ -3545,7 +3628,7 @@ int osd_remove_collection(struct osd_device *osd, uint64_t pid, uint64_t cid, goto out_cdb_err; /* XXX: invalidate ic_cache */ - osd->ic.cur_pid = osd->ic.next_id = 0; + osd_invalidate_id_cache(osd); ret = coll_isempty_cid(osd->dbc, pid, cid, &isempty); if (ret != OSD_OK) @@ -3619,7 +3702,7 @@ int osd_remove_partition(struct osd_device *osd, uint64_t pid, uint32_t cdb_cont goto out_not_empty; /* XXX: invalidate ic_cache */ - osd->ic.cur_pid = osd->ic.next_id = 0; + osd_invalidate_id_cache(osd); ret = attr_delete_all(osd->dbc, pid, PARTITION_OID); if (ret != 0) diff --git a/osd-util/osd-util.c b/osd-util/osd-util.c index 757fd5f..dab3028 100644 --- a/osd-util/osd-util.c +++ b/osd-util/osd-util.c @@ -104,7 +104,7 @@ osd_error(const char *fmt, ...) va_start(ap, fmt); vfprintf(stderr, fmt, ap); va_end(ap); - fprintf(stderr, ".\n"); + fprintf(stderr, " (%s).\n", strerror(errno)); } /*