From 56aa99c63e5de1a151be831e6ba8b2534ea2a55e Mon Sep 17 00:00:00 2001 From: Greg Friedland Date: Thu, 21 Feb 2019 13:45:27 -0800 Subject: [PATCH] update unit test --- python_bindings/tests/bindings_test.py | 144 ++++++--------- python_bindings/tests/jaccard_comparison.py | 167 ------------------ python_bindings/tests/jaccard_comparison.sh | 11 -- .../tests/jaccard_comparison_plot.py | 39 ---- 4 files changed, 55 insertions(+), 306 deletions(-) delete mode 100644 python_bindings/tests/jaccard_comparison.py delete mode 100644 python_bindings/tests/jaccard_comparison.sh delete mode 100644 python_bindings/tests/jaccard_comparison_plot.py diff --git a/python_bindings/tests/bindings_test.py b/python_bindings/tests/bindings_test.py index f812160..4c78875 100644 --- a/python_bindings/tests/bindings_test.py +++ b/python_bindings/tests/bindings_test.py @@ -218,94 +218,60 @@ class BitVectorIndexTestMixin(object): def _get_index(self, space='bit_jaccard'): raise NotImplementedError() - def testKnnQuery(self): - for num_elems in [30000, 100000, 300000, 1000000]: - for nbits in [512, 2048]: - self._testKnnQuery(nbits, num_elems) + def _get_batches(self, index, nbits, num_elems, chunk_size): + if "bit_" in str(index): + self.bit_vector_str_func = bit_vector_to_str + else: + self.bit_vector_str_func = bit_vector_sparse_str + + batches = [] + for i in range(0, num_elems, chunk_size): + strs = [] + for j in range(chunk_size): + a = np.random.rand(nbits) > 0.5 + strs.append(self.bit_vector_str_func(a)) + batches.append([np.arange(i, i + chunk_size), strs]) + return batches - def _testKnnQuery(self, nbits, num_elems): - chunk_size = 10000 + def testKnnQuery(self): + np.random.seed(23) - ps_proc = psutil.Process() - # print(f"\n{ps_proc.memory_info()}") index = self._get_index() - if "bit_jaccard" in str(index): - bit_vector_str_func = bit_vector_to_str - else: - bit_vector_str_func = bit_vector_sparse_str - - # logging.basicConfig(level=logging.INFO) - # with PsUtil(interval=2, proc_attr=["memory_info"]): - with PeakMemoryUsage(f"AddData: vector={nbits}-bit elems={num_elems}"): - np.random.seed(23) - for i in range(0, num_elems, chunk_size): - strs = [] - for j in range(chunk_size): - a = np.random.rand(nbits) > 0.5 - strs.append(bit_vector_str_func(a)) - index.addDataPointBatch(ids=np.arange(i, i + chunk_size), data=strs) - - # print(f"\n{ps_proc.memory_info()}") - with PeakMemoryUsage(f"CreateIndex: vector={nbits}-bit of elems={num_elems}"): - index.createIndex() - # print(f"\n{ps_proc.memory_info()}") - - a = np.ones(nbits) - ids, distances = index.knnQuery(bit_vector_str_func(a), k=10) - # print(ids) - print(distances) - # self.assertTrue(get_hitrate(get_exact_cosine(row, data), ids) >= 5) - # def testKnnQueryBatch(self): - # np.random.seed(23) - # data = np.random.randn(1000, 10).astype(np.float32) - # - # index = self._get_index() - # index.addDataPointBatch(data) - # index.createIndex() - # - # queries = data[:10] - # results = index.knnQueryBatch(queries, k=10) - # for query, (ids, distances) in zip(queries, results): - # self.assertTrue(get_hitrate(get_exact_cosine(query, data), ids) >= 5) - # - # # test col-major arrays - # queries = np.asfortranarray(queries) - # results = index.knnQueryBatch(queries, k=10) - # for query, (ids, distances) in zip(queries, results): - # self.assertTrue(get_hitrate(get_exact_cosine(query, data), ids) >= 5) - # - # # test custom ids (set id to square of each row) - # index = self._get_index() - # index.addDataPointBatch(data, ids=np.arange(data.shape[0]) ** 2) - # index.createIndex() - # - # queries = data[:10] - # results = index.knnQueryBatch(queries, k=10) - # for query, (ids, distances) in zip(queries, results): - # # convert from square back to row id - # ids = np.sqrt(ids).astype(int) - # self.assertTrue(get_hitrate(get_exact_cosine(query, data), ids) >= 5) - - # def testReloadIndex(self): - # np.random.seed(23) - # data = np.random.randn(1000, 10).astype(np.float32) - # - # original = self._get_index() - # original.addDataPointBatch(data) - # original.createIndex() - # - # # test out saving/reloading index - # with tempfile.NamedTemporaryFile() as tmp: - # original.saveIndex(tmp.name + ".index") - # - # reloaded = self._get_index() - # reloaded.addDataPointBatch(data) - # reloaded.loadIndex(tmp.name + ".index") - # - # original_results = original.knnQuery(data[0]) - # reloaded_results = reloaded.knnQuery(data[0]) - # npt.assert_allclose(original_results, - # reloaded_results) + batches = self._get_batches(index, 512, 2000, 1000) + for ids, data in batches: + index.addDataPointBatch(ids=ids, data=data) + + index.createIndex() + + s = self.bit_vector_str_func(np.ones(512)) + index.knnQuery(s, k=10) + + def testReloadIndex(self): + np.random.seed(23) + + original = self._get_index() + batches = self._get_batches(original, 512, 2000, 1000) + for ids, data in batches: + original.addDataPointBatch(ids=ids, data=data) + original.createIndex() + + # test out saving/reloading index + with tempfile.NamedTemporaryFile() as tmp: + original.saveIndex(tmp.name + ".index") + + reloaded = self._get_index() + for ids, data in batches: + reloaded.addDataPointBatch(ids=ids, data=data) + reloaded.loadIndex(tmp.name + ".index") + + s = self.bit_vector_str_func(np.ones(512)) + original_results = original.knnQuery(s) + reloaded_results = reloaded.knnQuery(s) + original_results = list(zip(list(original_results[0]), list(original_results[1]))) + original_results = sorted(original_results, key=lambda x: x[1]) + reloaded_results = list(zip(list(reloaded_results[0]), list(reloaded_results[1]))) + reloaded_results = sorted(reloaded_results, key=lambda x: x[1]) + npt.assert_allclose(original_results, reloaded_results) class HNSWTestCase(unittest.TestCase, DenseIndexTestMixin): @@ -325,10 +291,10 @@ class SparseJaccardTestCase(unittest.TestCase, BitVectorIndexTestMixin): dtype=nmslib.DistType.FLOAT) -# class BitHammingTestCase(unittest.TestCase, BitVectorIndexTestMixin): -# def _get_index(self, space='bit_hamming'): -# return nmslib.init(method='hnsw', space='bit_hamming', data_type=nmslib.DataType.OBJECT_AS_STRING, -# dtype=nmslib.DistType.INT) +class BitHammingTestCase(unittest.TestCase, BitVectorIndexTestMixin): + def _get_index(self, space='bit_hamming'): + return nmslib.init(method='hnsw', space='bit_hamming', data_type=nmslib.DataType.OBJECT_AS_STRING, + dtype=nmslib.DistType.INT) class SWGraphTestCase(unittest.TestCase, DenseIndexTestMixin): diff --git a/python_bindings/tests/jaccard_comparison.py b/python_bindings/tests/jaccard_comparison.py deleted file mode 100644 index 26d402e..0000000 --- a/python_bindings/tests/jaccard_comparison.py +++ /dev/null @@ -1,167 +0,0 @@ -import sys -import numpy as np -import nmslib -import psutil -import logging -import multiprocessing -import time -import os -import threading - -MB = 1024 * 1024 -CHUNK_SIZE = 10000 - - -class StoppableThread(threading.Thread): - """Thread class with a stop() method. The thread itself has to check - regularly for the stopped() condition.""" - - def __init__(self, *args, **kwargs): - super().__init__() - self._stop_event = threading.Event() - - def stop(self): - self._stop_event.set() - - def stopped(self): - return self._stop_event.is_set() - - -class Timer: - """ Context manager for timing named blocks of code """ - def __init__(self, name, logger=None): - self.name = name - self.logger = logger if logger else logging.getLogger() - - def __enter__(self): - self.start = time.time() - self.logger.debug("Starting {}".format(self.name)) - - def __exit__(self, type, value, trace): - self.logger.info("{}: {:0.2f}s".format(self.name, time.time() - self.start)) - - -class PeakMemoryUsage: - class Worker(StoppableThread): - def __init__(self, interval, *args, **kwargs): - super().__init__(*args, **kwargs) - self.interval = interval - self.max_rss = self.max_vms = 0 - - def run(self): - process = psutil.Process() - while not self.stopped(): - mem = process.memory_info() - self.max_rss = max(self.max_rss, mem.rss) - self.max_vms = max(self.max_vms, mem.vms) - time.sleep(self.interval) - - """ Context manager to calculate peak memory usage in a statement block """ - def __init__(self, name, logger=None, interval=1): - self.name = name - self.logger = logger if logger else logging.getLogger() - self.interval = interval - self.start = time.time() - self.worker = None - - def __enter__(self): - if self.interval > 0: - pid = os.getpid() - mem = psutil.Process(pid).memory_info() - self.start_rss, self.start_vms = mem.rss, mem.vms - - self.worker = PeakMemoryUsage.Worker(self.interval) - self.worker.start() - return self - - def __exit__(self, _, value, trace): - if self.worker: - self.worker.stop() - self.worker.join() - self.logger.warning("Peak memory usage for '{}' in MBs: orig=(rss={:0.1f} vms={:0.1f}) " - "peak=(rss={:0.1f} vms={:0.1f}) in {:0.2f}s" - .format(self.name, self.start_rss / MB, self.start_vms / MB, - self.worker.max_rss / MB, - self.worker.max_vms / MB, time.time() - self.start)) - - -class PsUtil(object): - def __init__(self, attr=('virtual_memory',), proc_attr=None, - logger=None, interval=60): - """ attr can be multiple methods of psutil (e.g. attr=['virtual_memory', 'cpu_times_percent']) """ - self.ps_mon = None - self.attr = attr - self.proc_attr = proc_attr - self.logger = logger if logger else logging.getLogger() - self.interval = interval - - def psutil_worker(self, pid): - root_proc = psutil.Process(pid) - while True: - for attr in self.attr: - self.logger.warning("PSUTIL {}".format(getattr(psutil, attr)())) - if self.proc_attr: - procs = set(root_proc.children(recursive=True)) - procs.add(root_proc) - procs = sorted(procs, key=lambda p: p.pid) - - for proc in procs: - self.logger.warning("PSUTIL process={}: {}" - .format(proc.pid, proc.as_dict(self.proc_attr))) - - time.sleep(self.interval) - - def __enter__(self): - if self.interval > 0: - self.ps_mon = multiprocessing.Process(target=self.psutil_worker, args=(os.getpid(),)) - self.ps_mon.start() - time.sleep(1) # sleep so the first iteration doesn't include statements in the PsUtil context - return self - - def __exit__(self, type, value, trace): - if self.ps_mon is not None: - self.ps_mon.terminate() - - -def bit_vector_to_str(bit_vect): - return " ".join(["1" if e else "0" for e in bit_vect]) - - -def bit_vector_sparse_str(bit_vect): - return " ".join([str(k) for k, b in enumerate(bit_vect) if b]) - - -def run(space, num_elems, nbits): - np.random.seed(23) - if space == "bit_jaccard": - bit_vector_str_func = bit_vector_to_str - index = nmslib.init(method='hnsw', space=space, data_type=nmslib.DataType.OBJECT_AS_STRING, - dtype=nmslib.DistType.FLOAT) - else: - bit_vector_str_func = bit_vector_sparse_str - index = nmslib.init(method='hnsw', space=space, data_type=nmslib.DataType.OBJECT_AS_STRING, - dtype=nmslib.DistType.FLOAT) - - with PeakMemoryUsage(f"All: space={space} nbits={nbits} elems={num_elems}"): - for i in range(0, num_elems, CHUNK_SIZE): - strs = [] - for j in range(CHUNK_SIZE): - a = np.random.rand(nbits) > 0.5 - strs.append(bit_vector_str_func(a)) - index.addDataPointBatch(ids=np.arange(i, i + CHUNK_SIZE), data=strs) - - index.createIndex() - - a = np.ones(nbits) - ids, distances = index.knnQuery(bit_vector_str_func(a), k=10) - print(distances) - - -if __name__ == "__main__": - np.set_printoptions(linewidth=500) - - logging.basicConfig(level=logging.WARNING) - space = sys.argv[1] - num_elems = int(sys.argv[2]) - nbits = int(sys.argv[3]) - run(space, num_elems, nbits) diff --git a/python_bindings/tests/jaccard_comparison.sh b/python_bindings/tests/jaccard_comparison.sh deleted file mode 100644 index 3bdf8b7..0000000 --- a/python_bindings/tests/jaccard_comparison.sh +++ /dev/null @@ -1,11 +0,0 @@ -#!/bin/bash - -set -e - -for space in jaccard_sparse; do - for num_elems in 30000 100000 300000 1000000 3000000 10000000 30000000; do - for nbits in 512 2048; do - python jaccard_comparison.py $space $num_elems $nbits - done - done -done diff --git a/python_bindings/tests/jaccard_comparison_plot.py b/python_bindings/tests/jaccard_comparison_plot.py deleted file mode 100644 index 02a8117..0000000 --- a/python_bindings/tests/jaccard_comparison_plot.py +++ /dev/null @@ -1,39 +0,0 @@ -import pandas as pd -import sys -import statsmodels.api as sm -from plotnine import ggplot, geom_point, aes, stat_smooth, geom_line, scale_x_log10, \ - scale_y_log10, theme, element_text, ylim - - -if __name__ == "__main__": - df = pd.read_csv(sys.argv[1]) - df["space_nbits"] = df.space.astype(str) + "_" + df.nbits.astype(str) - df.memory = df.memory.astype(float) - df.time = df.time.astype(float) - print(df.info()) - - for col in ["time", "memory"]: - funcs = [] - for space_nbits in df.space_nbits.unique(): - sub_df = df.loc[df.space_nbits == space_nbits] - model = sm.OLS(sub_df.num_elems, sm.add_constant(sub_df[col])) - params = model.fit().params - func = lambda x: params.const + x * getattr(params, col) - funcs.append(func) - - p = (ggplot(df, aes("num_elems", col, color="space_nbits")) - + geom_point() + geom_line() - + scale_x_log10(limits=[10000,10000000]) - + scale_y_log10(limits=[3,10000]) - + theme(axis_text_x=element_text(rotation=90, hjust=1))) - p.save(filename=col + ".png", height=5, width=5, units='in', dpi=300) - - # p = ggplot(aes(x="num_elems", y=col, color="space_nbits"), data=df) + geom_line() + geom_point() + stat_function(fun=funcs[0]) - # p.make() - - # fig = plt.gcf() - # ax = plt.gca() - # plt.gca().set_xscale('log') - # plt.gca().set_yscale('log') - # - # ggsave(plot=p, filename=col + ".png")