diff -U3 -r -N no-tests/tests/crash_test.py 4651bb3a865c77008ac261443899fe25f88173f2-tests/tests/crash_test.py --- no-tests/tests/crash_test.py 1970-01-01 01:00:00.000000000 +0100 +++ 4651bb3a865c77008ac261443899fe25f88173f2-tests/tests/crash_test.py 2017-05-29 16:44:06.615481956 +0200 @@ -0,0 +1,237 @@ +# +# Copyright 2013 The py-lmdb authors, all rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted only as authorized by the OpenLDAP +# Public License. +# +# A copy of this license is available in the file LICENSE in the +# top-level directory of the distribution or, alternatively, at +# . +# +# OpenLDAP is a registered trademark of the OpenLDAP Foundation. +# +# Individual files and/or contributed packages may be copyright by +# other parties and/or subject to additional restrictions. +# +# This work also contains materials derived from public sources. +# +# Additional information about OpenLDAP can be obtained at +# . +# + +# This is not a test suite! More like a collection of triggers for previously +# observed crashes. Want to contribute to py-lmdb? Please write a test suite! +# +# what happens when empty keys/ values passed to various funcs +# incorrect types +# try to break cpython arg parsing - too many/few/incorrect args +# Various efforts to cause Python-level leaks. +# + +from __future__ import absolute_import +from __future__ import with_statement + +import itertools +import os +import random +import unittest + +import lmdb +import testlib + +from testlib import B +from testlib import O + + +try: + next(iter([1])) +except NameError: # Python2.5. + def next(it): + return it.next() + + +class CrashTest(unittest.TestCase): + def tearDown(self): + testlib.cleanup() + + # Various efforts to cause segfaults. + + def setUp(self): + self.path, self.env = testlib.temp_env() + with self.env.begin(write=True) as txn: + txn.put(B('dave'), B('')) + txn.put(B('dave2'), B('')) + + def testOldCrash(self): + txn = self.env.begin() + dir(iter(txn.cursor())) + + def testCloseWithTxn(self): + txn = self.env.begin(write=True) + self.env.close() + self.assertRaises(Exception, (lambda: list(txn.cursor()))) + + def testDoubleClose(self): + self.env.close() + self.env.close() + + def testDbDoubleClose(self): + db = self.env.open_db(key=B('dave3')) + #db.close() + #db.close() + + def testTxnCloseActiveIter(self): + with self.env.begin() as txn: + it = txn.cursor().iternext() + self.assertRaises(Exception, (lambda: list(it))) + + def testDbCloseActiveIter(self): + db = self.env.open_db(key=B('dave3')) + with self.env.begin(write=True) as txn: + txn.put(B('a'), B('b'), db=db) + it = txn.cursor(db=db).iternext() + self.assertRaises(Exception, (lambda: list(it))) + + +class IteratorTest(unittest.TestCase): + def tearDown(self): + testlib.cleanup() + + def setUp(self): + self.path, self.env = testlib.temp_env() + self.txn = self.env.begin(write=True) + self.c = self.txn.cursor() + + def testEmpty(self): + self.assertEqual([], list(self.c)) + self.assertEqual([], list(self.c.iternext())) + self.assertEqual([], list(self.c.iterprev())) + + def testFilled(self): + testlib.putData(self.txn) + self.assertEqual(testlib.ITEMS, list(self.c)) + self.assertEqual(testlib.ITEMS, list(self.c)) + self.assertEqual(testlib.ITEMS, list(self.c.iternext())) + self.assertEqual(testlib.ITEMS[::-1], list(self.txn.cursor().iterprev())) + self.assertEqual(testlib.ITEMS[::-1], list(self.c.iterprev())) + self.assertEqual(testlib.ITEMS, list(self.c)) + + def testFilledSkipForward(self): + testlib.putData(self.txn) + self.c.set_range(B('b')) + self.assertEqual(testlib.ITEMS[1:], list(self.c)) + + def testFilledSkipReverse(self): + testlib.putData(self.txn) + self.c.set_range(B('b')) + self.assertEqual(testlib.REV_ITEMS[-2:], list(self.c.iterprev())) + + def testFilledSkipEof(self): + testlib.putData(self.txn) + self.assertEqual(False, self.c.set_range(B('z'))) + self.assertEqual(testlib.REV_ITEMS, list(self.c.iterprev())) + + +class BigReverseTest(unittest.TestCase): + def tearDown(self): + testlib.cleanup() + + # Test for issue with MDB_LAST+MDB_PREV skipping chunks of database. + def test_big_reverse(self): + path, env = testlib.temp_env() + txn = env.begin(write=True) + keys = [B('%05d' % i) for i in range(0xffff)] + for k in keys: + txn.put(k, k, append=True) + assert list(txn.cursor().iterprev(values=False)) == list(reversed(keys)) + + +class MultiCursorDeleteTest(unittest.TestCase): + def tearDown(self): + testlib.cleanup() + + def setUp(self): + self.path, self.env = testlib.temp_env() + + def test1(self): + """Ensure MDB_NEXT is ignored on `c1' when it was previously positioned + on the key that `c2' just deleted.""" + txn = self.env.begin(write=True) + cur = txn.cursor() + while cur.first(): + cur.delete() + + for i in range(1, 10): + cur.put(O(ord('a') + i) * i, B('')) + + c1 = txn.cursor() + c1f = c1.iternext(values=False) + while next(c1f) != B('ddd'): + pass + c2 = txn.cursor() + assert c2.set_key(B('ddd')) + c2.delete() + assert next(c1f) == B('eeee') + + def test_monster(self): + # Generate predictable sequence of sizes. + rand = random.Random() + rand.seed(0) + + txn = self.env.begin(write=True) + keys = [] + for i in range(20000): + key = B('%06x' % i) + val = B('x' * rand.randint(76, 350)) + assert txn.put(key, val) + keys.append(key) + + deleted = 0 + for key in txn.cursor().iternext(values=False): + assert txn.delete(key), key + deleted += 1 + + assert deleted == len(keys), deleted + + +class TxnFullTest(unittest.TestCase): + def tearDown(self): + testlib.cleanup() + + def test_17bf75b12eb94d9903cd62329048b146d5313bad(self): + """ + me_txn0 previously cached MDB_TXN_ERROR permanently. Fixed by + 17bf75b12eb94d9903cd62329048b146d5313bad. + """ + path, env = testlib.temp_env(map_size=4096*9, sync=False, max_spare_txns=0) + for i in itertools.count(): + try: + with env.begin(write=True) as txn: + txn.put(B(str(i)), B(str(i))) + except lmdb.MapFullError: + break + + # Should not crash with MDB_BAD_TXN: + with env.begin(write=True) as txn: + txn.delete(B('1')) + + +class EmptyIterTest(unittest.TestCase): + def tearDown(self): + testlib.cleanup() + + def test_python3_iternext_segfault(self): + # https://github.com/dw/py-lmdb/issues/105 + _, env = testlib.temp_env() + txn = env.begin() + cur = txn.cursor() + ite = cur.iternext() + nex = getattr(ite, 'next', + getattr(ite, '__next__', None)) + assert nex is not None + self.assertRaises(StopIteration, nex) + + +if __name__ == '__main__': + unittest.main() diff -U3 -r -N no-tests/tests/cursor_test.py 4651bb3a865c77008ac261443899fe25f88173f2-tests/tests/cursor_test.py --- no-tests/tests/cursor_test.py 1970-01-01 01:00:00.000000000 +0100 +++ 4651bb3a865c77008ac261443899fe25f88173f2-tests/tests/cursor_test.py 2017-05-29 16:44:06.615481956 +0200 @@ -0,0 +1,222 @@ +# +# Copyright 2013 The py-lmdb authors, all rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted only as authorized by the OpenLDAP +# Public License. +# +# A copy of this license is available in the file LICENSE in the +# top-level directory of the distribution or, alternatively, at +# . +# +# OpenLDAP is a registered trademark of the OpenLDAP Foundation. +# +# Individual files and/or contributed packages may be copyright by +# other parties and/or subject to additional restrictions. +# +# This work also contains materials derived from public sources. +# +# Additional information about OpenLDAP can be obtained at +# . +# + +# test delete(dupdata) + +from __future__ import absolute_import +from __future__ import with_statement +import unittest + +import testlib +from testlib import B +from testlib import BT + + +class ContextManagerTest(unittest.TestCase): + def tearDown(self): + testlib.cleanup() + + def test_ok(self): + path, env = testlib.temp_env() + txn = env.begin(write=True) + with txn.cursor() as curs: + curs.put(B('foo'), B('123')) + self.assertRaises(Exception, lambda: curs.get(B('foo'))) + + def test_crash(self): + path, env = testlib.temp_env() + txn = env.begin(write=True) + + try: + with txn.cursor() as curs: + curs.put(123, 123) + except: + pass + self.assertRaises(Exception, lambda: curs.get(B('foo'))) + + +class CursorTestBase(unittest.TestCase): + def tearDown(self): + testlib.cleanup() + + def setUp(self): + self.path, self.env = testlib.temp_env() + self.txn = self.env.begin(write=True) + self.c = self.txn.cursor() + + +class CursorTest(CursorTestBase): + def testKeyValueItemEmpty(self): + self.assertEqual(B(''), self.c.key()) + self.assertEqual(B(''), self.c.value()) + self.assertEqual(BT('', ''), self.c.item()) + + def testFirstLastEmpty(self): + self.assertEqual(False, self.c.first()) + self.assertEqual(False, self.c.last()) + + def testFirstFilled(self): + testlib.putData(self.txn) + self.assertEqual(True, self.c.first()) + self.assertEqual(testlib.ITEMS[0], self.c.item()) + + def testLastFilled(self): + testlib.putData(self.txn) + self.assertEqual(True, self.c.last()) + self.assertEqual(testlib.ITEMS[-1], self.c.item()) + + def testSetKey(self): + self.assertRaises(Exception, (lambda: self.c.set_key(B('')))) + self.assertEqual(False, self.c.set_key(B('missing'))) + testlib.putData(self.txn) + self.assertEqual(True, self.c.set_key(B('b'))) + self.assertEqual(False, self.c.set_key(B('ba'))) + + def testSetRange(self): + self.assertEqual(False, self.c.set_range(B('x'))) + testlib.putData(self.txn) + self.assertEqual(False, self.c.set_range(B('x'))) + self.assertEqual(True, self.c.set_range(B('a'))) + self.assertEqual(B('a'), self.c.key()) + self.assertEqual(True, self.c.set_range(B('ba'))) + self.assertEqual(B('baa'), self.c.key()) + self.c.set_range(B('')) + self.assertEqual(B('a'), self.c.key()) + + def testDeleteEmpty(self): + self.assertEqual(False, self.c.delete()) + + def testDeleteFirst(self): + testlib.putData(self.txn) + self.assertEqual(False, self.c.delete()) + self.c.first() + self.assertEqual(BT('a', ''), self.c.item()) + self.assertEqual(True, self.c.delete()) + self.assertEqual(BT('b', ''), self.c.item()) + self.assertEqual(True, self.c.delete()) + self.assertEqual(BT('baa', ''), self.c.item()) + self.assertEqual(True, self.c.delete()) + self.assertEqual(BT('d', ''), self.c.item()) + self.assertEqual(True, self.c.delete()) + self.assertEqual(BT('', ''), self.c.item()) + self.assertEqual(False, self.c.delete()) + self.assertEqual(BT('', ''), self.c.item()) + + def testDeleteLast(self): + testlib.putData(self.txn) + self.assertEqual(True, self.c.last()) + self.assertEqual(BT('d', ''), self.c.item()) + self.assertEqual(True, self.c.delete()) + self.assertEqual(BT('', ''), self.c.item()) + self.assertEqual(False, self.c.delete()) + self.assertEqual(BT('', ''), self.c.item()) + + def testCount(self): + self.assertRaises(Exception, (lambda: self.c.count())) + testlib.putData(self.txn) + self.c.first() + # TODO: complete dup key support. + #self.assertEqual(1, self.c.count()) + + def testPut(self): + pass + + +class PutmultiTest(CursorTestBase): + def test_empty_seq(self): + consumed, added = self.c.putmulti(()) + assert consumed == added == 0 + + def test_2list(self): + l = [BT('a', ''), BT('a', '')] + consumed, added = self.c.putmulti(l) + assert consumed == added == 2 + + li = iter(l) + consumed, added = self.c.putmulti(li) + assert consumed == added == 2 + + def test_2list_preserve(self): + l = [BT('a', ''), BT('a', '')] + consumed, added = self.c.putmulti(l, overwrite=False) + assert consumed == 2 + assert added == 1 + + assert self.c.set_key(B('a')) + assert self.c.delete() + + li = iter(l) + consumed, added = self.c.putmulti(li, overwrite=False) + assert consumed == 2 + assert added == 1 + + def test_bad_seq1(self): + self.assertRaises(Exception, + lambda: self.c.putmulti(range(2))) + + +class ReplaceTest(CursorTestBase): + def test_replace(self): + assert None is self.c.replace(B('a'), B('')) + assert B('') == self.c.replace(B('a'), B('x')) + assert B('x') == self.c.replace(B('a'), B('y')) + + +class ContextManagerTest(CursorTestBase): + def test_enter(self): + with self.c as c: + assert c is self.c + c.put(B('a'), B('a')) + assert c.get(B('a')) == B('a') + self.assertRaises(Exception, + lambda: c.get(B('a'))) + + def test_exit_success(self): + with self.txn.cursor() as c: + c.put(B('a'), B('a')) + self.assertRaises(Exception, + lambda: c.get(B('a'))) + + def test_exit_failure(self): + try: + with self.txn.cursor() as c: + c.put(B('a'), B('a')) + raise ValueError + except ValueError: + pass + self.assertRaises(Exception, + lambda: c.get(B('a'))) + + def test_close(self): + self.c.close() + self.assertRaises(Exception, + lambda: c.get(B('a'))) + + def test_double_close(self): + self.c.close() + self.c.close() + self.assertRaises(Exception, + lambda: self.c.put(B('a'), B('a'))) + + +if __name__ == '__main__': + unittest.main() diff -U3 -r -N no-tests/tests/env_test.py 4651bb3a865c77008ac261443899fe25f88173f2-tests/tests/env_test.py --- no-tests/tests/env_test.py 1970-01-01 01:00:00.000000000 +0100 +++ 4651bb3a865c77008ac261443899fe25f88173f2-tests/tests/env_test.py 2017-06-12 23:54:54.737797526 +0200 @@ -0,0 +1,845 @@ +# +# Copyright 2013 The py-lmdb authors, all rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted only as authorized by the OpenLDAP +# Public License. +# +# A copy of this license is available in the file LICENSE in the +# top-level directory of the distribution or, alternatively, at +# . +# +# OpenLDAP is a registered trademark of the OpenLDAP Foundation. +# +# Individual files and/or contributed packages may be copyright by +# other parties and/or subject to additional restrictions. +# +# This work also contains materials derived from public sources. +# +# Additional information about OpenLDAP can be obtained at +# . +# + +from __future__ import absolute_import +from __future__ import with_statement +import os +import signal +import sys +import unittest +import weakref + +import testlib +from testlib import B +from testlib import BT +from testlib import OCT +from testlib import INT_TYPES +from testlib import UnicodeType + +import lmdb + + +NO_READERS = UnicodeType('(no active readers)\n') + +try: + PAGE_SIZE = os.sysconf(os.sysconf_names['SC_PAGE_SIZE']) +except (AttributeError, KeyError, OSError): + PAGE_SIZE = 4096 + + +class VersionTest(unittest.TestCase): + def tearDown(self): + testlib.cleanup() + + def test_version(self): + ver = lmdb.version() + assert len(ver) == 3 + assert all(isinstance(i, INT_TYPES) for i in ver) + assert all(i >= 0 for i in ver) + + +class OpenTest(unittest.TestCase): + def tearDown(self): + testlib.cleanup() + + def test_bad_paths(self): + self.assertRaises(Exception, + lambda: lmdb.open('/doesnt/exist/at/all')) + self.assertRaises(Exception, + lambda: lmdb.open(testlib.temp_file())) + + def test_ok_path(self): + path, env = testlib.temp_env() + assert os.path.exists(path) + assert os.path.exists(os.path.join(path, 'data.mdb')) + assert os.path.exists(os.path.join(path, 'lock.mdb')) + assert env.path() == path + + def test_bad_size(self): + self.assertRaises(OverflowError, + lambda: testlib.temp_env(map_size=-123)) + + def test_tiny_size(self): + _, env = testlib.temp_env(map_size=10) + def txn(): + with env.begin(write=True) as txn: + txn.put(B('a'), B('a')) + self.assertRaises(lmdb.MapFullError, txn) + + def test_subdir_false_junk(self): + path = testlib.temp_file() + fp = open(path, 'wb') + fp.write(B('A' * 8192)) + fp.close() + self.assertRaises(lmdb.InvalidError, + lambda: lmdb.open(path, subdir=False)) + + def test_subdir_false_ok(self): + path = testlib.temp_file(create=False) + _, env = testlib.temp_env(path, subdir=False) + assert os.path.exists(path) + assert os.path.isfile(path) + assert os.path.isfile(path + '-lock') + assert not env.flags()['subdir'] + + def test_subdir_true_noexist_nocreate(self): + path = testlib.temp_dir(create=False) + self.assertRaises(lmdb.Error, + lambda: testlib.temp_env(path, subdir=True, create=False)) + assert not os.path.exists(path) + + def test_subdir_true_noexist_create(self): + path = testlib.temp_dir(create=False) + path_, env = testlib.temp_env(path, subdir=True, create=True) + assert path_ == path + assert env.path() == path + + def test_subdir_true_exist_nocreate(self): + path, env = testlib.temp_env() + assert lmdb.open(path, subdir=True, create=False).path() == path + + def test_subdir_true_exist_create(self): + path, env = testlib.temp_env() + assert lmdb.open(path, subdir=True, create=True).path() == path + + def test_readonly_false(self): + path, env = testlib.temp_env(readonly=False) + with env.begin(write=True) as txn: + txn.put(B('a'), B('')) + with env.begin() as txn: + assert txn.get(B('a')) == B('') + assert not env.flags()['readonly'] + + def test_readonly_true_noexist(self): + path = testlib.temp_dir(create=False) + # Open readonly missing store should fail. + self.assertRaises(lmdb.Error, + lambda: lmdb.open(path, readonly=True, create=True)) + # And create=True should not have mkdir'd it. + assert not os.path.exists(path) + + def test_readonly_true_exist(self): + path, env = testlib.temp_env() + env2 = lmdb.open(path, readonly=True) + assert env2.path() == path + # Attempting a write txn should fail. + self.assertRaises(lmdb.ReadonlyError, + lambda: env2.begin(write=True)) + # Flag should be set. + assert env2.flags()['readonly'] + + def test_metasync(self): + for flag in True, False: + path, env = testlib.temp_env(metasync=flag) + assert env.flags()['metasync'] == flag + + def test_lock(self): + for flag in True, False: + path, env = testlib.temp_env(lock=flag) + lock_path = os.path.join(path, 'lock.mdb') + assert env.flags()['lock'] == flag + assert flag == os.path.exists(lock_path) + + def test_sync(self): + for flag in True, False: + path, env = testlib.temp_env(sync=flag) + assert env.flags()['sync'] == flag + + def test_map_async(self): + for flag in True, False: + path, env = testlib.temp_env(map_async=flag) + assert env.flags()['map_async'] == flag + + def test_mode_subdir_create(self): + if sys.platform == 'win32': + # Mode argument is ignored on Windows; see lmdb.h + return + + oldmask = os.umask(0) + try: + for mode in OCT('777'), OCT('755'), OCT('700'): + path = testlib.temp_dir(create=False) + env = lmdb.open(path, subdir=True, create=True, mode=mode) + fmode = mode & ~OCT('111') + assert testlib.path_mode(path) == mode + assert testlib.path_mode(path+'/data.mdb') == fmode + assert testlib.path_mode(path+'/lock.mdb') == fmode + finally: + os.umask(oldmask) + + def test_mode_subdir_nocreate(self): + if sys.platform == 'win32': + # Mode argument is ignored on Windows; see lmdb.h + return + + oldmask = os.umask(0) + try: + for mode in OCT('777'), OCT('755'), OCT('700'): + path = testlib.temp_dir() + env = lmdb.open(path, subdir=True, create=False, mode=mode) + fmode = mode & ~OCT('111') + assert testlib.path_mode(path+'/data.mdb') == fmode + assert testlib.path_mode(path+'/lock.mdb') == fmode + finally: + os.umask(oldmask) + + def test_readahead(self): + for flag in True, False: + path, env = testlib.temp_env(readahead=flag) + assert env.flags()['readahead'] == flag + + def test_writemap(self): + for flag in True, False: + path, env = testlib.temp_env(writemap=flag) + assert env.flags()['writemap'] == flag + + def test_meminit(self): + for flag in True, False: + path, env = testlib.temp_env(meminit=flag) + assert env.flags()['meminit'] == flag + + def test_max_readers(self): + self.assertRaises(lmdb.InvalidParameterError, + lambda: testlib.temp_env(max_readers=0)) + for val in 123, 234: + _, env = testlib.temp_env(max_readers=val) + assert env.info()['max_readers'] == val + + def test_max_dbs(self): + self.assertRaises(OverflowError, + lambda: testlib.temp_env(max_dbs=-1)) + for val in 0, 10, 20: + _, env = testlib.temp_env(max_dbs=val) + dbs = [env.open_db(B('db%d' % i)) for i in range(val)] + self.assertRaises(lmdb.DbsFullError, + lambda: env.open_db(B('toomany'))) + + +class SetMapSizeTest(unittest.TestCase): + def tearDown(self): + testlib.cleanup() + + def test_invalid(self): + _, env = testlib.temp_env() + env.close() + self.assertRaises(Exception, + lambda: env.set_mapsize(999999)) + + def test_negative(self): + _, env = testlib.temp_env() + self.assertRaises(OverflowError, + lambda: env.set_mapsize(-2015)) + + def test_applied(self): + _, env = testlib.temp_env(map_size=PAGE_SIZE * 8) + assert env.info()['map_size'] == PAGE_SIZE * 8 + + env.set_mapsize(PAGE_SIZE * 16) + assert env.info()['map_size'] == PAGE_SIZE * 16 + + +class CloseTest(unittest.TestCase): + def tearDown(self): + testlib.cleanup() + + def test_close(self): + _, env = testlib.temp_env() + # Attempting things should be ok. + txn = env.begin(write=True) + txn.put(B('a'), B('')) + cursor = txn.cursor() + list(cursor) + cursor.first() + it = iter(cursor) + + env.close() + # Repeated calls are ignored: + env.close() + # Attempting to use invalid objects should crash. + self.assertRaises(Exception, lambda: txn.cursor()) + self.assertRaises(Exception, lambda: txn.commit()) + self.assertRaises(Exception, lambda: cursor.first()) + self.assertRaises(Exception, lambda: list(it)) + # Abort should be OK though. + txn.abort() + # Attempting to start new txn should crash. + self.assertRaises(Exception, + lambda: env.begin()) + + +class ContextManagerTest(unittest.TestCase): + def tearDown(self): + testlib.cleanup() + + def test_ok(self): + path, env = testlib.temp_env() + with env as env_: + assert env_ is env + with env.begin() as txn: + txn.get(B('foo')) + self.assertRaises(Exception, lambda: env.begin()) + + def test_crash(self): + path, env = testlib.temp_env() + try: + with env as env_: + assert env_ is env + with env.begin() as txn: + txn.get(123) + except: + pass + self.assertRaises(Exception, lambda: env.begin()) + + +class InfoMethodsTest(unittest.TestCase): + def tearDown(self): + testlib.cleanup() + + def test_path(self): + path, env = testlib.temp_env() + assert path == env.path() + assert isinstance(env.path(), UnicodeType) + + env.close() + self.assertRaises(Exception, + lambda: env.path()) + + def test_stat(self): + _, env = testlib.temp_env() + stat = env.stat() + for k in 'psize', 'depth', 'branch_pages', 'overflow_pages',\ + 'entries': + assert isinstance(stat[k], INT_TYPES), k + assert stat[k] >= 0 + + assert stat['entries'] == 0 + txn = env.begin(write=True) + txn.put(B('a'), B('b')) + txn.commit() + stat = env.stat() + assert stat['entries'] == 1 + + env.close() + self.assertRaises(Exception, + lambda: env.stat()) + + def test_info(self): + _, env = testlib.temp_env() + info = env.info() + for k in 'map_addr', 'map_size', 'last_pgno', 'last_txnid', \ + 'max_readers', 'num_readers': + assert isinstance(info[k], INT_TYPES), k + assert info[k] >= 0 + + assert info['last_txnid'] == 0 + txn = env.begin(write=True) + txn.put(B('a'), B('')) + txn.commit() + info = env.info() + assert info['last_txnid'] == 1 + + env.close() + self.assertRaises(Exception, + lambda: env.info()) + + def test_flags(self): + _, env = testlib.temp_env() + info = env.flags() + for k in 'subdir', 'readonly', 'metasync', 'sync', 'map_async',\ + 'readahead', 'writemap': + assert isinstance(info[k], bool) + + env.close() + self.assertRaises(Exception, + lambda: env.flags()) + + def test_max_key_size(self): + _, env = testlib.temp_env() + mks = env.max_key_size() + assert isinstance(mks, INT_TYPES) + assert mks > 0 + + env.close() + self.assertRaises(Exception, + lambda: env.max_key_size()) + + def test_max_readers(self): + _, env = testlib.temp_env() + mr = env.max_readers() + assert isinstance(mr, INT_TYPES) + assert mr > 0 and mr == env.info()['max_readers'] + + env.close() + self.assertRaises(Exception, + lambda: env.max_readers()) + + def test_readers(self): + _, env = testlib.temp_env(max_spare_txns=0) + r = env.readers() + assert isinstance(r, UnicodeType) + assert r == NO_READERS + + rtxn = env.begin() + r2 = env.readers() + assert isinstance(env.readers(), UnicodeType) + assert env.readers() != r + + env.close() + self.assertRaises(Exception, + lambda: env.readers()) + + +class OtherMethodsTest(unittest.TestCase): + def tearDown(self): + testlib.cleanup() + + def test_copy(self): + _, env = testlib.temp_env() + txn = env.begin(write=True) + txn.put(B('a'), B('b')) + txn.commit() + + dest_dir = testlib.temp_dir() + env.copy(dest_dir) + assert os.path.exists(dest_dir + '/data.mdb') + + cenv = lmdb.open(dest_dir) + ctxn = cenv.begin() + assert ctxn.get(B('a')) == B('b') + + env.close() + self.assertRaises(Exception, + lambda: env.copy(testlib.temp_dir())) + + def test_copy_compact(self): + _, env = testlib.temp_env() + txn = env.begin(write=True) + txn.put(B('a'), B('b')) + txn.commit() + + dest_dir = testlib.temp_dir() + env.copy(dest_dir, compact=True) + assert os.path.exists(dest_dir + '/data.mdb') + + cenv = lmdb.open(dest_dir) + ctxn = cenv.begin() + assert ctxn.get(B('a')) == B('b') + + env.close() + self.assertRaises(Exception, + lambda: env.copy(testlib.temp_dir())) + + def test_copyfd(self): + path, env = testlib.temp_env() + txn = env.begin(write=True) + txn.put(B('a'), B('b')) + txn.commit() + + dst_path = testlib.temp_file(create=False) + fp = open(dst_path, 'wb') + env.copyfd(fp.fileno()) + + dstenv = lmdb.open(dst_path, subdir=False) + dtxn = dstenv.begin() + assert dtxn.get(B('a')) == B('b') + + env.close() + self.assertRaises(Exception, + lambda: env.copyfd(fp.fileno())) + fp.close() + + def test_copyfd_compact(self): + path, env = testlib.temp_env() + txn = env.begin(write=True) + txn.put(B('a'), B('b')) + txn.commit() + + dst_path = testlib.temp_file(create=False) + fp = open(dst_path, 'wb') + env.copyfd(fp.fileno(), compact=True) + + dstenv = lmdb.open(dst_path, subdir=False) + dtxn = dstenv.begin() + assert dtxn.get(B('a')) == B('b') + + env.close() + self.assertRaises(Exception, + lambda: env.copyfd(fp.fileno())) + fp.close() + + def test_sync(self): + _, env = testlib.temp_env() + env.sync(False) + env.sync(True) + env.close() + self.assertRaises(Exception, + lambda: env.sync(False)) + + @staticmethod + def _test_reader_check_child(path): + """Function to run in child process since we can't use fork() on + win32.""" + env = lmdb.open(path, max_spare_txns=0) + txn = env.begin() + os._exit(0) + + def test_reader_check(self): + if sys.platform == 'win32': + # Stale writers are cleared automatically on Windows, see lmdb.h + return + + path, env = testlib.temp_env(max_spare_txns=0) + rc = env.reader_check() + assert rc == 0 + + # We need to open a separate env since Transaction.abort() always calls + # reset for a read-only txn, the actual abort doesn't happen until + # __del__, when Transaction discovers there is no room for it on the + # freelist. + env1 = lmdb.open(path) + txn1 = env1.begin() + assert env.readers() != NO_READERS + assert env.reader_check() == 0 + + # Start a child, open a txn, then crash the child. + rc = os.spawnl(os.P_WAIT, sys.executable, sys.executable, + __file__, 'test_reader_check_child', path) + + assert rc == 0 + assert env.reader_check() == 1 + assert env.reader_check() == 0 + assert env.readers() != NO_READERS + + txn1.abort() + env1.close() + assert env.readers() == NO_READERS + + env.close() + self.assertRaises(Exception, + lambda: env.reader_check()) + + +class BeginTest(unittest.TestCase): + def tearDown(self): + testlib.cleanup() + + def test_begin_closed(self): + _, env = testlib.temp_env() + env.close() + self.assertRaises(Exception, + lambda: env.begin()) + + def test_begin_readonly(self): + _, env = testlib.temp_env() + txn = env.begin() + # Read txn can't write. + self.assertRaises(lmdb.ReadonlyError, + lambda: txn.put(B('a'), B(''))) + txn.abort() + + def test_begin_write(self): + _, env = testlib.temp_env() + txn = env.begin(write=True) + # Write txn can write. + assert txn.put(B('a'), B('')) + txn.commit() + + def test_bind_db(self): + _, env = testlib.temp_env() + main = env.open_db(None) + sub = env.open_db(B('db1')) + + txn = env.begin(write=True, db=sub) + assert txn.put(B('b'), B('')) # -> sub + assert txn.put(B('a'), B(''), db=main) # -> main + txn.commit() + + txn = env.begin() + assert txn.get(B('a')) == B('') + assert txn.get(B('b')) is None + assert txn.get(B('a'), db=sub) is None + assert txn.get(B('b'), db=sub) == B('') + txn.abort() + + def test_parent_readonly(self): + _, env = testlib.temp_env() + parent = env.begin() + # Nonsensical. + self.assertRaises(lmdb.InvalidParameterError, + lambda: env.begin(parent=parent)) + + def test_parent(self): + _, env = testlib.temp_env() + parent = env.begin(write=True) + parent.put(B('a'), B('a')) + + child = env.begin(write=True, parent=parent) + assert child.get(B('a')) == B('a') + assert child.put(B('a'), B('b')) + child.abort() + + # put() should have rolled back + assert parent.get(B('a')) == B('a') + + child = env.begin(write=True, parent=parent) + assert child.put(B('a'), B('b')) + child.commit() + + # put() should be visible + assert parent.get(B('a')) == B('b') + + def test_buffers(self): + _, env = testlib.temp_env() + txn = env.begin(write=True, buffers=True) + assert txn.put(B('a'), B('a')) + b = txn.get(B('a')) + assert b is not None + assert len(b) == 1 + assert not isinstance(b, type(B(''))) + txn.commit() + + txn = env.begin(buffers=False) + b = txn.get(B('a')) + assert b is not None + assert len(b) == 1 + assert isinstance(b, type(B(''))) + txn.abort() + + +class OpenDbTest(unittest.TestCase): + def tearDown(self): + testlib.cleanup() + + def test_main(self): + _, env = testlib.temp_env() + # Start write txn, so we cause deadlock if open_db attempts txn. + txn = env.begin(write=True) + # Now get main DBI, we should already be open. + db = env.open_db(None) + # w00t, no deadlock. + + flags = db.flags(txn) + assert not flags['reverse_key'] + assert not flags['dupsort'] + txn.abort() + + def test_unicode(self): + _, env = testlib.temp_env() + assert env.open_db(B('myindex')) is not None + self.assertRaises(TypeError, + lambda: env.open_db(UnicodeType('myindex'))) + + def test_sub_notxn(self): + _, env = testlib.temp_env() + assert env.info()['last_txnid'] == 0 + db1 = env.open_db(B('subdb1')) + assert env.info()['last_txnid'] == 1 + db2 = env.open_db(B('subdb2')) + assert env.info()['last_txnid'] == 2 + + env.close() + self.assertRaises(Exception, + lambda: env.open_db('subdb3')) + + def test_sub_rotxn(self): + _, env = testlib.temp_env() + txn = env.begin(write=False) + self.assertRaises(lmdb.ReadonlyError, + lambda: env.open_db(B('subdb'), txn=txn)) + + def test_sub_txn(self): + _, env = testlib.temp_env() + txn = env.begin(write=True) + db1 = env.open_db(B('subdb1'), txn=txn) + db2 = env.open_db(B('subdb2'), txn=txn) + for db in db1, db2: + assert db.flags(txn) == { + 'dupfixed': False, + 'dupsort': False, + 'integerdup': False, + 'integerkey': False, + 'reverse_key': False, + } + txn.commit() + + def test_reopen(self): + path, env = testlib.temp_env() + db1 = env.open_db(B('subdb1')) + env.close() + env = lmdb.open(path, max_dbs=10) + db1 = env.open_db(B('subdb1')) + + FLAG_SETS = [ + (flag, val) + for flag in ( + 'reverse_key', 'dupsort', 'integerkey', 'integerdup', 'dupfixed' + ) + for val in (True, False) + ] + + def test_flags(self): + path, env = testlib.temp_env() + txn = env.begin(write=True) + + for flag, val in self.FLAG_SETS: + key = B('%s-%s' % (flag, val)) + db = env.open_db(key, txn=txn, **{flag: val}) + assert db.flags(txn)[flag] == val + + txn.commit() + # Test flag persistence. + env.close() + env = lmdb.open(path, max_dbs=10) + txn = env.begin(write=True) + + for flag, val in self.FLAG_SETS: + key = B('%s-%s' % (flag, val)) + db = env.open_db(key, txn=txn) + assert db.flags(txn)[flag] == val + + def test_readonly_env_main(self): + path, env = testlib.temp_env() + env.close() + + env = lmdb.open(path, readonly=True) + db = env.open_db(None) + + def test_readonly_env_sub_noexist(self): + # https://github.com/dw/py-lmdb/issues/109 + path, env = testlib.temp_env() + env.close() + + env = lmdb.open(path, max_dbs=10, readonly=True) + self.assertRaises(lmdb.NotFoundError, + lambda: env.open_db(B('node_schedules'), create=False)) + + def test_readonly_env_sub_eperm(self): + # https://github.com/dw/py-lmdb/issues/109 + path, env = testlib.temp_env() + env.close() + + env = lmdb.open(path, max_dbs=10, readonly=True) + self.assertRaises(lmdb.ReadonlyError, + lambda: env.open_db(B('node_schedules'), create=True)) + + def test_readonly_env_sub(self): + # https://github.com/dw/py-lmdb/issues/109 + path, env = testlib.temp_env() + assert env.open_db(B('node_schedules')) is not None + env.close() + + env = lmdb.open(path, max_dbs=10, readonly=True) + db = env.open_db(B('node_schedules'), create=False) + assert db is not None + + +reader_count = lambda env: env.readers().count('\n') - 1 + +class SpareTxnTest(unittest.TestCase): + def tearDown(self): + testlib.cleanup() + + def test_none(self): + _, env = testlib.temp_env(max_spare_txns=0) + assert 0 == reader_count(env) + + t1 = env.begin() + assert 1 == reader_count(env) + + t2 = env.begin() + assert 2 == reader_count(env) + + t1.abort() + del t1 + assert 1 == reader_count(env) + + t2.abort() + del t2 + assert 0 == reader_count(env) + + def test_one(self): + _, env = testlib.temp_env(max_spare_txns=1) + # 1 here, since CFFI uses a temporary reader during init. + assert 1 >= reader_count(env) + + t1 = env.begin() + assert 1 == reader_count(env) + + t2 = env.begin() + assert 2 == reader_count(env) + + t1.abort() + del t1 + assert 2 == reader_count(env) # 1 live, 1 cached + + t2.abort() + del t2 + assert 1 == reader_count(env) # 1 cached + + t3 = env.begin() + assert 1 == reader_count(env) # 1 live + + t3.abort() + del t3 + assert 1 == reader_count(env) # 1 cached + + +class LeakTest(unittest.TestCase): + def tearDown(self): + testlib.cleanup() + + def test_open_unref_does_not_leak(self): + temp_dir = testlib.temp_dir() + env = lmdb.open(temp_dir) + ref = weakref.ref(env) + env = None + testlib.debug_collect() + assert ref() is None + + def test_open_close_does_not_leak(self): + temp_dir = testlib.temp_dir() + env = lmdb.open(temp_dir) + env.close() + ref = weakref.ref(env) + env = None + testlib.debug_collect() + assert ref() is None + + def test_weakref_callback_invoked_once(self): + temp_dir = testlib.temp_dir() + env = lmdb.open(temp_dir) + env.close() + count = [0] + def callback(ref): + count[0] += 1 + ref = weakref.ref(env, callback) + env = None + testlib.debug_collect() + assert ref() is None + assert count[0] == 1 + + +if __name__ == '__main__': + if len(sys.argv) > 1 and sys.argv[1] == 'test_reader_check_child': + OtherMethodsTest._test_reader_check_child(sys.argv[2]) + else: + unittest.main() diff -U3 -r -N no-tests/tests/iteration_test.py 4651bb3a865c77008ac261443899fe25f88173f2-tests/tests/iteration_test.py --- no-tests/tests/iteration_test.py 1970-01-01 01:00:00.000000000 +0100 +++ 4651bb3a865c77008ac261443899fe25f88173f2-tests/tests/iteration_test.py 2017-05-29 16:44:06.615481956 +0200 @@ -0,0 +1,257 @@ +#! /usr/bin/env python +# +# Copyright 2013 The py-lmdb authors, all rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted only as authorized by the OpenLDAP +# Public License. +# +# A copy of this license is available in the file LICENSE in the +# top-level directory of the distribution or, alternatively, at +# . +# +# OpenLDAP is a registered trademark of the OpenLDAP Foundation. +# +# Individual files and/or contributed packages may be copyright by +# other parties and/or subject to additional restrictions. +# +# This work also contains materials derived from public sources. +# +# Additional information about OpenLDAP can be obtained at +# . +# + +# test delete(dupdata) + +from __future__ import absolute_import +from __future__ import with_statement +import unittest + +import testlib +from testlib import B +from testlib import BT +from testlib import KEYS, ITEMS, KEYS2, ITEMS2 +from testlib import putData, putBigData + + +class IterationTestBase(unittest.TestCase): + def tearDown(self): + testlib.cleanup() + + def setUp(self): + self.path, self.env = testlib.temp_env() # creates 10 databases + self.txn = self.env.begin(write=True) + putData(self.txn) + self.c = self.txn.cursor() + self.empty_entry = (B(''), B('')) + + def matchList(self, ls_a, ls_b): + return all(map(lambda x, y: x == y, ls_a, ls_b)) + + +class IterationTestBase2(unittest.TestCase): + """ This puts more data than its predecessor""" + + def tearDown(self): + testlib.cleanup() + + def setUp(self): + self.path, self.env = testlib.temp_env() # creates 10 databases + self.txn = self.env.begin(write=True) + putBigData(self.txn) # HERE! + self.c = self.txn.cursor() + self.empty_entry = ('', '') + + def matchList(self, ls_a, ls_b): + return all(map(lambda x, y: x == y, ls_a, ls_b)) + + +class IterationTest(IterationTestBase): + def testFromStart(self): + # From start + self.c.first() + self.assertEqual(self.c.key(), KEYS[0]) # start of db + test_list = [i for i in iter(self.c)] + self.assertEqual(self.matchList(test_list, ITEMS), True) + self.assertEqual(self.c.item(), self.empty_entry) # end of db + + def testFromStartWithIternext(self): + # From start with iternext + self.c.first() + self.assertEqual(self.c.key(), KEYS[0]) # start of db + test_list = [i for i in self.c.iternext()] + # remaining elements in db + self.assertEqual(self.matchList(test_list, ITEMS), True) + self.assertEqual(self.c.item(), self.empty_entry) # end of db + + def testFromStartWithNext(self): + # From start with next + self.c.first() + self.assertEqual(self.c.key(), KEYS[0]) # start of db + test_list = [] + while 1: + test_list.append(self.c.item()) + if not self.c.next(): + break + self.assertEqual(self.matchList(test_list, ITEMS), True) + + def testFromExistentKeySetKey(self): + self.c.first() + self.c.set_key(KEYS[1]) + self.assertEqual(self.c.key(), KEYS[1]) + test_list = [i for i in self.c.iternext()] + self.assertEqual(self.matchList(test_list, ITEMS[1:]), True) + + def testFromExistentKeySetRange(self): + self.c.first() + self.c.set_range(KEYS[1]) + self.assertEqual(self.c.key(), KEYS[1]) + test_list = [i for i in self.c.iternext()] + self.assertEqual(self.matchList(test_list, ITEMS[1:]), True) + + def testFromNonExistentKeySetRange(self): + self.c.first() + self.c.set_range(B('c')) + self.assertEqual(self.c.key(), B('d')) + test_list = [i for i in self.c.iternext()] + test_items = [i for i in ITEMS if i[0] > B('c')] + self.assertEqual(self.matchList(test_list, test_items), True) + + def testFromLastKey(self): + self.c.last() + self.assertEqual(self.c.key(), KEYS[-1]) + test_list = [i for i in self.c.iternext()] + self.assertEqual(self.matchList(test_list, ITEMS[-1:]), True) + + def testFromNonExistentKeyPastEnd(self): + self.c.last() + self.assertEqual(self.c.key(), KEYS[-1]) + # next() fails, leaving iterator in an unpositioned state. + self.c.next() + self.assertEqual(self.c.item(), self.empty_entry) + # iternext() from an unpositioned state proceeds from start of DB. + test_list = list(self.c.iternext()) + self.assertEqual(test_list, ITEMS) + + +class ReverseIterationTest(IterationTestBase): + def testFromStartRev(self): + # From start + self.c.first() + self.assertEqual(self.c.key(), KEYS[0]) # start of db + test_list = [i for i in self.c.iterprev()] + self.assertEqual(self.matchList(test_list, ITEMS[:1][::-1]), True) + self.assertEqual(self.c.item(), self.empty_entry) # very start of db + + def testFromExistentKeySetKeyRev(self): + self.c.first() + self.c.set_key(KEYS[2]) + self.assertEqual(self.c.key(), KEYS[2]) + test_list = [i for i in self.c.iterprev()] + self.assertEqual(self.matchList(test_list, ITEMS[:3][::-1]), True) + + def testFromExistentKeySetRangeRev(self): + self.c.first() + self.c.set_range(KEYS[2]) + self.assertEqual(self.c.key(), KEYS[2]) + test_list = [i for i in self.c.iterprev()] + self.assertEqual(self.matchList(test_list, ITEMS[:3][::-1]), True) + + def testFromNonExistentKeySetRangeRev(self): + self.c.first() + self.c.set_range(B('c')) + self.assertEqual(self.c.key(), B('d')) + test_list = [i for i in self.c.iterprev()] + test_items = [i for i in ITEMS if i[0] <= B('d')] + test_items = test_items[::-1] + self.assertEqual(self.matchList(test_list, test_items), True) + + def testFromLastKeyRev(self): + self.c.last() + self.assertEqual(self.c.key(), KEYS[-1]) + test_list = [i for i in self.c.iterprev()] + self.assertEqual(self.matchList(test_list, ITEMS[::-1]), True) + + def testFromLastKeyWithPrevRev(self): + self.c.last() + self.assertEqual(self.c.key(), KEYS[-1]) # end of db + test_list = [] + while 1: + test_list.append(self.c.item()) + if not self.c.prev(): + break + self.assertEqual(self.matchList(test_list, ITEMS[::-1]), True) + + def testFromNonExistentKeyPastEndRev(self): + self.c.first() + self.assertEqual(self.c.key(), KEYS[0]) + # prev() fails, leaving iterator in an unpositioned state. + self.c.prev() + self.assertEqual(self.c.item(), self.empty_entry) + # iterprev() from an unpositioned state proceeds from end of DB. + test_list = list(self.c.iterprev()) + self.assertEqual(test_list, ITEMS[::-1]) + +class IterationTestWithDupsBase(unittest.TestCase): + def tearDown(self): + testlib.cleanup() + + def setUp(self): + self.path, self.env = testlib.temp_env() + db = self.env.open_db(B('db1'), dupsort=True) + self.txn = self.env.begin(db, write=True) + for _ in range(2): + putData(self.txn) + self.c = self.txn.cursor() + self.empty_entry = ('', '') + + def matchList(self, ls_a, ls_b): + return all(map(lambda x, y: x == y, ls_a, ls_b)) + + +class IterationTestWithDups(IterationTestWithDupsBase): + pass + + +class SeekIterationTest(IterationTestBase2): + def testForwardIterationSeek(self): + self.c.first() + test_list = [] + for i in self.c.iternext(): + test_list.append(i) + # skips d and e + if self.c.key() == B('baa'): + self.c.set_key(B('e')) + test_item = [i for i in ITEMS2 if i[0] not in (B('d'), B('e'))] + self.assertEqual(test_list, test_item) + + def testPutDuringIteration(self): + self.c.first() + test_list = [] + c = self.txn.cursor() + for i in c.iternext(): + test_list.append(i) + # adds 'i' upon seeing 'e' + if c.key() == B('e'): + self.c.put(B('i'), B('')) + test_item = ITEMS2 + [(B('i'), B(''))] + self.assertEqual(test_list, test_item) + + def testDeleteDuringIteration(self): + self.c.first() + test_list = [] + for i in self.c.iternext(): + # deletes 'e' upon seeing it + if self.c.key() == B('e'): + # Causes 'e' to be deleted, and advances cursor to next + # element. + self.c.delete() + i = self.c.item() + test_list.append(i) + + test_item = [i for i in ITEMS2 if i[0] != B('e')] + self.assertEqual(test_list, test_item) + + +if __name__ == '__main__': + unittest.main() diff -U3 -r -N no-tests/tests/package_test.py 4651bb3a865c77008ac261443899fe25f88173f2-tests/tests/package_test.py --- no-tests/tests/package_test.py 1970-01-01 01:00:00.000000000 +0100 +++ 4651bb3a865c77008ac261443899fe25f88173f2-tests/tests/package_test.py 2017-05-29 16:44:06.615481956 +0200 @@ -0,0 +1,68 @@ +# +# Copyright 2013 The py-lmdb authors, all rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted only as authorized by the OpenLDAP +# Public License. +# +# A copy of this license is available in the file LICENSE in the +# top-level directory of the distribution or, alternatively, at +# . +# +# OpenLDAP is a registered trademark of the OpenLDAP Foundation. +# +# Individual files and/or contributed packages may be copyright by +# other parties and/or subject to additional restrictions. +# +# This work also contains materials derived from public sources. +# +# Additional information about OpenLDAP can be obtained at +# . +# + +from __future__ import absolute_import +import unittest + +import lmdb + + +class PackageExportsTest(unittest.TestCase): + """ + Ensure the list of exported names matches a predefined list. Designed to + ensure future interface changes to cffi.py and cpython.c don't break + consistency of "from lmdb import *". + """ + def test_exports(self): + assert sorted(lmdb.__all__) == [ + 'BadDbiError', + 'BadRslotError', + 'BadTxnError', + 'BadValsizeError', + 'CorruptedError', + 'Cursor', + 'CursorFullError', + 'DbsFullError', + 'DiskError', + 'Environment', + 'Error', + 'IncompatibleError', + 'InvalidError', + 'InvalidParameterError', + 'KeyExistsError', + 'LockError', + 'MapFullError', + 'MapResizedError', + 'MemoryError', + 'NotFoundError', + 'PageFullError', + 'PageNotFoundError', + 'PanicError', + 'ReadersFullError', + 'ReadonlyError', + 'TlsFullError', + 'Transaction', + 'TxnFullError', + 'VersionMismatchError', + 'enable_drop_gil', + 'version', + ] diff -U3 -r -N no-tests/tests/testlib.py 4651bb3a865c77008ac261443899fe25f88173f2-tests/tests/testlib.py --- no-tests/tests/testlib.py 1970-01-01 01:00:00.000000000 +0100 +++ 4651bb3a865c77008ac261443899fe25f88173f2-tests/tests/testlib.py 2017-05-29 16:44:06.615481956 +0200 @@ -0,0 +1,154 @@ +# +# Copyright 2013 The py-lmdb authors, all rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted only as authorized by the OpenLDAP +# Public License. +# +# A copy of this license is available in the file LICENSE in the +# top-level directory of the distribution or, alternatively, at +# . +# +# OpenLDAP is a registered trademark of the OpenLDAP Foundation. +# +# Individual files and/or contributed packages may be copyright by +# other parties and/or subject to additional restrictions. +# +# This work also contains materials derived from public sources. +# +# Additional information about OpenLDAP can be obtained at +# . +# + +from __future__ import absolute_import +import atexit +import gc +import os +import shutil +import stat +import sys +import tempfile +import traceback + +try: + import __builtin__ +except ImportError: + import builtins as __builtin__ + +import lmdb + + +_cleanups = [] + +def cleanup(): + while _cleanups: + func = _cleanups.pop() + try: + func() + except Exception: + traceback.print_exc() + +atexit.register(cleanup) + + +def temp_dir(create=True): + path = tempfile.mkdtemp(prefix='lmdb_test') + assert path is not None, 'tempfile.mkdtemp failed' + if not create: + os.rmdir(path) + _cleanups.append(lambda: shutil.rmtree(path, ignore_errors=True)) + if hasattr(path, 'decode'): + path = path.decode(sys.getfilesystemencoding()) + return path + + +def temp_file(create=True): + fd, path = tempfile.mkstemp(prefix='lmdb_test') + assert path is not None, 'tempfile.mkstemp failed' + os.close(fd) + if not create: + os.unlink(path) + _cleanups.append(lambda: os.path.exists(path) and os.unlink(path)) + pathlock = path + '-lock' + _cleanups.append(lambda: os.path.exists(pathlock) and os.unlink(pathlock)) + if hasattr(path, 'decode'): + path = path.decode(sys.getfilesystemencoding()) + return path + + +def temp_env(path=None, max_dbs=10, **kwargs): + if not path: + path = temp_dir() + env = lmdb.open(path, max_dbs=max_dbs, **kwargs) + _cleanups.append(env.close) + return path, env + + +def path_mode(path): + return stat.S_IMODE(os.stat(path).st_mode) + + +def debug_collect(): + if hasattr(gc, 'set_debug') and hasattr(gc, 'get_debug'): + old = gc.get_debug() + gc.set_debug(gc.DEBUG_LEAK) + gc.collect() + gc.set_debug(old) + else: + for x in range(10): + # PyPy doesn't collect objects with __del__ on first attempt. + gc.collect() + + +# Handle moronic Python >=3.0 <3.3. +UnicodeType = getattr(__builtin__, 'unicode', str) +BytesType = getattr(__builtin__, 'bytes', str) + + +try: + INT_TYPES = (int, long) +except NameError: + INT_TYPES = (int,) + +# B(ascii 'string') -> bytes +try: + bytes('') # Python>=2.6, alias for str(). + B = lambda s: s +except TypeError: # Python3.x, requires encoding parameter. + B = lambda s: bytes(s, 'ascii') +except NameError: # Python<=2.5. + B = lambda s: s + +# BL('s1', 's2') -> ['bytes1', 'bytes2'] +BL = lambda *args: list(map(B, args)) +# TS('s1', 's2') -> ('bytes1', 'bytes2') +BT = lambda *args: tuple(B(s) for s in args) +# O(int) -> length-1 bytes +O = lambda arg: B(chr(arg)) +# OCT(s) -> parse string as octal +OCT = lambda s: int(s, 8) + + +KEYS = BL('a', 'b', 'baa', 'd') +ITEMS = [(k, B('')) for k in KEYS] +REV_ITEMS = ITEMS[::-1] +VALUES = [B('') for k in KEYS] + +KEYS2 = BL('a', 'b', 'baa', 'd', 'e', 'f', 'g', 'h') +ITEMS2 = [(k, B('')) for k in KEYS2] +REV_ITEMS2 = ITEMS2[::-1] +VALUES2 = [B('') for k in KEYS2] + +def putData(t, db=None): + for k, v in ITEMS: + if db: + t.put(k, v, db=db) + else: + t.put(k, v) + +def putBigData(t, db=None): + for k, v in ITEMS2: + if db: + t.put(k, v, db=db) + else: + t.put(k, v) diff -U3 -r -N no-tests/tests/tool_test.py 4651bb3a865c77008ac261443899fe25f88173f2-tests/tests/tool_test.py --- no-tests/tests/tool_test.py 1970-01-01 01:00:00.000000000 +0100 +++ 4651bb3a865c77008ac261443899fe25f88173f2-tests/tests/tool_test.py 2017-05-29 16:44:06.616481955 +0200 @@ -0,0 +1,37 @@ +# +# Copyright 2013 The py-lmdb authors, all rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted only as authorized by the OpenLDAP +# Public License. +# +# A copy of this license is available in the file LICENSE in the +# top-level directory of the distribution or, alternatively, at +# . +# +# OpenLDAP is a registered trademark of the OpenLDAP Foundation. +# +# Individual files and/or contributed packages may be copyright by +# other parties and/or subject to additional restrictions. +# +# This work also contains materials derived from public sources. +# +# Additional information about OpenLDAP can be obtained at +# . +# + +from __future__ import absolute_import +import unittest + +import lmdb +import lmdb.tool + + +class ToolTest(unittest.TestCase): + def test_ok(self): + # For now, simply ensure the module can be compiled (3.x compat). + pass + + +if __name__ == '__main__': + unittest.main() diff -U3 -r -N no-tests/tests/txn_test.py 4651bb3a865c77008ac261443899fe25f88173f2-tests/tests/txn_test.py --- no-tests/tests/txn_test.py 1970-01-01 01:00:00.000000000 +0100 +++ 4651bb3a865c77008ac261443899fe25f88173f2-tests/tests/txn_test.py 2017-05-29 16:44:06.616481955 +0200 @@ -0,0 +1,538 @@ +# +# Copyright 2013 The py-lmdb authors, all rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted only as authorized by the OpenLDAP +# Public License. +# +# A copy of this license is available in the file LICENSE in the +# top-level directory of the distribution or, alternatively, at +# . +# +# OpenLDAP is a registered trademark of the OpenLDAP Foundation. +# +# Individual files and/or contributed packages may be copyright by +# other parties and/or subject to additional restrictions. +# +# This work also contains materials derived from public sources. +# +# Additional information about OpenLDAP can be obtained at +# . +# + +from __future__ import absolute_import +from __future__ import with_statement +import struct +import unittest +import weakref + +import testlib +from testlib import B +from testlib import BT +from testlib import OCT +from testlib import INT_TYPES +from testlib import BytesType +from testlib import UnicodeType + +import lmdb + + +UINT_0001 = struct.pack('I', 1) +UINT_0002 = struct.pack('I', 2) +ULONG_0001 = struct.pack('L', 1) # L != size_t +ULONG_0002 = struct.pack('L', 2) # L != size_t + + +class InitTest(unittest.TestCase): + def tearDown(self): + testlib.cleanup() + + def test_closed(self): + _, env = testlib.temp_env() + env.close() + self.assertRaises(Exception, + lambda: lmdb.Transaction(env)) + + def test_readonly(self): + _, env = testlib.temp_env() + txn = lmdb.Transaction(env) + # Read txn can't write. + self.assertRaises(lmdb.ReadonlyError, + lambda: txn.put(B('a'), B(''))) + txn.abort() + + def test_begin_write(self): + _, env = testlib.temp_env() + txn = lmdb.Transaction(env, write=True) + # Write txn can write. + assert txn.put(B('a'), B('')) + txn.commit() + + def test_bind_db(self): + _, env = testlib.temp_env() + main = env.open_db(None) + sub = env.open_db(B('db1')) + + txn = lmdb.Transaction(env, write=True, db=sub) + assert txn.put(B('b'), B('')) # -> sub + assert txn.put(B('a'), B(''), db=main) # -> main + txn.commit() + + txn = lmdb.Transaction(env) + assert txn.get(B('a')) == B('') + assert txn.get(B('b')) is None + assert txn.get(B('a'), db=sub) is None + assert txn.get(B('b'), db=sub) == B('') + txn.abort() + + def test_bind_db_methods(self): + _, env = testlib.temp_env() + maindb = env.open_db(None) + db1 = env.open_db(B('d1')) + txn = lmdb.Transaction(env, write=True, db=db1) + assert txn.put(B('a'), B('d1')) + assert txn.get(B('a'), db=db1) == B('d1') + assert txn.get(B('a'), db=maindb) is None + assert txn.replace(B('a'), B('d11')) == B('d1') + assert txn.pop(B('a')) == B('d11') + assert txn.put(B('a'), B('main'), db=maindb, overwrite=False) + assert not txn.delete(B('a')) + txn.abort() + + def test_parent_readonly(self): + _, env = testlib.temp_env() + parent = lmdb.Transaction(env) + # Nonsensical. + self.assertRaises(lmdb.InvalidParameterError, + lambda: lmdb.Transaction(env, parent=parent)) + + def test_parent(self): + _, env = testlib.temp_env() + parent = lmdb.Transaction(env, write=True) + parent.put(B('a'), B('a')) + + child = lmdb.Transaction(env, write=True, parent=parent) + assert child.get(B('a')) == B('a') + assert child.put(B('a'), B('b')) + child.abort() + + # put() should have rolled back + assert parent.get(B('a')) == B('a') + + child = lmdb.Transaction(env, write=True, parent=parent) + assert child.put(B('a'), B('b')) + child.commit() + + # put() should be visible + assert parent.get(B('a')) == B('b') + + def test_buffers(self): + _, env = testlib.temp_env() + txn = lmdb.Transaction(env, write=True, buffers=True) + assert txn.put(B('a'), B('a')) + b = txn.get(B('a')) + assert b is not None + assert len(b) == 1 + assert not isinstance(b, type(B(''))) + txn.commit() + + txn = lmdb.Transaction(env, buffers=False) + b = txn.get(B('a')) + assert b is not None + assert len(b) == 1 + assert isinstance(b, type(B(''))) + txn.abort() + + +class ContextManagerTest(unittest.TestCase): + def tearDown(self): + testlib.cleanup() + + def test_ok(self): + path, env = testlib.temp_env() + txn = env.begin(write=True) + with txn as txn_: + assert txn is txn_ + txn.put(B('foo'), B('123')) + + self.assertRaises(Exception, lambda: txn.get(B('foo'))) + with env.begin() as txn: + assert txn.get(B('foo')) == B('123') + + def test_crash(self): + path, env = testlib.temp_env() + txn = env.begin(write=True) + + try: + with txn as txn_: + txn.put(B('foo'), B('123')) + txn.put(123, 123) + except: + pass + + self.assertRaises(Exception, lambda: txn.get(B('foo'))) + with env.begin() as txn: + assert txn.get(B('foo')) is None + + +class IdTest(unittest.TestCase): + def tearDown(self): + testlib.cleanup() + + def test_readonly_new(self): + _, env = testlib.temp_env() + with env.begin() as txn: + assert txn.id() == 0 + + def test_write_new(self): + _, env = testlib.temp_env() + with env.begin(write=True) as txn: + assert txn.id() == 1 + + def test_readonly_after_write(self): + _, env = testlib.temp_env() + with env.begin(write=True) as txn: + txn.put(B('a'), B('a')) + with env.begin() as txn: + assert txn.id() == 1 + + def test_invalid_txn(self): + _, env = testlib.temp_env() + txn = env.begin() + txn.abort() + self.assertRaises(Exception, lambda: txn.id()) + + +class StatTest(unittest.TestCase): + def tearDown(self): + testlib.cleanup() + + def test_stat(self): + _, env = testlib.temp_env() + db1 = env.open_db(B('db1')) + db2 = env.open_db(B('db2')) + + txn = lmdb.Transaction(env) + for db in db1, db2: + stat = txn.stat(db) + for k in 'psize', 'depth', 'branch_pages', 'overflow_pages',\ + 'entries': + assert isinstance(stat[k], INT_TYPES), k + assert stat[k] >= 0 + assert stat['entries'] == 0 + + txn = lmdb.Transaction(env, write=True) + txn.put(B('a'), B('b'), db=db1) + txn.commit() + + txn = lmdb.Transaction(env) + stat = txn.stat(db1) + assert stat['entries'] == 1 + + stat = txn.stat(db2) + assert stat['entries'] == 0 + + txn.abort() + self.assertRaises(Exception, + lambda: env.stat(db1)) + env.close() + self.assertRaises(Exception, + lambda: env.stat(db1)) + + +class DropTest(unittest.TestCase): + def tearDown(self): + testlib.cleanup() + + def test_empty(self): + _, env = testlib.temp_env() + db1 = env.open_db(B('db1')) + txn = env.begin(write=True) + txn.put(B('a'), B('a'), db=db1) + assert txn.get(B('a'), db=db1) == B('a') + txn.drop(db1, False) + assert txn.get(B('a')) is None + txn.drop(db1, False) # should succeed. + assert txn.get(B('a')) is None + + def test_delete(self): + _, env = testlib.temp_env() + db1 = env.open_db(B('db1')) + txn = env.begin(write=True) + txn.put(B('a'), B('a'), db=db1) + txn.drop(db1) + self.assertRaises(lmdb.InvalidParameterError, + lambda: txn.get(B('a'), db=db1)) + self.assertRaises(lmdb.InvalidParameterError, + lambda: txn.drop(db1)) + + +class CommitTest(unittest.TestCase): + def tearDown(self): + testlib.cleanup() + + def test_bad_txn(self): + _, env = testlib.temp_env() + txn = env.begin() + txn.abort() + self.assertRaises(Exception, + lambda: txn.commit()) + + def test_bad_env(self): + _, env = testlib.temp_env() + txn = env.begin() + env.close() + self.assertRaises(Exception, + lambda: txn.commit()) + + def test_commit_ro(self): + _, env = testlib.temp_env() + txn = env.begin() + txn.commit() + self.assertRaises(Exception, + lambda: txn.commit()) + + def test_commit_rw(self): + _, env = testlib.temp_env() + txn = env.begin(write=True) + assert txn.put(B('a'), B('a')) + txn.commit() + self.assertRaises(Exception, + lambda: txn.commit()) + txn = env.begin() + assert txn.get(B('a')) == B('a') + txn.abort() + + +class AbortTest(unittest.TestCase): + def tearDown(self): + testlib.cleanup() + + def test_abort_ro(self): + _, env = testlib.temp_env() + txn = env.begin() + assert txn.get(B('a')) is None + txn.abort() + self.assertRaises(Exception, + lambda: txn.get(B('a'))) + env.close() + self.assertRaises(Exception, + lambda: txn.get(B('a'))) + + def test_abort_rw(self): + _, env = testlib.temp_env() + txn = env.begin(write=True) + assert txn.put(B('a'), B('a')) + txn.abort() + txn = env.begin() + assert txn.get(B('a')) is None + + +class GetTest(unittest.TestCase): + def tearDown(self): + testlib.cleanup() + + def test_bad_txn(self): + _, env = testlib.temp_env() + txn = env.begin() + txn.abort() + self.assertRaises(Exception, + lambda: txn.get(B('a'))) + + def test_bad_env(self): + _, env = testlib.temp_env() + txn = env.begin() + env.close() + self.assertRaises(Exception, + lambda: txn.get(B('a'))) + + def test_missing(self): + _, env = testlib.temp_env() + txn = env.begin() + assert txn.get(B('a')) is None + assert txn.get(B('a'), default='default') is 'default' + + def test_empty_key(self): + _, env = testlib.temp_env() + txn = env.begin() + self.assertRaises(lmdb.BadValsizeError, + lambda: txn.get(B(''))) + + def test_db(self): + _, env = testlib.temp_env() + maindb = env.open_db(None) + db1 = env.open_db(B('db1')) + + txn = env.begin() + assert txn.get(B('a'), db=db1) is None + txn.abort() + + txn = env.begin(write=True) + txn.put(B('a'), B('a'), db=db1) + txn.commit() + + txn = env.begin() + assert txn.get(B('a')) is None + txn.abort() + + txn = env.begin(db=db1) + assert txn.get(B('a')) == B('a') + assert txn.get(B('a'), db=maindb) is None + + def test_buffers_no(self): + _, env = testlib.temp_env() + txn = env.begin(write=True) + assert txn.put(B('a'), B('a')) + assert type(txn.get(B('a'))) is BytesType + + def test_buffers_yes(self): + _, env = testlib.temp_env() + txn = env.begin(write=True, buffers=True) + assert txn.put(B('a'), B('a')) + assert type(txn.get(B('a'))) is not BytesType + + def test_dupsort(self): + _, env = testlib.temp_env() + db1 = env.open_db(B('db1'), dupsort=True) + txn = env.begin(write=True, db=db1) + assert txn.put(B('a'), B('a')) + assert txn.put(B('a'), B('b')) + assert txn.get(B('a')) == B('a') + + def test_integerkey(self): + _, env = testlib.temp_env() + db1 = env.open_db(B('db1'), integerkey=True) + txn = env.begin(write=True, db=db1) + assert txn.put(UINT_0001, B('a')) + assert txn.put(UINT_0002, B('b')) + assert txn.get(UINT_0001) == B('a') + assert txn.get(UINT_0002) == B('b') + + def test_integerdup(self): + _, env = testlib.temp_env() + db1 = env.open_db(B('db1'), dupsort=True, integerdup=True) + txn = env.begin(write=True, db=db1) + assert txn.put(UINT_0001, UINT_0002) + assert txn.put(UINT_0001, UINT_0001) + assert txn.get(UINT_0001) == UINT_0001 + + def test_dupfixed(self): + _, env = testlib.temp_env() + db1 = env.open_db(B('db1'), dupsort=True, dupfixed=True) + txn = env.begin(write=True, db=db1) + assert txn.put(B('a'), B('a')) + assert txn.put(B('a'), B('b')) + assert txn.get(B('a')) == B('a') + + +class PutTest(unittest.TestCase): + def tearDown(self): + testlib.cleanup() + + def test_bad_txn(self): + _, env = testlib.temp_env() + txn = env.begin(write=True) + txn.abort() + self.assertRaises(Exception, + lambda: txn.put(B('a'), B('a'))) + + def test_bad_env(self): + _, env = testlib.temp_env() + txn = env.begin(write=True) + env.close() + self.assertRaises(Exception, + lambda: txn.put(B('a'), B('a'))) + + def test_ro_txn(self): + _, env = testlib.temp_env() + txn = env.begin() + self.assertRaises(lmdb.ReadonlyError, + lambda: txn.put(B('a'), B('a'))) + + def test_empty_key_value(self): + _, env = testlib.temp_env() + txn = env.begin(write=True) + self.assertRaises(lmdb.BadValsizeError, + lambda: txn.put(B(''), B('a'))) + + def test_dupsort(self): + _, env = testlib.temp_env() + + def test_dupdata_no_dupsort(self): + _, env = testlib.temp_env() + txn = env.begin(write=True) + assert txn.put(B('a'), B('a'), dupdata=True) + assert txn.put(B('a'), B('b'), dupdata=True) + txn.get(B('a')) + + +class ReplaceTest(unittest.TestCase): + def tearDown(self): + testlib.cleanup() + + def test_bad_txn(self): + _, env = testlib.temp_env() + txn = env.begin(write=True) + txn.abort() + self.assertRaises(Exception, + lambda: txn.replace(B('a'), B('a'))) + + def test_bad_env(self): + _, env = testlib.temp_env() + txn = env.begin(write=True) + env.close() + self.assertRaises(Exception, + lambda: txn.replace(B('a'), B('a'))) + + def test_ro_txn(self): + _, env = testlib.temp_env() + txn = env.begin() + self.assertRaises(lmdb.ReadonlyError, + lambda: txn.replace(B('a'), B('a'))) + + def test_empty_key_value(self): + _, env = testlib.temp_env() + txn = env.begin(write=True) + self.assertRaises(lmdb.BadValsizeError, + lambda: txn.replace(B(''), B('a'))) + + def test_dupsort_noexist(self): + _, env = testlib.temp_env() + db = env.open_db(B('db1'), dupsort=True) + txn = env.begin(write=True, db=db) + assert None == txn.replace(B('a'), B('x')) + assert B('x') == txn.replace(B('a'), B('y')) + assert B('y') == txn.replace(B('a'), B('z')) + cur = txn.cursor() + assert cur.set_key(B('a')) + assert [B('z')] == list(cur.iternext_dup()) + + def test_dupdata_no_dupsort(self): + _, env = testlib.temp_env() + txn = env.begin(write=True) + assert txn.put(B('a'), B('a'), dupdata=True) + assert txn.put(B('a'), B('b'), dupdata=True) + txn.get(B('a')) + + +class LeakTest(unittest.TestCase): + def tearDown(self): + testlib.cleanup() + + def test_open_close(self): + temp_dir = testlib.temp_dir() + env = lmdb.open(temp_dir) + with env.begin() as txn: + pass + env.close() + r1 = weakref.ref(env) + r2 = weakref.ref(txn) + env = None + txn = None + testlib.debug_collect() + assert r1() is None + assert r2() is None + + +if __name__ == '__main__': + unittest.main()