diff --git a/python_bindings/tests/bindings_test.py b/python_bindings/tests/bindings_test.py index 068b52f..3514341 100644 --- a/python_bindings/tests/bindings_test.py +++ b/python_bindings/tests/bindings_test.py @@ -10,9 +10,19 @@ import os, gc, psutil -MEM_TEST_ITER=50 -MEM_GROW_COEFF=8 # This is a bit adhoc but seems to work in practice +MEM_TEST_CRIT_FAIL_RATE=0.25 +MEM_TEST_REPEAT_QTY1=4 +MEM_TEST_ITER1=10 + +MEM_TEST_REPEAT_QTY2=4 +MEM_TEST_ITER2=5 + +# The key to stable memory testing is using a reasonably large number of points +MEM_TEST_DATA_QTY=25000 +MEM_TEST_QUERY_QTY=200 +MEM_GROW_COEFF=1.5 # This is a bit adhoc but seems to work in practice +MEM_TEST_DATA_DIM=4 def get_exact_cosine(row, data, N=10): scores = data.dot(row) / np.linalg.norm(data, axis=-1) @@ -300,113 +310,7 @@ def testGlobal(self): # this is a one line reproduction of https://github.com/nmslib/nmslib/issues/327 GlobalTestCase.index = nmslib.init() -class MemoryLeak1TestCase(TestCaseBase): - def testMemoryLeak1(self): - process = psutil.Process(os.getpid()) - - np.random.seed(23) - data = np.random.randn(10000, 10).astype(np.float32) - query = np.random.randn(1000, 10).astype(np.float32) - space_name = 'l2' - - num_threads=4 - - index_time_params = {'M': 20, - 'efConstruction': 100, - 'indexThreadQty': num_threads, - 'post' : 0, - 'skip_optimized_index' : 1} # using non-optimized index! - - query_time_params = {'efSearch': 100} - - with tempfile.NamedTemporaryFile() as tmp: - - index = nmslib.init(method='hnsw', space=space_name, data_type=nmslib.DataType.DENSE_VECTOR) - index.addDataPointBatch(data) - - index.createIndex(index_time_params) - index.saveIndex(tmp.name, save_data=True) - - init_mem = process.memory_info().rss - - delta1 = None - - for k in range(MEM_TEST_ITER): - - index = nmslib.init(method='hnsw', space=space_name, data_type=nmslib.DataType.DENSE_VECTOR) - index.loadIndex(tmp.name, load_data=True) - index.setQueryTimeParams(query_time_params) - - if k == 0: - delta1 = process.memory_info().rss - init_mem - - nbrs = index.knnQueryBatch(query, k = 10, num_threads = num_threads) - - nbrs = None - index = None - - gc.collect() - - gc.collect() - time.sleep(1) - gc.collect() - time.sleep(1) - delta_last = process.memory_info().rss - init_mem - - # if this check fails a memory leak is possible (but not necessarily 100% certain, memory is random) - self.assertTrue(delta_last < delta1 * MEM_GROW_COEFF) - -class MemoryLeak2TestCase(TestCaseBase): - def testMemoryLeak2(self): - process = psutil.Process(os.getpid()) - - np.random.seed(23) - data = np.random.randn(10000, 10).astype(np.float32) - query = np.random.randn(1000, 10).astype(np.float32) - space_name = 'l2' - - num_threads=4 - - index_time_params = {'M': 20, - 'efConstruction': 100, - 'indexThreadQty': num_threads, - 'post' : 0, - 'skip_optimized_index' : 1} # using non-optimized index! - - query_time_params = {'efSearch': 100} - - with tempfile.NamedTemporaryFile() as tmp: - - init_mem = process.memory_info().rss - - delta1 = None - - for k in range(MEM_TEST_ITER): - - index = nmslib.init(method='hnsw', space=space_name, data_type=nmslib.DataType.DENSE_VECTOR) - index.addDataPointBatch(data) - index.createIndex(index_time_params) - - if k == 0: - delta1 = process.memory_info().rss - init_mem - - index.setQueryTimeParams(query_time_params) - nbrs = index.knnQueryBatch(query, k = 10, num_threads = num_threads) - - nbrs = None - index = None - - gc.collect() - - gc.collect() - time.sleep(1) - gc.collect() - time.sleep(1) - delta_last = process.memory_info().rss - init_mem - - # if this check fails a memory leak is possible (but not necessarily 100% certain, memory is random) - self.assertTrue(delta_last < delta1 * MEM_GROW_COEFF) - if __name__ == "__main__": unittest.main() + diff --git a/python_bindings/tests/mem_test1.py b/python_bindings/tests/mem_test1.py new file mode 100644 index 0000000..c519b2e --- /dev/null +++ b/python_bindings/tests/mem_test1.py @@ -0,0 +1,90 @@ +import itertools +import tempfile +import unittest + +import numpy as np +import numpy.testing as npt + +import nmslib +import time + +import os, gc, psutil + +from .bindings_test import * + +class MemoryLeak1TestCase(TestCaseBase): + def testMemoryLeak1(self): + process = psutil.Process(os.getpid()) + + np.random.seed(23) + data = np.random.randn(MEM_TEST_DATA_QTY, MEM_TEST_DATA_DIM).astype(np.float32) + query = np.random.randn(MEM_TEST_QUERY_QTY, MEM_TEST_DATA_DIM).astype(np.float32) + space_name = 'l2' + + num_threads=4 + + index_time_params = {'M': 20, + 'efConstruction': 100, + 'indexThreadQty': num_threads, + 'post' : 0, + 'skip_optimized_index' : 1} # using non-optimized index! + + query_time_params = {'efSearch': 100} + + fail_qty = 0 + test_qty = 0 + delta_first = None + + gc.collect() + time.sleep(0.25) + + init_mem = process.memory_info().rss + + for tid in range(MEM_TEST_REPEAT_QTY1): + + with tempfile.NamedTemporaryFile() as tmp: + + index = nmslib.init(method='hnsw', space=space_name, data_type=nmslib.DataType.DENSE_VECTOR) + index.addDataPointBatch(data) + + index.createIndex(index_time_params) + index.saveIndex(tmp.name, save_data=True) + + index = None + gc.collect() + + for iter_id in range(MEM_TEST_ITER1): + + index = nmslib.init(method='hnsw', space=space_name, data_type=nmslib.DataType.DENSE_VECTOR) + index.loadIndex(tmp.name, load_data=True) + index.setQueryTimeParams(query_time_params) + + if iter_id == 0 and tid == 0: + delta_first = process.memory_info().rss - init_mem + + delta_curr = process.memory_info().rss - init_mem + + #print('Step %d mem deltas current: %d first: %d ratio %f' % (iter_id, delta_curr, delta_first, float(delta_curr)/max(delta_first, 1))) + + nbrs = index.knnQueryBatch(query, k = 10, num_threads = num_threads) + + nbrs = None + index = None + + gc.collect() + + gc.collect() + time.sleep(0.25) + delta_last = process.memory_info().rss - init_mem + #print('Delta last %d' % delta_last) + + test_qty += 1 + if delta_last >= delta_first * MEM_GROW_COEFF: + fail_qty += 1 + + + print('Fail qty %d out of %d' % (fail_qty, test_qty)) + self.assertTrue(fail_qty < MEM_TEST_ITER1 * MEM_TEST_CRIT_FAIL_RATE) + +if __name__ == "__main__": + unittest.main() diff --git a/python_bindings/tests/mem_test2.py b/python_bindings/tests/mem_test2.py new file mode 100644 index 0000000..919062d --- /dev/null +++ b/python_bindings/tests/mem_test2.py @@ -0,0 +1,90 @@ +import itertools +import tempfile +import unittest + +import numpy as np +import numpy.testing as npt + +import nmslib +import time + +import os, gc, psutil + +from .bindings_test import * + +class MemoryLeak2TestCase(TestCaseBase): + def testMemoryLeak2(self): + process = psutil.Process(os.getpid()) + + np.random.seed(23) + data = np.random.randn(MEM_TEST_DATA_QTY, 10).astype(np.float32) + query = np.random.randn(MEM_TEST_QUERY_QTY, 10).astype(np.float32) + space_name = 'l2' + + num_threads=4 + + index_time_params = {'M': 20, + 'efConstruction': 100, + 'indexThreadQty': num_threads, + 'post' : 0, + 'skip_optimized_index' : 1} # using non-optimized index! + + query_time_params = {'efSearch': 100} + + gc.collect() + time.sleep(0.25) + + init_mem = process.memory_info().rss + + + fail_qty = 0 + test_qty = 0 + delta_first = None + + for tid in range(MEM_TEST_REPEAT_QTY2): + with tempfile.NamedTemporaryFile() as tmp: + + gc.collect() + init_mem = process.memory_info().rss + + delta1 = None + + for iter_id in range(MEM_TEST_ITER2): + + index = nmslib.init(method='hnsw', space=space_name, data_type=nmslib.DataType.DENSE_VECTOR) + index.addDataPointBatch(data) + index.createIndex(index_time_params) + + if iter_id == 0 and tid == 0: + delta_first = process.memory_info().rss - init_mem + + delta_curr = process.memory_info().rss - init_mem + + #print('Step %d mem deltas current: %d first: %d ratio %f' % (iter_id, delta_curr, delta_first, float(delta_curr)/max(delta_first, 1))) + + index.setQueryTimeParams(query_time_params) + nbrs = index.knnQueryBatch(query, k = 10, num_threads = num_threads) + + nbrs = None + index = None + + gc.collect() + + + gc.collect() + time.sleep(0.25) + delta_last = process.memory_info().rss - init_mem + #print('Delta last %d' % delta_last) + + test_qty += 1 + if delta_last >= delta_first * MEM_GROW_COEFF: + fail_qty += 1 + + + print('Fail qty %d out of %d' % (fail_qty, test_qty)) + self.assertTrue(fail_qty < MEM_TEST_ITER2 * MEM_TEST_CRIT_FAIL_RATE) + + + +if __name__ == "__main__": + unittest.main()