Skip to content
Permalink
01adfa3b25
Switch branches/tags

Name already in use

A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
Go to file
 
 
Cannot retrieve contributors at this time
71 lines (59 sloc) 2.26 KB
from __future__ import absolute_import
import chemfp
from ann_benchmarks.algorithms.base import BaseANN
from scipy.sparse import csr_matrix
import numpy
import os
from bitarray import bitarray
class Chemfp(BaseANN):
def __init__(self, metric):
if metric != "jaccard":
raise NotImplementedError("Chemfp doesn't support metric %s, only jaccard metric is supported." % metric)
self._metric = metric
self.name = "Chemfp()"
@staticmethod
def matrToArena(X, reorder=True):
# convert X to Chemfp fingerprintArena in memory
fps = []
for row in range(X.shape[0]):
fp = bitarray(endian='big')
fp.extend(X[row])
fps.append((row,fp.tobytes()))
return chemfp.load_fingerprints(fps,chemfp.Metadata(num_bits=X.shape[1]),reorder=reorder)
def pre_fit(self, X):
self._fps = []
for row in range(X.shape[0]):
fp = bitarray(endian='big')
fp.extend(X[row])
self._fps.append((row,fp.tobytes()))
def fit(self, X):
self._target = chemfp.load_fingerprints(self._fps,chemfp.Metadata(num_bits=X.shape[1]), reorder=True)
def pre_query(self, v, n):
queryMatr = numpy.array([v])
self._queries = Chemfp.matrToArena(queryMatr)
def query(self, v, n, rq=False):
if rq:
self._results = chemfp.threshold_tanimoto_search(self._queries, self._target, threshold=1.0-n)
else:
self._results = chemfp.knearest_tanimoto_search(self._queries, self._target, k=n, threshold=0.0)
def post_query(self, rq=False):
# parse the results
for (query_id, hits) in self._results:
if hits:
return hits.get_ids()
else:
return []
def pre_batch_query(self, X, n):
self._queries = Chemfp.matrToArena(X)
def batch_query(self, X, n):
self._results = chemfp.knearest_tanimoto_search(self._queries, self._target, k=n, threshold=0.0)
def get_batch_results(self):
# parse the results
res = []
for (query_id, hits) in sorted(self._results):
if hits:
res.append(hits.get_ids())
else:
res.append([])
#print(res)
return res