Skip to content
Permalink
development
Switch branches/tags

Name already in use

A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
Go to file
 
 
Cannot retrieve contributors at this time
#!/usr/bin/env python
"""
@author Peter Zaffetti 2018
"""
import argparse
import cmd
import os
import sys
from operator import itemgetter
from input_parser import parse_micelle_input_file, parse_connection_indices_file, parse_micelle_from_pdb_file
from logger import get_logger
from primatives.micelle import Micelle
from micelle_utility import identify_micelle
from primatives.micelle_connection import cull_non_boundary_indices, write_boundary_indices_to_file,\
convert_boundary_indices_to_points, write_boundary_points_to_file
class MicelleIdentifier(cmd.Cmd):
def __init__(self, args, application_path=None):
super().__init__()
self.logger = get_logger(__name__)
self.test_micelle_path = os.path.join(application_path, "test_files")
self.arguments = args
self.logger.debug("Test Micelle Path is: %s", self.test_micelle_path)
self.logger.debug("Arguments are: %s", self.arguments)
self.parse_cmd_args(self.arguments)
sys.exit(1)
def parse_cmd_args(self, args):
"""
Parses the command arguments passed in from the command line and perform the corresponding action.
:param args: the arguments parsed from the command line
"""
if args is not None:
if args.micelle_file is None and args.pdb_micelle_file is None:
raise RuntimeError("Please input a micelle file in X,Y,Z format or one in PDB format (-m or -p options, respectively.)")
if args.micelle_file and args.pdb_micelle_file:
raise RuntimeError("Please input X,Y,Z or PDB format Micelle file but not both at the same time. Mixed mode is not currently supported.")
if args.micelle_file and len(args.micelle_file) != len(args.adjacent_bnd_conn_file):
raise RuntimeError("Error parsing input arguments. Please make sure the number of micelle files and number of connection files match.")
elif args.pdb_micelle_file and len(args.pdb_micelle_file) != len(args.adjacent_bnd_conn_file):
raise RuntimeError("Error parsing input arguments. Please make sure the number of micelle files and number of connection files match.")
if args.micelle_file:
paired_files = zip(args.micelle_file, args.adjacent_bnd_conn_file)
else:
paired_files = zip(args.pdb_micelle_file, args.adjacent_bnd_conn_file)
for coordinate_file, connect_file in paired_files:
micelle_file = self.get_micelle_point_file_path(coordinate_file)
connection_file = self.get_micelle_connection_file_path(connect_file)
# create all the names of the files which will store the outputs.
micelle_point_output_basename = os.path.splitext(coordinate_file)[0]
num_adjacencies_tag = "" if args.num_adjacencies_on_bnd is None else "-N" + str(args.num_adjacencies_on_bnd)
connect_output_filename = micelle_point_output_basename + "-boundary-indices" + num_adjacencies_tag + ".dat"
point_output_filename = micelle_point_output_basename + "-boundary-pnts" + num_adjacencies_tag + ".dat"
id_output_filename = micelle_point_output_basename + num_adjacencies_tag + "-output.dat"
# there can be more than one micelle in a file parse them all and return the list.
micelle_list = parse_micelle_input_file(micelle_file) if args.micelle_file else parse_micelle_from_pdb_file(micelle_file)
connection_list = parse_connection_indices_file(connection_file)
micelle_identifications = list()
# iterate over the list of micelles and identify them
for micelle_cluster_id, micelle in micelle_list:
point_id_list = micelle.get_point_index_list()
micelle_connections = list()
# TODO: PDZ- this is wildly inefficient. Find a better way but for the time being just leave it.
for point in micelle.get_points():
for connection in connection_list:
if point.get_index() == connection.get_source_index():
micelle_connections.append(connection)
# might want to change this since num_adjacencies_on_bnd can be implicitly set from the default value below.
boundary_indices, micelle_boundary_points = cull_boundary_indices(micelle_connections, micelle,
args.num_adjacencies_on_bnd,
indices_output_filename=connect_output_filename,
points_output_filename=point_output_filename)
micelle_id = identify_micelle_shape(micelle=micelle,
boundary_points=micelle_boundary_points,
hausdorff_limit=args.hausdorff_limit,
floating_point_tolerance=args.floating_point_tolerance,
plot_output=args.plot_output)
micelle_identifications.append((point_id_list, micelle_id, micelle_cluster_id))
# PDZ- this previously just used the boundary indices rather than all of them. Might need to change it back.
generate_output_file(micelle_identifications, output_filename=id_output_filename, write_micelle_cluster_id_to_output=args.write_micelle_cluster_id_to_output)
else:
raise RuntimeError("The args passed in are None, however a Micelle file and Boundary Connection file are required.")
def get_micelle_point_file_path(self, micelle_file):
"""
Converts the micelle point file input into an actual file location. This is necessary to convert test files to real
locations.
:param micelle_file: the micelle file from the command line input.
:return: the path of the file.
"""
if os.path.exists(micelle_file):
if not os.path.isdir(micelle_file):
return micelle_file
else:
raise RuntimeError("Invalid Micelle file input. Please pass a file not a folder.")
else:
micelle_test_file = os.path.join(self.test_micelle_path, micelle_file)
if os.path.exists(micelle_test_file):
if not os.path.isdir(micelle_file):
return micelle_test_file
else:
raise RuntimeError("Invalid Micelle file input. Please pass a file not a folder.")
else:
raise RuntimeError("Invalid Micelle file input. Please provide a valid file.")
def get_micelle_connection_file_path(self, connection_file):
"""
Converts the micelle boundary file input into an actual file location. This is necessary to convert test files to real
locations.
:param connection_file: the connection file from the command line input.
:return: the path of the file.
"""
if os.path.exists(connection_file):
if not os.path.isdir(connection_file):
return connection_file
else:
raise RuntimeError("Invalid connection file input. Please pass a file not a folder.")
else:
connection_test_file = os.path.join(self.test_micelle_path, connection_file)
if os.path.exists(connection_test_file):
if not os.path.isdir(connection_file):
return connection_test_file
else:
raise RuntimeError("Invalid connection file input. Please pass a file not a folder.")
else:
raise RuntimeError("Invalid connection file input. Please provide a valid file.")
def identify_micelle_shape(hausdorff_limit, floating_point_tolerance, micelle=None, boundary_points=None, plot_output=False):
"""
Identify the shape of the micelle
:param hausdorff_limit: the hausdorff metric limit over which a shape is considered non-convex.
:param floating_point_tolerance: the allowable amount of tolerance before a floating point comparison is considered false.
:param micelle: this list of points that make up a micelle.
:param boundary_points: the set of previously selected points that make up the boundary of the micelle.
:return the MicelleType of the micelle.
"""
if boundary_points is None:
raise RuntimeError("The boundary points of the Micelle cannot be none when identifying the Micelle shape.")
if micelle is None:
raise RuntimeError("The micelle was not provided. Cannot identify the micelle without the micelle object.")
return identify_micelle(micelle, boundary_points=boundary_points, hausdorff_limit=hausdorff_limit,
floating_point_tolerance=floating_point_tolerance, plot_output=plot_output)
def cull_boundary_indices(micelle_connections, micelle, num_adjacencies_allowed=None,
indices_output_filename="boundary-indices.dat", points_output_filename="boundary-points.dat"):
"""
Takes a file of connection indices and culls them down to only the boundary points.
:param micelle_connections: the connections of the micelle parsed from the connection file.
:param micelle: the micelle object which was previously created from the micelle point set.
:param num_adjacencies_allowed: the maximum allowed indices, all indices with more adjacencies will be removed. If this is none, no limit is imposed.
:param indices_output_filename: the output filename where the boundary (culled) indices will be written to.
:param points_output_filename: the output filename where the boundary (culled) points will be written to.
"""
# check to see if a value was passed, if so verify it is an int
if num_adjacencies_allowed is not None:
try:
num_adjacencies_allowed = int(num_adjacencies_allowed)
except ValueError:
raise RuntimeError("The number of adjacencies passed to the program: {} must be an integer."
.format(num_adjacencies_allowed))
boundary_indices = cull_non_boundary_indices(micelle_connections, num_adjacencies_allowed)
write_boundary_indices_to_file(len(micelle_connections), boundary_indices, indices_output_filename)
boundary_points = convert_boundary_indices_to_points(boundary_indices, micelle)
write_boundary_points_to_file(boundary_points, points_output_filename)
return boundary_indices, boundary_points
def generate_output_file(micelle_identifications, sort_by_index=True, output_filename="micelle-identifier-output.txt", write_micelle_cluster_id_to_output=True):
"""
Generates the output file: create a file with 2 columns. The first is the index from the micelle file. The second is the
micelle type that was identified.
:param micelle_identifications: the list of zipped items. The zipped items are the paired micelle indices and the identifcation for the indice.
:param sort_by_index: specify whether or not to sort all the indices before writing them the to the output file.
:param output_filename: the name of the file where the results will live.
:param write_micelle_cluster_id_to_output: whether or not to write the micelle cluster id to the output file as a third column or to omit it.
"""
logger = get_logger(generate_output_file.__name__)
if sort_by_index:
id_list = list()
# TODO- PDZ- again, inefficient. Clean this up.
for point_indice_list, identification, micelle_cluster_id in micelle_identifications:
for point in point_indice_list:
id_list.append([point, identification, micelle_cluster_id])
id_list = sorted(id_list, key=itemgetter(0))
else:
id_list = micelle_identifications
output_file = None
try:
output_file = open(output_filename, "w")
for point_index, identification, micelle_cluster_id in id_list:
if write_micelle_cluster_id_to_output:
output_file.write("{} {} {}\n".format(point_index, identification, micelle_cluster_id))
else:
output_file.write("{} {}\n".format(point_index, identification))
output_file.close()
except IOError as err:
logger.debug("IO Error: {}".format(err))
logger.error("Error writing to the output file. You may need to remove the file if it already exists.")
if output_file is not None:
output_file.close()
def main():
"""The main function that runs the Subdivision Generator application"""
application_path = os.path.dirname(os.path.abspath(__file__))
application_name = os.path.splitext(os.path.basename(__file__))[0]
DIR, DEF_SHAPE = "test_files", 0
DEF_PRE = ["Shape", str(DEF_SHAPE)]
DEF_CONN = os.path.join(DIR, "_".join(DEF_PRE + ["connect"]) + ".dat")
DEF_COORD = os.path.join(DIR, "_".join(DEF_PRE + ["coord"]) + ".dat")
argument_parser = argparse.ArgumentParser(
description='Identifies the shape (sphere, cylinder, ellipsoid, worm) of a Micelle.')
# bnd stands for boundary.
argument_parser.add_argument("-a", "--adjacent_bnd_conn_file", required=False, nargs="+", default=[DEF_CONN],
help="the list of files containing the list of *indices* for connections in the shape file.")
# TODO: PDZ- figure out a good default limit.
argument_parser.add_argument("-hl", "--hausdorff_limit", required=False, type=int,
help="the Hausdorff metric limit before something is considered non-convex.",
default=1.0001, action="store")
argument_parser.add_argument("-m", "--micelle_file", required=False, nargs="+", default=[DEF_COORD],
help="the list of files containing the Micelle in CSV format.")
argument_parser.add_argument("-na", "--num_adjacencies_on_bnd", required=False, default=None,
help="the number of adjacencies allowed on the boundary. Connections in the supplied connection file that have more than this number will be removed.")
argument_parser.add_argument("-p", "--pdb_micelle_file", required=False, default=None, nargs="+",
help="the list micelle files in PDB format")
argument_parser.add_argument("-o", "--plot_output", required=False, default=False, action='store_true',
help="set to plot the output of the micelles to file.")
argument_parser.add_argument("-t", "--floating_point_tolerance", required=False, type=int, default=0.0001,
help="the floating point tolerance used when comparing floating point numbers.")
argument_parser.add_argument("-w", "--write_micelle_cluster_id_to_output", required=False, default=True, action='store_true',
help="toggle whether or not to write the micelle's cluster id to the output file.")
arguments = argument_parser.parse_args()
get_logger().debug("Micelle Shape Identifier- Starting Up")
application = MicelleIdentifier(args=arguments, application_path=application_path)
if __name__ == "__main__":
main()