diff options
Diffstat (limited to 'db2/db/db_dup.c')
-rw-r--r-- | db2/db/db_dup.c | 511 |
1 files changed, 395 insertions, 116 deletions
diff --git a/db2/db/db_dup.c b/db2/db/db_dup.c index 6379fc1729..2673bbcd61 100644 --- a/db2/db/db_dup.c +++ b/db2/db/db_dup.c @@ -8,7 +8,7 @@ #include "config.h" #ifndef lint -static const char sccsid[] = "@(#)db_dup.c 10.18 (Sleepycat) 5/31/98"; +static const char sccsid[] = "@(#)db_dup.c 10.35 (Sleepycat) 12/2/98"; #endif /* not lint */ #ifndef NO_SYSTEM_INCLUDES @@ -23,25 +23,25 @@ static const char sccsid[] = "@(#)db_dup.c 10.18 (Sleepycat) 5/31/98"; #include "btree.h" #include "db_am.h" -static int __db_addpage __P((DB *, - PAGE **, db_indx_t *, int (*)(DB *, u_int32_t, PAGE **))); -static int __db_dsplit __P((DB *, - PAGE **, db_indx_t *, u_int32_t, int (*)(DB *, u_int32_t, PAGE **))); +static int __db_addpage __P((DBC *, + PAGE **, db_indx_t *, int (*)(DBC *, u_int32_t, PAGE **))); +static int __db_dsplit __P((DBC *, + PAGE **, db_indx_t *, u_int32_t, int (*)(DBC *, u_int32_t, PAGE **))); /* * __db_dput -- * Put a duplicate item onto a duplicate page at the given index. * - * PUBLIC: int __db_dput __P((DB *, - * PUBLIC: DBT *, PAGE **, db_indx_t *, int (*)(DB *, u_int32_t, PAGE **))); + * PUBLIC: int __db_dput __P((DBC *, DBT *, + * PUBLIC: PAGE **, db_indx_t *, int (*)(DBC *, u_int32_t, PAGE **))); */ int -__db_dput(dbp, dbt, pp, indxp, newfunc) - DB *dbp; +__db_dput(dbc, dbt, pp, indxp, newfunc) + DBC *dbc; DBT *dbt; PAGE **pp; db_indx_t *indxp; - int (*newfunc) __P((DB *, u_int32_t, PAGE **)); + int (*newfunc) __P((DBC *, u_int32_t, PAGE **)); { BOVERFLOW bo; DBT *data_dbtp, hdr_dbt, *hdr_dbtp; @@ -54,10 +54,12 @@ __db_dput(dbp, dbt, pp, indxp, newfunc) * We need some access method independent threshold for when we put * a duplicate item onto an overflow page. */ - if (dbt->size > 0.25 * dbp->pgsize) { - if ((ret = __db_poff(dbp, dbt, &pgno, newfunc)) != 0) + if (dbt->size > 0.25 * dbc->dbp->pgsize) { + if ((ret = __db_poff(dbc, dbt, &pgno, newfunc)) != 0) return (ret); + UMRW(bo.unused1); B_TSET(bo.type, B_OVERFLOW, 0); + UMRW(bo.unused2); bo.tlen = dbt->size; bo.pgno = pgno; hdr_dbt.data = &bo; @@ -75,11 +77,14 @@ __db_dput(dbp, dbt, pp, indxp, newfunc) pagep = *pp; if (size > P_FREESPACE(pagep)) { if (*indxp == NUM_ENT(*pp) && NEXT_PGNO(*pp) == PGNO_INVALID) - ret = __db_addpage(dbp, pp, indxp, newfunc); + ret = __db_addpage(dbc, pp, indxp, newfunc); else - ret = __db_dsplit(dbp, pp, indxp, isize, newfunc); + ret = __db_dsplit(dbc, pp, indxp, isize, newfunc); if (ret != 0) - /* XXX: Pages not returned to free list. */ + /* + * XXX + * Pages not returned to free list. + */ return (ret); pagep = *pp; } @@ -88,11 +93,11 @@ __db_dput(dbp, dbt, pp, indxp, newfunc) * Now, pagep references the page on which to insert and indx is the * the location to insert. */ - if ((ret = __db_pitem(dbp, + if ((ret = __db_pitem(dbc, pagep, (u_int32_t)*indxp, isize, hdr_dbtp, data_dbtp)) != 0) return (ret); - (void)memp_fset(dbp->mpf, pagep, DB_MPOOL_DIRTY); + (void)memp_fset(dbc->dbp->mpf, pagep, DB_MPOOL_DIRTY); return (0); } @@ -100,15 +105,15 @@ __db_dput(dbp, dbt, pp, indxp, newfunc) * __db_drem -- * Remove a duplicate at the given index on the given page. * - * PUBLIC: int __db_drem __P((DB *, - * PUBLIC: PAGE **, u_int32_t, int (*)(DB *, PAGE *))); + * PUBLIC: int __db_drem __P((DBC *, + * PUBLIC: PAGE **, u_int32_t, int (*)(DBC *, PAGE *))); */ int -__db_drem(dbp, pp, indx, freefunc) - DB *dbp; +__db_drem(dbc, pp, indx, freefunc) + DBC *dbc; PAGE **pp; u_int32_t indx; - int (*freefunc) __P((DB *, PAGE *)); + int (*freefunc) __P((DBC *, PAGE *)); { PAGE *pagep; int ret; @@ -117,12 +122,12 @@ __db_drem(dbp, pp, indx, freefunc) /* Check if we are freeing a big item. */ if (B_TYPE(GET_BKEYDATA(pagep, indx)->type) == B_OVERFLOW) { - if ((ret = __db_doff(dbp, + if ((ret = __db_doff(dbc, GET_BOVERFLOW(pagep, indx)->pgno, freefunc)) != 0) return (ret); - ret = __db_ditem(dbp, pagep, indx, BOVERFLOW_SIZE); + ret = __db_ditem(dbc, pagep, indx, BOVERFLOW_SIZE); } else - ret = __db_ditem(dbp, pagep, indx, + ret = __db_ditem(dbc, pagep, indx, BKEYDATA_SIZE(GET_BKEYDATA(pagep, indx)->len)); if (ret != 0) return (ret); @@ -137,12 +142,12 @@ __db_drem(dbp, pp, indx, freefunc) * !!! * __db_relink will set the dirty bit for us. */ - if ((ret = __db_relink(dbp, pagep, pp, 0)) != 0) + if ((ret = __db_relink(dbc, DB_REM_PAGE, pagep, pp, 0)) != 0) return (ret); - if ((ret = freefunc(dbp, pagep)) != 0) + if ((ret = freefunc(dbc, pagep)) != 0) return (ret); } else - (void)memp_fset(dbp->mpf, pagep, DB_MPOOL_DIRTY); + (void)memp_fset(dbc->dbp->mpf, pagep, DB_MPOOL_DIRTY); return (0); } @@ -151,32 +156,41 @@ __db_drem(dbp, pp, indx, freefunc) * __db_dend -- * Find the last page in a set of offpage duplicates. * - * PUBLIC: int __db_dend __P((DB *, db_pgno_t, PAGE **)); + * PUBLIC: int __db_dend __P((DBC *, db_pgno_t, PAGE **)); */ int -__db_dend(dbp, pgno, pagep) - DB *dbp; +__db_dend(dbc, pgno, pp) + DBC *dbc; db_pgno_t pgno; - PAGE **pagep; + PAGE **pp; { + DB *dbp; PAGE *h; int ret; + dbp = dbc->dbp; + /* * This implements DB_KEYLAST. The last page is returned in pp; pgno * should be the page number of the first page of the duplicate chain. + * + * *pp may be non-NULL -- if given a valid page use it. */ + if (*pp != NULL) + goto started; for (;;) { - if ((ret = memp_fget(dbp->mpf, &pgno, 0, &h)) != 0) { + if ((ret = memp_fget(dbp->mpf, &pgno, 0, pp)) != 0) { (void)__db_pgerr(dbp, pgno); return (ret); } +started: h = *pp; + if ((pgno = NEXT_PGNO(h)) == PGNO_INVALID) break; - (void)memp_fput(dbp->mpf, h, 0); - } - *pagep = h; + if ((ret = memp_fput(dbp->mpf, h, 0)) != 0) + return (ret); + } return (0); } @@ -191,41 +205,44 @@ __db_dend(dbp, pgno, pagep) * the page on which the insert should happen, not yet put. */ static int -__db_dsplit(dbp, hp, indxp, size, newfunc) - DB *dbp; +__db_dsplit(dbc, hp, indxp, size, newfunc) + DBC *dbc; PAGE **hp; db_indx_t *indxp; u_int32_t size; - int (*newfunc) __P((DB *, u_int32_t, PAGE **)); + int (*newfunc) __P((DBC *, u_int32_t, PAGE **)); { PAGE *h, *np, *tp; BKEYDATA *bk; DBT page_dbt; + DB *dbp; + size_t pgsize; db_indx_t halfbytes, i, indx, lastsum, nindex, oindex, s, sum; - int did_indx, ret; + int did_indx, ret, t_ret; h = *hp; indx = *indxp; + ret = 0; + dbp = dbc->dbp; + pgsize = dbp->pgsize; /* Create a temporary page to do compaction onto. */ - if ((tp = (PAGE *)__db_malloc(dbp->pgsize)) == NULL) - return (ENOMEM); -#ifdef DIAGNOSTIC - memset(tp, 0xff, dbp->pgsize); -#endif + if ((ret = __os_malloc(pgsize, NULL, &tp)) != 0) + return (ret); + /* Create new page for the split. */ - if ((ret = newfunc(dbp, P_DUPLICATE, &np)) != 0) { - FREE(tp, dbp->pgsize); + if ((ret = newfunc(dbc, P_DUPLICATE, &np)) != 0) { + __os_free(tp, pgsize); return (ret); } - P_INIT(np, dbp->pgsize, PGNO(np), PGNO(h), NEXT_PGNO(h), 0, + P_INIT(np, pgsize, PGNO(np), PGNO(h), NEXT_PGNO(h), 0, P_DUPLICATE); - P_INIT(tp, dbp->pgsize, PGNO(h), PREV_PGNO(h), PGNO(np), 0, + P_INIT(tp, pgsize, PGNO(h), PREV_PGNO(h), PGNO(np), 0, P_DUPLICATE); /* Figure out the split point */ - halfbytes = (dbp->pgsize - HOFFSET(h)) / 2; + halfbytes = (pgsize - HOFFSET(h)) / 2; did_indx = 0; for (sum = 0, lastsum = 0, i = 0; i < NUM_ENT(h); i++) { if (i == indx) { @@ -237,7 +254,6 @@ __db_dsplit(dbp, hp, indxp, size, newfunc) (db_indx_t)(sum - halfbytes)) { *hp = np; *indxp = 0; - i--; } else *indxp = i; break; @@ -252,29 +268,28 @@ __db_dsplit(dbp, hp, indxp, size, newfunc) if (lastsum < halfbytes && sum >= halfbytes) { /* We've crossed the halfway point. */ - if ((db_indx_t)(halfbytes - lastsum) < - (db_indx_t)(sum - halfbytes)) - i--; + if ((db_indx_t)(sum - halfbytes) < + (db_indx_t)(halfbytes - lastsum)) + i++; break; } } - /* * Check if we have set the return values of the index pointer and * page pointer. */ if (!did_indx) { *hp = np; - *indxp = indx - i - 1; + *indxp = indx - i; } - if (DB_LOGGING(dbp)) { + if (DB_LOGGING(dbc)) { page_dbt.size = dbp->pgsize; page_dbt.data = h; if ((ret = __db_split_log(dbp->dbenv->lg_info, - dbp->txn, &LSN(h), 0, DB_SPLITOLD, dbp->log_fileid, + dbc->txn, &LSN(h), 0, DB_SPLITOLD, dbp->log_fileid, PGNO(h), &page_dbt, &LSN(h))) != 0) { - FREE(tp, dbp->pgsize); + __os_free(tp, pgsize); return (ret); } LSN(tp) = LSN(h); @@ -283,12 +298,12 @@ __db_dsplit(dbp, hp, indxp, size, newfunc) /* * If it's a btree, adjust the cursors. * - * i is the index of the last element to stay on the page. + * i is the index of the first element to move onto the new page. */ - if (dbp->type == DB_BTREE || dbp->type == DB_RECNO) - __bam_ca_split(dbp, PGNO(h), PGNO(h), PGNO(np), i + 1, 0); + if (dbp->type == DB_BTREE) + __bam_ca_split(dbp, PGNO(h), PGNO(h), PGNO(np), i, 0); - for (nindex = 0, oindex = i + 1; oindex < NUM_ENT(h); oindex++) { + for (nindex = 0, oindex = i; oindex < NUM_ENT(h); oindex++) { bk = GET_BKEYDATA(h, oindex); if (B_TYPE(bk->type) == B_KEYDATA) s = BKEYDATA_SIZE(bk->len); @@ -304,7 +319,7 @@ __db_dsplit(dbp, hp, indxp, size, newfunc) * Now do data compaction by copying the remaining stuff onto the * temporary page and then copying it back to the real page. */ - for (nindex = 0, oindex = 0; oindex <= i; oindex++) { + for (nindex = 0, oindex = 0; oindex < i; oindex++) { bk = GET_BKEYDATA(h, oindex); if (B_TYPE(bk->type) == B_KEYDATA) s = BKEYDATA_SIZE(bk->len); @@ -324,59 +339,73 @@ __db_dsplit(dbp, hp, indxp, size, newfunc) */ memcpy(h, tp, LOFFSET(tp)); memcpy((u_int8_t *)h + HOFFSET(tp), - (u_int8_t *)tp + HOFFSET(tp), dbp->pgsize - HOFFSET(tp)); - FREE(tp, dbp->pgsize); + (u_int8_t *)tp + HOFFSET(tp), pgsize - HOFFSET(tp)); + __os_free(tp, pgsize); - if (DB_LOGGING(dbp)) { - page_dbt.size = dbp->pgsize; + if (DB_LOGGING(dbc)) { + /* + * XXX + * If either of these fails, are we leaving pages pinned? + * Yes, but it seems like this happens in error case. + */ + page_dbt.size = pgsize; page_dbt.data = h; if ((ret = __db_split_log(dbp->dbenv->lg_info, - dbp->txn, &LSN(h), 0, DB_SPLITNEW, dbp->log_fileid, + dbc->txn, &LSN(h), 0, DB_SPLITNEW, dbp->log_fileid, PGNO(h), &page_dbt, &LSN(h))) != 0) return (ret); - page_dbt.size = dbp->pgsize; + page_dbt.size = pgsize; page_dbt.data = np; if ((ret = __db_split_log(dbp->dbenv->lg_info, - dbp->txn, &LSN(np), 0, DB_SPLITNEW, dbp->log_fileid, + dbc->txn, &LSN(np), 0, DB_SPLITNEW, dbp->log_fileid, PGNO(np), &page_dbt, &LSN(np))) != 0) return (ret); } /* + * Finally, if there was a next page after the page being + * split, fix its prev pointer. + */ + if (np->next_pgno != PGNO_INVALID) + ret = __db_relink(dbc, DB_ADD_PAGE, np, NULL, 1); + + /* * Figure out if the location we're interested in is on the new * page, and if so, reset the callers' pointer. Push the other * page back to the store. */ if (*hp == h) - ret = memp_fput(dbp->mpf, np, DB_MPOOL_DIRTY); + t_ret = memp_fput(dbp->mpf, np, DB_MPOOL_DIRTY); else - ret = memp_fput(dbp->mpf, h, DB_MPOOL_DIRTY); + t_ret = memp_fput(dbp->mpf, h, DB_MPOOL_DIRTY); - return (ret); + return (ret != 0 ? ret : t_ret); } /* * __db_ditem -- * Remove an item from a page. * - * PUBLIC: int __db_ditem __P((DB *, PAGE *, u_int32_t, u_int32_t)); + * PUBLIC: int __db_ditem __P((DBC *, PAGE *, u_int32_t, u_int32_t)); */ int -__db_ditem(dbp, pagep, indx, nbytes) - DB *dbp; +__db_ditem(dbc, pagep, indx, nbytes) + DBC *dbc; PAGE *pagep; u_int32_t indx, nbytes; { + DB *dbp; DBT ldbt; db_indx_t cnt, offset; int ret; u_int8_t *from; - if (DB_LOGGING(dbp)) { + dbp = dbc->dbp; + if (DB_LOGGING(dbc)) { ldbt.data = P_ENTRY(pagep, indx); ldbt.size = nbytes; - if ((ret = __db_addrem_log(dbp->dbenv->lg_info, dbp->txn, + if ((ret = __db_addrem_log(dbp->dbenv->lg_info, dbc->txn, &LSN(pagep), 0, DB_REM_DUP, dbp->log_fileid, PGNO(pagep), (u_int32_t)indx, nbytes, &ldbt, NULL, &LSN(pagep))) != 0) return (ret); @@ -413,7 +442,7 @@ __db_ditem(dbp, pagep, indx, nbytes) sizeof(db_indx_t) * (NUM_ENT(pagep) - indx)); /* If it's a btree, adjust the cursors. */ - if (dbp->type == DB_BTREE || dbp->type == DB_RECNO) + if (dbp->type == DB_BTREE) __bam_ca_di(dbp, PGNO(pagep), indx, -1); return (0); @@ -424,16 +453,17 @@ __db_ditem(dbp, pagep, indx, nbytes) * Put an item on a page. * * PUBLIC: int __db_pitem - * PUBLIC: __P((DB *, PAGE *, u_int32_t, u_int32_t, DBT *, DBT *)); + * PUBLIC: __P((DBC *, PAGE *, u_int32_t, u_int32_t, DBT *, DBT *)); */ int -__db_pitem(dbp, pagep, indx, nbytes, hdr, data) - DB *dbp; +__db_pitem(dbc, pagep, indx, nbytes, hdr, data) + DBC *dbc; PAGE *pagep; u_int32_t indx; u_int32_t nbytes; DBT *hdr, *data; { + DB *dbp; BKEYDATA bk; DBT thdr; int ret; @@ -456,8 +486,9 @@ __db_pitem(dbp, pagep, indx, nbytes, hdr, data) * the passed in header sizes must be adjusted for the structure's * placeholder for the trailing variable-length data field. */ - if (DB_LOGGING(dbp)) - if ((ret = __db_addrem_log(dbp->dbenv->lg_info, dbp->txn, + dbp = dbc->dbp; + if (DB_LOGGING(dbc)) + if ((ret = __db_addrem_log(dbp->dbenv->lg_info, dbc->txn, &LSN(pagep), 0, DB_ADD_DUP, dbp->log_fileid, PGNO(pagep), (u_int32_t)indx, nbytes, hdr, data, &LSN(pagep))) != 0) return (ret); @@ -485,7 +516,7 @@ __db_pitem(dbp, pagep, indx, nbytes, hdr, data) memcpy(p + hdr->size, data->data, data->size); /* If it's a btree, adjust the cursors. */ - if (dbp->type == DB_BTREE || dbp->type == DB_RECNO) + if (dbp->type == DB_BTREE) __bam_ca_di(dbp, PGNO(pagep), indx, 1); return (0); @@ -495,14 +526,16 @@ __db_pitem(dbp, pagep, indx, nbytes, hdr, data) * __db_relink -- * Relink around a deleted page. * - * PUBLIC: int __db_relink __P((DB *, PAGE *, PAGE **, int)); + * PUBLIC: int __db_relink __P((DBC *, u_int32_t, PAGE *, PAGE **, int)); */ int -__db_relink(dbp, pagep, new_next, needlock) - DB *dbp; +__db_relink(dbc, add_rem, pagep, new_next, needlock) + DBC *dbc; + u_int32_t add_rem; PAGE *pagep, **new_next; int needlock; { + DB *dbp; PAGE *np, *pp; DB_LOCK npl, ppl; DB_LSN *nlsnp, *plsnp; @@ -512,10 +545,15 @@ __db_relink(dbp, pagep, new_next, needlock) np = pp = NULL; npl = ppl = LOCK_INVALID; nlsnp = plsnp = NULL; + dbp = dbc->dbp; - /* Retrieve and lock the two pages. */ + /* + * Retrieve and lock the one/two pages. For a remove, we may need + * two pages (the before and after). For an add, we only need one + * because, the split took care of the prev. + */ if (pagep->next_pgno != PGNO_INVALID) { - if (needlock && (ret = __bam_lget(dbp, + if (needlock && (ret = __bam_lget(dbc, 0, pagep->next_pgno, DB_LOCK_WRITE, &npl)) != 0) goto err; if ((ret = memp_fget(dbp->mpf, @@ -525,8 +563,8 @@ __db_relink(dbp, pagep, new_next, needlock) } nlsnp = &np->lsn; } - if (pagep->prev_pgno != PGNO_INVALID) { - if (needlock && (ret = __bam_lget(dbp, + if (add_rem == DB_REM_PAGE && pagep->prev_pgno != PGNO_INVALID) { + if (needlock && (ret = __bam_lget(dbc, 0, pagep->prev_pgno, DB_LOCK_WRITE, &ppl)) != 0) goto err; if ((ret = memp_fget(dbp->mpf, @@ -538,9 +576,10 @@ __db_relink(dbp, pagep, new_next, needlock) } /* Log the change. */ - if (DB_LOGGING(dbp)) { - if ((ret = __db_relink_log(dbp->dbenv->lg_info, dbp->txn, - &pagep->lsn, 0, dbp->log_fileid, pagep->pgno, &pagep->lsn, + if (DB_LOGGING(dbc)) { + if ((ret = __db_relink_log(dbp->dbenv->lg_info, dbc->txn, + &pagep->lsn, 0, add_rem, dbp->log_fileid, + pagep->pgno, &pagep->lsn, pagep->prev_pgno, plsnp, pagep->next_pgno, nlsnp)) != 0) goto err; if (np != NULL) @@ -558,7 +597,10 @@ __db_relink(dbp, pagep, new_next, needlock) * set to NULL. */ if (np != NULL) { - np->prev_pgno = pagep->prev_pgno; + if (add_rem == DB_ADD_PAGE) + np->prev_pgno = pagep->pgno; + else + np->prev_pgno = pagep->prev_pgno; if (new_next == NULL) ret = memp_fput(dbp->mpf, np, DB_MPOOL_DIRTY); else { @@ -568,7 +610,7 @@ __db_relink(dbp, pagep, new_next, needlock) if (ret != 0) goto err; if (needlock) - (void)__bam_lput(dbp, npl); + (void)__bam_lput(dbc, npl); } else if (new_next != NULL) *new_next = NULL; @@ -577,18 +619,18 @@ __db_relink(dbp, pagep, new_next, needlock) if ((ret = memp_fput(dbp->mpf, pp, DB_MPOOL_DIRTY)) != 0) goto err; if (needlock) - (void)__bam_lput(dbp, ppl); + (void)__bam_lput(dbc, ppl); } return (0); err: if (np != NULL) (void)memp_fput(dbp->mpf, np, 0); if (needlock && npl != LOCK_INVALID) - (void)__bam_lput(dbp, npl); + (void)__bam_lput(dbc, npl); if (pp != NULL) (void)memp_fput(dbp->mpf, pp, 0); if (needlock && ppl != LOCK_INVALID) - (void)__bam_lput(dbp, ppl); + (void)__bam_lput(dbc, ppl); return (ret); } @@ -596,34 +638,37 @@ err: if (np != NULL) * __db_ddup -- * Delete an offpage chain of duplicates. * - * PUBLIC: int __db_ddup __P((DB *, db_pgno_t, int (*)(DB *, PAGE *))); + * PUBLIC: int __db_ddup __P((DBC *, db_pgno_t, int (*)(DBC *, PAGE *))); */ int -__db_ddup(dbp, pgno, freefunc) - DB *dbp; +__db_ddup(dbc, pgno, freefunc) + DBC *dbc; db_pgno_t pgno; - int (*freefunc) __P((DB *, PAGE *)); + int (*freefunc) __P((DBC *, PAGE *)); { + DB *dbp; PAGE *pagep; DBT tmp_dbt; int ret; + dbp = dbc->dbp; do { if ((ret = memp_fget(dbp->mpf, &pgno, 0, &pagep)) != 0) { (void)__db_pgerr(dbp, pgno); return (ret); } - if (DB_LOGGING(dbp)) { + if (DB_LOGGING(dbc)) { tmp_dbt.data = pagep; tmp_dbt.size = dbp->pgsize; - if ((ret = __db_split_log(dbp->dbenv->lg_info, dbp->txn, - &LSN(pagep), 0, DB_SPLITOLD, dbp->log_fileid, - PGNO(pagep), &tmp_dbt, &LSN(pagep))) != 0) + if ((ret = __db_split_log(dbp->dbenv->lg_info, + dbc->txn, &LSN(pagep), 0, DB_SPLITOLD, + dbp->log_fileid, PGNO(pagep), &tmp_dbt, + &LSN(pagep))) != 0) return (ret); } pgno = pagep->next_pgno; - if ((ret = freefunc(dbp, pagep)) != 0) + if ((ret = freefunc(dbc, pagep)) != 0) return (ret); } while (pgno != PGNO_INVALID); @@ -636,21 +681,23 @@ __db_ddup(dbp, pgno, freefunc) * current page. */ static int -__db_addpage(dbp, hp, indxp, newfunc) - DB *dbp; +__db_addpage(dbc, hp, indxp, newfunc) + DBC *dbc; PAGE **hp; db_indx_t *indxp; - int (*newfunc) __P((DB *, u_int32_t, PAGE **)); + int (*newfunc) __P((DBC *, u_int32_t, PAGE **)); { + DB *dbp; PAGE *newpage; int ret; - if ((ret = newfunc(dbp, P_DUPLICATE, &newpage)) != 0) + dbp = dbc->dbp; + if ((ret = newfunc(dbc, P_DUPLICATE, &newpage)) != 0) return (ret); - if (DB_LOGGING(dbp)) { + if (DB_LOGGING(dbc)) { if ((ret = __db_addpage_log(dbp->dbenv->lg_info, - dbp->txn, &LSN(*hp), 0, dbp->log_fileid, + dbc->txn, &LSN(*hp), 0, dbp->log_fileid, PGNO(*hp), &LSN(*hp), PGNO(newpage), &LSN(newpage))) != 0) { return (ret); } @@ -666,3 +713,235 @@ __db_addpage(dbp, hp, indxp, newfunc) *indxp = 0; return (0); } + +/* + * __db_dsearch -- + * Search a set of duplicates for the proper position for a new duplicate. + * + * + pgno is the page number of the page on which to begin searching. + * Since we can continue duplicate searches, it might not be the first + * page. + * + * + If we are continuing a search, then *pp may be non-NULL in which + * case we do not have to retrieve the page. + * + * + If we are continuing a search, then *indxp contains the first + * on pgno of where we should begin the search. + * + * NOTE: if there is no comparison function, then continuing is + * meaningless, and *pp should always be NULL and *indxp will be + * ignored. + * + * 3 return values:: + * + * + pp is the returned page pointer of where this element should go. + * + indxp is the returned index on that page + * + cmpp is the returned final comparison result. + * + * PUBLIC: int __db_dsearch __P((DBC *, + * PUBLIC: int, DBT *, db_pgno_t, db_indx_t *, PAGE **, int *)); + */ +int +__db_dsearch(dbc, is_insert, dbt, pgno, indxp, pp, cmpp) + DBC *dbc; + int is_insert, *cmpp; + DBT *dbt; + db_pgno_t pgno; + db_indx_t *indxp; + PAGE **pp; +{ + DB *dbp; + PAGE *h; + db_indx_t base, indx, lim, save_indx; + db_pgno_t save_pgno; + int ret; + + dbp = dbc->dbp; + + if (dbp->dup_compare == NULL) { + /* + * We may have been given a valid page, but we may not be + * able to use it. The problem is that the application is + * doing a join and we're trying to continue the search, + * but since the items aren't sorted, we can't. Discard + * the page if it's not the one we're going to start with + * anyway. + */ + if (*pp != NULL && (*pp)->pgno != pgno) { + if ((ret = memp_fput(dbp->mpf, *pp, 0)) != 0) + return (ret); + *pp = NULL; + } + + /* + * If no duplicate function is specified, just go to the end + * of the duplicate set. + */ + if (is_insert) { + if ((ret = __db_dend(dbc, pgno, pp)) != 0) + return (ret); + *indxp = NUM_ENT(*pp); + return (0); + } + + /* + * We are looking for a specific duplicate, so do a linear + * search. + */ + if (*pp != NULL) + goto nocmp_started; + for (;;) { + if ((ret = memp_fget(dbp->mpf, &pgno, 0, pp)) != 0) + goto pg_err; +nocmp_started: h = *pp; + + for (*indxp = 0; *indxp < NUM_ENT(h); ++*indxp) { + if ((*cmpp = __bam_cmp(dbp, + dbt, h, *indxp, __bam_defcmp)) != 0) + continue; + /* + * The duplicate may have already been deleted, + * if it's a btree page, in which case we skip + * it. + */ + if (dbp->type == DB_BTREE && + B_DISSET(GET_BKEYDATA(h, *indxp)->type)) + continue; + + return (0); + } + + if ((pgno = h->next_pgno) == PGNO_INVALID) + break; + + if ((ret = memp_fput(dbp->mpf, h, 0)) != 0) + return (ret); + } + *cmpp = 1; /* We didn't succeed... */ + return (0); + } + + /* + * We have a comparison routine, i.e., the duplicates are sorted. + * Walk through the chain of duplicates, checking the last entry + * on each page to decide if it's the page we want to search. + * + * *pp may be non-NULL -- if we were given a valid page (e.g., are + * in mid-search), then use the provided page. + */ + if (*pp != NULL) + goto cmp_started; + for (;;) { + if ((ret = memp_fget(dbp->mpf, &pgno, 0, pp)) != 0) + goto pg_err; +cmp_started: h = *pp; + + if ((pgno = h->next_pgno) == PGNO_INVALID || __bam_cmp(dbp, + dbt, h, h->entries - 1, dbp->dup_compare) <= 0) + break; + /* + * Even when continuing a search, make sure we don't skip + * entries on a new page + */ + *indxp = 0; + + if ((ret = memp_fput(dbp->mpf, h, 0)) != 0) + return (ret); + } + + /* Next, do a binary search on the page. */ + base = F_ISSET(dbc, DBC_CONTINUE) ? *indxp : 0; + for (lim = NUM_ENT(h) - base; lim != 0; lim >>= 1) { + indx = base + (lim >> 1); + if ((*cmpp = __bam_cmp(dbp, + dbt, h, indx, dbp->dup_compare)) == 0) { + *indxp = indx; + + if (dbp->type != DB_BTREE || + !B_DISSET(GET_BKEYDATA(h, *indxp)->type)) + return (0); + goto check_delete; + } + if (*cmpp > 0) { + base = indx + 1; + lim--; + } + } + + /* + * Base references the smallest index larger than the supplied DBT's + * data item, potentially both 0 and NUM_ENT. + */ + *indxp = base; + return (0); + +check_delete: + /* + * The duplicate may have already been deleted, if it's a btree page, + * in which case we wander around, hoping to find an entry that hasn't + * been deleted. First, wander in a forwardly direction. + */ + save_pgno = (*pp)->pgno; + save_indx = *indxp; + for (++*indxp;;) { + for (; *indxp < NUM_ENT(h); ++*indxp) { + if ((*cmpp = __bam_cmp(dbp, + dbt, h, *indxp, dbp->dup_compare)) != 0) + goto check_delete_rev; + + if (!B_DISSET(GET_BKEYDATA(h, *indxp)->type)) + return (0); + } + if ((pgno = h->next_pgno) == PGNO_INVALID) + break; + + if ((ret = memp_fput(dbp->mpf, h, 0)) != 0) + return (ret); + + if ((ret = memp_fget(dbp->mpf, &pgno, 0, pp)) != 0) + goto pg_err; + h = *pp; + + *indxp = 0; + } + +check_delete_rev: + /* Go back to where we started, and wander in a backwardly direction. */ + if (h->pgno != save_pgno) { + if ((ret = memp_fput(dbp->mpf, h, 0)) != 0) + return (ret); + if ((ret = memp_fget(dbp->mpf, &save_pgno, 0, pp)) != 0) + goto pg_err; + h = *pp; + } + + for (;;) { + while (*indxp > 0) { + --*indxp; + if ((*cmpp = __bam_cmp(dbp, + dbt, h, *indxp, dbp->dup_compare)) != 0) + goto check_delete_fail; + + if (!B_DISSET(GET_BKEYDATA(h, *indxp)->type)) + return (0); + } + if ((pgno = h->prev_pgno) == PGNO_INVALID) + break; + + if ((ret = memp_fput(dbp->mpf, h, 0)) != 0) + return (ret); + + if ((ret = memp_fget(dbp->mpf, &pgno, 0, pp)) != 0) + goto pg_err; + h = *pp; + + *indxp = NUM_ENT(h); + } + +check_delete_fail: + *cmpp = 1; /* We didn't succeed... */ + return (0); + +pg_err: __db_pgerr(dbp, pgno); + return (ret); +} |