about summary refs log tree commit diff
path: root/db2/hash/hash.c
diff options
context:
space:
mode:
Diffstat (limited to 'db2/hash/hash.c')
-rw-r--r--db2/hash/hash.c1151
1 files changed, 503 insertions, 648 deletions
diff --git a/db2/hash/hash.c b/db2/hash/hash.c
index 0265f19659..0d202fce20 100644
--- a/db2/hash/hash.c
+++ b/db2/hash/hash.c
@@ -47,7 +47,7 @@
 #include "config.h"
 
 #ifndef lint
-static const char sccsid[] = "@(#)hash.c	10.45 (Sleepycat) 5/11/98";
+static const char sccsid[] = "@(#)hash.c	10.63 (Sleepycat) 12/11/98";
 #endif /* not lint */
 
 #ifndef NO_SYSTEM_INCLUDES
@@ -64,23 +64,23 @@ static const char sccsid[] = "@(#)hash.c	10.45 (Sleepycat) 5/11/98";
 #include "db_am.h"
 #include "db_ext.h"
 #include "hash.h"
+#include "btree.h"
 #include "log.h"
+#include "db_shash.h"
+#include "lock.h"
+#include "lock_ext.h"
 
 static int  __ham_c_close __P((DBC *));
 static int  __ham_c_del __P((DBC *, u_int32_t));
+static int  __ham_c_destroy __P((DBC *));
 static int  __ham_c_get __P((DBC *, DBT *, DBT *, u_int32_t));
 static int  __ham_c_put __P((DBC *, DBT *, DBT *, u_int32_t));
-static int  __ham_c_init __P((DB *, DB_TXN *, DBC **));
-static int  __ham_cursor __P((DB *, DB_TXN *, DBC **));
 static int  __ham_delete __P((DB *, DB_TXN *, DBT *, u_int32_t));
-static int  __ham_dup_return __P((HTAB *, HASH_CURSOR *, DBT *, u_int32_t));
-static int  __ham_get __P((DB *, DB_TXN *, DBT *, DBT *, u_int32_t));
-static void __ham_init_htab __P((HTAB *, u_int32_t, u_int32_t));
-static int  __ham_lookup __P((HTAB *,
-		HASH_CURSOR *, const DBT *, u_int32_t, db_lockmode_t));
-static int  __ham_overwrite __P((HTAB *, HASH_CURSOR *, DBT *));
-static int  __ham_put __P((DB *, DB_TXN *, DBT *, DBT *, u_int32_t));
-static int  __ham_sync __P((DB *, u_int32_t));
+static int  __ham_dup_return __P((DBC *, DBT *, u_int32_t));
+static int  __ham_expand_table __P((DBC *));
+static void __ham_init_htab __P((DBC *, u_int32_t, u_int32_t));
+static int  __ham_lookup __P((DBC *, const DBT *, u_int32_t, db_lockmode_t));
+static int  __ham_overwrite __P((DBC *, DBT *));
 
 /************************** INTERFACE ROUTINES ***************************/
 /* OPEN/CLOSE */
@@ -96,65 +96,53 @@ __ham_open(dbp, dbinfo)
 	DB_INFO *dbinfo;
 {
 	DB_ENV *dbenv;
-	DBC *curs;
-	HTAB *hashp;
+	DBC *dbc;
+	HASH_CURSOR *hcp;
 	int file_existed, ret;
 
+	dbc = NULL;
 	dbenv = dbp->dbenv;
 
-	if ((hashp = (HTAB *)__db_calloc(1, sizeof(HTAB))) == NULL)
-		return (ENOMEM);
-	hashp->dbp = dbp;
-
 	/* Set the hash function if specified by the user. */
 	if (dbinfo != NULL && dbinfo->h_hash != NULL)
-		hashp->hash = dbinfo->h_hash;
+		dbp->h_hash = dbinfo->h_hash;
 
 	/*
-	 * Initialize the remaining fields of the dbp.  The type, close and
-	 * fd functions are all set in db_open.
+	 * Initialize the remaining fields of the dbp.  The only function
+	 * that differs from the default set is __ham_stat().
 	 */
-	dbp->internal = hashp;
-	dbp->cursor = __ham_cursor;
+	dbp->internal = NULL;
+	dbp->am_close = __ham_close;
 	dbp->del = __ham_delete;
-	dbp->get = __ham_get;
-	dbp->put = __ham_put;
-	dbp->sync = __ham_sync;
-
-	/* If locking is turned on, lock the meta data page. */
-	if (F_ISSET(dbp, DB_AM_LOCKING)) {
-		dbp->lock.pgno = BUCKET_INVALID;
-		if ((ret = lock_get(dbenv->lk_info, dbp->locker,
-		    0, &dbp->lock_dbt, DB_LOCK_READ, &hashp->hlock)) != 0) {
-			if (ret < 0)
-				ret = EAGAIN;
-			goto out;
-		}
-	}
+	dbp->stat = __ham_stat;
+
+	/* Get a cursor we can use for the rest of this function. */
+	if ((ret = dbp->cursor(dbp, NULL, &dbc, 0)) != 0)
+		goto out;
+
+	hcp = (HASH_CURSOR *)dbc->internal;
+	GET_META(dbp, hcp, ret);
+	if (ret != 0)
+		goto out;
 
 	/*
-	 * Now, we can try to read the meta-data page and figure out
-	 * if we set up locking and get the meta-data page properly.
 	 * If this is a new file, initialize it, and put it back dirty.
 	 */
-	if ((ret = __ham_get_page(hashp->dbp, 0, (PAGE **)&hashp->hdr)) != 0)
-		goto out;
 
-	/* Initialize the hashp structure */
-	if (hashp->hdr->magic == DB_HASHMAGIC) {
+	/* Initialize the hdr structure */
+	if (hcp->hdr->magic == DB_HASHMAGIC) {
 		file_existed = 1;
 		/* File exists, verify the data in the header. */
-		if (hashp->hash == NULL)
-			hashp->hash =
-			    hashp->hdr->version < 5 ? __ham_func4 : __ham_func5;
-		if (hashp->hash(CHARKEY, sizeof(CHARKEY)) !=
-		    hashp->hdr->h_charkey) {
-			__db_err(hashp->dbp->dbenv,
-			    "hash: incompatible hash function");
+		if (dbp->h_hash == NULL)
+			dbp->h_hash =
+			    hcp->hdr->version < 5 ? __ham_func4 : __ham_func5;
+		if (dbp->h_hash(CHARKEY, sizeof(CHARKEY)) !=
+		    hcp->hdr->h_charkey) {
+			__db_err(dbp->dbenv, "hash: incompatible hash function");
 			ret = EINVAL;
 			goto out;
 		}
-		if (F_ISSET(hashp->hdr, DB_HASH_DUP))
+		if (F_ISSET(hcp->hdr, DB_HASH_DUP))
 			F_SET(dbp, DB_AM_DUP);
 	} else {
 		/*
@@ -163,59 +151,27 @@ __ham_open(dbp, dbinfo)
 		 */
 		file_existed = 0;
 		if (F_ISSET(dbp, DB_AM_LOCKING) &&
-		    ((ret = lock_put(dbenv->lk_info, hashp->hlock)) != 0 ||
-		    (ret = lock_get(dbenv->lk_info, dbp->locker, 0,
-		        &dbp->lock_dbt, DB_LOCK_WRITE, &hashp->hlock)) != 0)) {
+		    ((ret = lock_put(dbenv->lk_info, hcp->hlock)) != 0 ||
+		    (ret = lock_get(dbenv->lk_info, dbc->locker, 0,
+		        &dbc->lock_dbt, DB_LOCK_WRITE, &hcp->hlock)) != 0)) {
 			if (ret < 0)
 				ret = EAGAIN;
 			goto out;
 		}
 
-		__ham_init_htab(hashp,
-		    dbinfo != NULL ? dbinfo->h_nelem : 0,
+		__ham_init_htab(dbc, dbinfo != NULL ? dbinfo->h_nelem : 0,
 		    dbinfo != NULL ? dbinfo->h_ffactor : 0);
 		if (F_ISSET(dbp, DB_AM_DUP))
-			F_SET(hashp->hdr, DB_HASH_DUP);
-		if ((ret = __ham_dirty_page(hashp, (PAGE *)hashp->hdr)) != 0)
+			F_SET(hcp->hdr, DB_HASH_DUP);
+		if ((ret = __ham_dirty_page(dbp, (PAGE *)hcp->hdr)) != 0)
 			goto out;
 	}
 
-	/* Initialize the default cursor. */
-	__ham_c_init(dbp, NULL, &curs);
-	TAILQ_INSERT_TAIL(&dbp->curs_queue, curs, links);
-
-	/* Allocate memory for our split buffer. */
-	if ((hashp->split_buf = (PAGE *)__db_malloc(dbp->pgsize)) == NULL) {
-		ret = ENOMEM;
-		goto out;
-	}
-
-#ifdef NO_STATISTICS_FOR_DB_ERR
-	__db_err(dbp->dbenv,
-	    "%s%lx\n%s%ld\n%s%ld\n%s%ld\n%s%ld\n%s0x%lx\n%s0x%lx\n%s%ld\n%s%ld\n%s0x%lx",
-	    "TABLE POINTER   ", (long)hashp,
-	    "BUCKET SIZE     ", (long)hashp->hdr->pagesize,
-	    "FILL FACTOR     ", (long)hashp->hdr->ffactor,
-	    "MAX BUCKET      ", (long)hashp->hdr->max_bucket,
-	    "OVFL POINT      ", (long)hashp->hdr->ovfl_point,
-	    "LAST FREED      ", (long)hashp->hdr->last_freed,
-	    "HIGH MASK       ", (long)hashp->hdr->high_mask,
-	    "LOW  MASK       ", (long)hashp->hdr->low_mask,
-	    "NELEM           ", (long)hashp->hdr->nelem,
-	    "FLAGS           ", (long)hashp->hdr->flags);
-#endif
-
 	/* Release the meta data page */
-	(void)__ham_put_page(hashp->dbp, (PAGE *)hashp->hdr, 0);
-	if (F_ISSET(dbp, DB_AM_LOCKING) &&
-	    (ret = lock_put(dbenv->lk_info, hashp->hlock)) != 0) {
-		if (ret < 0)
-			ret = EAGAIN;
+	RELEASE_META(dbp, hcp);
+	if ((ret  = dbc->c_close(dbc)) != 0)
 		goto out;
-	}
 
-	hashp->hlock = 0;
-	hashp->hdr = NULL;
 	/* Sync the file so that we know that the meta data goes to disk. */
 	if (!file_existed && (ret = dbp->sync(dbp, 0)) != 0)
 		goto out;
@@ -232,27 +188,8 @@ int
 __ham_close(dbp)
 	DB *dbp;
 {
-	HTAB *hashp;
-	int ret, t_ret;
-
-	DEBUG_LWRITE(dbp, NULL, "ham_close", NULL, NULL, 0);
-	hashp = (HTAB *)dbp->internal;
-	ret = 0;
-
-	/* Free the split page. */
-	if (hashp->split_buf)
-		FREE(hashp->split_buf, dbp->pgsize);
-
-	if (hashp->hdr && (t_ret = __ham_put_page(hashp->dbp,
-	    (PAGE *)hashp->hdr, 0)) != 0 && ret == 0)
-		ret = t_ret;
-	if (hashp->hlock && (t_ret = lock_put(hashp->dbp->dbenv->lk_info,
-	    hashp->hlock)) != 0 && ret == 0)
-		ret = t_ret;
-
-	FREE(hashp, sizeof(HTAB));
-	dbp->internal = NULL;
-	return (ret);
+	COMPQUIET(dbp, NULL);
+	return (0);
 }
 
 /************************** LOCAL CREATION ROUTINES **********************/
@@ -260,408 +197,204 @@ __ham_close(dbp)
  * Returns 0 on No Error
  */
 static void
-__ham_init_htab(hashp, nelem, ffactor)
-	HTAB *hashp;
+__ham_init_htab(dbc, nelem, ffactor)
+	DBC *dbc;
 	u_int32_t nelem, ffactor;
 {
+	DB *dbp;
+	HASH_CURSOR *hcp;
 	int32_t l2, nbuckets;
 
-	memset(hashp->hdr, 0, sizeof(HASHHDR));
-	hashp->hdr->ffactor = ffactor;
-	hashp->hdr->pagesize = hashp->dbp->pgsize;
-	ZERO_LSN(hashp->hdr->lsn);
-	hashp->hdr->magic = DB_HASHMAGIC;
-	hashp->hdr->version = DB_HASHVERSION;
-	if (hashp->hash == NULL)
-		hashp->hash =
-		    hashp->hdr->version < 5 ? __ham_func4 : __ham_func5;
-	hashp->hdr->h_charkey = hashp->hash(CHARKEY, sizeof(CHARKEY));
-	if (nelem != 0 && hashp->hdr->ffactor != 0) {
-		nelem = (nelem - 1) / hashp->hdr->ffactor + 1;
+	hcp = (HASH_CURSOR *)dbc->internal;
+	dbp = dbc->dbp;
+	memset(hcp->hdr, 0, sizeof(HASHHDR));
+	hcp->hdr->ffactor = ffactor;
+	hcp->hdr->pagesize = dbp->pgsize;
+	ZERO_LSN(hcp->hdr->lsn);
+	hcp->hdr->magic = DB_HASHMAGIC;
+	hcp->hdr->version = DB_HASHVERSION;
+
+	if (dbp->h_hash == NULL)
+		dbp->h_hash = hcp->hdr->version < 5 ? __ham_func4 : __ham_func5;
+	hcp->hdr->h_charkey = dbp->h_hash(CHARKEY, sizeof(CHARKEY));
+	if (nelem != 0 && hcp->hdr->ffactor != 0) {
+		nelem = (nelem - 1) / hcp->hdr->ffactor + 1;
 		l2 = __db_log2(nelem > 2 ? nelem : 2);
 	} else
 		l2 = 2;
 
 	nbuckets = 1 << l2;
 
-	hashp->hdr->ovfl_point = l2;
-	hashp->hdr->last_freed = PGNO_INVALID;
+	hcp->hdr->ovfl_point = l2;
+	hcp->hdr->last_freed = PGNO_INVALID;
 
-	hashp->hdr->max_bucket = hashp->hdr->high_mask = nbuckets - 1;
-	hashp->hdr->low_mask = (nbuckets >> 1) - 1;
-	memcpy(hashp->hdr->uid, hashp->dbp->lock.fileid, DB_FILE_ID_LEN);
+	hcp->hdr->max_bucket = hcp->hdr->high_mask = nbuckets - 1;
+	hcp->hdr->low_mask = (nbuckets >> 1) - 1;
+	memcpy(hcp->hdr->uid, dbp->fileid, DB_FILE_ID_LEN);
 }
 
-/********************** DESTROY/CLOSE ROUTINES ************************/
-
-
-/*
- * Write modified pages to disk
- *
- * Returns:
- *	 0 == OK
- *	-1 ERROR
- */
 static int
-__ham_sync(dbp, flags)
-	DB *dbp;
-	u_int32_t flags;
-{
-	int ret;
-
-	DEBUG_LWRITE(dbp, NULL, "ham_sync", NULL, NULL, flags);
-	if ((ret = __db_syncchk(dbp, flags)) != 0)
-		return (ret);
-	if (F_ISSET(dbp, DB_AM_RDONLY))
-		return (0);
-
-	if ((ret = memp_fsync(dbp->mpf)) == DB_INCOMPLETE)
-		ret = 0;
-
-	return (ret);
-}
-
-/*******************************SEARCH ROUTINES *****************************/
-/*
- * All the access routines return
- *
- * Returns:
- *	 0 on SUCCESS
- *	 1 to indicate an external ERROR (i.e. key not found, etc)
- *	-1 to indicate an internal ERROR (i.e. out of memory, etc)
- */
-
-static int
-__ham_get(dbp, txn, key, data, flags)
+__ham_delete(dbp, txn, key, flags)
 	DB *dbp;
 	DB_TXN *txn;
 	DBT *key;
-	DBT *data;
 	u_int32_t flags;
 {
-	DB *ldbp;
-	HTAB *hashp;
+	DBC *dbc;
 	HASH_CURSOR *hcp;
-	int ret, t_ret;
+	int ret, tret;
 
-	DEBUG_LREAD(dbp, txn, "ham_get", key, NULL, flags);
-	if ((ret = __db_getchk(dbp, key, data, flags)) != 0)
-		return (ret);
+	DB_PANIC_CHECK(dbp);
 
-	ldbp = dbp;
-	if (F_ISSET(dbp, DB_AM_THREAD) &&
-	    (ret = __db_gethandle(dbp, __ham_hdup, &ldbp)) != 0)
+	if ((ret =
+	    __db_delchk(dbp, key, flags, F_ISSET(dbp, DB_AM_RDONLY))) != 0)
 		return (ret);
 
-	hashp = (HTAB *)ldbp->internal;
-	SET_LOCKER(ldbp, txn);
-	GET_META(ldbp, hashp);
-
-	hashp->hash_accesses++;
-	hcp = (HASH_CURSOR *)TAILQ_FIRST(&ldbp->curs_queue)->internal;
-	if ((ret = __ham_lookup(hashp, hcp, key, 0, DB_LOCK_READ)) == 0) {
-		if (F_ISSET(hcp, H_OK))
-			ret = __ham_dup_return(hashp, hcp, data, DB_FIRST);
-		else /* Key was not found */
-			ret = DB_NOTFOUND;
-	}
-
-	if ((t_ret = __ham_item_done(hashp, hcp, 0)) != 0 && ret == 0)
-		ret = t_ret;
-	RELEASE_META(ldbp, hashp);
-	if (F_ISSET(dbp, DB_AM_THREAD))
-		__db_puthandle(ldbp);
-	return (ret);
-}
-
-static int
-__ham_put(dbp, txn, key, data, flags)
-	DB *dbp;
-	DB_TXN *txn;
-	DBT *key;
-	DBT *data;
-	u_int32_t flags;
-{
-	DB *ldbp;
-	DBT tmp_val, *myval;
-	HASH_CURSOR *hcp;
-	HTAB *hashp;
-	u_int32_t nbytes;
-	int ret, t_ret;
-
-	DEBUG_LWRITE(dbp, txn, "ham_put", key, data, flags);
-	if ((ret = __db_putchk(dbp, key, data,
-	    flags, F_ISSET(dbp, DB_AM_RDONLY), F_ISSET(dbp, DB_AM_DUP))) != 0)
+	if ((ret = dbp->cursor(dbp, txn, &dbc, DB_WRITELOCK)) != 0)
 		return (ret);
 
-	ldbp = dbp;
-	if (F_ISSET(dbp, DB_AM_THREAD) &&
-	    (ret = __db_gethandle(dbp, __ham_hdup, &ldbp)) != 0)
-		return (ret);
+	DEBUG_LWRITE(dbc, txn, "ham_delete", key, NULL, flags);
 
-	hashp = (HTAB *)ldbp->internal;
-	SET_LOCKER(ldbp, txn);
-	GET_META(ldbp, hashp);
-	hcp = TAILQ_FIRST(&ldbp->curs_queue)->internal;
-
-	nbytes = (ISBIG(hashp, key->size) ? HOFFPAGE_PSIZE :
-	    HKEYDATA_PSIZE(key->size)) +
-	    (ISBIG(hashp, data->size) ? HOFFPAGE_PSIZE :
-	    HKEYDATA_PSIZE(data->size));
-
-	hashp->hash_accesses++;
-	ret = __ham_lookup(hashp, hcp, key, nbytes, DB_LOCK_WRITE);
-
-	if (ret == DB_NOTFOUND) {
-		ret = 0;
-		if (hcp->seek_found_page != PGNO_INVALID &&
-		    hcp->seek_found_page != hcp->pgno) {
-			if ((ret = __ham_item_done(hashp, hcp, 0)) != 0)
-				goto out;
-			hcp->pgno = hcp->seek_found_page;
-			hcp->bndx = NDX_INVALID;
-		}
+	hcp = (HASH_CURSOR *)dbc->internal;
+	GET_META(dbp, hcp, ret);
+	if (ret != 0)
+		goto out;
 
-		if (F_ISSET(data, DB_DBT_PARTIAL) && data->doff != 0) {
-			/*
-			 * Doing a partial put, but the key does not exist
-			 * and we are not beginning the write at 0.  We
-			 * must create a data item padded up to doff and
-			 * then write the new bytes represented by val.
-			 */
-			ret = __ham_init_dbt(&tmp_val, data->size + data->doff,
-			    &hcp->big_data, &hcp->big_datalen);
-			if (ret == 0) {
-				memset(tmp_val.data, 0, data->doff);
-				memcpy((u_int8_t *)tmp_val.data + data->doff,
-				    data->data, data->size);
-				myval = &tmp_val;
-			}
-		} else
-			myval = (DBT *)data;
-
-		if (ret == 0)
-			ret = __ham_add_el(hashp, hcp, key, myval, H_KEYDATA);
-	} else if (ret == 0 && F_ISSET(hcp, H_OK)) {
-		if (flags == DB_NOOVERWRITE)
-			ret = DB_KEYEXIST;
-		else if (F_ISSET(ldbp, DB_AM_DUP))
-			ret = __ham_add_dup(hashp, hcp, data, DB_KEYLAST);
+	hcp->stats.hash_deleted++;
+	if ((ret = __ham_lookup(dbc, key, 0, DB_LOCK_WRITE)) == 0) {
+		if (F_ISSET(hcp, H_OK))
+			ret = __ham_del_pair(dbc, 1);
 		else
-			ret = __ham_overwrite(hashp, hcp, data);
-	}
-
-	/* Free up all the cursor pages. */
-	if ((t_ret = __ham_item_done(hashp, hcp, ret == 0)) != 0 && ret == 0)
-		ret = t_ret;
-	/* Now check if we have to grow. */
-out:	if (ret == 0 && F_ISSET(hcp, H_EXPAND)) {
-		ret = __ham_expand_table(hashp);
-		F_CLR(hcp, H_EXPAND);
+			ret = DB_NOTFOUND;
 	}
 
-	if ((t_ret = __ham_item_done(hashp, hcp, ret == 0)) != 0 && ret == 0)
-		ret = t_ret;
-	RELEASE_META(ldbp, hashp);
-	if (F_ISSET(dbp, DB_AM_THREAD))
-		__db_puthandle(ldbp);
+	RELEASE_META(dbp, hcp);
+out:	if ((tret = dbc->c_close(dbc)) != 0 && ret == 0)
+		ret = tret;
 	return (ret);
 }
 
-static int
-__ham_cursor(dbp, txnid, dbcp)
-	DB *dbp;
-	DB_TXN *txnid;
-	DBC **dbcp;
-{
+/* ****************** CURSORS ********************************** */
+/*
+ * __ham_c_init --
+ *	Initialize the hash-specific portion of a cursor.
+ *
+ * PUBLIC: int __ham_c_init __P((DBC *));
+ */
+int
+__ham_c_init(dbc)
+	DBC *dbc;
+  {
+	HASH_CURSOR *new_curs;
 	int ret;
 
-	DEBUG_LWRITE(dbp, txnid, "ham_cursor", NULL, NULL, 0);
-	if ((ret = __ham_c_init(dbp, txnid, dbcp)) != 0)
+	if ((ret = __os_calloc(1, sizeof(struct cursor_t), &new_curs)) != 0)
+		return (ret);
+	if ((ret =
+	    __os_malloc(dbc->dbp->pgsize, NULL, &new_curs->split_buf)) != 0) {
+		__os_free(new_curs, sizeof(*new_curs));
 		return (ret);
-
-	DB_THREAD_LOCK(dbp);
-	TAILQ_INSERT_TAIL(&dbp->curs_queue, *dbcp, links);
-	DB_THREAD_UNLOCK(dbp);
-	return (ret);
-}
-
-static int
-__ham_c_init(dbp, txnid, dbcp)
-	DB *dbp;
-	DB_TXN *txnid;
-	DBC **dbcp;
-{
-	DBC *db_curs;
-	HASH_CURSOR *new_curs;
-
-	if ((db_curs = (DBC *)__db_calloc(sizeof(DBC), 1)) == NULL)
-		return (ENOMEM);
-
-	if ((new_curs =
-	    (HASH_CURSOR *)__db_calloc(sizeof(struct cursor_t), 1)) == NULL) {
-		FREE(db_curs, sizeof(DBC));
-		return (ENOMEM);
 	}
 
-	db_curs->internal = new_curs;
-	db_curs->c_close = __ham_c_close;
-	db_curs->c_del = __ham_c_del;
-	db_curs->c_get = __ham_c_get;
-	db_curs->c_put = __ham_c_put;
-	db_curs->txn = txnid;
-	db_curs->dbp = dbp;
+	new_curs->dbc = dbc;
+
+	dbc->internal = new_curs;
+	dbc->c_am_close = __ham_c_close;
+	dbc->c_am_destroy = __ham_c_destroy;
+	dbc->c_del = __ham_c_del;
+	dbc->c_get = __ham_c_get;
+	dbc->c_put = __ham_c_put;
 
-	new_curs->db_cursor = db_curs;
 	__ham_item_init(new_curs);
 
-	if (dbcp != NULL)
-		*dbcp = db_curs;
 	return (0);
 }
 
+/*
+ * __ham_c_close --
+ *	Close down the cursor from a single use.
+ */
 static int
-__ham_delete(dbp, txn, key, flags)
-	DB *dbp;
-	DB_TXN *txn;
-	DBT *key;
-	u_int32_t flags;
-{
-	DB *ldbp;
-	HTAB *hashp;
-	HASH_CURSOR *hcp;
-	int ret, t_ret;
-
-	DEBUG_LWRITE(dbp, txn, "ham_delete", key, NULL, flags);
-	if ((ret =
-	    __db_delchk(dbp, key, flags, F_ISSET(dbp, DB_AM_RDONLY))) != 0)
-		return (ret);
-
-	ldbp = dbp;
-	if (F_ISSET(dbp, DB_AM_THREAD) &&
-	    (ret = __db_gethandle(dbp, __ham_hdup, &ldbp)) != 0)
-		return (ret);
-	hashp = (HTAB *)ldbp->internal;
-	SET_LOCKER(ldbp, txn);
-	GET_META(ldbp, hashp);
-	hcp = TAILQ_FIRST(&ldbp->curs_queue)->internal;
-
-	hashp->hash_accesses++;
-	if ((ret = __ham_lookup(hashp, hcp, key, 0, DB_LOCK_WRITE)) == 0) {
-		if (F_ISSET(hcp, H_OK))
-			ret = __ham_del_pair(hashp, hcp, 1);
-		else
-			ret = DB_NOTFOUND;
-	}
-
-	if ((t_ret = __ham_item_done(hashp, hcp, ret == 0)) != 0 && ret == 0)
-		ret = t_ret;
-	RELEASE_META(ldbp, hashp);
-	if (F_ISSET(dbp, DB_AM_THREAD))
-		__db_puthandle(ldbp);
-	return (ret);
-}
-
-/* ****************** CURSORS ********************************** */
-static int
-__ham_c_close(cursor)
-	DBC *cursor;
+__ham_c_close(dbc)
+	DBC *dbc;
 {
-	DB  *ldbp;
 	int ret;
 
-	DEBUG_LWRITE(cursor->dbp, cursor->txn, "ham_c_close", NULL, NULL, 0);
-	/*
-	 * If the pagep, dpagep, and lock fields of the cursor are all NULL,
-	 * then there really isn't a need to get a handle here.  However,
-	 * the normal case is that at least one of those fields is non-NULL,
-	 * and putting those checks in here would couple the ham_item_done
-	 * functionality with cursor close which would be pretty disgusting.
-	 * Instead, we pay the overhead here of always getting the handle.
-	 */
-	ldbp = cursor->dbp;
-	if (F_ISSET(cursor->dbp, DB_AM_THREAD) &&
-	    (ret = __db_gethandle(cursor->dbp, __ham_hdup, &ldbp)) != 0)
+	if ((ret = __ham_item_done(dbc, 0)) != 0)
 		return (ret);
 
-	ret = __ham_c_iclose(ldbp, cursor);
-
-	if (F_ISSET(ldbp, DB_AM_THREAD))
-		__db_puthandle(ldbp);
-	return (ret);
+	__ham_item_init((HASH_CURSOR *)dbc->internal);
+	return (0);
 }
+
 /*
- * __ham_c_iclose --
- *
- * Internal cursor close routine; assumes it is being passed the correct
- * handle, rather than getting and putting a handle.
- *
- * PUBLIC: int __ham_c_iclose __P((DB *, DBC *));
+ * __ham_c_destroy --
+ *	Cleanup the access method private part of a cursor.
  */
-int
-__ham_c_iclose(dbp, dbc)
-	DB *dbp;
+static int
+__ham_c_destroy(dbc)
 	DBC *dbc;
 {
 	HASH_CURSOR *hcp;
-	HTAB *hashp;
-	int ret;
 
-	hashp = (HTAB *)dbp->internal;
 	hcp = (HASH_CURSOR *)dbc->internal;
-	ret = __ham_item_done(hashp, hcp, 0);
-
-	if (hcp->big_key)
-		FREE(hcp->big_key, hcp->big_keylen);
-	if (hcp->big_data)
-		FREE(hcp->big_data, hcp->big_datalen);
+	if (hcp->split_buf != NULL)
+		__os_free(hcp->split_buf, dbc->dbp->pgsize);
+	__os_free(hcp, sizeof(HASH_CURSOR));
 
-	/*
-	 * All cursors (except the default ones) are linked off the master.
-	 * Therefore, when we close the cursor, we have to remove it from
-	 * the master, not the local one.
-	 * XXX I am always removing from the master; what about local cursors?
-	 */
-	DB_THREAD_LOCK(dbc->dbp);
-	TAILQ_REMOVE(&dbc->dbp->curs_queue, dbc, links);
-	DB_THREAD_UNLOCK(dbc->dbp);
-
-	FREE(hcp, sizeof(HASH_CURSOR));
-	FREE(dbc, sizeof(DBC));
-
-	return (ret);
+	return (0);
 }
 
 static int
-__ham_c_del(cursor, flags)
-	DBC *cursor;
+__ham_c_del(dbc, flags)
+	DBC *dbc;
 	u_int32_t flags;
 {
-	DB *ldbp;
+	DB *dbp;
+	DBT repldbt;
 	HASH_CURSOR *hcp;
 	HASH_CURSOR save_curs;
-	HTAB *hashp;
 	db_pgno_t ppgno, chg_pgno;
 	int ret, t_ret;
 
-	DEBUG_LWRITE(cursor->dbp, cursor->txn, "ham_c_del", NULL, NULL, flags);
-	ldbp = cursor->dbp;
-	if (F_ISSET(cursor->dbp, DB_AM_THREAD) &&
-	    (ret = __db_gethandle(cursor->dbp, __ham_hdup, &ldbp)) != 0)
-		return (ret);
-	hashp = (HTAB *)ldbp->internal;
-	hcp = (HASH_CURSOR *)cursor->internal;
-	save_curs = *hcp;
-	if ((ret = __db_cdelchk(ldbp, flags,
-	    F_ISSET(ldbp, DB_AM_RDONLY), IS_VALID(hcp))) != 0)
+	DEBUG_LWRITE(dbc, dbc->txn, "ham_c_del", NULL, NULL, flags);
+	dbp = dbc->dbp;
+	DB_PANIC_CHECK(dbp);
+	hcp = (HASH_CURSOR *)dbc->internal;
+
+	if ((ret = __db_cdelchk(dbc->dbp, flags,
+	    F_ISSET(dbc->dbp, DB_AM_RDONLY), IS_VALID(hcp))) != 0)
 		return (ret);
+
 	if (F_ISSET(hcp, H_DELETED))
 		return (DB_NOTFOUND);
 
-	SET_LOCKER(hashp->dbp, cursor->txn);
-	GET_META(hashp->dbp, hashp);
-	hashp->hash_accesses++;
-	if ((ret = __ham_get_cpage(hashp, hcp, DB_LOCK_WRITE)) != 0)
+	/*
+	 * If we are in the concurrent DB product and this cursor
+	 * is not a write cursor, then this request is invalid.
+	 * If it is a simple write cursor, then we need to upgrade its
+	 * lock.
+	 */
+	if (F_ISSET(dbp, DB_AM_CDB)) {
+		/* Make sure it's a valid update cursor. */
+		if (!F_ISSET(dbc, DBC_RMW | DBC_WRITER))
+			return (EINVAL);
+
+		if (F_ISSET(dbc, DBC_RMW) &&
+		    (ret = lock_get(dbp->dbenv->lk_info, dbc->locker,
+		    DB_LOCK_UPGRADE, &dbc->lock_dbt, DB_LOCK_WRITE,
+		    &dbc->mylock)) != 0)
+			return (EAGAIN);
+	}
+
+	GET_META(dbp, hcp, ret);
+	if (ret != 0)
+		return (ret);
+
+	SAVE_CURSOR(hcp, &save_curs);
+	hcp->stats.hash_deleted++;
+
+	if ((ret = __ham_get_cpage(dbc, DB_LOCK_WRITE)) != 0)
 		goto out;
 	if (F_ISSET(hcp, H_ISDUP) && hcp->dpgno != PGNO_INVALID) {
 		/*
@@ -695,20 +428,20 @@ __ham_c_del(cursor, flags)
 
 		/* Remove item from duplicate page. */
 		chg_pgno = hcp->dpgno;
-		if ((ret = __db_drem(hashp->dbp,
+		if ((ret = __db_drem(dbc,
 		    &hcp->dpagep, hcp->dndx, __ham_del_page)) != 0)
 			goto out;
 
 		if (hcp->dpagep == NULL) {
 			if (ppgno != PGNO_INVALID) {		/* Case 3 */
 				hcp->dpgno = ppgno;
-				if ((ret = __ham_get_cpage(hashp, hcp,
+				if ((ret = __ham_get_cpage(dbc,
 				    DB_LOCK_READ)) != 0)
 					goto out;
 				hcp->dndx = NUM_ENT(hcp->dpagep);
 				F_SET(hcp, H_DELETED);
 			} else {				/* Case 4 */
-				ret = __ham_del_pair(hashp, hcp, 1);
+				ret = __ham_del_pair(dbc, 1);
 				hcp->dpgno = PGNO_INVALID;
 				/*
 				 * Delpair updated the cursor queue, so we
@@ -723,6 +456,15 @@ __ham_c_del(cursor, flags)
 				memcpy(HOFFDUP_PGNO(P_ENTRY(hcp->pagep,
 				    H_DATAINDEX(hcp->bndx))),
 				    &hcp->dpgno, sizeof(db_pgno_t));
+			/*
+			 * We need to put the master page here, because
+			 * although we have a duplicate page, the master
+			 * page is dirty, and ham_item_done assumes that
+			 * if you have a duplicate page, it's the only one
+			 * that can be dirty.
+			 */
+			ret = __ham_put_page(dbp, hcp->pagep, 1);
+			hcp->pagep = NULL;
 			F_SET(hcp, H_DELETED);
 		} else						/* Case 1 */
 			F_SET(hcp, H_DELETED);
@@ -730,17 +472,17 @@ __ham_c_del(cursor, flags)
 			__ham_c_update(hcp, chg_pgno, 0, 0, 1);
 	} else if (F_ISSET(hcp, H_ISDUP)) {			/* on page */
 		if (hcp->dup_off == 0 && DUP_SIZE(hcp->dup_len) ==
-		    LEN_HDATA(hcp->pagep, hashp->hdr->pagesize, hcp->bndx))
-			ret = __ham_del_pair(hashp, hcp, 1);
+		    LEN_HDATA(hcp->pagep, hcp->hdr->pagesize, hcp->bndx))
+			ret = __ham_del_pair(dbc, 1);
 		else {
-			DBT repldbt;
-
 			repldbt.flags = 0;
 			F_SET(&repldbt, DB_DBT_PARTIAL);
 			repldbt.doff = hcp->dup_off;
 			repldbt.dlen = DUP_SIZE(hcp->dup_len);
 			repldbt.size = 0;
-			ret = __ham_replpair(hashp, hcp, &repldbt, 0);
+			repldbt.data =
+			    HKEYDATA_DATA(H_PAIRDATA(hcp->pagep, hcp->bndx));
+			ret = __ham_replpair(dbc, &repldbt, 0);
 			hcp->dup_tlen -= DUP_SIZE(hcp->dup_len);
 			F_SET(hcp, H_DELETED);
 			__ham_c_update(hcp, hcp->pgno,
@@ -749,48 +491,53 @@ __ham_c_del(cursor, flags)
 
 	} else
 		/* Not a duplicate */
-normal:		ret = __ham_del_pair(hashp, hcp, 1);
+normal:		ret = __ham_del_pair(dbc, 1);
 
-out:	if ((t_ret = __ham_item_done(hashp, hcp, ret == 0)) != 0 && ret == 0)
+out:	if ((t_ret = __ham_item_done(dbc, ret == 0)) != 0 && ret == 0)
 		ret = t_ret;
-	if (ret != 0)
-		*hcp = save_curs;
-	RELEASE_META(hashp->dbp, hashp);
-	if (F_ISSET(cursor->dbp, DB_AM_THREAD))
-		__db_puthandle(ldbp);
+	RELEASE_META(dbp, hcp);
+	RESTORE_CURSOR(dbp, hcp, &save_curs, ret);
+	if (F_ISSET(dbp, DB_AM_CDB) && F_ISSET(dbc, DBC_RMW))
+		(void)__lock_downgrade(dbp->dbenv->lk_info, dbc->mylock,
+		    DB_LOCK_IWRITE, 0);
 	return (ret);
 }
 
 static int
-__ham_c_get(cursor, key, data, flags)
-	DBC *cursor;
+__ham_c_get(dbc, key, data, flags)
+	DBC *dbc;
 	DBT *key;
 	DBT *data;
 	u_int32_t flags;
 {
-	DB *ldbp;
-	HTAB *hashp;
+	DB *dbp;
 	HASH_CURSOR *hcp, save_curs;
+	db_lockmode_t lock_type;
 	int get_key, ret, t_ret;
 
-	DEBUG_LREAD(cursor->dbp, cursor->txn, "ham_c_get",
+	DEBUG_LREAD(dbc, dbc->txn, "ham_c_get",
 	    flags == DB_SET || flags == DB_SET_RANGE ? key : NULL,
 	    NULL, flags);
-	ldbp = cursor->dbp;
-	if (F_ISSET(cursor->dbp, DB_AM_THREAD) &&
-	    (ret = __db_gethandle(cursor->dbp, __ham_hdup, &ldbp)) != 0)
-		return (ret);
-	hashp = (HTAB *)(ldbp->internal);
-	hcp = (HASH_CURSOR *)cursor->internal;
-	save_curs = *hcp;
+
+	hcp = (HASH_CURSOR *)dbc->internal;
+	dbp = dbc->dbp;
+	DB_PANIC_CHECK(dbp);
+	SAVE_CURSOR(hcp, &save_curs);
 	if ((ret =
-	    __db_cgetchk(hashp->dbp, key, data, flags, IS_VALID(hcp))) != 0)
+	    __db_cgetchk(dbp, key, data, flags, IS_VALID(hcp))) != 0)
 		return (ret);
 
-	SET_LOCKER(hashp->dbp, cursor->txn);
-	GET_META(hashp->dbp, hashp);
-	hashp->hash_accesses++;
+	/* Clear OR'd in additional bits so we can check for flag equality. */
+	if (LF_ISSET(DB_RMW)) {
+		lock_type = DB_LOCK_WRITE;
+		LF_CLR(DB_RMW);
+	} else
+		lock_type = DB_LOCK_READ;
 
+	GET_META(dbp, hcp, ret);
+	if (ret != 0)
+		return (ret);
+	hcp->stats.hash_get++;
 	hcp->seek_size = 0;
 
 	ret = 0;
@@ -798,24 +545,39 @@ __ham_c_get(cursor, key, data, flags)
 	switch (flags) {
 	case DB_PREV:
 		if (hcp->bucket != BUCKET_INVALID) {
-			ret = __ham_item_prev(hashp, hcp, DB_LOCK_READ);
+			ret = __ham_item_prev(dbc, lock_type);
 			break;
 		}
 		/* FALLTHROUGH */
 	case DB_LAST:
-		ret = __ham_item_last(hashp, hcp, DB_LOCK_READ);
+		ret = __ham_item_last(dbc, lock_type);
 		break;
 	case DB_FIRST:
-		ret = __ham_item_first(hashp, hcp, DB_LOCK_READ);
+		ret = __ham_item_first(dbc, lock_type);
+		break;
+	case DB_NEXT_DUP:
+		if (hcp->bucket == BUCKET_INVALID)
+			ret = EINVAL;
+		else {
+			F_SET(hcp, H_DUPONLY);
+			ret = __ham_item_next(dbc, lock_type);
+		}
 		break;
 	case DB_NEXT:
 		if (hcp->bucket == BUCKET_INVALID)
 			hcp->bucket = 0;
-		ret = __ham_item_next(hashp, hcp, DB_LOCK_READ);
+		ret = __ham_item_next(dbc, lock_type);
 		break;
 	case DB_SET:
 	case DB_SET_RANGE:
-		ret = __ham_lookup(hashp, hcp, key, 0, DB_LOCK_READ);
+	case DB_GET_BOTH:
+		if (F_ISSET(dbc, DBC_CONTINUE)) {
+			F_SET(hcp, H_DUPONLY);
+			ret = __ham_item_next(dbc, lock_type);
+		} else if (F_ISSET(dbc, DBC_KEYSET))
+			ret = __ham_item(dbc, lock_type);
+		else
+			ret = __ham_lookup(dbc, key, 0, lock_type);
 		get_key = 0;
 		break;
 	case DB_CURRENT:
@@ -824,7 +586,7 @@ __ham_c_get(cursor, key, data, flags)
 			goto out;
 		}
 
-		ret = __ham_item(hashp, hcp, DB_LOCK_READ);
+		ret = __ham_item(dbc, lock_type);
 		break;
 	}
 
@@ -837,12 +599,12 @@ __ham_c_get(cursor, key, data, flags)
 			goto out1;
 		else if (F_ISSET(hcp, H_OK)) {
 			/* Get the key. */
-			if (get_key && (ret = __db_ret(hashp->dbp, hcp->pagep,
-			    H_KEYINDEX(hcp->bndx), key, &hcp->big_key,
-			    &hcp->big_keylen)) != 0)
+			if (get_key && (ret = __db_ret(dbp, hcp->pagep,
+			    H_KEYINDEX(hcp->bndx), key, &dbc->rkey.data,
+			    &dbc->rkey.size)) != 0)
 				goto out1;
 
-			ret = __ham_dup_return(hashp, hcp, data, flags);
+			ret = __ham_dup_return(dbc, data, flags);
 			break;
 		} else if (!F_ISSET(hcp, H_NOMORE)) {
 			abort();
@@ -855,7 +617,7 @@ __ham_c_get(cursor, key, data, flags)
 		switch (flags) {
 			case DB_LAST:
 			case DB_PREV:
-				ret = __ham_item_done(hashp, hcp, 0);
+				ret = __ham_item_done(dbc, 0);
 				if (hcp->bucket == 0) {
 					ret = DB_NOTFOUND;
 					goto out1;
@@ -863,24 +625,24 @@ __ham_c_get(cursor, key, data, flags)
 				hcp->bucket--;
 				hcp->bndx = NDX_INVALID;
 				if (ret == 0)
-					ret = __ham_item_prev(hashp,
-					    hcp, DB_LOCK_READ);
+					ret = __ham_item_prev(dbc, lock_type);
 				break;
 			case DB_FIRST:
 			case DB_NEXT:
-				ret = __ham_item_done(hashp, hcp, 0);
+				ret = __ham_item_done(dbc, 0);
 				hcp->bndx = NDX_INVALID;
 				hcp->bucket++;
 				hcp->pgno = PGNO_INVALID;
 				hcp->pagep = NULL;
-				if (hcp->bucket > hashp->hdr->max_bucket) {
+				if (hcp->bucket > hcp->hdr->max_bucket) {
 					ret = DB_NOTFOUND;
 					goto out1;
 				}
 				if (ret == 0)
-					ret = __ham_item_next(hashp,
-					    hcp, DB_LOCK_READ);
+					ret = __ham_item_next(dbc, lock_type);
 				break;
+			case DB_GET_BOTH:
+			case DB_NEXT_DUP:
 			case DB_SET:
 			case DB_SET_RANGE:
 				/* Key not found. */
@@ -888,85 +650,137 @@ __ham_c_get(cursor, key, data, flags)
 				goto out1;
 		}
 	}
-out1:	if ((t_ret = __ham_item_done(hashp, hcp, 0)) != 0 && ret == 0)
+out1:	if ((t_ret = __ham_item_done(dbc, 0)) != 0 && ret == 0)
 		ret = t_ret;
-out:	if (ret)
-		*hcp = save_curs;
-	RELEASE_META(hashp->dbp, hashp);
-	if (F_ISSET(cursor->dbp, DB_AM_THREAD))
-		__db_puthandle(ldbp);
+out:	RELEASE_META(dbp, hcp);
+	RESTORE_CURSOR(dbp, hcp, &save_curs, ret);
 	return (ret);
 }
 
 static int
-__ham_c_put(cursor, key, data, flags)
-	DBC *cursor;
+__ham_c_put(dbc, key, data, flags)
+	DBC *dbc;
 	DBT *key;
 	DBT *data;
 	u_int32_t flags;
 {
-	DB *ldbp;
+	DB *dbp;
+	DBT tmp_val, *myval;
 	HASH_CURSOR *hcp, save_curs;
-	HTAB *hashp;
 	u_int32_t nbytes;
 	int ret, t_ret;
 
-	DEBUG_LWRITE(cursor->dbp, cursor->txn, "ham_c_put",
+	dbp = dbc->dbp;
+	DB_PANIC_CHECK(dbp);
+	DEBUG_LWRITE(dbc, dbc->txn, "ham_c_put",
 	    flags == DB_KEYFIRST || flags == DB_KEYLAST ? key : NULL,
 	    data, flags);
-	ldbp = cursor->dbp;
-	if (F_ISSET(cursor->dbp, DB_AM_THREAD) &&
-	    (ret = __db_gethandle(cursor->dbp, __ham_hdup, &ldbp)) != 0)
-		return (ret);
-	hashp = (HTAB *)(ldbp->internal);
-	hcp = (HASH_CURSOR *)cursor->internal;
-	save_curs = *hcp;
+	hcp = (HASH_CURSOR *)dbc->internal;
 
-	if ((ret = __db_cputchk(hashp->dbp, key, data, flags,
-	    F_ISSET(ldbp, DB_AM_RDONLY), IS_VALID(hcp))) != 0)
+	if ((ret = __db_cputchk(dbp, key, data, flags,
+	    F_ISSET(dbp, DB_AM_RDONLY), IS_VALID(hcp))) != 0)
 		return (ret);
-	if (F_ISSET(hcp, H_DELETED))
+
+	if (F_ISSET(hcp, H_DELETED) &&
+	    flags != DB_KEYFIRST && flags != DB_KEYLAST)
 		return (DB_NOTFOUND);
 
-	SET_LOCKER(hashp->dbp, cursor->txn);
-	GET_META(hashp->dbp, hashp);
-	ret = 0;
+	/*
+	 * If we are in the concurrent DB product and this cursor
+	 * is not a write cursor, then this request is invalid.
+	 * If it is a simple write cursor, then we need to upgrade its
+	 * lock.
+	 */
+	if (F_ISSET(dbp, DB_AM_CDB)) {
+		/* Make sure it's a valid update cursor. */
+		if (!F_ISSET(dbc, DBC_RMW | DBC_WRITER))
+			return (EINVAL);
+
+		if (F_ISSET(dbc, DBC_RMW) &&
+		    (ret = lock_get(dbp->dbenv->lk_info, dbc->locker,
+		    DB_LOCK_UPGRADE, &dbc->lock_dbt, DB_LOCK_WRITE,
+		    &dbc->mylock)) != 0)
+			return (EAGAIN);
+	}
+
+	GET_META(dbp, hcp, ret);
+	if (ret != 0)
+		return (ret);
+
+	SAVE_CURSOR(hcp, &save_curs);
+	hcp->stats.hash_put++;
 
 	switch (flags) {
 	case DB_KEYLAST:
 	case DB_KEYFIRST:
-		nbytes = (ISBIG(hashp, key->size) ? HOFFPAGE_PSIZE :
+		nbytes = (ISBIG(hcp, key->size) ? HOFFPAGE_PSIZE :
 		    HKEYDATA_PSIZE(key->size)) +
-		    (ISBIG(hashp, data->size) ? HOFFPAGE_PSIZE :
+		    (ISBIG(hcp, data->size) ? HOFFPAGE_PSIZE :
 		    HKEYDATA_PSIZE(data->size));
-		ret = __ham_lookup(hashp, hcp, key, nbytes, DB_LOCK_WRITE);
+		if ((ret = __ham_lookup(dbc,
+		    key, nbytes, DB_LOCK_WRITE)) == DB_NOTFOUND) {
+			ret = 0;
+			if (hcp->seek_found_page != PGNO_INVALID &&
+			    hcp->seek_found_page != hcp->pgno) {
+				if ((ret = __ham_item_done(dbc, 0)) != 0)
+					goto out;
+				hcp->pgno = hcp->seek_found_page;
+				hcp->bndx = NDX_INVALID;
+			}
+
+			if (F_ISSET(data, DB_DBT_PARTIAL) && data->doff != 0) {
+				/*
+				 * A partial put, but the key does not exist
+				 * and we are not beginning the write at 0.
+				 * We must create a data item padded up to doff
+				 * and then write the new bytes represented by
+				 * val.
+				 */
+				if ((ret = __ham_init_dbt(&tmp_val,
+				    data->size + data->doff,
+				    &dbc->rdata.data, &dbc->rdata.size)) == 0) {
+					memset(tmp_val.data, 0, data->doff);
+					memcpy((u_int8_t *)tmp_val.data +
+					    data->doff, data->data, data->size);
+					myval = &tmp_val;
+				}
+			} else
+				myval = (DBT *)data;
+
+			if (ret == 0)
+				ret = __ham_add_el(dbc, key, myval, H_KEYDATA);
+			goto done;
+		}
 		break;
 	case DB_BEFORE:
 	case DB_AFTER:
 	case DB_CURRENT:
-		ret = __ham_item(hashp, hcp, DB_LOCK_WRITE);
+		ret = __ham_item(dbc, DB_LOCK_WRITE);
 		break;
 	}
 
 	if (ret == 0) {
-		if (flags == DB_CURRENT && !F_ISSET(ldbp, DB_AM_DUP))
-			ret = __ham_overwrite(hashp, hcp, data);
+		if ((flags == DB_CURRENT && !F_ISSET(hcp, H_ISDUP)) ||
+		    ((flags == DB_KEYFIRST || flags == DB_KEYLAST) &&
+		    !F_ISSET(dbp, DB_AM_DUP)))
+			ret = __ham_overwrite(dbc, data);
 		else
-			ret = __ham_add_dup(hashp, hcp, data, flags);
+			ret = __ham_add_dup(dbc, data, flags);
 	}
 
-	if (ret == 0 && F_ISSET(hcp, H_EXPAND)) {
-		ret = __ham_expand_table(hashp);
+done:	if (ret == 0 && F_ISSET(hcp, H_EXPAND)) {
+		ret = __ham_expand_table(dbc);
 		F_CLR(hcp, H_EXPAND);
 	}
 
-	if ((t_ret = __ham_item_done(hashp, hcp, ret == 0)) != 0 && ret == 0)
+	if ((t_ret = __ham_item_done(dbc, ret == 0)) != 0 && ret == 0)
 		ret = t_ret;
-	if (ret != 0)
-		*hcp = save_curs;
-	RELEASE_META(hashp->dbp, hashp);
-	if (F_ISSET(cursor->dbp, DB_AM_THREAD))
-		__db_puthandle(ldbp);
+
+out:	RELEASE_META(dbp, hcp);
+	RESTORE_CURSOR(dbp, hcp, &save_curs, ret);
+	if (F_ISSET(dbp, DB_AM_CDB) && F_ISSET(dbc, DBC_RMW))
+		(void)__lock_downgrade(dbp->dbenv->lk_info, dbc->mylock,
+		    DB_LOCK_IWRITE, 0);
 	return (ret);
 }
 
@@ -974,19 +788,21 @@ __ham_c_put(cursor, key, data, flags)
 
 /*
  * __ham_expand_table --
- *
- * PUBLIC: int __ham_expand_table __P((HTAB *));
  */
-int
-__ham_expand_table(hashp)
-	HTAB *hashp;
+static int
+__ham_expand_table(dbc)
+	DBC *dbc;
 {
+	DB *dbp;
+	HASH_CURSOR *hcp;
 	DB_LSN new_lsn;
 	u_int32_t old_bucket, new_bucket, spare_ndx;
 	int ret;
 
+	dbp = dbc->dbp;
+	hcp = (HASH_CURSOR *)dbc->internal;
 	ret = 0;
-	DIRTY_META(hashp, ret);
+	DIRTY_META(dbp, hcp, ret);
 	if (ret)
 		return (ret);
 
@@ -999,78 +815,78 @@ __ham_expand_table(hashp)
 	 * see what the log of one greater than that is; here we have to
 	 * look at the log of max + 2.  VERY NASTY STUFF.
 	 */
-	if (__db_log2(hashp->hdr->max_bucket + 2) > hashp->hdr->ovfl_point) {
+	if (__db_log2(hcp->hdr->max_bucket + 2) > hcp->hdr->ovfl_point) {
 		/*
 		 * We are about to shift the split point.  Make sure that
 		 * if the next doubling is going to be big (more than 8
 		 * pages), we have some extra pages around.
 		 */
-		if (hashp->hdr->max_bucket + 1 >= 8 &&
-		    hashp->hdr->spares[hashp->hdr->ovfl_point] <
-		    hashp->hdr->spares[hashp->hdr->ovfl_point - 1] +
-		    hashp->hdr->ovfl_point + 1)
-			__ham_init_ovflpages(hashp);
+		if (hcp->hdr->max_bucket + 1 >= 8 &&
+		    hcp->hdr->spares[hcp->hdr->ovfl_point] <
+		    hcp->hdr->spares[hcp->hdr->ovfl_point - 1] +
+		    hcp->hdr->ovfl_point + 1)
+			__ham_init_ovflpages(dbc);
 	}
 
 	/* Now we can log the meta-data split. */
-	if (DB_LOGGING(hashp->dbp)) {
-		if ((ret = __ham_splitmeta_log(hashp->dbp->dbenv->lg_info,
-		    (DB_TXN *)hashp->dbp->txn, &new_lsn, 0,
-		    hashp->dbp->log_fileid,
-		    hashp->hdr->max_bucket, hashp->hdr->ovfl_point,
-		    hashp->hdr->spares[hashp->hdr->ovfl_point],
-		    &hashp->hdr->lsn)) != 0)
+	if (DB_LOGGING(dbc)) {
+		if ((ret = __ham_splitmeta_log(dbp->dbenv->lg_info,
+		    dbc->txn, &new_lsn, 0, dbp->log_fileid,
+		    hcp->hdr->max_bucket, hcp->hdr->ovfl_point,
+		    hcp->hdr->spares[hcp->hdr->ovfl_point],
+		    &hcp->hdr->lsn)) != 0)
 			return (ret);
 
-		hashp->hdr->lsn = new_lsn;
+		hcp->hdr->lsn = new_lsn;
 	}
 
-	hashp->hash_expansions++;
-	new_bucket = ++hashp->hdr->max_bucket;
-	old_bucket = (hashp->hdr->max_bucket & hashp->hdr->low_mask);
+	hcp->stats.hash_expansions++;
+	new_bucket = ++hcp->hdr->max_bucket;
+	old_bucket = (hcp->hdr->max_bucket & hcp->hdr->low_mask);
 
 	/*
 	 * If the split point is increasing, copy the current contents
 	 * of the spare split bucket to the next bucket.
 	 */
-	spare_ndx = __db_log2(hashp->hdr->max_bucket + 1);
-	if (spare_ndx > hashp->hdr->ovfl_point) {
-		hashp->hdr->spares[spare_ndx] =
-		    hashp->hdr->spares[hashp->hdr->ovfl_point];
-		hashp->hdr->ovfl_point = spare_ndx;
+	spare_ndx = __db_log2(hcp->hdr->max_bucket + 1);
+	if (spare_ndx > hcp->hdr->ovfl_point) {
+		hcp->hdr->spares[spare_ndx] =
+		    hcp->hdr->spares[hcp->hdr->ovfl_point];
+		hcp->hdr->ovfl_point = spare_ndx;
 	}
 
-	if (new_bucket > hashp->hdr->high_mask) {
+	if (new_bucket > hcp->hdr->high_mask) {
 		/* Starting a new doubling */
-		hashp->hdr->low_mask = hashp->hdr->high_mask;
-		hashp->hdr->high_mask = new_bucket | hashp->hdr->low_mask;
+		hcp->hdr->low_mask = hcp->hdr->high_mask;
+		hcp->hdr->high_mask = new_bucket | hcp->hdr->low_mask;
 	}
 
-	if (BUCKET_TO_PAGE(hashp, new_bucket) > MAX_PAGES(hashp)) {
-		__db_err(hashp->dbp->dbenv,
+	if (BUCKET_TO_PAGE(hcp, new_bucket) > MAX_PAGES(hcp)) {
+		__db_err(dbp->dbenv,
 		    "hash: Cannot allocate new bucket.  Pages exhausted.");
 		return (ENOSPC);
 	}
 
 	/* Relocate records to the new bucket */
-	return (__ham_split_page(hashp, old_bucket, new_bucket));
+	return (__ham_split_page(dbc, old_bucket, new_bucket));
 }
 
 /*
- * PUBLIC: u_int32_t __ham_call_hash __P((HTAB *, u_int8_t *, int32_t));
+ * PUBLIC: u_int32_t __ham_call_hash __P((HASH_CURSOR *, u_int8_t *, int32_t));
  */
 u_int32_t
-__ham_call_hash(hashp, k, len)
-	HTAB *hashp;
+__ham_call_hash(hcp, k, len)
+	HASH_CURSOR *hcp;
 	u_int8_t *k;
 	int32_t len;
 {
 	u_int32_t n, bucket;
 
-	n = (u_int32_t)hashp->hash(k, len);
-	bucket = n & hashp->hdr->high_mask;
-	if (bucket > hashp->hdr->max_bucket)
-		bucket = bucket & hashp->hdr->low_mask;
+	n = (u_int32_t)(hcp->dbc->dbp->h_hash(k, len));
+
+	bucket = n & hcp->hdr->high_mask;
+	if (bucket > hcp->hdr->max_bucket)
+		bucket = bucket & hcp->hdr->low_mask;
 	return (bucket);
 }
 
@@ -1079,31 +895,36 @@ __ham_call_hash(hashp, k, len)
  * everything held by the cursor.
  */
 static int
-__ham_dup_return(hashp, hcp, val, flags)
-	HTAB *hashp;
-	HASH_CURSOR *hcp;
+__ham_dup_return(dbc, val, flags)
+	DBC *dbc;
 	DBT *val;
 	u_int32_t flags;
 {
+	DB *dbp;
+	HASH_CURSOR *hcp;
 	PAGE *pp;
 	DBT *myval, tmp_val;
 	db_indx_t ndx;
 	db_pgno_t pgno;
+	u_int32_t off, tlen;
 	u_int8_t *hk, type;
-	int ret;
+	int cmp, ret;
 	db_indx_t len;
 
 	/* Check for duplicate and return the first one. */
+	dbp = dbc->dbp;
+	hcp = (HASH_CURSOR *)dbc->internal;
 	ndx = H_DATAINDEX(hcp->bndx);
 	type = HPAGE_TYPE(hcp->pagep, ndx);
 	pp = hcp->pagep;
 	myval = val;
 
 	/*
-	 * There are 3 cases:
+	 * There are 4 cases:
 	 * 1. We are not in duplicate, simply call db_ret.
 	 * 2. We are looking at keys and stumbled onto a duplicate.
 	 * 3. We are in the middle of a duplicate set. (ISDUP set)
+	 * 4. This is a duplicate and we need to return a specific item.
 	 */
 
 	/*
@@ -1115,7 +936,7 @@ __ham_dup_return(hashp, hcp, val, flags)
 		if (type == H_DUPLICATE) {
 			F_SET(hcp, H_ISDUP);
 			hcp->dup_tlen = LEN_HDATA(hcp->pagep,
-			    hashp->hdr->pagesize, hcp->bndx);
+			    hcp->hdr->pagesize, hcp->bndx);
 			hk = H_PAIRDATA(hcp->pagep, hcp->bndx);
 			if (flags == DB_LAST || flags == DB_PREV) {
 				hcp->dndx = 0;
@@ -1141,18 +962,63 @@ __ham_dup_return(hashp, hcp, val, flags)
 			memcpy(&pgno, HOFFDUP_PGNO(P_ENTRY(hcp->pagep, ndx)),
 			    sizeof(db_pgno_t));
 			if (flags == DB_LAST || flags == DB_PREV) {
-				if ((ret = __db_dend(hashp->dbp,
+				if ((ret = __db_dend(dbc,
 				    pgno, &hcp->dpagep)) != 0)
 					return (ret);
 				hcp->dpgno = PGNO(hcp->dpagep);
 				hcp->dndx = NUM_ENT(hcp->dpagep) - 1;
-			} else if ((ret = __ham_next_cpage(hashp,
-			    hcp, pgno, 0, H_ISDUP)) != 0)
+			} else if ((ret = __ham_next_cpage(dbc,
+			    pgno, 0, H_ISDUP)) != 0)
 				return (ret);
 		}
 	}
 
 	/*
+	 * If we are retrieving a specific key/data pair, then we
+	 * may need to adjust the cursor before returning data.
+	 */
+	if (flags == DB_GET_BOTH) {
+		if (F_ISSET(hcp, H_ISDUP)) {
+			if (hcp->dpgno != PGNO_INVALID) {
+				if ((ret = __db_dsearch(dbc, 0, val,
+				    hcp->dpgno, &hcp->dndx, &hcp->dpagep, &cmp))
+				    != 0)
+					return (ret);
+				if (cmp == 0)
+					hcp->dpgno = PGNO(hcp->dpagep);
+			} else {
+				__ham_dsearch(dbc, val, &off, &cmp);
+				hcp->dup_off = off;
+			}
+		} else {
+			hk = H_PAIRDATA(hcp->pagep, hcp->bndx);
+			if (((HKEYDATA *)hk)->type == H_OFFPAGE) {
+				memcpy(&tlen,
+				    HOFFPAGE_TLEN(hk), sizeof(u_int32_t));
+				memcpy(&pgno,
+				    HOFFPAGE_PGNO(hk), sizeof(db_pgno_t));
+				if ((ret = __db_moff(dbp, val,
+				    pgno, tlen, dbp->dup_compare, &cmp)) != 0)
+					return (ret);
+			} else {
+				/*
+				 * We do not zero tmp_val since the comparison
+				 * routines may only look at data and size.
+				 */
+				tmp_val.data = HKEYDATA_DATA(hk);
+				tmp_val.size = LEN_HDATA(hcp->pagep,
+				    dbp->pgsize, hcp->bndx);
+				cmp = dbp->dup_compare == NULL ?
+				    __bam_defcmp(&tmp_val, val) :
+				    dbp->dup_compare(&tmp_val, val);
+			}
+		}
+
+		if (cmp != 0)
+			return (DB_NOTFOUND);
+	}
+
+	/*
 	 * Now, everything is initialized, grab a duplicate if
 	 * necessary.
 	 */
@@ -1162,14 +1028,34 @@ __ham_dup_return(hashp, hcp, val, flags)
 			ndx = hcp->dndx;
 		} else {
 			/*
-			 * Copy the DBT in case we are retrieving into
-			 * user memory and we need the parameters for
-			 * it.
+			 * Copy the DBT in case we are retrieving into user
+			 * memory and we need the parameters for it.  If the
+			 * user requested a partial, then we need to adjust
+			 * the user's parameters to get the partial of the
+			 * duplicate which is itself a partial.
 			 */
 			memcpy(&tmp_val, val, sizeof(*val));
-			F_SET(&tmp_val, DB_DBT_PARTIAL);
-			tmp_val.dlen = hcp->dup_len;
-			tmp_val.doff = hcp->dup_off + sizeof(db_indx_t);
+			if (F_ISSET(&tmp_val, DB_DBT_PARTIAL)) {
+				/*
+				 * Take the user's length unless it would go
+				 * beyond the end of the duplicate.
+				 */
+				if (tmp_val.doff + hcp->dup_off > hcp->dup_len)
+					tmp_val.dlen = 0;
+				else if (tmp_val.dlen + tmp_val.doff >
+				    hcp->dup_len)
+					tmp_val.dlen =
+					    hcp->dup_len - tmp_val.doff;
+
+				/*
+				 * Calculate the new offset.
+				 */
+				tmp_val.doff += hcp->dup_off;
+			} else {
+				F_SET(&tmp_val, DB_DBT_PARTIAL);
+				tmp_val.dlen = hcp->dup_len;
+				tmp_val.doff = hcp->dup_off + sizeof(db_indx_t);
+			}
 			myval = &tmp_val;
 		}
 	}
@@ -1178,8 +1064,8 @@ __ham_dup_return(hashp, hcp, val, flags)
 	 * Finally, if we had a duplicate, pp, ndx, and myval should be
 	 * set appropriately.
 	 */
-	if ((ret = __db_ret(hashp->dbp, pp, ndx, myval, &hcp->big_data,
-	    &hcp->big_datalen)) != 0)
+	if ((ret = __db_ret(dbp, pp, ndx, myval, &dbc->rdata.data,
+	    &dbc->rdata.size)) != 0)
 		return (ret);
 
 	/*
@@ -1193,16 +1079,17 @@ __ham_dup_return(hashp, hcp, val, flags)
 }
 
 static int
-__ham_overwrite(hashp, hcp, nval)
-	HTAB *hashp;
-	HASH_CURSOR *hcp;
+__ham_overwrite(dbc, nval)
+	DBC *dbc;
 	DBT *nval;
 {
+	HASH_CURSOR *hcp;
 	DBT *myval, tmp_val;
 	u_int8_t *hk;
 
-	if (F_ISSET(hashp->dbp, DB_AM_DUP))
-		return (__ham_add_dup(hashp, hcp, nval, DB_KEYLAST));
+	hcp = (HASH_CURSOR *)dbc->internal;
+	if (F_ISSET(dbc->dbp, DB_AM_DUP))
+		return (__ham_add_dup(dbc, nval, DB_KEYLAST));
 	else if (!F_ISSET(nval, DB_DBT_PARTIAL)) {
 		/* Put/overwrite */
 		memcpy(&tmp_val, nval, sizeof(*nval));
@@ -1214,12 +1101,12 @@ __ham_overwrite(hashp, hcp, nval)
 			    HOFFPAGE_TLEN(hk), sizeof(u_int32_t));
 		else
 			tmp_val.dlen = LEN_HDATA(hcp->pagep,
-			    hashp->hdr->pagesize,hcp->bndx);
+			    hcp->hdr->pagesize,hcp->bndx);
 		myval = &tmp_val;
 	} else /* Regular partial put */
 		myval = nval;
 
-	return (__ham_replpair(hashp, hcp, myval, 0));
+	return (__ham_replpair(dbc, myval, 0));
 }
 
 /*
@@ -1232,29 +1119,32 @@ __ham_overwrite(hashp, hcp, nval)
  * non of the cursor pointer field are valid.
  */
 static int
-__ham_lookup(hashp, hcp, key, sought, mode)
-	HTAB *hashp;
-	HASH_CURSOR *hcp;
+__ham_lookup(dbc, key, sought, mode)
+	DBC *dbc;
 	const DBT *key;
 	u_int32_t sought;
 	db_lockmode_t mode;
 {
+	DB *dbp;
+	HASH_CURSOR *hcp;
 	db_pgno_t pgno;
 	u_int32_t tlen;
 	int match, ret, t_ret;
 	u_int8_t *hk;
 
+	dbp = dbc->dbp;
+	hcp = (HASH_CURSOR *)dbc->internal;
 	/*
 	 * Set up cursor so that we're looking for space to add an item
 	 * as we cycle through the pages looking for the key.
 	 */
-	if ((ret = __ham_item_reset(hashp, hcp)) != 0)
+	if ((ret = __ham_item_reset(dbc)) != 0)
 		return (ret);
 	hcp->seek_size = sought;
 
-	hcp->bucket = __ham_call_hash(hashp, (u_int8_t *)key->data, key->size);
+	hcp->bucket = __ham_call_hash(hcp, (u_int8_t *)key->data, key->size);
 	while (1) {
-		if ((ret = __ham_item_next(hashp, hcp, mode)) != 0)
+		if ((ret = __ham_item_next(dbc, mode)) != 0)
 			return (ret);
 
 		if (F_ISSET(hcp, H_NOMORE))
@@ -1267,7 +1157,9 @@ __ham_lookup(hashp, hcp, key, sought, mode)
 			if (tlen == key->size) {
 				memcpy(&pgno,
 				    HOFFPAGE_PGNO(hk), sizeof(db_pgno_t));
-				match = __db_moff(hashp->dbp, key, pgno);
+				if ((ret = __db_moff(dbp,
+				    key, pgno, tlen, NULL, &match)) != 0)
+					return (ret);
 				if (match == 0) {
 					F_SET(hcp, H_OK);
 					return (0);
@@ -1276,7 +1168,7 @@ __ham_lookup(hashp, hcp, key, sought, mode)
 			break;
 		case H_KEYDATA:
 			if (key->size == LEN_HKEY(hcp->pagep,
-			    hashp->hdr->pagesize, hcp->bndx) &&
+			    hcp->hdr->pagesize, hcp->bndx) &&
 			    memcmp(key->data,
 			    HKEYDATA_DATA(hk), key->size) == 0) {
 				F_SET(hcp, H_OK);
@@ -1289,9 +1181,9 @@ __ham_lookup(hashp, hcp, key, sought, mode)
 			 * These are errors because keys are never
 			 * duplicated, only data items are.
 			 */
-			return (__db_pgfmt(hashp->dbp, PGNO(hcp->pagep)));
+			return (__db_pgfmt(dbp, PGNO(hcp->pagep)));
 		}
-		hashp->hash_collisions++;
+		hcp->stats.hash_collisions++;
 	}
 
 	/*
@@ -1301,7 +1193,7 @@ __ham_lookup(hashp, hcp, key, sought, mode)
 	if (sought != 0)
 		return (ret);
 
-	if ((t_ret = __ham_item_done(hashp, hcp, 0)) != 0 && ret == 0)
+	if ((t_ret = __ham_item_done(dbc, 0)) != 0 && ret == 0)
 		ret = t_ret;
 	return (ret);
 }
@@ -1318,12 +1210,13 @@ __ham_init_dbt(dbt, size, bufp, sizep)
 	void **bufp;
 	u_int32_t *sizep;
 {
+	int ret;
+
 	memset(dbt, 0, sizeof(*dbt));
 	if (*sizep < size) {
-		if ((*bufp = (void *)(*bufp == NULL ?
-		    __db_malloc(size) : __db_realloc(*bufp, size))) == NULL) {
+		if ((ret = __os_realloc(bufp, size)) != 0) {
 			*sizep = 0;
-			return (ENOMEM);
+			return (ret);
 		}
 		*sizep = size;
 	}
@@ -1352,8 +1245,8 @@ __ham_c_update(hcp, chg_pgno, len, add, is_dup)
 	u_int32_t len;
 	int add, is_dup;
 {
+	DB *dbp;
 	DBC *cp;
-	HTAB *hp;
 	HASH_CURSOR *lcp;
 	int page_deleted;
 
@@ -1379,10 +1272,10 @@ __ham_c_update(hcp, chg_pgno, len, add, is_dup)
 		page_deleted =
 		    chg_pgno != PGNO_INVALID && chg_pgno != hcp->dpgno;
 
-	hp = hcp->db_cursor->dbp->master->internal;
-	DB_THREAD_LOCK(hp->dbp);
+	dbp = hcp->dbc->dbp;
+	DB_THREAD_LOCK(dbp);
 
-	for (cp = TAILQ_FIRST(&hp->dbp->curs_queue); cp != NULL;
+	for (cp = TAILQ_FIRST(&dbp->active_queue); cp != NULL;
 	    cp = TAILQ_NEXT(cp, links)) {
 		if (cp->internal == hcp)
 			continue;
@@ -1440,43 +1333,5 @@ __ham_c_update(hcp, chg_pgno, len, add, is_dup)
 			}
 		}
 	}
-	DB_THREAD_UNLOCK(hp->dbp);
-}
-
-/*
- * __ham_hdup --
- *	This function gets called when we create a duplicate handle for a
- *	threaded DB.  It should create the private part of the DB structure.
- *
- * PUBLIC: int  __ham_hdup __P((DB *, DB *));
- */
-int
-__ham_hdup(orig, new)
-	DB *orig, *new;
-{
-	DBC *curs;
-	HTAB *hashp;
-	int ret;
-
-	if ((hashp = (HTAB *)__db_malloc(sizeof(HTAB))) == NULL)
-		return (ENOMEM);
-
-	new->internal = hashp;
-
-	hashp->dbp = new;
-	hashp->hlock = 0;
-	hashp->hdr = NULL;
-	hashp->hash = ((HTAB *)orig->internal)->hash;
-	if ((hashp->split_buf = (PAGE *)__db_malloc(orig->pgsize)) == NULL)
-		return (ENOMEM);
-	hashp->local_errno = 0;
-	hashp->hash_accesses = 0;
-	hashp->hash_collisions = 0;
-	hashp->hash_expansions = 0;
-	hashp->hash_overflows = 0;
-	hashp->hash_bigpages = 0;
-	/* Initialize the cursor queue. */
-	ret = __ham_c_init(new, NULL, &curs);
-	TAILQ_INSERT_TAIL(&new->curs_queue, curs, links);
-	return (ret);
+	DB_THREAD_UNLOCK(dbp);
 }