diff options
Diffstat (limited to 'db2/btree/bt_recno.c')
-rw-r--r-- | db2/btree/bt_recno.c | 975 |
1 files changed, 534 insertions, 441 deletions
diff --git a/db2/btree/bt_recno.c b/db2/btree/bt_recno.c index 38dbbd1c55..c69877ff7f 100644 --- a/db2/btree/bt_recno.c +++ b/db2/btree/bt_recno.c @@ -8,7 +8,7 @@ #include "config.h" #ifndef lint -static const char sccsid[] = "@(#)bt_recno.c 10.37 (Sleepycat) 5/23/98"; +static const char sccsid[] = "@(#)bt_recno.c 10.53 (Sleepycat) 12/11/98"; #endif /* not lint */ #ifndef NO_SYSTEM_INCLUDES @@ -22,64 +22,89 @@ static const char sccsid[] = "@(#)bt_recno.c 10.37 (Sleepycat) 5/23/98"; #include "db_int.h" #include "db_page.h" #include "btree.h" - -static int __ram_add __P((DB *, db_recno_t *, DBT *, u_int32_t, u_int32_t)); -static int __ram_c_close __P((DBC *)); -static int __ram_c_del __P((DBC *, u_int32_t)); -static int __ram_c_get __P((DBC *, DBT *, DBT *, u_int32_t)); -static int __ram_c_put __P((DBC *, DBT *, DBT *, u_int32_t)); -static int __ram_fmap __P((DB *, db_recno_t)); -static int __ram_get __P((DB *, DB_TXN *, DBT *, DBT *, u_int32_t)); -static int __ram_iget __P((DB *, DBT *, DBT *)); +#include "db_ext.h" +#include "shqueue.h" +#include "db_shash.h" +#include "lock.h" +#include "lock_ext.h" + +static int __ram_add __P((DBC *, db_recno_t *, DBT *, u_int32_t, u_int32_t)); +static int __ram_delete __P((DB *, DB_TXN *, DBT *, u_int32_t)); +static int __ram_fmap __P((DBC *, db_recno_t)); +static int __ram_i_delete __P((DBC *)); static int __ram_put __P((DB *, DB_TXN *, DBT *, DBT *, u_int32_t)); static int __ram_source __P((DB *, RECNO *, const char *)); static int __ram_sync __P((DB *, u_int32_t)); -static int __ram_update __P((DB *, db_recno_t, int)); -static int __ram_vmap __P((DB *, db_recno_t)); -static int __ram_writeback __P((DB *)); +static int __ram_update __P((DBC *, db_recno_t, int)); +static int __ram_vmap __P((DBC *, db_recno_t)); +static int __ram_writeback __P((DBC *)); /* - * If we're renumbering records, then we have to detect in the cursor that a - * record was deleted, and adjust the cursor as necessary. If not renumbering - * records, then we can detect this by looking at the actual record, so we - * ignore the cursor delete flag. + * In recno, there are two meanings to the on-page "deleted" flag. If we're + * re-numbering records, it means the record was implicitly created. We skip + * over implicitly created records if doing a cursor "next" or "prev", and + * return DB_KEYEMPTY if they're explicitly requested.. If not re-numbering + * records, it means that the record was implicitly created, or was deleted. + * We skip over implicitly created or deleted records if doing a cursor "next" + * or "prev", and return DB_KEYEMPTY if they're explicitly requested. + * + * If we're re-numbering records, then we have to detect in the cursor that + * a record was deleted, and adjust the cursor as necessary on the next get. + * If we're not re-numbering records, then we can detect that a record has + * been deleted by looking at the actual on-page record, so we completely + * ignore the cursor's delete flag. This is different from the B+tree code. + * It also maintains whether the cursor references a deleted record in the + * cursor, and it doesn't always check the on-page value. */ #define CD_SET(dbp, cp) { \ if (F_ISSET(dbp, DB_RE_RENUMBER)) \ - F_SET(cp, CR_DELETED); \ + F_SET(cp, C_DELETED); \ } #define CD_CLR(dbp, cp) { \ if (F_ISSET(dbp, DB_RE_RENUMBER)) \ - F_CLR(cp, CR_DELETED); \ + F_CLR(cp, C_DELETED); \ } #define CD_ISSET(dbp, cp) \ - (F_ISSET(dbp, DB_RE_RENUMBER) && F_ISSET(cp, CR_DELETED)) + (F_ISSET(dbp, DB_RE_RENUMBER) && F_ISSET(cp, C_DELETED)) /* * __ram_open -- * Recno open function. * - * PUBLIC: int __ram_open __P((DB *, DBTYPE, DB_INFO *)); + * PUBLIC: int __ram_open __P((DB *, DB_INFO *)); */ int -__ram_open(dbp, type, dbinfo) +__ram_open(dbp, dbinfo) DB *dbp; - DBTYPE type; DB_INFO *dbinfo; { BTREE *t; + DBC *dbc; RECNO *rp; - int ret; - - COMPQUIET(type, DB_RECNO); + int ret, t_ret; - ret = 0; + /* Allocate and initialize the private btree structure. */ + if ((ret = __os_calloc(1, sizeof(BTREE), &t)) != 0) + return (ret); + dbp->internal = t; + __bam_setovflsize(dbp); - /* Allocate and initialize the private RECNO structure. */ - if ((rp = (RECNO *)__db_calloc(1, sizeof(*rp))) == NULL) - return (ENOMEM); + /* Allocate and initialize the private recno structure. */ + if ((ret = __os_calloc(1, sizeof(*rp), &rp)) != 0) + return (ret); + /* Link in the private recno structure. */ + t->recno = rp; - if (dbinfo != NULL) { + /* + * Intention is to make sure all of the user's selections are okay + * here and then use them without checking. + */ + if (dbinfo == NULL) { + rp->re_delim = '\n'; + rp->re_pad = ' '; + rp->re_fd = -1; + F_SET(rp, RECNO_EOF); + } else { /* * If the user specified a source tree, open it and map it in. * @@ -111,31 +136,40 @@ __ram_open(dbp, type, dbinfo) } } else rp->re_len = 0; - } else { - rp->re_delim = '\n'; - rp->re_pad = ' '; - rp->re_fd = -1; - F_SET(rp, RECNO_EOF); } - /* Open the underlying btree. */ - if ((ret = __bam_open(dbp, DB_RECNO, dbinfo)) != 0) - goto err; - - /* Set the routines necessary to make it look like a recno tree. */ - dbp->cursor = __ram_cursor; + /* Initialize the remaining fields/methods of the DB. */ + dbp->am_close = __ram_close; dbp->del = __ram_delete; - dbp->get = __ram_get; dbp->put = __ram_put; + dbp->stat = __bam_stat; dbp->sync = __ram_sync; - /* Link in the private recno structure. */ - ((BTREE *)dbp->internal)->bt_recno = rp; + /* Start up the tree. */ + if ((ret = __bam_read_root(dbp)) != 0) + goto err; + + /* Set the overflow page size. */ + __bam_setovflsize(dbp); /* If we're snapshotting an underlying source file, do it now. */ - if (dbinfo != NULL && F_ISSET(dbinfo, DB_SNAPSHOT)) - if ((ret = __ram_snapshot(dbp)) != 0 && ret != DB_NOTFOUND) + if (dbinfo != NULL && F_ISSET(dbinfo, DB_SNAPSHOT)) { + /* Allocate a cursor. */ + if ((ret = dbp->cursor(dbp, NULL, &dbc, 0)) != 0) + goto err; + + /* Do the snapshot. */ + if ((ret = __ram_update(dbc, + DB_MAX_RECORDS, 0)) != 0 && ret == DB_NOTFOUND) + ret = 0; + + /* Discard the cursor. */ + if ((t_ret = dbc->c_close(dbc)) != 0 && ret == 0) + ret = t_ret; + + if (ret != 0) goto err; + } return (0); @@ -145,143 +179,169 @@ err: /* If we mmap'd a source file, discard it. */ /* If we opened a source file, discard it. */ if (rp->re_fd != -1) - (void)__db_close(rp->re_fd); + (void)__os_close(rp->re_fd); if (rp->re_source != NULL) - FREES(rp->re_source); - - /* If we allocated room for key/data return, discard it. */ - t = dbp->internal; - if (t != NULL && t->bt_rkey.data != NULL) - __db_free(t->bt_rkey.data); + __os_freestr(rp->re_source); - FREE(rp, sizeof(*rp)); + __os_free(rp, sizeof(*rp)); return (ret); } /* - * __ram_cursor -- - * Recno db->cursor function. - * - * PUBLIC: int __ram_cursor __P((DB *, DB_TXN *, DBC **)); + * __ram_delete -- + * Recno db->del function. */ -int -__ram_cursor(dbp, txn, dbcp) +static int +__ram_delete(dbp, txn, key, flags) DB *dbp; DB_TXN *txn; - DBC **dbcp; + DBT *key; + u_int32_t flags; { - RCURSOR *cp; + CURSOR *cp; DBC *dbc; + db_recno_t recno; + int ret, t_ret; - DEBUG_LWRITE(dbp, txn, "ram_cursor", NULL, NULL, 0); - - if ((dbc = (DBC *)__db_calloc(1, sizeof(DBC))) == NULL) - return (ENOMEM); - if ((cp = (RCURSOR *)__db_calloc(1, sizeof(RCURSOR))) == NULL) { - __db_free(dbc); - return (ENOMEM); - } - - cp->dbc = dbc; - cp->recno = RECNO_OOB; - - dbc->dbp = dbp; - dbc->txn = txn; - dbc->internal = cp; - dbc->c_close = __ram_c_close; - dbc->c_del = __ram_c_del; - dbc->c_get = __ram_c_get; - dbc->c_put = __ram_c_put; - - /* - * All cursors are queued from the master DB structure. Add the - * cursor to that queue. - */ - CURSOR_SETUP(dbp); - TAILQ_INSERT_HEAD(&dbp->curs_queue, dbc, links); - CURSOR_TEARDOWN(dbp); + DB_PANIC_CHECK(dbp); - *dbcp = dbc; - return (0); -} + /* Check for invalid flags. */ + if ((ret = __db_delchk(dbp, + key, flags, F_ISSET(dbp, DB_AM_RDONLY))) != 0) + return (ret); -/* - * __ram_get -- - * Recno db->get function. - */ -static int -__ram_get(argdbp, txn, key, data, flags) - DB *argdbp; - DB_TXN *txn; - DBT *key, *data; - u_int32_t flags; -{ - DB *dbp; - int ret; + /* Acquire a cursor. */ + if ((ret = dbp->cursor(dbp, txn, &dbc, DB_WRITELOCK)) != 0) + return (ret); - DEBUG_LWRITE(argdbp, txn, "ram_get", key, NULL, flags); + DEBUG_LWRITE(dbc, txn, "ram_delete", key, NULL, flags); - /* Check for invalid flags. */ - if ((ret = __db_getchk(argdbp, key, data, flags)) != 0) - return (ret); + /* Check the user's record number and fill in as necessary. */ + if ((ret = __ram_getno(dbc, key, &recno, 0)) != 0) + goto err; - GETHANDLE(argdbp, txn, &dbp, ret); + /* Do the delete. */ + cp = dbc->internal; + cp->recno = recno; + ret = __ram_i_delete(dbc); - ret = __ram_iget(dbp, key, data); + /* Release the cursor. */ +err: if ((t_ret = dbc->c_close(dbc)) != 0 && ret == 0) + ret = t_ret; - PUTHANDLE(dbp); return (ret); } /* - * __ram_iget -- - * Internal ram get function, called for both standard and cursor - * get after the flags have been checked. + * __ram_i_delete -- + * Internal version of recno delete, called by __ram_delete and + * __ram_c_del. */ static int -__ram_iget(dbp, key, data) - DB *dbp; - DBT *key, *data; +__ram_i_delete(dbc) + DBC *dbc; { + BKEYDATA bk; BTREE *t; + CURSOR *cp; + DB *dbp; + DBT hdr, data; PAGE *h; db_indx_t indx; - db_recno_t recno; int exact, ret, stack; - stack = 0; + dbp = dbc->dbp; + cp = dbc->internal; t = dbp->internal; + stack = 0; - /* Check the user's record number and fill in as necessary. */ - if ((ret = __ram_getno(dbp, key, &recno, 0)) != 0) - goto done; + /* + * If this is CDB and this isn't a write cursor, then it's an error. + * If it is a write cursor, but we don't yet hold the write lock, then + * we need to upgrade to the write lock. + */ + if (F_ISSET(dbp, DB_AM_CDB)) { + /* Make sure it's a valid update cursor. */ + if (!F_ISSET(dbc, DBC_RMW | DBC_WRITER)) + return (EINVAL); + + if (F_ISSET(dbc, DBC_RMW) && + (ret = lock_get(dbp->dbenv->lk_info, dbc->locker, + DB_LOCK_UPGRADE, &dbc->lock_dbt, DB_LOCK_WRITE, + &dbc->mylock)) != 0) + return (EAGAIN); + } - /* Search the tree for the record. */ - if ((ret = __bam_rsearch(dbp, &recno, S_FIND, 1, &exact)) != 0) - goto done; - if (!exact) - return (DB_NOTFOUND); + /* Search the tree for the key; delete only deletes exact matches. */ + if ((ret = __bam_rsearch(dbc, &cp->recno, S_DELETE, 1, &exact)) != 0) + goto err; + if (!exact) { + ret = DB_NOTFOUND; + goto err; + } stack = 1; - h = t->bt_csp->page; - indx = t->bt_csp->indx; + h = cp->csp->page; + indx = cp->csp->indx; - /* If the record has already been deleted, we couldn't have found it. */ + /* + * If re-numbering records, the on-page deleted flag can only mean + * that this record was implicitly created. Applications aren't + * permitted to delete records they never created, return an error. + * + * If not re-numbering records, the on-page deleted flag means that + * this record was implicitly created, or, was deleted at some time. + * The former is an error because applications aren't permitted to + * delete records they never created, the latter is an error because + * if the record was "deleted", we could never have found it. + */ if (B_DISSET(GET_BKEYDATA(h, indx)->type)) { ret = DB_KEYEMPTY; - goto done; + goto err; } - /* Return the data item. */ - ret = __db_ret(dbp, - h, indx, data, &t->bt_rdata.data, &t->bt_rdata.ulen); - ++t->lstat.bt_get; + if (F_ISSET(dbp, DB_RE_RENUMBER)) { + /* Delete the item, adjust the counts, adjust the cursors. */ + if ((ret = __bam_ditem(dbc, h, indx)) != 0) + goto err; + __bam_adjust(dbc, -1); + __ram_ca(dbp, cp->recno, CA_DELETE); + + /* + * If the page is empty, delete it. The whole tree is locked + * so there are no preparations to make. + */ + if (NUM_ENT(h) == 0 && h->pgno != PGNO_ROOT) { + stack = 0; + ret = __bam_dpages(dbc); + } + } else { + /* Use a delete/put pair to replace the record with a marker. */ + if ((ret = __bam_ditem(dbc, h, indx)) != 0) + goto err; + + B_TSET(bk.type, B_KEYDATA, 1); + bk.len = 0; + memset(&hdr, 0, sizeof(hdr)); + hdr.data = &bk; + hdr.size = SSZA(BKEYDATA, data); + memset(&data, 0, sizeof(data)); + data.data = (char *)""; + data.size = 0; + if ((ret = __db_pitem(dbc, + h, indx, BKEYDATA_SIZE(0), &hdr, &data)) != 0) + goto err; + } + F_SET(t->recno, RECNO_MODIFIED); -done: /* Discard the stack. */ - if (stack) - __bam_stkrel(dbp); +err: if (stack) + __bam_stkrel(dbc, 0); + /* If we upgraded the CDB lock upon entry; downgrade it now. */ + if (F_ISSET(dbp, DB_AM_CDB) && F_ISSET(dbc, DBC_RMW)) + (void)__lock_downgrade(dbp->dbenv->lk_info, dbc->mylock, + DB_LOCK_IWRITE, 0); return (ret); } @@ -290,46 +350,50 @@ done: /* Discard the stack. */ * Recno db->put function. */ static int -__ram_put(argdbp, txn, key, data, flags) - DB *argdbp; +__ram_put(dbp, txn, key, data, flags) + DB *dbp; DB_TXN *txn; DBT *key, *data; u_int32_t flags; { - BTREE *t; - DB *dbp; + DBC *dbc; db_recno_t recno; - int ret; + int ret, t_ret; - DEBUG_LWRITE(argdbp, txn, "ram_put", key, data, flags); + DB_PANIC_CHECK(dbp); /* Check for invalid flags. */ - if ((ret = __db_putchk(argdbp, - key, data, flags, F_ISSET(argdbp, DB_AM_RDONLY), 0)) != 0) + if ((ret = __db_putchk(dbp, + key, data, flags, F_ISSET(dbp, DB_AM_RDONLY), 0)) != 0) + return (ret); + + /* Allocate a cursor. */ + if ((ret = dbp->cursor(dbp, txn, &dbc, DB_WRITELOCK)) != 0) return (ret); - GETHANDLE(argdbp, txn, &dbp, ret); + DEBUG_LWRITE(dbc, txn, "ram_put", key, data, flags); /* * If we're appending to the tree, make sure we've read in all of * the backing source file. Otherwise, check the user's record * number and fill in as necessary. */ - ret = LF_ISSET(DB_APPEND) ? - __ram_snapshot(dbp) : __ram_getno(dbp, key, &recno, 1); + ret = flags == DB_APPEND ? + __ram_update(dbc, DB_MAX_RECORDS, 0) : + __ram_getno(dbc, key, &recno, 1); /* Add the record. */ if (ret == 0) - ret = __ram_add(dbp, &recno, data, flags, 0); + ret = __ram_add(dbc, &recno, data, flags, 0); - /* If we're appending to the tree, we have to return the record. */ - if (ret == 0 && LF_ISSET(DB_APPEND)) { - t = dbp->internal; - ret = __db_retcopy(key, &recno, sizeof(recno), - &t->bt_rkey.data, &t->bt_rkey.ulen, dbp->db_malloc); - } + /* Discard the cursor. */ + if ((t_ret = dbc->c_close(dbc)) != 0 && ret == 0) + ret = t_ret; + + /* Return the record number if we're appending to the tree. */ + if (ret == 0 && flags == DB_APPEND) + *(db_recno_t *)key->data = recno; - PUTHANDLE(dbp); return (ret); } @@ -338,23 +402,35 @@ __ram_put(argdbp, txn, key, data, flags) * Recno db->sync function. */ static int -__ram_sync(argdbp, flags) - DB *argdbp; +__ram_sync(dbp, flags) + DB *dbp; u_int32_t flags; { - DB *dbp; - int ret; + DBC *dbc; + int ret, t_ret; - DEBUG_LWRITE(argdbp, NULL, "ram_sync", NULL, NULL, flags); + /* + * Sync the underlying btree. + * + * !!! + * We don't need to do a panic check or flags check, the "real" + * sync function does all that for us. + */ + if ((ret = __db_sync(dbp, flags)) != 0) + return (ret); - /* Sync the underlying btree. */ - if ((ret = __bam_sync(argdbp, flags)) != 0) + /* Allocate a cursor. */ + if ((ret = dbp->cursor(dbp, NULL, &dbc, 0)) != 0) return (ret); + DEBUG_LWRITE(dbc, NULL, "ram_sync", NULL, NULL, flags); + /* Copy back the backing source file. */ - GETHANDLE(argdbp, NULL, &dbp, ret); - ret = __ram_writeback(dbp); - PUTHANDLE(dbp); + ret = __ram_writeback(dbc); + + /* Discard the cursor. */ + if ((t_ret = dbc->c_close(dbc)) != 0 && ret == 0) + ret = t_ret; return (ret); } @@ -366,14 +442,12 @@ __ram_sync(argdbp, flags) * PUBLIC: int __ram_close __P((DB *)); */ int -__ram_close(argdbp) - DB *argdbp; +__ram_close(dbp) + DB *dbp; { RECNO *rp; - DEBUG_LWRITE(argdbp, NULL, "ram_close", NULL, NULL, 0); - - rp = ((BTREE *)argdbp->internal)->bt_recno; + rp = ((BTREE *)dbp->internal)->recno; /* Close any underlying mmap region. */ if (rp->re_smap != NULL) @@ -381,136 +455,133 @@ __ram_close(argdbp) /* Close any backing source file descriptor. */ if (rp->re_fd != -1) - (void)__db_close(rp->re_fd); + (void)__os_close(rp->re_fd); /* Free any backing source file name. */ if (rp->re_source != NULL) - FREES(rp->re_source); + __os_freestr(rp->re_source); /* Free allocated memory. */ - FREE(rp, sizeof(RECNO)); - ((BTREE *)argdbp->internal)->bt_recno = NULL; + __os_free(rp, sizeof(RECNO)); + ((BTREE *)dbp->internal)->recno = NULL; /* Close the underlying btree. */ - return (__bam_close(argdbp)); -} - -/* - * __ram_c_close -- - * Recno cursor->close function. - */ -static int -__ram_c_close(dbc) - DBC *dbc; -{ - DEBUG_LWRITE(dbc->dbp, dbc->txn, "ram_c_close", NULL, NULL, 0); - - return (__ram_c_iclose(dbc->dbp, dbc)); -} - -/* - * __ram_c_iclose -- - * Close a single cursor -- internal version. - * - * PUBLIC: int __ram_c_iclose __P((DB *, DBC *)); - */ -int -__ram_c_iclose(dbp, dbc) - DB *dbp; - DBC *dbc; -{ - /* Remove the cursor from the queue. */ - CURSOR_SETUP(dbp); - TAILQ_REMOVE(&dbp->curs_queue, dbc, links); - CURSOR_TEARDOWN(dbp); - - /* Discard the structures. */ - FREE(dbc->internal, sizeof(RCURSOR)); - FREE(dbc, sizeof(DBC)); - - return (0); + return (__bam_close(dbp)); } /* * __ram_c_del -- * Recno cursor->c_del function. + * + * PUBLIC: int __ram_c_del __P((DBC *, u_int32_t)); */ -static int +int __ram_c_del(dbc, flags) DBC *dbc; u_int32_t flags; { - DBT key; - RCURSOR *cp; + CURSOR *cp; + DB *dbp; int ret; - DEBUG_LWRITE(dbc->dbp, dbc->txn, "ram_c_del", NULL, NULL, flags); - + dbp = dbc->dbp; cp = dbc->internal; + DB_PANIC_CHECK(dbp); + /* Check for invalid flags. */ - if ((ret = __db_cdelchk(dbc->dbp, flags, - F_ISSET(dbc->dbp, DB_AM_RDONLY), cp->recno != RECNO_OOB)) != 0) + if ((ret = __db_cdelchk(dbp, flags, + F_ISSET(dbp, DB_AM_RDONLY), cp->recno != RECNO_OOB)) != 0) return (ret); - /* If already deleted, return failure. */ - if (CD_ISSET(dbc->dbp, cp)) - return (DB_KEYEMPTY); + DEBUG_LWRITE(dbc, dbc->txn, "ram_c_del", NULL, NULL, flags); - /* Build a normal delete request. */ - memset(&key, 0, sizeof(key)); - key.data = &cp->recno; - key.size = sizeof(db_recno_t); - if ((ret = __ram_delete(dbc->dbp, dbc->txn, &key, 0)) == 0) - CD_SET(dbc->dbp, cp); + /* + * If we are running CDB, this had better be either a write + * cursor or an immediate writer. + */ + if (F_ISSET(dbp, DB_AM_CDB)) + if (!F_ISSET(dbc, DBC_RMW | DBC_WRITER)) + return (EINVAL); - return (ret); + /* + * The semantics of cursors during delete are as follows: if record + * numbers are mutable (DB_RE_RENUMBER is set), deleting a record + * causes the cursor to automatically point to the record immediately + * following. In this case it is possible to use a single cursor for + * repeated delete operations, without intervening operations. + * + * If record numbers are not mutable, then records are replaced with + * a marker containing a delete flag. If the record referenced by + * this cursor has already been deleted, we will detect that as part + * of the delete operation, and fail. + */ + return (__ram_i_delete(dbc)); } /* * __ram_c_get -- * Recno cursor->c_get function. + * + * PUBLIC: int __ram_c_get __P((DBC *, DBT *, DBT *, u_int32_t)); */ -static int +int __ram_c_get(dbc, key, data, flags) DBC *dbc; DBT *key, *data; u_int32_t flags; { - BTREE *t; + CURSOR *cp, copy; DB *dbp; - RCURSOR *cp, copy; - int ret; - - DEBUG_LREAD(dbc->dbp, dbc->txn, "ram_c_get", - flags == DB_SET || flags == DB_SET_RANGE ? key : NULL, - NULL, flags); + PAGE *h; + db_indx_t indx; + int exact, ret, stack, tmp_rmw; - cp = dbc->internal; dbp = dbc->dbp; + cp = dbc->internal; + + DB_PANIC_CHECK(dbp); /* Check for invalid flags. */ if ((ret = __db_cgetchk(dbc->dbp, key, data, flags, cp->recno != RECNO_OOB)) != 0) return (ret); - GETHANDLE(dbc->dbp, dbc->txn, &dbp, ret); - t = dbp->internal; + /* Clear OR'd in additional bits so we can check for flag equality. */ + tmp_rmw = 0; + if (LF_ISSET(DB_RMW)) { + if (!F_ISSET(dbp, DB_AM_CDB)) { + tmp_rmw = 1; + F_SET(dbc, DBC_RMW); + } + LF_CLR(DB_RMW); + } + + DEBUG_LREAD(dbc, dbc->txn, "ram_c_get", + flags == DB_SET || flags == DB_SET_RANGE ? key : NULL, NULL, flags); /* Initialize the cursor for a new retrieval. */ copy = *cp; retry: /* Update the record number. */ + stack = 0; switch (flags) { case DB_CURRENT: - if (CD_ISSET(dbp, cp)) { - PUTHANDLE(dbp); - return (DB_KEYEMPTY); - } + /* + * If record numbers are mutable: if we just deleted a record, + * there is no action necessary, we return the record following + * the deleted item by virtue of renumbering the tree. + */ break; case DB_NEXT: + /* + * If record numbers are mutable: if we just deleted a record, + * we have to avoid incrementing the record number so that we + * return the right record by virtue of renumbering the tree. + */ if (CD_ISSET(dbp, cp)) break; + if (cp->recno != RECNO_OOB) { ++cp->recno; break; @@ -522,86 +593,133 @@ retry: /* Update the record number. */ break; case DB_PREV: if (cp->recno != RECNO_OOB) { - if (cp->recno == 1) - return (DB_NOTFOUND); + if (cp->recno == 1) { + ret = DB_NOTFOUND; + goto err; + } --cp->recno; break; } /* FALLTHROUGH */ case DB_LAST: flags = DB_PREV; - if (((ret = __ram_snapshot(dbp)) != 0) && ret != DB_NOTFOUND) + if (((ret = __ram_update(dbc, + DB_MAX_RECORDS, 0)) != 0) && ret != DB_NOTFOUND) goto err; - if ((ret = __bam_nrecs(dbp, &cp->recno)) != 0) + if ((ret = __bam_nrecs(dbc, &cp->recno)) != 0) goto err; - if (cp->recno == 0) - return (DB_NOTFOUND); + if (cp->recno == 0) { + ret = DB_NOTFOUND; + goto err; + } break; case DB_SET: case DB_SET_RANGE: - if ((ret = __ram_getno(dbp, key, &cp->recno, 0)) != 0) + if ((ret = __ram_getno(dbc, key, &cp->recno, 0)) != 0) goto err; break; } - /* - * Return the key if the user didn't give us one, and then pass it - * into __ram_iget(). - */ + /* Return the key if the user didn't give us one. */ if (flags != DB_SET && flags != DB_SET_RANGE && (ret = __db_retcopy(key, &cp->recno, sizeof(cp->recno), - &t->bt_rkey.data, &t->bt_rkey.ulen, dbp->db_malloc)) != 0) - return (ret); + &dbc->rkey.data, &dbc->rkey.ulen, dbp->db_malloc)) != 0) + goto err; - /* - * The cursor was reset, so the delete adjustment is no - * longer necessary. - */ - CD_CLR(dbp, cp); + /* Search the tree for the record. */ + if ((ret = __bam_rsearch(dbc, &cp->recno, + F_ISSET(dbc, DBC_RMW) ? S_FIND_WR : S_FIND, 1, &exact)) != 0) + goto err; + stack = 1; + if (!exact) { + ret = DB_NOTFOUND; + goto err; + } + h = cp->csp->page; + indx = cp->csp->indx; /* - * Retrieve the record. - * - * Skip any keys that don't really exist. + * If re-numbering records, the on-page deleted flag means this record + * was implicitly created. If not re-numbering records, the on-page + * deleted flag means this record was implicitly created, or, it was + * deleted at some time. Regardless, we skip such records if doing + * cursor next/prev operations, and fail if the application requested + * them explicitly. */ - if ((ret = __ram_iget(dbp, key, data)) != 0) - if (ret == DB_KEYEMPTY && - (flags == DB_NEXT || flags == DB_PREV)) + if (B_DISSET(GET_BKEYDATA(h, indx)->type)) { + if (flags == DB_NEXT || flags == DB_PREV) { + (void)__bam_stkrel(dbc, 0); goto retry; + } + ret = DB_KEYEMPTY; + goto err; + } + + /* Return the data item. */ + if ((ret = __db_ret(dbp, + h, indx, data, &dbc->rdata.data, &dbc->rdata.ulen)) != 0) + goto err; + + /* The cursor was reset, no further delete adjustment is necessary. */ + CD_CLR(dbp, cp); + +err: if (stack) + (void)__bam_stkrel(dbc, 0); + + /* Release temporary lock upgrade. */ + if (tmp_rmw) + F_CLR(dbc, DBC_RMW); -err: if (ret != 0) + if (ret != 0) *cp = copy; - PUTHANDLE(dbp); return (ret); } /* * __ram_c_put -- * Recno cursor->c_put function. + * + * PUBLIC: int __ram_c_put __P((DBC *, DBT *, DBT *, u_int32_t)); */ -static int +int __ram_c_put(dbc, key, data, flags) DBC *dbc; DBT *key, *data; u_int32_t flags; { - BTREE *t; - RCURSOR *cp, copy; + CURSOR *cp, copy; DB *dbp; int exact, ret; void *arg; - DEBUG_LWRITE(dbc->dbp, dbc->txn, "ram_c_put", NULL, data, flags); - + dbp = dbc->dbp; cp = dbc->internal; + DB_PANIC_CHECK(dbp); + if ((ret = __db_cputchk(dbc->dbp, key, data, flags, F_ISSET(dbc->dbp, DB_AM_RDONLY), cp->recno != RECNO_OOB)) != 0) return (ret); - GETHANDLE(dbc->dbp, dbc->txn, &dbp, ret); - t = dbp->internal; + DEBUG_LWRITE(dbc, dbc->txn, "ram_c_put", NULL, data, flags); + + /* + * If we are running CDB, this had better be either a write + * cursor or an immediate writer. If it's a regular writer, + * that means we have an IWRITE lock and we need to upgrade + * it to a write lock. + */ + if (F_ISSET(dbp, DB_AM_CDB)) { + if (!F_ISSET(dbc, DBC_RMW | DBC_WRITER)) + return (EINVAL); + + if (F_ISSET(dbc, DBC_RMW) && + (ret = lock_get(dbp->dbenv->lk_info, dbc->locker, + DB_LOCK_UPGRADE, &dbc->lock_dbt, DB_LOCK_WRITE, + &dbc->mylock)) != 0) + return (EAGAIN); + } /* Initialize the cursor for a new retrieval. */ copy = *cp; @@ -614,23 +732,23 @@ __ram_c_put(dbc, key, data, flags) */ if (0) { split: arg = &cp->recno; - if ((ret = __bam_split(dbp, arg)) != 0) + if ((ret = __bam_split(dbc, arg)) != 0) goto err; } - if ((ret = __bam_rsearch(dbp, &cp->recno, S_INSERT, 1, &exact)) != 0) + if ((ret = __bam_rsearch(dbc, &cp->recno, S_INSERT, 1, &exact)) != 0) goto err; if (!exact) { ret = DB_NOTFOUND; goto err; } - if ((ret = __bam_iitem(dbp, &t->bt_csp->page, - &t->bt_csp->indx, key, data, flags, 0)) == DB_NEEDSPLIT) { - if ((ret = __bam_stkrel(dbp)) != 0) + if ((ret = __bam_iitem(dbc, &cp->csp->page, + &cp->csp->indx, key, data, flags, 0)) == DB_NEEDSPLIT) { + if ((ret = __bam_stkrel(dbc, 0)) != 0) goto err; goto split; } - if ((ret = __bam_stkrel(dbp)) != 0) + if ((ret = __bam_stkrel(dbc, 0)) != 0) goto err; switch (flags) { @@ -650,16 +768,16 @@ split: arg = &cp->recno; break; } - /* - * The cursor was reset, so the delete adjustment is no - * longer necessary. - */ + /* The cursor was reset, no further delete adjustment is necessary. */ CD_CLR(dbp, cp); -err: if (ret != 0) +err: if (F_ISSET(dbp, DB_AM_CDB) && F_ISSET(dbc, DBC_RMW)) + (void)__lock_downgrade(dbp->dbenv->lk_info, dbc->mylock, + DB_LOCK_IWRITE, 0); + + if (ret != 0) *cp = copy; - PUTHANDLE(dbp); return (ret); } @@ -675,20 +793,22 @@ __ram_ca(dbp, recno, op) db_recno_t recno; ca_recno_arg op; { + CURSOR *cp; DBC *dbc; - RCURSOR *cp; /* * Adjust the cursors. See the comment in __bam_ca_delete(). */ - CURSOR_SETUP(dbp); - for (dbc = TAILQ_FIRST(&dbp->curs_queue); + DB_THREAD_LOCK(dbp); + for (dbc = TAILQ_FIRST(&dbp->active_queue); dbc != NULL; dbc = TAILQ_NEXT(dbc, links)) { - cp = (RCURSOR *)dbc->internal; + cp = dbc->internal; switch (op) { case CA_DELETE: if (recno > cp->recno) --cp->recno; + if (recno == cp->recno) + CD_SET(dbp, cp); break; case CA_IAFTER: if (recno > cp->recno) @@ -700,51 +820,27 @@ __ram_ca(dbp, recno, op) break; } } - CURSOR_TEARDOWN(dbp); + DB_THREAD_UNLOCK(dbp); } -#ifdef DEBUG -/* - * __ram_cprint -- - * Display the current recno cursor list. - * - * PUBLIC: int __ram_cprint __P((DB *)); - */ -int -__ram_cprint(dbp) - DB *dbp; -{ - DBC *dbc; - RCURSOR *cp; - - CURSOR_SETUP(dbp); - for (dbc = TAILQ_FIRST(&dbp->curs_queue); - dbc != NULL; dbc = TAILQ_NEXT(dbc, links)) { - cp = (RCURSOR *)dbc->internal; - fprintf(stderr, - "%#0x: recno: %lu\n", (u_int)cp, (u_long)cp->recno); - } - CURSOR_TEARDOWN(dbp); - - return (0); -} -#endif /* DEBUG */ - /* * __ram_getno -- * Check the user's record number, and make sure we've seen it. * - * PUBLIC: int __ram_getno __P((DB *, const DBT *, db_recno_t *, int)); + * PUBLIC: int __ram_getno __P((DBC *, const DBT *, db_recno_t *, int)); */ int -__ram_getno(dbp, key, rep, can_create) - DB *dbp; +__ram_getno(dbc, key, rep, can_create) + DBC *dbc; const DBT *key; db_recno_t *rep; int can_create; { + DB *dbp; db_recno_t recno; + dbp = dbc->dbp; + /* Check the user's record number. */ if ((recno = *(db_recno_t *)key->data) == 0) { __db_err(dbp->dbenv, "illegal record number of 0"); @@ -754,24 +850,11 @@ __ram_getno(dbp, key, rep, can_create) *rep = recno; /* - * Btree can neither create records or read them in. Recno can + * Btree can neither create records nor read them in. Recno can * do both, see if we can find the record. */ return (dbp->type == DB_RECNO ? - __ram_update(dbp, recno, can_create) : 0); -} - -/* - * __ram_snapshot -- - * Read in any remaining records from the backing input file. - * - * PUBLIC: int __ram_snapshot __P((DB *)); - */ -int -__ram_snapshot(dbp) - DB *dbp; -{ - return (__ram_update(dbp, DB_MAX_RECORDS, 0)); + __ram_update(dbc, recno, can_create) : 0); } /* @@ -779,18 +862,20 @@ __ram_snapshot(dbp) * Ensure the tree has records up to and including the specified one. */ static int -__ram_update(dbp, recno, can_create) - DB *dbp; +__ram_update(dbc, recno, can_create) + DBC *dbc; db_recno_t recno; int can_create; { BTREE *t; + DB *dbp; RECNO *rp; db_recno_t nrecs; int ret; + dbp = dbc->dbp; t = dbp->internal; - rp = t->bt_recno; + rp = t->recno; /* * If we can't create records and we've read the entire backing input @@ -803,12 +888,12 @@ __ram_update(dbp, recno, can_create) * If we haven't seen this record yet, try to get it from the original * file. */ - if ((ret = __bam_nrecs(dbp, &nrecs)) != 0) + if ((ret = __bam_nrecs(dbc, &nrecs)) != 0) return (ret); if (!F_ISSET(rp, RECNO_EOF) && recno > nrecs) { - if ((ret = rp->re_irec(dbp, recno)) != 0) + if ((ret = rp->re_irec(dbc, recno)) != 0) return (ret); - if ((ret = __bam_nrecs(dbp, &nrecs)) != 0) + if ((ret = __bam_nrecs(dbc, &nrecs)) != 0) return (ret); } @@ -819,28 +904,27 @@ __ram_update(dbp, recno, can_create) if (!can_create || recno <= nrecs + 1) return (0); - t->bt_rdata.dlen = 0; - t->bt_rdata.doff = 0; - t->bt_rdata.flags = 0; + dbc->rdata.dlen = 0; + dbc->rdata.doff = 0; + dbc->rdata.flags = 0; if (F_ISSET(dbp, DB_RE_FIXEDLEN)) { - if (t->bt_rdata.ulen < rp->re_len) { - t->bt_rdata.data = t->bt_rdata.data == NULL ? - (void *)__db_malloc(rp->re_len) : - (void *)__db_realloc(t->bt_rdata.data, rp->re_len); - if (t->bt_rdata.data == NULL) { - t->bt_rdata.ulen = 0; - return (ENOMEM); + if (dbc->rdata.ulen < rp->re_len) { + if ((ret = + __os_realloc(&dbc->rdata.data, rp->re_len)) != 0) { + dbc->rdata.ulen = 0; + dbc->rdata.data = NULL; + return (ret); } - t->bt_rdata.ulen = rp->re_len; + dbc->rdata.ulen = rp->re_len; } - t->bt_rdata.size = rp->re_len; - memset(t->bt_rdata.data, rp->re_pad, rp->re_len); + dbc->rdata.size = rp->re_len; + memset(dbc->rdata.data, rp->re_pad, rp->re_len); } else - t->bt_rdata.size = 0; + dbc->rdata.size = 0; while (recno > ++nrecs) - if ((ret = __ram_add(dbp, - &nrecs, &t->bt_rdata, 0, BI_DELETED)) != 0) + if ((ret = __ram_add(dbc, + &nrecs, &dbc->rdata, 0, BI_DELETED)) != 0) return (ret); return (0); } @@ -859,6 +943,11 @@ __ram_source(dbp, rp, fname) u_int32_t bytes, mbytes, oflags; int ret; + /* + * !!! + * The caller has full responsibility for cleaning up on error -- + * (it has to anyway, in case it fails after this routine succeeds). + */ if ((ret = __db_appname(dbp->dbenv, DB_APP_DATA, NULL, fname, 0, NULL, &rp->re_source)) != 0) return (ret); @@ -867,7 +956,7 @@ __ram_source(dbp, rp, fname) if ((ret = __db_open(rp->re_source, oflags, oflags, 0, &rp->re_fd)) != 0) { __db_err(dbp->dbenv, "%s: %s", rp->re_source, strerror(ret)); - goto err; + return (ret); } /* @@ -878,10 +967,10 @@ __ram_source(dbp, rp, fname) * compiler will perpetrate, doing the comparison in a portable way is * flatly impossible. Hope that mmap fails if the file is too large. */ - if ((ret = __db_ioinfo(rp->re_source, + if ((ret = __os_ioinfo(rp->re_source, rp->re_fd, &mbytes, &bytes, NULL)) != 0) { __db_err(dbp->dbenv, "%s: %s", rp->re_source, strerror(ret)); - goto err; + return (ret); } if (mbytes == 0 && bytes == 0) { F_SET(rp, RECNO_EOF); @@ -891,14 +980,11 @@ __ram_source(dbp, rp, fname) size = mbytes * MEGABYTE + bytes; if ((ret = __db_mapfile(rp->re_source, rp->re_fd, (size_t)size, 1, &rp->re_smap)) != 0) - goto err; + return (ret); rp->re_cmap = rp->re_smap; rp->re_emap = (u_int8_t *)rp->re_smap + (rp->re_msize = size); rp->re_irec = F_ISSET(dbp, DB_RE_FIXEDLEN) ? __ram_fmap : __ram_vmap; return (0); - -err: FREES(rp->re_source) - return (ret); } /* @@ -906,17 +992,19 @@ err: FREES(rp->re_source) * Rewrite the backing file. */ static int -__ram_writeback(dbp) - DB *dbp; +__ram_writeback(dbc) + DBC *dbc; { - RECNO *rp; + DB *dbp; DBT key, data; + RECNO *rp; db_recno_t keyno; ssize_t nw; int fd, ret, t_ret; u_int8_t delim, *pad; - rp = ((BTREE *)dbp->internal)->bt_recno; + dbp = dbc->dbp; + rp = ((BTREE *)dbp->internal)->recno; /* If the file wasn't modified, we're done. */ if (!F_ISSET(rp, RECNO_MODIFIED)) @@ -931,7 +1019,7 @@ __ram_writeback(dbp) /* * Read any remaining records into the tree. * - * XXX + * !!! * This is why we can't support transactions when applications specify * backing (re_source) files. At this point we have to read in the * rest of the records from the file so that we can write all of the @@ -946,7 +1034,8 @@ __ram_writeback(dbp) * protecting the backing source file, i.e. mpool would have to know * about it, and we don't want to go there. */ - if ((ret = __ram_snapshot(dbp)) != 0 && ret != DB_NOTFOUND) + if ((ret = + __ram_update(dbc, DB_MAX_RECORDS, 0)) != 0 && ret != DB_NOTFOUND) return (ret); /* @@ -962,7 +1051,7 @@ __ram_writeback(dbp) /* Get rid of any backing file descriptor, just on GP's. */ if (rp->re_fd != -1) { - (void)__db_close(rp->re_fd); + (void)__os_close(rp->re_fd); rp->re_fd = -1; } @@ -990,10 +1079,8 @@ __ram_writeback(dbp) */ delim = rp->re_delim; if (F_ISSET(dbp, DB_RE_FIXEDLEN)) { - if ((pad = (u_int8_t *)__db_malloc(rp->re_len)) == NULL) { - ret = ENOMEM; + if ((ret = __os_malloc(rp->re_len, NULL, &pad)) != 0) goto err; - } memset(pad, rp->re_pad, rp->re_len); } else COMPQUIET(pad, NULL); @@ -1001,7 +1088,7 @@ __ram_writeback(dbp) switch (ret = dbp->get(dbp, NULL, &key, &data, 0)) { case 0: if ((ret = - __db_write(fd, data.data, data.size, &nw)) != 0) + __os_write(fd, data.data, data.size, &nw)) != 0) goto err; if (nw != (ssize_t)data.size) { ret = EIO; @@ -1011,7 +1098,7 @@ __ram_writeback(dbp) case DB_KEYEMPTY: if (F_ISSET(dbp, DB_RE_FIXEDLEN)) { if ((ret = - __db_write(fd, pad, rp->re_len, &nw)) != 0) + __os_write(fd, pad, rp->re_len, &nw)) != 0) goto err; if (nw != (ssize_t)rp->re_len) { ret = EIO; @@ -1024,7 +1111,7 @@ __ram_writeback(dbp) goto done; } if (!F_ISSET(dbp, DB_RE_FIXEDLEN)) { - if ((ret = __db_write(fd, &delim, 1, &nw)) != 0) + if ((ret = __os_write(fd, &delim, 1, &nw)) != 0) goto err; if (nw != 1) { ret = EIO; @@ -1035,7 +1122,7 @@ __ram_writeback(dbp) err: done: /* Close the file descriptor. */ - if ((t_ret = __db_close(fd)) != 0 || ret == 0) + if ((t_ret = __os_close(fd)) != 0 || ret == 0) ret = t_ret; if (ret == 0) @@ -1048,11 +1135,11 @@ done: /* Close the file descriptor. */ * Get fixed length records from a file. */ static int -__ram_fmap(dbp, top) - DB *dbp; +__ram_fmap(dbc, top) + DBC *dbc; db_recno_t top; { - BTREE *t; + DB *dbp; DBT data; RECNO *rp; db_recno_t recno; @@ -1060,24 +1147,23 @@ __ram_fmap(dbp, top) u_int8_t *sp, *ep, *p; int ret; - if ((ret = __bam_nrecs(dbp, &recno)) != 0) + if ((ret = __bam_nrecs(dbc, &recno)) != 0) return (ret); - t = dbp->internal; - rp = t->bt_recno; - if (t->bt_rdata.ulen < rp->re_len) { - t->bt_rdata.data = t->bt_rdata.data == NULL ? - (void *)__db_malloc(rp->re_len) : - (void *)__db_realloc(t->bt_rdata.data, rp->re_len); - if (t->bt_rdata.data == NULL) { - t->bt_rdata.ulen = 0; - return (ENOMEM); + dbp = dbc->dbp; + rp = ((BTREE *)(dbp->internal))->recno; + + if (dbc->rdata.ulen < rp->re_len) { + if ((ret = __os_realloc(&dbc->rdata.data, rp->re_len)) != 0) { + dbc->rdata.ulen = 0; + dbc->rdata.data = NULL; + return (ret); } - t->bt_rdata.ulen = rp->re_len; + dbc->rdata.ulen = rp->re_len; } memset(&data, 0, sizeof(data)); - data.data = t->bt_rdata.data; + data.data = dbc->rdata.data; data.size = rp->re_len; sp = (u_int8_t *)rp->re_cmap; @@ -1088,7 +1174,7 @@ __ram_fmap(dbp, top) return (DB_NOTFOUND); } len = rp->re_len; - for (p = t->bt_rdata.data; + for (p = dbc->rdata.data; sp < ep && len > 0; *p++ = *sp++, --len) ; @@ -1108,7 +1194,7 @@ __ram_fmap(dbp, top) memset(p, rp->re_pad, len); ++recno; - if ((ret = __ram_add(dbp, &recno, &data, 0, 0)) != 0) + if ((ret = __ram_add(dbc, &recno, &data, 0, 0)) != 0) return (ret); } ++rp->re_last; @@ -1122,21 +1208,19 @@ __ram_fmap(dbp, top) * Get variable length records from a file. */ static int -__ram_vmap(dbp, top) - DB *dbp; +__ram_vmap(dbc, top) + DBC *dbc; db_recno_t top; { - BTREE *t; DBT data; RECNO *rp; db_recno_t recno; u_int8_t *sp, *ep; int delim, ret; - t = dbp->internal; - rp = t->bt_recno; + rp = ((BTREE *)(dbc->dbp->internal))->recno; - if ((ret = __bam_nrecs(dbp, &recno)) != 0) + if ((ret = __bam_nrecs(dbc, &recno)) != 0) return (ret); memset(&data, 0, sizeof(data)); @@ -1163,7 +1247,7 @@ __ram_vmap(dbp, top) if (rp->re_last >= recno) { data.size = sp - (u_int8_t *)data.data; ++recno; - if ((ret = __ram_add(dbp, &recno, &data, 0, 0)) != 0) + if ((ret = __ram_add(dbc, &recno, &data, 0, 0)) != 0) return (ret); } ++rp->re_last; @@ -1178,40 +1262,47 @@ __ram_vmap(dbp, top) * Add records into the tree. */ static int -__ram_add(dbp, recnop, data, flags, bi_flags) - DB *dbp; +__ram_add(dbc, recnop, data, flags, bi_flags) + DBC *dbc; db_recno_t *recnop; DBT *data; u_int32_t flags, bi_flags; { BKEYDATA *bk; - BTREE *t; + CURSOR *cp; + DB *dbp; PAGE *h; db_indx_t indx; int exact, isdeleted, ret, stack; - t = dbp->internal; + dbp = dbc->dbp; + cp = dbc->internal; retry: /* Find the slot for insertion. */ - if ((ret = __bam_rsearch(dbp, recnop, - S_INSERT | (LF_ISSET(DB_APPEND) ? S_APPEND : 0), 1, &exact)) != 0) + if ((ret = __bam_rsearch(dbc, recnop, + S_INSERT | (flags == DB_APPEND ? S_APPEND : 0), 1, &exact)) != 0) return (ret); - h = t->bt_csp->page; - indx = t->bt_csp->indx; + h = cp->csp->page; + indx = cp->csp->indx; stack = 1; /* + * If re-numbering records, the on-page deleted flag means this record + * was implicitly created. If not re-numbering records, the on-page + * deleted flag means this record was implicitly created, or, it was + * deleted at some time. + * * If DB_NOOVERWRITE is set and the item already exists in the tree, - * return an error unless the item has been marked for deletion. + * return an error unless the item was either marked for deletion or + * only implicitly created. */ isdeleted = 0; if (exact) { bk = GET_BKEYDATA(h, indx); - if (B_DISSET(bk->type)) { + if (B_DISSET(bk->type)) isdeleted = 1; - __bam_ca_replace(dbp, h->pgno, indx, REPLACE_SETUP); - } else - if (LF_ISSET(DB_NOOVERWRITE)) { + else + if (flags == DB_NOOVERWRITE) { ret = DB_KEYEXIST; goto err; } @@ -1224,40 +1315,42 @@ retry: /* Find the slot for insertion. */ * match, we're inserting a new key/data pair, before the search * location. */ - switch (ret = __bam_iitem(dbp, + switch (ret = __bam_iitem(dbc, &h, &indx, NULL, data, exact ? DB_CURRENT : DB_BEFORE, bi_flags)) { case 0: /* - * Done. Clean up the cursor and adjust the internal page - * counts. + * Don't adjust anything. + * + * If we inserted a record, no cursors need adjusting because + * the only new record it's possible to insert is at the very + * end of the tree. The necessary adjustments to the internal + * page counts were made by __bam_iitem(). + * + * If we overwrote a record, no cursors need adjusting because + * future DBcursor->get calls will simply return the underlying + * record (there's no adjustment made for the DB_CURRENT flag + * when a cursor get operation immediately follows a cursor + * delete operation, and the normal adjustment for the DB_NEXT + * flag is still correct). */ - if (isdeleted) - __bam_ca_replace(dbp, h->pgno, indx, REPLACE_SUCCESS); break; case DB_NEEDSPLIT: - /* - * We have to split the page. Back out the cursor setup, - * discard the stack of pages, and do the split. - */ - if (isdeleted) - __bam_ca_replace(dbp, h->pgno, indx, REPLACE_FAILED); - - (void)__bam_stkrel(dbp); + /* Discard the stack of pages and split the page. */ + (void)__bam_stkrel(dbc, 0); stack = 0; - if ((ret = __bam_split(dbp, recnop)) != 0) - break; + if ((ret = __bam_split(dbc, recnop)) != 0) + goto err; goto retry; /* NOTREACHED */ default: - if (isdeleted) - __bam_ca_replace(dbp, h->pgno, indx, REPLACE_FAILED); - break; + goto err; } + err: if (stack) - __bam_stkrel(dbp); + __bam_stkrel(dbc, 0); return (ret); } |