[Date Prev][Date Next]
[Chronological]
[Thread]
[Top]
Re: [LMDB] getting MDB_CORRUPTED when deleting within a DUPSORT database
On 3/21/17 2:01 PM, Klaus Malorny wrote:
On 3/20/17 3:32 PM, Howard Chu wrote:
Klaus Malorny wrote:
Hi,
I am using version 0.9.20 on Linux (Ubuntu derivates, uname see [1], [2]).
Version 0.9.20 was never officially released. The release was withdrawn due to
a corruption bug. I suggest you downgrade to 0.9.19.
Hi,
after hitting a known problem with 0.9.18, I went to the Git repository to fetch
the latest release I found there -- this is why I took 0.9.20.
I have repeated my test with 0.9.19, but unfortunately, the very same error
occurred...
I am going to try to write a small C program to see whether I can reproduce the
behaviour.
Regards,
Klaus
Hi,
I am now able to reproduce the problem with a sample program. I guess that the
problem is somewhat deeper and not necessarily directly related to
mdb_cursor_del as it seems to depend on the order the data is inserted into the
database. If I do this in a pre-ordered way, the problem does not occur, but if
I shuffle the data, it does. I also tried a smaller number of records, but then
the problem disappears also.
I have attached the test program to this e-mail. I hope it does not get filtered
out.
Regards,
Klaus
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <endian.h>
#include <string.h>
#include "lmdb.h"
static const char *fileName = "test.lmdb";
static const char *dbName = "test";
static long size = 1500000000;
static int recordCount = 33000000;
static int majorIdCount = 6000;
static int minorIdCount = 1000000;
static unsigned int seed = 1;
static long *majorIds;
typedef struct
{
long majorId;
long minorId;
} KeyType;
typedef struct
{
long refId;
} DataType;
typedef struct
{
KeyType key;
DataType data;
} KeyDataType;
void check (const char *op, int error)
{
if (error != 0)
{
fprintf (stderr, "%s: unexpected error %d: %s\n", op,
error, mdb_strerror (error));
exit (1);
}
}
void shuffle (void *data, int recordSize, int recordCount)
{
char *ptr = (char *) data;
char *swapBuf = malloc (recordSize);
for (int i = recordCount - 2; i >= 0; i--)
{
int j = (int) (random () % (recordCount - i));
if (j > 0)
{
char *ptr1 = ptr + i * recordSize;
char *ptr2 = ptr + (i + j) * recordSize;
memcpy (swapBuf, ptr1, recordSize);
memcpy (ptr1, ptr2, recordSize);
memcpy (ptr2, swapBuf, recordSize);
}
}
free (swapBuf);
}
void fill (MDB_env *env, MDB_dbi dbi)
{
KeyType key;
DataType data;
MDB_val keyRef;
MDB_val dataRef;
MDB_txn *txn;
long majorId;
long minorId;
printf ("generating data\n");
srandom (seed);
majorIds = (long *) malloc (majorIdCount * sizeof (long));
if (!majorIds)
{
fprintf (stderr, "out of memory\n");
exit (1);
}
for (int i = 0; i < majorIdCount; i++)
majorIds[i] = i;
// now shuffle (for later deletion test)
shuffle ((void *) majorIds, sizeof (long), majorIdCount);
KeyDataType *records = malloc (sizeof (KeyDataType) * recordCount);
KeyDataType *ptr = records;
int remaining = recordCount;
long refId = 0;
for (int i = 0; i < minorIdCount; i++)
{
long majorId = random () % majorIdCount;
long minorId = i;
int max = remaining / (minorIdCount -i + 1);
int use;
if (i == minorIdCount - 1 || max < 2)
{
use = max;
} else
{
long rand1 = random () % max;
long rand2 = random () % max;
use = (int) ((rand1 * rand2 / (max - 1))) + 1; // non-linear distribution
}
// printf ("%d %d %d\n", i, max, use);
while (use-- > 0)
{
ptr -> key.majorId = majorId;
ptr -> key.minorId = minorId;
ptr -> data.refId = ++refId;
ptr++;
remaining--;
}
}
shuffle ((void *) records, sizeof (KeyDataType), recordCount);
printf ("writing data\n");
check ("txn_begin", mdb_txn_begin (env, NULL, 0, &txn));
ptr = records;
for (int i = recordCount; i > 0; i--)
{
key.majorId = htobe64 (ptr -> key.majorId);
key.minorId = htobe64 (ptr -> key.minorId);
data.refId = htobe64 (ptr -> data.refId);
keyRef.mv_size = sizeof (key);
keyRef.mv_data = (void *) &key;
dataRef.mv_size = sizeof (data);
dataRef.mv_data = (void *) &data;
check ("mdb_put", mdb_put (txn, dbi, &keyRef, &dataRef, 0));
ptr++;
}
check ("txn_commit", mdb_txn_commit (txn));
printf ("%d records written\n", recordCount);
}
void deleteRange (MDB_env *env, MDB_dbi dbi, MDB_txn *txn,
KeyType *startKey, KeyType *endKey, int endIsInclusive)
{
MDB_cursor *cursor;
MDB_val curKeyRef;
MDB_val endKeyRef;
MDB_val curDataRef;
check ("cursor_open", mdb_cursor_open (txn, dbi, &cursor));
curKeyRef.mv_size = sizeof (KeyType);
curKeyRef.mv_data = (void *) startKey;
endKeyRef.mv_size = sizeof (KeyType);
endKeyRef.mv_data = (void *) endKey;
curDataRef.mv_size = 0;
curDataRef.mv_data = NULL;
int error = mdb_cursor_get (cursor, &curKeyRef, &curDataRef, MDB_SET_RANGE);
while (error != MDB_NOTFOUND)
{
check ("mdb_cursor_get", error);
int compResult = mdb_cmp (txn, dbi, &curKeyRef, &endKeyRef);
if (compResult > 0 || !compResult && !endIsInclusive)
break;
check ("mdb_cursor_del", mdb_cursor_del (cursor, MDB_NODUPDATA));
error = mdb_cursor_get (cursor, &curKeyRef, &curDataRef, MDB_NEXT);
}
mdb_cursor_close (cursor);
}
void testDelete (MDB_env *env, MDB_dbi dbi)
{
MDB_txn *txn;
KeyType startKey;
KeyType endKey;
printf ("testing\n");
check ("txn_begin", mdb_txn_begin (env, NULL, 0, &txn));
long majorId;
for (int i = 0; i < majorIdCount; i++)
{
majorId = majorIds[i];
startKey.majorId = htobe64 (majorId);
startKey.minorId = htobe64 (1);
endKey.majorId = htobe64 (majorId);
endKey.minorId = htobe64 ((long) (~0UL >> 1));
deleteRange (env, dbi, txn, &startKey, &endKey, 1);
}
check ("txn_commit", mdb_txn_commit (txn));
}
int main (int argc, char *argv[])
{
MDB_env *env;
MDB_dbi dbi;
MDB_txn *txn;
printf ("LMDB version: %s\n", MDB_VERSION_STRING);
unlink (fileName);
check ("env_create", mdb_env_create (&env));
check ("env_set_mapsize", mdb_env_set_mapsize (env, size));
check ("env_set_maxdbs", mdb_env_set_maxdbs (env, 2));
check ("env_open", mdb_env_open (env, fileName,
MDB_NOSUBDIR | MDB_WRITEMAP, 0666));
check ("txn_begin", mdb_txn_begin (env, NULL, 0, &txn));
check ("dbi_open", mdb_dbi_open (txn, dbName,
MDB_CREATE | MDB_DUPSORT, &dbi));
check ("txn_commit", mdb_txn_commit (txn));
fill (env, dbi);
testDelete (env, dbi);
mdb_env_close (env);
printf ("done.\n");
}