about summary refs log tree commit diff
path: root/db2/btree/bt_cursor.c
diff options
context:
space:
mode:
Diffstat (limited to 'db2/btree/bt_cursor.c')
-rw-r--r--db2/btree/bt_cursor.c381
1 files changed, 250 insertions, 131 deletions
diff --git a/db2/btree/bt_cursor.c b/db2/btree/bt_cursor.c
index f526c965e5..cfa388741e 100644
--- a/db2/btree/bt_cursor.c
+++ b/db2/btree/bt_cursor.c
@@ -1,22 +1,20 @@
 /*-
  * See the file LICENSE for redistribution information.
  *
- * Copyright (c) 1996, 1997
+ * Copyright (c) 1996, 1997, 1998
  *	Sleepycat Software.  All rights reserved.
  */
 
 #include "config.h"
 
 #ifndef lint
-static const char sccsid[] = "@(#)bt_cursor.c	10.41 (Sleepycat) 1/8/98";
+static const char sccsid[] = "@(#)bt_cursor.c	10.53 (Sleepycat) 5/25/98";
 #endif /* not lint */
 
 #ifndef NO_SYSTEM_INCLUDES
 #include <sys/types.h>
 
 #include <errno.h>
-#include <stdio.h>
-#include <stdlib.h>
 #include <string.h>
 #endif
 
@@ -25,24 +23,30 @@ static const char sccsid[] = "@(#)bt_cursor.c	10.41 (Sleepycat) 1/8/98";
 #include "btree.h"
 
 static int __bam_c_close __P((DBC *));
-static int __bam_c_del __P((DBC *, int));
+static int __bam_c_del __P((DBC *, u_int32_t));
 static int __bam_c_first __P((DB *, CURSOR *));
-static int __bam_c_get __P((DBC *, DBT *, DBT *, int));
+static int __bam_c_get __P((DBC *, DBT *, DBT *, u_int32_t));
+static int __bam_c_getstack __P((DB *, CURSOR *));
 static int __bam_c_last __P((DB *, CURSOR *));
 static int __bam_c_next __P((DB *, CURSOR *, int));
 static int __bam_c_physdel __P((DB *, CURSOR *, PAGE *));
 static int __bam_c_prev __P((DB *, CURSOR *));
-static int __bam_c_put __P((DBC *, DBT *, DBT *, int));
-static int __bam_c_rget __P((DB *, CURSOR *, DBT *, int));
-static int __bam_c_search __P((DB *, CURSOR *, const DBT *, u_int, int, int *));
+static int __bam_c_put __P((DBC *, DBT *, DBT *, u_int32_t));
+static int __bam_c_rget __P((DB *, CURSOR *, DBT *, u_int32_t));
+static int __bam_c_search
+	       __P((DB *, CURSOR *, const DBT *, u_int32_t, int, int *));
 
 /* Discard the current page/lock held by a cursor. */
 #undef	DISCARD
 #define	DISCARD(dbp, cp) {						\
-	(void)memp_fput(dbp->mpf, (cp)->page, 0);			\
-	(cp)->page = NULL;						\
-	(void)__BT_TLPUT((dbp), (cp)->lock);				\
-	(cp)->lock = LOCK_INVALID;					\
+	if ((cp)->page != NULL) {					\
+		(void)memp_fput(dbp->mpf, (cp)->page, 0);		\
+		(cp)->page = NULL;					\
+	}								\
+	if ((cp)->lock != LOCK_INVALID) {				\
+		(void)__BT_TLPUT((dbp), (cp)->lock);			\
+		(cp)->lock = LOCK_INVALID;				\
+	}								\
 }
 
 /*
@@ -85,9 +89,9 @@ __bam_cursor(dbp, txn, dbcp)
 	 * All cursors are queued from the master DB structure.  Add the
 	 * cursor to that queue.
 	 */
-	DB_THREAD_LOCK(dbp);
+	CURSOR_SETUP(dbp);
 	TAILQ_INSERT_HEAD(&dbp->curs_queue, dbc, links);
-	DB_THREAD_UNLOCK(dbp);
+	CURSOR_TEARDOWN(dbp);
 
 	*dbcp = dbc;
 	return (0);
@@ -128,13 +132,6 @@ __bam_c_iclose(dbp, dbc)
 	CURSOR *cp;
 	int ret;
 
-	/*
-	 * All cursors are queued from the master DB structure.  For
-	 * now, discard the DB handle which triggered this call, and
-	 * replace it with the cursor's reference.
-	 */
-	dbp = dbc->dbp;
-
 	/* If a cursor key was deleted, perform the actual deletion.  */
 	cp = dbc->internal;
 	ret = F_ISSET(cp, C_DELETED) ? __bam_c_physdel(dbp, cp, NULL) : 0;
@@ -144,9 +141,9 @@ __bam_c_iclose(dbp, dbc)
 		(void)__BT_TLPUT(dbp, cp->lock);
 
 	/* Remove the cursor from the queue. */
-	DB_THREAD_LOCK(dbp);
+	CURSOR_SETUP(dbp);
 	TAILQ_REMOVE(&dbp->curs_queue, dbc, links);
-	DB_THREAD_UNLOCK(dbp);
+	CURSOR_TEARDOWN(dbp);
 
 	/* Discard the structures. */
 	FREE(dbc->internal, sizeof(CURSOR));
@@ -162,8 +159,9 @@ __bam_c_iclose(dbp, dbc)
 static int
 __bam_c_del(dbc, flags)
 	DBC *dbc;
-	int flags;
+	u_int32_t flags;
 {
+	BTREE *t;
 	CURSOR *cp;
 	DB *dbp;
 	DB_LOCK lock;
@@ -175,6 +173,7 @@ __bam_c_del(dbc, flags)
 	DEBUG_LWRITE(dbc->dbp, dbc->txn, "bam_c_del", NULL, NULL, flags);
 
 	cp = dbc->internal;
+	h = NULL;
 
 	/* Check for invalid flags. */
 	if ((ret = __db_cdelchk(dbc->dbp, flags,
@@ -186,6 +185,7 @@ __bam_c_del(dbc, flags)
 		return (DB_KEYEMPTY);
 
 	GETHANDLE(dbc->dbp, dbc->txn, &dbp, ret);
+	t = dbp->internal;
 
 	/*
 	 * We don't physically delete the record until the cursor moves,
@@ -235,8 +235,21 @@ __bam_c_del(dbc, flags)
 	(void)__bam_ca_delete(dbp, pgno, indx, NULL, 0);
 
 	ret = memp_fput(dbp->mpf, h, DB_MPOOL_DIRTY);
+	h = NULL;
+
+	/*
+	 * If it's a btree with record numbers, we have to adjust the
+	 * counts.
+	 */
+	if (F_ISSET(dbp, DB_BT_RECNUM) &&
+	    (ret = __bam_c_getstack(dbp, cp)) == 0) {
+		ret = __bam_adjust(dbp, t, -1);
+		(void)__bam_stkrel(dbp);
+	}
 
-err:	PUTHANDLE(dbp);
+err:	if (h != NULL)
+		(void)memp_fput(dbp->mpf, h, 0);
+	PUTHANDLE(dbp);
 	return (ret);
 }
 
@@ -244,14 +257,14 @@ err:	PUTHANDLE(dbp);
  * __bam_get --
  *	Retrieve a key/data pair from the tree.
  *
- * PUBLIC: int __bam_get __P((DB *, DB_TXN *, DBT *, DBT *, int));
+ * PUBLIC: int __bam_get __P((DB *, DB_TXN *, DBT *, DBT *, u_int32_t));
  */
 int
 __bam_get(argdbp, txn, key, data, flags)
 	DB *argdbp;
 	DB_TXN *txn;
 	DBT *key, *data;
-	int flags;
+	u_int32_t flags;
 {
 	DBC dbc;
 	CURSOR cp;
@@ -289,7 +302,7 @@ static int
 __bam_c_get(dbc, key, data, flags)
 	DBC *dbc;
 	DBT *key, *data;
-	int flags;
+	u_int32_t flags;
 {
 	BTREE *t;
 	CURSOR *cp, copy;
@@ -448,7 +461,7 @@ __bam_c_rget(dbp, cp, data, flags)
 	DB *dbp;
 	CURSOR *cp;
 	DBT *data;
-	int flags;
+	u_int32_t flags;
 {
 	BTREE *t;
 	DBT dbt;
@@ -491,7 +504,7 @@ static int
 __bam_c_put(dbc, key, data, flags)
 	DBC *dbc;
 	DBT *key, *data;
-	int flags;
+	u_int32_t flags;
 {
 	BTREE *t;
 	CURSOR *cp, copy;
@@ -499,7 +512,8 @@ __bam_c_put(dbc, key, data, flags)
 	DBT dbt;
 	db_indx_t indx;
 	db_pgno_t pgno;
-	int exact, needkey, ret;
+	u_int32_t iiflags;
+	int exact, needkey, ret, stack;
 	void *arg;
 
 	DEBUG_LWRITE(dbc->dbp, dbc->txn, "bam_c_put",
@@ -524,29 +538,34 @@ __bam_c_put(dbc, key, data, flags)
 	 * To split, we need a valid key for the page.  Since it's a cursor,
 	 * we have to build one.
 	 */
+	stack = 0;
 	if (0) {
-split:		if (needkey) {
+split:		/* Acquire a copy of a key from the page. */
+		if (needkey) {
 			memset(&dbt, 0, sizeof(DBT));
-			ret = __db_ret(dbp, cp->page, indx,
-			    &dbt, &t->bt_rkey.data, &t->bt_rkey.ulen);
-
-			DISCARD(dbp, cp);
-
-			if (ret)
+			if ((ret = __db_ret(dbp, cp->page, indx,
+			    &dbt, &t->bt_rkey.data, &t->bt_rkey.ulen)) != 0)
 				goto err;
 			arg = &dbt;
-		} else {
-			(void)__bam_stkrel(dbp);
+		} else
 			arg = key;
-		}
+
+		/* Discard any pinned pages. */
+		if (stack) {
+			(void)__bam_stkrel(dbp);
+			stack = 0;
+		} else
+			DISCARD(dbp, cp);
+
 		if ((ret = __bam_split(dbp, arg)) != 0)
 			goto err;
 	}
 
-	/* If there's no key supplied, use the cursor. */
-	if (flags == DB_KEYFIRST || flags == DB_KEYLAST)
-		needkey = 0;
-	else {
+	ret = 0;
+	switch (flags) {
+	case DB_AFTER:
+	case DB_BEFORE:
+	case DB_CURRENT:
 		needkey = 1;
 		if (cp->dpgno == PGNO_INVALID) {
 			pgno = cp->pgno;
@@ -555,41 +574,53 @@ split:		if (needkey) {
 			pgno = cp->dpgno;
 			indx = cp->dindx;
 		}
-		/* Acquire the current page. */
-		if ((ret = __bam_lget(dbp,
-		    0, cp->pgno, DB_LOCK_WRITE, &cp->lock)) != 0)
-			goto err;
-		if ((ret = __bam_pget(dbp, &cp->page, &pgno, 0)) != 0)
-			goto err;
-	}
+		/*
+		 * XXX
+		 * This test is right -- we don't currently support duplicates
+		 * in the presence of record numbers, so we don't worry about
+		 * them if DB_BT_RECNUM is set.
+		 */
+		if (F_ISSET(dbp, DB_BT_RECNUM) &&
+		    (flags != DB_CURRENT || F_ISSET(cp, C_DELETED))) {
+			/* Acquire a complete stack. */
+			if ((ret = __bam_c_getstack(dbp, cp)) != 0)
+				goto err;
+			cp->page = t->bt_csp->page;
 
-	ret = 0;
-	switch (flags) {
-	case DB_AFTER:
-	case DB_BEFORE:
-	case DB_CURRENT:
+			stack = 1;
+			iiflags = BI_DOINCR;
+		} else {
+			/* Acquire the current page. */
+			if ((ret = __bam_lget(dbp,
+			    0, cp->pgno, DB_LOCK_WRITE, &cp->lock)) == 0)
+				ret = __bam_pget(dbp, &cp->page, &pgno, 0);
+			if (ret != 0)
+				goto err;
+
+			iiflags = 0;
+		}
 		if ((ret = __bam_iitem(dbp, &cp->page,
-		    &indx, key, data, flags, 0)) == DB_NEEDSPLIT)
+		    &indx, key, data, flags, iiflags)) == DB_NEEDSPLIT)
 			goto split;
 		break;
 	case DB_KEYFIRST:
-		exact = 0;
+		exact = needkey = 0;
 		if ((ret =
 		    __bam_c_search(dbp, cp, key, S_KEYFIRST, 0, &exact)) != 0)
 			goto err;
+		stack = 1;
 
 		indx = cp->dpgno == PGNO_INVALID ? cp->indx : cp->dindx;
 		if ((ret = __bam_iitem(dbp, &cp->page, &indx, key,
 		    data, DB_BEFORE, exact ? 0 : BI_NEWKEY)) == DB_NEEDSPLIT)
 			goto split;
-		if (ret)
-			goto err;
 		break;
 	case DB_KEYLAST:
-		exact = 0;
+		exact = needkey = 0;
 		if ((ret =
 		    __bam_c_search(dbp, cp, key, S_KEYLAST, 0, &exact)) != 0)
 			goto err;
+		stack = 1;
 
 		indx = cp->dpgno == PGNO_INVALID ? cp->indx : cp->dindx;
 		if ((ret = __bam_iitem(dbp, &cp->page, &indx, key,
@@ -623,13 +654,27 @@ split:		if (needkey) {
 	if (copy.lock != LOCK_INVALID)
 		(void)__BT_TLPUT(dbp, copy.lock);
 
-	/* Discard the pinned page. */
-	ret = memp_fput(dbp->mpf, cp->page, 0);
+	/*
+	 * Discard any pages pinned in the tree and their locks, except for
+	 * the leaf page, for which we only discard the pin, not the lock.
+	 *
+	 * Note, the leaf page participated in the stack we acquired, and so
+	 * we have to adjust the stack as necessary.  If there was only a
+	 * single page on the stack, we don't have to free further stack pages.
+	 */
+
+	if (stack && BT_STK_POP(t) != NULL)
+		(void)__bam_stkrel(dbp);
+
+	if ((ret = memp_fput(dbp->mpf, cp->page, 0)) != 0)
+		goto err;
+
 	if (0) {
-err:		if (cp->page != NULL)
-			(void)memp_fput(dbp->mpf, cp->page, 0);
-		if (cp->lock != LOCK_INVALID)
-			(void)__BT_TLPUT(dbp, cp->lock);
+err:		/* Discard any pinned pages. */
+		if (stack)
+			(void)__bam_stkrel(dbp);
+		else
+			DISCARD(dbp, cp);
 		*cp = copy;
 	}
 
@@ -976,7 +1021,7 @@ __bam_c_search(dbp, cp, key, flags, isrecno, exactp)
 	DB *dbp;
 	CURSOR *cp;
 	const DBT *key;
-	u_int flags;
+	u_int32_t flags;
 	int isrecno, *exactp;
 {
 	BTREE *t;
@@ -1032,6 +1077,18 @@ __bam_c_search(dbp, cp, key, flags, isrecno, exactp)
 		} else
 			if ((ret = __bam_c_next(dbp, cp, 0)) != 0)
 				return (ret);
+	/*
+	 * If we don't specify an exact match (the DB_KEYFIRST/DB_KEYLAST or
+	 * DB_SET_RANGE flags were set) __bam_search() may return a deleted
+	 * item.  For DB_KEYFIRST/DB_KEYLAST, we don't care since we're only
+	 * using it for a tree position.  For DB_SET_RANGE, we're returning
+	 * the key, so we have to adjust it.
+	 */
+	if (LF_ISSET(S_DELNO) && cp->dpgno == PGNO_INVALID &&
+	    B_DISSET(GET_BKEYDATA(cp->page, cp->indx + O_INDX)->type))
+		if ((ret = __bam_c_next(dbp, cp, 0)) != 0)
+			return (ret);
+
 	return (0);
 }
 
@@ -1101,7 +1158,7 @@ __bam_cprint(dbp)
 	CURSOR *cp;
 	DBC *dbc;
 
-	DB_THREAD_LOCK(dbp);
+	CURSOR_SETUP(dbp);
 	for (dbc = TAILQ_FIRST(&dbp->curs_queue);
 	    dbc != NULL; dbc = TAILQ_NEXT(dbc, links)) {
 		cp = (CURSOR *)dbc->internal;
@@ -1113,7 +1170,8 @@ __bam_cprint(dbp)
 			fprintf(stderr, "(deleted)");
 		fprintf(stderr, "\n");
 	}
-	DB_THREAD_UNLOCK(dbp);
+	CURSOR_TEARDOWN(dbp);
+
 	return (0);
 }
 #endif /* DEBUG */
@@ -1135,7 +1193,7 @@ __bam_ca_delete(dbp, pgno, indx, curs, key_delete)
 {
 	DBC *dbc;
 	CURSOR *cp;
-	int count;
+	int count;		/* !!!: Has to contain max number of cursors. */
 
 	/*
 	 * Adjust the cursors.  We don't have to review the cursors for any
@@ -1148,8 +1206,7 @@ __bam_ca_delete(dbp, pgno, indx, curs, key_delete)
 	 * locks on the same page, but, cursors within a thread must be single
 	 * threaded, so all we're locking here is the cursor linked list.
 	 */
-	DB_THREAD_LOCK(dbp);
-
+	CURSOR_SETUP(dbp);
 	for (count = 0, dbc = TAILQ_FIRST(&dbp->curs_queue);
 	    dbc != NULL; dbc = TAILQ_NEXT(dbc, links)) {
 		cp = (CURSOR *)dbc->internal;
@@ -1180,8 +1237,8 @@ __bam_ca_delete(dbp, pgno, indx, curs, key_delete)
 				F_SET(cp, C_DELETED);
 			}
 	}
+	CURSOR_TEARDOWN(dbp);
 
-	DB_THREAD_UNLOCK(dbp);
 	return (count);
 }
 
@@ -1192,11 +1249,11 @@ __bam_ca_delete(dbp, pgno, indx, curs, key_delete)
  * PUBLIC: void __bam_ca_di __P((DB *, db_pgno_t, u_int32_t, int));
  */
 void
-__bam_ca_di(dbp, pgno, indx, value)
+__bam_ca_di(dbp, pgno, indx, adjust)
 	DB *dbp;
 	db_pgno_t pgno;
 	u_int32_t indx;
-	int value;
+	int adjust;
 {
 	CURSOR *cp;
 	DBC *dbc;
@@ -1208,16 +1265,16 @@ __bam_ca_di(dbp, pgno, indx, value)
 	/*
 	 * Adjust the cursors.  See the comment in __bam_ca_delete().
 	 */
-	DB_THREAD_LOCK(dbp);
+	CURSOR_SETUP(dbp);
 	for (dbc = TAILQ_FIRST(&dbp->curs_queue);
 	    dbc != NULL; dbc = TAILQ_NEXT(dbc, links)) {
 		cp = (CURSOR *)dbc->internal;
 		if (cp->pgno == pgno && cp->indx >= indx)
-			cp->indx += value;
+			cp->indx += adjust;
 		if (cp->dpgno == pgno && cp->dindx >= indx)
-			cp->dindx += value;
+			cp->dindx += adjust;
 	}
-	DB_THREAD_UNLOCK(dbp);
+	CURSOR_TEARDOWN(dbp);
 }
 
 /*
@@ -1242,7 +1299,7 @@ __bam_ca_dup(dbp, fpgno, first, fi, tpgno, ti)
 	 * No need to test duplicates, this only gets called when moving
 	 * leaf page data items onto a duplicates page.
 	 */
-	DB_THREAD_LOCK(dbp);
+	CURSOR_SETUP(dbp);
 	for (dbc = TAILQ_FIRST(&dbp->curs_queue);
 	    dbc != NULL; dbc = TAILQ_NEXT(dbc, links)) {
 		cp = (CURSOR *)dbc->internal;
@@ -1258,7 +1315,7 @@ __bam_ca_dup(dbp, fpgno, first, fi, tpgno, ti)
 			cp->dindx = ti;
 		}
 	}
-	DB_THREAD_UNLOCK(dbp);
+	CURSOR_TEARDOWN(dbp);
 }
 
 /*
@@ -1285,14 +1342,14 @@ __bam_ca_move(dbp, fpgno, tpgno)
 	 * No need to test duplicates, this only gets called when copying
 	 * over the root page with a leaf or internal page.
 	 */
-	DB_THREAD_LOCK(dbp);
+	CURSOR_SETUP(dbp);
 	for (dbc = TAILQ_FIRST(&dbp->curs_queue);
 	    dbc != NULL; dbc = TAILQ_NEXT(dbc, links)) {
 		cp = (CURSOR *)dbc->internal;
 		if (cp->pgno == fpgno)
 			cp->pgno = tpgno;
 	}
-	DB_THREAD_UNLOCK(dbp);
+	CURSOR_TEARDOWN(dbp);
 }
 
 /*
@@ -1333,7 +1390,7 @@ __bam_ca_replace(dbp, pgno, indx, pass)
 	 * for the cursor as it may have been changed by other cursor update
 	 * routines as the item was deleted/inserted.
 	 */
-	DB_THREAD_LOCK(dbp);
+	CURSOR_SETUP(dbp);
 	switch (pass) {
 	case REPLACE_SETUP:			/* Setup. */
 		for (dbc = TAILQ_FIRST(&dbp->curs_queue);
@@ -1372,7 +1429,7 @@ __bam_ca_replace(dbp, pgno, indx, pass)
 		}
 		break;
 	}
-	DB_THREAD_UNLOCK(dbp);
+	CURSOR_TEARDOWN(dbp);
 }
 
 /*
@@ -1406,7 +1463,7 @@ __bam_ca_split(dbp, ppgno, lpgno, rpgno, split_indx, cleft)
 	 * the cursor is on the right page, it is decremented by the number of
 	 * records split to the left page.
 	 */
-	DB_THREAD_LOCK(dbp);
+	CURSOR_SETUP(dbp);
 	for (dbc = TAILQ_FIRST(&dbp->curs_queue);
 	    dbc != NULL; dbc = TAILQ_NEXT(dbc, links)) {
 		cp = (CURSOR *)dbc->internal;
@@ -1427,7 +1484,7 @@ __bam_ca_split(dbp, ppgno, lpgno, rpgno, split_indx, cleft)
 				cp->dindx -= split_indx;
 			}
 	}
-	DB_THREAD_UNLOCK(dbp);
+	CURSOR_TEARDOWN(dbp);
 }
 
 /*
@@ -1440,16 +1497,17 @@ __bam_c_physdel(dbp, cp, h)
 	CURSOR *cp;
 	PAGE *h;
 {
+	enum { DELETE_ITEM, DELETE_PAGE, NOTHING_FURTHER } cmd;
 	BOVERFLOW bo;
 	BTREE *t;
 	DBT dbt;
 	DB_LOCK lock;
 	db_indx_t indx;
 	db_pgno_t pgno, next_pgno, prev_pgno;
-	int local, normal, ret;
+	int delete_page, local_page, ret;
 
 	t = dbp->internal;
-	ret = 0;
+	delete_page = ret = 0;
 
 	/* Figure out what we're deleting. */
 	if (cp->dpgno == PGNO_INVALID) {
@@ -1476,9 +1534,9 @@ __bam_c_physdel(dbp, cp, h)
 			return (ret);
 		if ((ret = __bam_pget(dbp, &h, &pgno, 0)) != 0)
 			return (ret);
-		local = 1;
+		local_page = 1;
 	} else
-		local = 0;
+		local_page = 0;
 
 	/*
 	 * If we're deleting a duplicate entry and there are other duplicate
@@ -1515,9 +1573,9 @@ __bam_c_physdel(dbp, cp, h)
 
 		if (NUM_ENT(h) == 1 &&
 		    prev_pgno == PGNO_INVALID && next_pgno == PGNO_INVALID)
-			normal = 1;
+			cmd = DELETE_PAGE;
 		else {
-			normal = 0;
+			cmd = DELETE_ITEM;
 
 			/* Delete the duplicate. */
 			if ((ret = __db_drem(dbp, &h, indx, __bam_free)) != 0)
@@ -1536,18 +1594,27 @@ __bam_c_physdel(dbp, cp, h)
 			 */
 			if ((h != NULL && pgno == h->pgno) ||
 			    prev_pgno != PGNO_INVALID)
-				goto done;
+				cmd = NOTHING_FURTHER;
 		}
 
-		/* Release any page we're holding and its lock. */
-		if (local) {
+		/*
+		 * Release any page we're holding and its lock.
+		 *
+		 * !!!
+		 * If there is no subsequent page in the duplicate chain, then
+		 * __db_drem will have put page "h" and set it to NULL.
+		*/
+		if (local_page) {
 			if (h != NULL)
 				(void)memp_fput(dbp->mpf, h, 0);
 			(void)__BT_TLPUT(dbp, lock);
-			local = 0;
+			local_page = 0;
 		}
 
-		/* Acquire the parent page. */
+		if (cmd == NOTHING_FURTHER)
+			goto done;
+
+		/* Acquire the parent page and switch the index to its entry. */
 		if ((ret =
 		    __bam_lget(dbp, 0, cp->pgno, DB_LOCK_WRITE, &lock)) != 0)
 			goto err;
@@ -1555,11 +1622,10 @@ __bam_c_physdel(dbp, cp, h)
 			(void)__BT_TLPUT(dbp, lock);
 			goto err;
 		}
-		local = 1;
-
-		/* Switch to the parent page's entry. */
+		local_page = 1;
 		indx = cp->indx;
-		if (normal)
+
+		if (cmd == DELETE_PAGE)
 			goto btd;
 
 		/*
@@ -1582,47 +1648,60 @@ __bam_c_physdel(dbp, cp, h)
 		goto done;
 	}
 
-	/* Otherwise, do a normal btree delete. */
-btd:	if ((ret = __bam_ditem(dbp, h, indx)) != 0)
-		goto err;
-	if ((ret = __bam_ditem(dbp, h, indx)) != 0)
-		goto err;
-
-	/*
-	 * If the page is empty, delete it.  To delete a leaf page we need a
-	 * copy of a key from the page.  We use the first one that was there,
-	 * since it's the last key that the page held.  We malloc the page
-	 * information instead of using the return key/data memory because
-	 * we've already set them -- the reason that we've already set them
-	 * is because we're (potentially) about to do a reverse split, which
-	 * would make our saved page information useless.
+btd:	/*
+	 * If the page is going to be emptied, delete it.  To delete a leaf
+	 * page we need a copy of a key from the page.  We use the 0th page
+	 * index since it's the last key that the page held.
+	 *
+	 * We malloc the page information instead of using the return key/data
+	 * memory because we've already set them -- the reason we've already
+	 * set them is because we're (potentially) about to do a reverse split,
+	 * which would make our saved page information useless.
 	 *
 	 * XXX
 	 * The following operations to delete a page might deadlock.  I think
 	 * that's OK.  The problem is if we're deleting an item because we're
 	 * closing cursors because we've already deadlocked and want to call
-	 * txn_abort().  If we fail due to deadlock, we'll leave an locked
-	 * empty page in the tree, which won't be empty long because we're
-	 * going to undo the delete.
+	 * txn_abort().  If we fail due to deadlock, we leave a locked empty
+	 * page in the tree, which won't be empty long because we're going to
+	 * undo the delete.
 	 */
-	if (NUM_ENT(h) == 0 && h->pgno != PGNO_ROOT) {
+	if (NUM_ENT(h) == 2 && h->pgno != PGNO_ROOT) {
 		memset(&dbt, 0, sizeof(DBT));
 		dbt.flags = DB_DBT_MALLOC | DB_DBT_INTERNAL;
 		if ((ret = __db_ret(dbp, h, 0, &dbt, NULL, NULL)) != 0)
 			goto err;
+		delete_page = 1;
+	}
 
-		if (local) {
-			(void)memp_fput(dbp->mpf, h, 0);
-			(void)__BT_TLPUT(dbp, lock);
-			local = 0;
-		}
+	/*
+	 * Do a normal btree delete.
+	 *
+	 * XXX
+	 * Delete the key item first, otherwise the duplicate checks in
+	 * __bam_ditem() won't work!
+	 */
+	if ((ret = __bam_ditem(dbp, h, indx)) != 0)
+		goto err;
+	if ((ret = __bam_ditem(dbp, h, indx)) != 0)
+		goto err;
 
-		ret = __bam_dpage(dbp, &dbt);
-		__db_free(dbt.data);
+	/* Discard any remaining locks/pages. */
+	if (local_page) {
+		(void)memp_fput(dbp->mpf, h, 0);
+		(void)__BT_TLPUT(dbp, lock);
+		local_page = 0;
 	}
 
+	/* Delete the page if it was emptied. */
+	if (delete_page)
+		ret = __bam_dpage(dbp, &dbt);
+
 err:
-done:	if (local) {
+done:	if (delete_page)
+		__db_free(dbt.data);
+
+	if (local_page) {
 		(void)memp_fput(dbp->mpf, h, 0);
 		(void)__BT_TLPUT(dbp, lock);
 	}
@@ -1631,3 +1710,43 @@ done:	if (local) {
 		++t->lstat.bt_deleted;
 	return (ret);
 }
+
+/*
+ * __bam_c_getstack --
+ *	Acquire a full stack for a cursor.
+ */
+static int
+__bam_c_getstack(dbp, cp)
+	DB *dbp;
+	CURSOR *cp;
+{
+	DBT dbt;
+	PAGE *h;
+	db_pgno_t pgno;
+	int exact, ret;
+
+	ret = 0;
+	h = NULL;
+	memset(&dbt, 0, sizeof(DBT));
+
+	/* Get the page with the current item on it. */
+	pgno = cp->pgno;
+	if ((ret = __bam_pget(dbp, &h, &pgno, 0)) != 0)
+		return (ret);
+
+	/* Get a copy of a key from the page. */
+	dbt.flags = DB_DBT_MALLOC | DB_DBT_INTERNAL;
+	if ((ret = __db_ret(dbp, h, 0, &dbt, NULL, NULL)) != 0)
+		goto err;
+
+	/* Get a write-locked stack for that page. */
+	exact = 0;
+	ret = __bam_search(dbp, &dbt, S_KEYFIRST, 1, NULL, &exact);
+
+	/* We no longer need the key or the page. */
+err:	if (h != NULL)
+		(void)memp_fput(dbp->mpf, h, 0);
+	if (dbt.data != NULL)
+		__db_free(dbt.data);
+	return (ret);
+}