Permalink
Cannot retrieve contributors at this time
Name already in use
A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
MssBenchmark/ann_benchmarks/runner.py
Go to fileThis commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
299 lines (263 sloc)
10.5 KB
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import print_function | |
__true_print = print | |
import argparse | |
import datetime | |
import docker | |
import json | |
import multiprocessing | |
import numpy | |
import os | |
import psutil | |
import requests | |
import sys | |
import threading | |
import time | |
import subprocess | |
def print(*args, **kwargs): | |
__true_print(*args, **kwargs) | |
sys.stdout.flush() | |
from ann_benchmarks.datasets import get_dataset, DATASETS | |
from ann_benchmarks.algorithms.definitions import Definition, instantiate_algorithm, get_algorithm_name | |
from ann_benchmarks.distance import metrics | |
from ann_benchmarks.results import store_results | |
from scipy.sparse import issparse | |
def run_individual_query(algoname, algo, X_train, X_test, distance, count, run_count, batch, rq): | |
best_search_time = float('inf') | |
for i in range(run_count): | |
print('Run %d/%d...' % (i+1, run_count)) | |
n_items_processed = [0] # a bit dumb but can't be a scalar since of Python's scoping rules | |
def single_query(v): | |
algo.pre_query(v, count) | |
start = time.time() | |
if rq: | |
candidates = algo.query(v, count, rq) | |
else: | |
candidates = algo.query(v, count) | |
total = (time.time() - start) | |
candidates = algo.post_query(rq) | |
if issparse(X_train): | |
candidates = [(int(idx), float(metrics[distance]['distance'](v, X_train[idx].toarray()[0]))) | |
for idx in candidates] | |
else: | |
candidates = [(int(idx), float(metrics[distance]['distance'](v, X_train[idx]))) | |
for idx in candidates] | |
n_items_processed[0] += 1 | |
if n_items_processed[0] % 1000 == 0: | |
print('Processed %d/%d queries...' % (n_items_processed[0], X_test.shape[0])) | |
if rq==False and len(candidates) > count: | |
print('warning: algorithm %s returned %d results, but count is only %d)' % (algo, len(candidates), count)) | |
return (total, candidates) | |
def batch_query(X): | |
algo.pre_batch_query(X, count) | |
start = time.time() | |
algo.batch_query(X, count) | |
total = (time.time() - start) | |
results = algo.get_batch_results() | |
if issparse(X_train): | |
candidates = [[(int(idx), float(metrics[distance]['distance'](v, X_train[idx].toarray()[0]))) | |
for idx in single_results] | |
for v, single_results in zip(X, results)] | |
else: | |
candidates = [[(int(idx), float(metrics[distance]['distance'](v, X_train[idx]))) | |
for idx in single_results] | |
for v, single_results in zip(X, results)] | |
return [(total / float(X.shape[0]), v) for v in candidates] | |
if batch: | |
results = batch_query(X_test) | |
else: | |
results = [single_query(x) for x in X_test] | |
total_time = sum(time for time, _ in results) | |
total_candidates = sum(len(candidates) for _, candidates in results) | |
search_time = total_time / len(X_test) | |
avg_candidates = total_candidates / len(X_test) | |
best_search_time = min(best_search_time, search_time) | |
verbose = hasattr(algo, "query_verbose") | |
attrs = { | |
"batch_mode": batch, | |
"best_search_time": best_search_time, | |
"candidates": avg_candidates, | |
"expect_extra": verbose, | |
"name": str(algo), | |
"run_count": run_count, | |
"distance": distance, | |
"count": int(count) | |
} | |
return (attrs, results) | |
def run(definition, dataset, count, run_count, batch, rq): | |
algo = instantiate_algorithm(definition) | |
assert not definition.query_argument_groups \ | |
or hasattr(algo, "set_query_arguments"), """\ | |
error: query argument groups have been specified for %s.%s(%s), but the \ | |
algorithm instantiated from it does not implement the set_query_arguments \ | |
function""" % (definition.module, definition.constructor, definition.arguments) | |
D = get_dataset(dataset) | |
# Chunjiang modified | |
print('Is the train set a sparse matrix? %d' % issparse(D['train'][()])) | |
if issparse(D['train'][()]): | |
X_train = D['train'][()].toarray() | |
else: | |
X_train = D['train'][()] | |
X_test = numpy.array(D['test']) | |
distance = D.attrs['distance'] | |
print('got a train set of size (%d * %d)' % X_train.shape) | |
print('got %d queries' % len(X_test)) | |
try: | |
print(X_train.shape) | |
algo.pre_fit(X_train) | |
t0 = time.time() | |
index_size_before = algo.get_index_size("self") | |
algo.fit(X_train) | |
build_time = time.time() - t0 | |
index_size = algo.get_index_size("self") - index_size_before | |
print('Built index in', build_time) | |
print('Index size: ', index_size) | |
query_argument_groups = definition.query_argument_groups | |
# Make sure that algorithms with no query argument groups still get run | |
# once by providing them with a single, empty, harmless group | |
if not query_argument_groups: | |
query_argument_groups = [[]] | |
for pos, query_arguments in enumerate(query_argument_groups, 1): | |
print("Running query argument group %d of %d..." % | |
(pos, len(query_argument_groups))) | |
if query_arguments: | |
algo.set_query_arguments(*query_arguments) | |
descriptor, results = run_individual_query(definition.algorithm, algo, X_train, X_test, | |
distance, count, run_count, batch, rq) | |
descriptor["build_time"] = build_time | |
descriptor["index_size"] = index_size | |
descriptor["algo"] = get_algorithm_name(definition.algorithm, batch) | |
descriptor["dataset"] = dataset | |
store_results(dataset, count, definition, | |
query_arguments, descriptor, results, batch, rq) | |
finally: | |
algo.done() | |
def run_from_cmdline(): | |
parser = argparse.ArgumentParser() | |
parser.add_argument( | |
'--dataset', | |
choices=DATASETS.keys(), | |
required=True) | |
parser.add_argument( | |
'--algorithm', | |
required=True) | |
parser.add_argument( | |
'--module', | |
required=True) | |
parser.add_argument( | |
'--constructor', | |
required=True) | |
parser.add_argument( | |
'--count', | |
required=True, | |
type=int) | |
parser.add_argument( | |
'--runs', | |
required=True, | |
type=int) | |
parser.add_argument( | |
'--batch', | |
action='store_true') | |
parser.add_argument( | |
'--rq', | |
action='store_true') | |
parser.add_argument( | |
'--radius', | |
type=float) | |
parser.add_argument( | |
'build') | |
parser.add_argument( | |
'queries', | |
nargs='*', | |
default=[]) | |
args = parser.parse_args() | |
algo_args = json.loads(args.build) | |
query_args = [json.loads(q) for q in args.queries] | |
definition = Definition( | |
algorithm=args.algorithm, | |
docker_tag=None, # not needed | |
singularity_tag=None, # not needed | |
module=args.module, | |
constructor=args.constructor, | |
arguments=algo_args, | |
query_argument_groups=query_args, | |
disabled=False | |
) | |
if args.rq: | |
run(definition, args.dataset, args.radius, args.runs, args.batch, args.rq) | |
else: | |
run(definition, args.dataset, args.count, args.runs, args.batch, args.rq) | |
def run_docker(definition, dataset, count, runs, timeout, batch, rq, radius, mem_limit=None): | |
import colors # Think it doesn't work in Python 2 | |
cmd = ['--dataset', dataset, | |
'--algorithm', definition.algorithm, | |
'--module', definition.module, | |
'--constructor', definition.constructor, | |
'--runs', str(runs), | |
'--count', str(count)] | |
if batch: | |
cmd += ['--batch'] | |
if rq: | |
cmd += ['--rq', '--radius', str(radius)] | |
cmd.append(json.dumps(definition.arguments)) | |
cmd += [json.dumps(qag) for qag in definition.query_argument_groups] | |
print('Running command', cmd) | |
client = docker.from_env() | |
if mem_limit is None: | |
mem_limit = psutil.virtual_memory().available | |
print('Memory limit:', mem_limit) | |
cpu_limit = "0-%d" % (multiprocessing.cpu_count() - 1) | |
if not batch: | |
# Limit to first cpu if not in batch mode | |
cpu_limit = "0" | |
print('Running on CPUs:', cpu_limit) | |
container = client.containers.run( | |
definition.docker_tag, | |
cmd, | |
volumes={ | |
os.path.abspath('ann_benchmarks'): {'bind': '/home/app/ann_benchmarks', 'mode': 'ro'}, | |
os.path.abspath('data'): {'bind': '/home/app/data', 'mode': 'ro'}, | |
os.path.abspath('results'): {'bind': '/home/app/results', 'mode': 'rw'}, | |
}, | |
cpuset_cpus=cpu_limit, | |
mem_limit=mem_limit, | |
detach=True) | |
def stream_logs(): | |
for line in container.logs(stream=True): | |
print(colors.color(line.decode().rstrip(), fg='blue')) | |
if sys.version_info >= (3, 0): | |
t = threading.Thread(target=stream_logs, daemon=True) | |
else: | |
t = threading.Thread(target=stream_logs) | |
t.daemon = True | |
t.start() | |
try: | |
exit_code = container.wait(timeout=timeout) | |
# Exit if exit code | |
if exit_code == 0: | |
return | |
elif exit_code is not None: | |
print(colors.color(container.logs().decode(), fg='red')) | |
raise Exception('Child process raised exception %d' % exit_code) | |
finally: | |
container.remove(force=True) | |
def run_singularity(definition, dataset, count, runs, timeout, batch, rq, radius, sif_dir, mem_limit=None): | |
cmd = ['--dataset', dataset, | |
'--algorithm', definition.algorithm, | |
'--module', definition.module, | |
'--constructor', definition.constructor, | |
'--runs', str(runs), | |
'--count', str(count)] | |
if batch: | |
cmd += ['--batch'] | |
if rq: | |
cmd += ['--rq', '--radius', str(radius)] | |
cmd.append(json.dumps(definition.arguments)) | |
cmd += [json.dumps(qag) for qag in definition.query_argument_groups] | |
print('Running command', cmd) | |
strCmd = ' '.join(["'" + k + "'" for k in cmd]) | |
print('String of command', strCmd) | |
# Chemfp uses Python2 while others use Python3 | |
if definition.algorithm in ['Chemfp', 'Bruteforce', 'Folding']: | |
subprocess.check_call('singularity exec %s/%s.sif python run_algorithm.py %s' %(sif_dir, definition.singularity_tag, strCmd), shell=True) | |
else: | |
subprocess.check_call('singularity exec %s/%s.sif python3 run_algorithm.py %s' %(sif_dir, definition.singularity_tag, strCmd), shell=True) |