diff options
Diffstat (limited to 'db2/btree/bt_split.c')
-rw-r--r-- | db2/btree/bt_split.c | 212 |
1 files changed, 114 insertions, 98 deletions
diff --git a/db2/btree/bt_split.c b/db2/btree/bt_split.c index da9417c781..1d8e926d85 100644 --- a/db2/btree/bt_split.c +++ b/db2/btree/bt_split.c @@ -44,7 +44,7 @@ #include "config.h" #ifndef lint -static const char sccsid[] = "@(#)bt_split.c 10.23 (Sleepycat) 5/23/98"; +static const char sccsid[] = "@(#)bt_split.c 10.33 (Sleepycat) 10/13/98"; #endif /* not lint */ #ifndef NO_SYSTEM_INCLUDES @@ -59,27 +59,31 @@ static const char sccsid[] = "@(#)bt_split.c 10.23 (Sleepycat) 5/23/98"; #include "db_page.h" #include "btree.h" -static int __bam_page __P((DB *, EPG *, EPG *)); -static int __bam_pinsert __P((DB *, EPG *, PAGE *, PAGE *)); -static int __bam_psplit __P((DB *, EPG *, PAGE *, PAGE *, int)); -static int __bam_root __P((DB *, EPG *)); +static int __bam_broot __P((DBC *, PAGE *, PAGE *, PAGE *)); +static int __bam_page __P((DBC *, EPG *, EPG *)); +static int __bam_pinsert __P((DBC *, EPG *, PAGE *, PAGE *)); +static int __bam_psplit __P((DBC *, EPG *, PAGE *, PAGE *, db_indx_t *)); +static int __bam_root __P((DBC *, EPG *)); +static int __ram_root __P((DBC *, PAGE *, PAGE *, PAGE *)); /* * __bam_split -- * Split a page. * - * PUBLIC: int __bam_split __P((DB *, void *)); + * PUBLIC: int __bam_split __P((DBC *, void *)); */ int -__bam_split(dbp, arg) - DB *dbp; +__bam_split(dbc, arg) + DBC *dbc; void *arg; { - BTREE *t; + CURSOR *cp; + DB *dbp; enum { UP, DOWN } dir; int exact, level, ret; - t = dbp->internal; + dbp = dbc->dbp; + cp = dbc->internal; /* * The locking protocol we use to avoid deadlock to acquire locks by @@ -113,15 +117,16 @@ __bam_split(dbp, arg) * Acquire a page and its parent, locked. */ if ((ret = (dbp->type == DB_BTREE ? - __bam_search(dbp, arg, S_WRPAIR, level, NULL, &exact) : - __bam_rsearch(dbp, + __bam_search(dbc, arg, S_WRPAIR, level, NULL, &exact) : + __bam_rsearch(dbc, (db_recno_t *)arg, S_WRPAIR, level, &exact))) != 0) return (ret); /* Split the page. */ - ret = t->bt_csp[0].page->pgno == PGNO_ROOT ? - __bam_root(dbp, &t->bt_csp[0]) : - __bam_page(dbp, &t->bt_csp[-1], &t->bt_csp[0]); + ret = cp->csp[0].page->pgno == PGNO_ROOT ? + __bam_root(dbc, &cp->csp[0]) : + __bam_page(dbc, &cp->csp[-1], &cp->csp[0]); + BT_STK_CLR(cp); switch (ret) { case 0: @@ -155,15 +160,16 @@ __bam_split(dbp, arg) * Split the root page of a btree. */ static int -__bam_root(dbp, cp) - DB *dbp; +__bam_root(dbc, cp) + DBC *dbc; EPG *cp; { - BTREE *t; + DB *dbp; PAGE *lp, *rp; + db_indx_t split; int ret; - t = dbp->internal; + dbp = dbc->dbp; /* Yeah, right. */ if (cp->page->level >= MAXBTREELEVEL) { @@ -173,8 +179,8 @@ __bam_root(dbp, cp) /* Create new left and right pages for the split. */ lp = rp = NULL; - if ((ret = __bam_new(dbp, TYPE(cp->page), &lp)) != 0 || - (ret = __bam_new(dbp, TYPE(cp->page), &rp)) != 0) + if ((ret = __bam_new(dbc, TYPE(cp->page), &lp)) != 0 || + (ret = __bam_new(dbc, TYPE(cp->page), &rp)) != 0) goto err; P_INIT(lp, dbp->pgsize, lp->pgno, PGNO_INVALID, ISINTERNAL(cp->page) ? PGNO_INVALID : rp->pgno, @@ -184,18 +190,18 @@ __bam_root(dbp, cp) cp->page->level, TYPE(cp->page)); /* Split the page. */ - if ((ret = __bam_psplit(dbp, cp, lp, rp, 1)) != 0) + if ((ret = __bam_psplit(dbc, cp, lp, rp, &split)) != 0) goto err; /* Log the change. */ - if (DB_LOGGING(dbp)) { + if (DB_LOGGING(dbc)) { DBT __a; DB_LSN __lsn; memset(&__a, 0, sizeof(__a)); __a.data = cp->page; __a.size = dbp->pgsize; ZERO_LSN(__lsn); - if ((ret = __bam_split_log(dbp->dbenv->lg_info, dbp->txn, + if ((ret = __bam_split_log(dbp->dbenv->lg_info, dbc->txn, &LSN(cp->page), 0, dbp->log_fileid, PGNO(lp), &LSN(lp), PGNO(rp), &LSN(rp), (u_int32_t)NUM_ENT(lp), 0, &__lsn, &__a)) != 0) @@ -205,26 +211,27 @@ __bam_root(dbp, cp) /* Clean up the new root page. */ if ((ret = (dbp->type == DB_RECNO ? - __ram_root(dbp, cp->page, lp, rp) : - __bam_broot(dbp, cp->page, lp, rp))) != 0) + __ram_root(dbc, cp->page, lp, rp) : + __bam_broot(dbc, cp->page, lp, rp))) != 0) goto err; + /* Adjust any cursors. Do it last so we don't have to undo it. */ + __bam_ca_split(dbp, cp->page->pgno, lp->pgno, rp->pgno, split, 1); + /* Success -- write the real pages back to the store. */ (void)memp_fput(dbp->mpf, cp->page, DB_MPOOL_DIRTY); - (void)__BT_TLPUT(dbp, cp->lock); + (void)__BT_TLPUT(dbc, cp->lock); (void)memp_fput(dbp->mpf, lp, DB_MPOOL_DIRTY); (void)memp_fput(dbp->mpf, rp, DB_MPOOL_DIRTY); - ++t->lstat.bt_split; - ++t->lstat.bt_rootsplit; return (0); err: if (lp != NULL) - (void)__bam_free(dbp, lp); + (void)__bam_free(dbc, lp); if (rp != NULL) - (void)__bam_free(dbp, rp); + (void)__bam_free(dbc, rp); (void)memp_fput(dbp->mpf, cp->page, 0); - (void)__BT_TLPUT(dbp, cp->lock); + (void)__BT_TLPUT(dbc, cp->lock); return (ret); } @@ -233,19 +240,22 @@ err: if (lp != NULL) * Split the non-root page of a btree. */ static int -__bam_page(dbp, pp, cp) - DB *dbp; +__bam_page(dbc, pp, cp) + DBC *dbc; EPG *pp, *cp; { + DB *dbp; DB_LOCK tplock; PAGE *lp, *rp, *tp; + db_indx_t split; int ret; + dbp = dbc->dbp; lp = rp = tp = NULL; ret = -1; /* Create new right page for the split. */ - if ((ret = __bam_new(dbp, TYPE(cp->page), &rp)) != 0) + if ((ret = __bam_new(dbc, TYPE(cp->page), &rp)) != 0) goto err; P_INIT(rp, dbp->pgsize, rp->pgno, ISINTERNAL(cp->page) ? PGNO_INVALID : cp->page->pgno, @@ -253,13 +263,8 @@ __bam_page(dbp, pp, cp) cp->page->level, TYPE(cp->page)); /* Create new left page for the split. */ - if ((lp = (PAGE *)__db_malloc(dbp->pgsize)) == NULL) { - ret = ENOMEM; + if ((ret = __os_malloc(dbp->pgsize, NULL, &lp)) != 0) goto err; - } -#ifdef DIAGNOSTIC - memset(lp, 0xff, dbp->pgsize); -#endif P_INIT(lp, dbp->pgsize, cp->page->pgno, ISINTERNAL(cp->page) ? PGNO_INVALID : cp->page->prev_pgno, ISINTERNAL(cp->page) ? PGNO_INVALID : rp->pgno, @@ -276,7 +281,7 @@ __bam_page(dbp, pp, cp) * change, we swap the original and the allocated left page after the * split. */ - if ((ret = __bam_psplit(dbp, cp, lp, rp, 0)) != 0) + if ((ret = __bam_psplit(dbc, cp, lp, rp, &split)) != 0) goto err; /* @@ -293,19 +298,19 @@ __bam_page(dbp, pp, cp) * the page we're splitting. */ if (TYPE(cp->page) == P_LBTREE && rp->next_pgno != PGNO_INVALID) { - if ((ret = __bam_lget(dbp, + if ((ret = __bam_lget(dbc, 0, rp->next_pgno, DB_LOCK_WRITE, &tplock)) != 0) goto err; - if ((ret = __bam_pget(dbp, &tp, &rp->next_pgno, 0)) != 0) + if ((ret = memp_fget(dbp->mpf, &rp->next_pgno, 0, &tp)) != 0) goto err; } /* Insert the new pages into the parent page. */ - if ((ret = __bam_pinsert(dbp, pp, lp, rp)) != 0) + if ((ret = __bam_pinsert(dbc, pp, lp, rp)) != 0) goto err; /* Log the change. */ - if (DB_LOGGING(dbp)) { + if (DB_LOGGING(dbc)) { DBT __a; DB_LSN __lsn; memset(&__a, 0, sizeof(__a)); @@ -313,7 +318,7 @@ __bam_page(dbp, pp, cp) __a.size = dbp->pgsize; if (tp == NULL) ZERO_LSN(__lsn); - if ((ret = __bam_split_log(dbp->dbenv->lg_info, dbp->txn, + if ((ret = __bam_split_log(dbp->dbenv->lg_info, dbc->txn, &cp->page->lsn, 0, dbp->log_fileid, PGNO(cp->page), &LSN(cp->page), PGNO(rp), &LSN(rp), (u_int32_t)NUM_ENT(lp), tp == NULL ? 0 : PGNO(tp), @@ -329,56 +334,69 @@ __bam_page(dbp, pp, cp) memcpy(cp->page, lp, LOFFSET(lp)); memcpy((u_int8_t *)cp->page + HOFFSET(lp), (u_int8_t *)lp + HOFFSET(lp), dbp->pgsize - HOFFSET(lp)); - FREE(lp, dbp->pgsize); + __os_free(lp, dbp->pgsize); lp = NULL; /* Finish the next-page link. */ if (tp != NULL) tp->prev_pgno = rp->pgno; + /* Adjust any cursors. Do so last so we don't have to undo it. */ + __bam_ca_split(dbp, cp->page->pgno, cp->page->pgno, rp->pgno, split, 0); + /* Success -- write the real pages back to the store. */ (void)memp_fput(dbp->mpf, pp->page, DB_MPOOL_DIRTY); - (void)__BT_TLPUT(dbp, pp->lock); + (void)__BT_TLPUT(dbc, pp->lock); (void)memp_fput(dbp->mpf, cp->page, DB_MPOOL_DIRTY); - (void)__BT_TLPUT(dbp, cp->lock); + (void)__BT_TLPUT(dbc, cp->lock); (void)memp_fput(dbp->mpf, rp, DB_MPOOL_DIRTY); if (tp != NULL) { (void)memp_fput(dbp->mpf, tp, DB_MPOOL_DIRTY); - (void)__BT_TLPUT(dbp, tplock); + (void)__BT_TLPUT(dbc, tplock); } return (0); err: if (lp != NULL) - FREE(lp, dbp->pgsize); + __os_free(lp, dbp->pgsize); if (rp != NULL) - (void)__bam_free(dbp, rp); + (void)__bam_free(dbc, rp); if (tp != NULL) { (void)memp_fput(dbp->mpf, tp, 0); - (void)__BT_TLPUT(dbp, tplock); + if (ret == DB_NEEDSPLIT) + (void)__BT_LPUT(dbc, tplock); + else + (void)__BT_TLPUT(dbc, tplock); } (void)memp_fput(dbp->mpf, pp->page, 0); - (void)__BT_TLPUT(dbp, pp->lock); + if (ret == DB_NEEDSPLIT) + (void)__BT_LPUT(dbc, pp->lock); + else + (void)__BT_TLPUT(dbc, pp->lock); (void)memp_fput(dbp->mpf, cp->page, 0); - (void)__BT_TLPUT(dbp, cp->lock); + if (ret == DB_NEEDSPLIT) + (void)__BT_LPUT(dbc, cp->lock); + else + (void)__BT_TLPUT(dbc, cp->lock); return (ret); } /* * __bam_broot -- * Fix up the btree root page after it has been split. - * - * PUBLIC: int __bam_broot __P((DB *, PAGE *, PAGE *, PAGE *)); */ -int -__bam_broot(dbp, rootp, lp, rp) - DB *dbp; +static int +__bam_broot(dbc, rootp, lp, rp) + DBC *dbc; PAGE *rootp, *lp, *rp; { BINTERNAL bi, *child_bi; BKEYDATA *child_bk; + DB *dbp; DBT hdr, data; int ret; + dbp = dbc->dbp; + /* * If the root page was a leaf page, change it into an internal page. * We copy the key we split on (but not the key's data, in the case of @@ -405,7 +423,7 @@ __bam_broot(dbp, rootp, lp, rp) hdr.data = &bi; hdr.size = SSZA(BINTERNAL, data); if ((ret = - __db_pitem(dbp, rootp, 0, BINTERNAL_SIZE(0), &hdr, NULL)) != 0) + __db_pitem(dbc, rootp, 0, BINTERNAL_SIZE(0), &hdr, NULL)) != 0) return (ret); switch (TYPE(rp)) { @@ -424,13 +442,13 @@ __bam_broot(dbp, rootp, lp, rp) hdr.size = SSZA(BINTERNAL, data); data.data = child_bi->data; data.size = child_bi->len; - if ((ret = __db_pitem(dbp, rootp, 1, + if ((ret = __db_pitem(dbc, rootp, 1, BINTERNAL_SIZE(child_bi->len), &hdr, &data)) != 0) return (ret); /* Increment the overflow ref count. */ if (B_TYPE(child_bi->type) == B_OVERFLOW) - if ((ret = __db_ovref(dbp, + if ((ret = __db_ovref(dbc, ((BOVERFLOW *)(child_bi->data))->pgno, 1)) != 0) return (ret); break; @@ -450,7 +468,7 @@ __bam_broot(dbp, rootp, lp, rp) hdr.size = SSZA(BINTERNAL, data); data.data = child_bk->data; data.size = child_bk->len; - if ((ret = __db_pitem(dbp, rootp, 1, + if ((ret = __db_pitem(dbc, rootp, 1, BINTERNAL_SIZE(child_bk->len), &hdr, &data)) != 0) return (ret); break; @@ -467,13 +485,13 @@ __bam_broot(dbp, rootp, lp, rp) hdr.size = SSZA(BINTERNAL, data); data.data = child_bk; data.size = BOVERFLOW_SIZE; - if ((ret = __db_pitem(dbp, rootp, 1, + if ((ret = __db_pitem(dbc, rootp, 1, BINTERNAL_SIZE(BOVERFLOW_SIZE), &hdr, &data)) != 0) return (ret); /* Increment the overflow ref count. */ if (B_TYPE(child_bk->type) == B_OVERFLOW) - if ((ret = __db_ovref(dbp, + if ((ret = __db_ovref(dbc, ((BOVERFLOW *)child_bk)->pgno, 1)) != 0) return (ret); break; @@ -490,18 +508,19 @@ __bam_broot(dbp, rootp, lp, rp) /* * __ram_root -- * Fix up the recno root page after it has been split. - * - * PUBLIC: int __ram_root __P((DB *, PAGE *, PAGE *, PAGE *)); */ -int -__ram_root(dbp, rootp, lp, rp) - DB *dbp; +static int +__ram_root(dbc, rootp, lp, rp) + DBC *dbc; PAGE *rootp, *lp, *rp; { + DB *dbp; DBT hdr; RINTERNAL ri; int ret; + dbp = dbc->dbp; + /* Initialize the page. */ P_INIT(rootp, dbp->pgsize, PGNO_ROOT, PGNO_INVALID, PGNO_INVALID, lp->level + 1, P_IRECNO); @@ -514,12 +533,12 @@ __ram_root(dbp, rootp, lp, rp) /* Insert the left and right keys, set the header information. */ ri.pgno = lp->pgno; ri.nrecs = __bam_total(lp); - if ((ret = __db_pitem(dbp, rootp, 0, RINTERNAL_SIZE, &hdr, NULL)) != 0) + if ((ret = __db_pitem(dbc, rootp, 0, RINTERNAL_SIZE, &hdr, NULL)) != 0) return (ret); RE_NREC_SET(rootp, ri.nrecs); ri.pgno = rp->pgno; ri.nrecs = __bam_total(rp); - if ((ret = __db_pitem(dbp, rootp, 1, RINTERNAL_SIZE, &hdr, NULL)) != 0) + if ((ret = __db_pitem(dbc, rootp, 1, RINTERNAL_SIZE, &hdr, NULL)) != 0) return (ret); RE_NREC_ADJ(rootp, ri.nrecs); return (0); @@ -530,14 +549,15 @@ __ram_root(dbp, rootp, lp, rp) * Insert a new key into a parent page, completing the split. */ static int -__bam_pinsert(dbp, parent, lchild, rchild) - DB *dbp; +__bam_pinsert(dbc, parent, lchild, rchild) + DBC *dbc; EPG *parent; PAGE *lchild, *rchild; { BINTERNAL bi, *child_bi; BKEYDATA *child_bk, *tmp_bk; BTREE *t; + DB *dbp; DBT a, b, hdr, data; PAGE *ppage; RINTERNAL ri; @@ -546,6 +566,7 @@ __bam_pinsert(dbp, parent, lchild, rchild) u_int32_t n, nbytes, nksize; int ret; + dbp = dbc->dbp; t = dbp->internal; ppage = parent->page; @@ -600,13 +621,13 @@ __bam_pinsert(dbp, parent, lchild, rchild) memset(&data, 0, sizeof(data)); data.data = child_bi->data; data.size = child_bi->len; - if ((ret = __db_pitem(dbp, ppage, off, + if ((ret = __db_pitem(dbc, ppage, off, BINTERNAL_SIZE(child_bi->len), &hdr, &data)) != 0) return (ret); /* Increment the overflow ref count. */ if (B_TYPE(child_bi->type) == B_OVERFLOW) - if ((ret = __db_ovref(dbp, + if ((ret = __db_ovref(dbc, ((BOVERFLOW *)(child_bi->data))->pgno, 1)) != 0) return (ret); break; @@ -630,10 +651,9 @@ __bam_pinsert(dbp, parent, lchild, rchild) b.size = child_bk->len; b.data = child_bk->data; nksize = t->bt_prefix(&a, &b); - if ((n = BINTERNAL_PSIZE(nksize)) < nbytes) { - t->lstat.bt_pfxsaved += nbytes - n; + if ((n = BINTERNAL_PSIZE(nksize)) < nbytes) nbytes = n; - } else + else noprefix: nksize = child_bk->len; if (P_FREESPACE(ppage) < nbytes) @@ -650,7 +670,7 @@ noprefix: nksize = child_bk->len; memset(&data, 0, sizeof(data)); data.data = child_bk->data; data.size = nksize; - if ((ret = __db_pitem(dbp, ppage, off, + if ((ret = __db_pitem(dbc, ppage, off, BINTERNAL_SIZE(nksize), &hdr, &data)) != 0) return (ret); break; @@ -672,13 +692,13 @@ noprefix: nksize = child_bk->len; memset(&data, 0, sizeof(data)); data.data = child_bk; data.size = BOVERFLOW_SIZE; - if ((ret = __db_pitem(dbp, ppage, off, + if ((ret = __db_pitem(dbc, ppage, off, BINTERNAL_SIZE(BOVERFLOW_SIZE), &hdr, &data)) != 0) return (ret); /* Increment the overflow ref count. */ if (B_TYPE(child_bk->type) == B_OVERFLOW) - if ((ret = __db_ovref(dbp, + if ((ret = __db_ovref(dbc, ((BOVERFLOW *)child_bk)->pgno, 1)) != 0) return (ret); break; @@ -699,7 +719,7 @@ noprefix: nksize = child_bk->len; hdr.size = RINTERNAL_SIZE; ri.pgno = rchild->pgno; ri.nrecs = nrecs; - if ((ret = __db_pitem(dbp, + if ((ret = __db_pitem(dbc, ppage, off, RINTERNAL_SIZE, &hdr, NULL)) != 0) return (ret); break; @@ -710,9 +730,9 @@ noprefix: nksize = child_bk->len; /* Adjust the parent page's left page record count. */ if (dbp->type == DB_RECNO || F_ISSET(dbp, DB_BT_RECNUM)) { /* Log the change. */ - if (DB_LOGGING(dbp) && + if (DB_LOGGING(dbc) && (ret = __bam_cadjust_log(dbp->dbenv->lg_info, - dbp->txn, &LSN(ppage), 0, dbp->log_fileid, + dbc->txn, &LSN(ppage), 0, dbp->log_fileid, PGNO(ppage), &LSN(ppage), (u_int32_t)parent->indx, -(int32_t)nrecs, (int32_t)0)) != 0) return (ret); @@ -732,18 +752,18 @@ noprefix: nksize = child_bk->len; * Do the real work of splitting the page. */ static int -__bam_psplit(dbp, cp, lp, rp, cleft) - DB *dbp; +__bam_psplit(dbc, cp, lp, rp, splitret) + DBC *dbc; EPG *cp; PAGE *lp, *rp; - int cleft; + db_indx_t *splitret; { - BTREE *t; + DB *dbp; PAGE *pp; db_indx_t half, nbytes, off, splitp, top; int adjust, cnt, isbigkey, ret; - t = dbp->internal; + dbp = dbc->dbp; pp = cp->page; adjust = TYPE(pp) == P_LBTREE ? P_INDX : O_INDX; @@ -762,11 +782,8 @@ __bam_psplit(dbp, cp, lp, rp, cleft) else if (PREV_PGNO(pp) == PGNO_INVALID && cp->indx == 0) off = adjust; - ++t->lstat.bt_split; - if (off != 0) { - ++t->lstat.bt_fastsplit; + if (off != 0) goto sort; - } /* * Split the data to the left and right pages. Try not to split on @@ -887,8 +904,7 @@ sort: splitp = off; if ((ret = __bam_copy(dbp, pp, rp, splitp, NUM_ENT(pp))) != 0) return (ret); - /* Adjust the cursors. */ - __bam_ca_split(dbp, pp->pgno, lp->pgno, rp->pgno, splitp, cleft); + *splitret = splitp; return (0); } |