Permalink
Cannot retrieve contributors at this time
Name already in use
A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
Final/IgBLAST/score_alignments_beta_delta.py
Go to fileThis commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
252 lines (205 sloc)
7.48 KB
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pprint import pprint | |
import csv | |
def remove_OR(gene): | |
''' | |
Removes the OR and number that appears sometimes in the repetoire genes | |
EX: TRBV21/OR9-2*01 | |
We want to remove the /OR9 | |
''' | |
segments = gene.split('/') | |
segment1 = segments[0] | |
segment2 = segments[1] | |
if '-' in segment2: | |
gene = segment1 + '-' + segment2.split('-')[1] | |
else: | |
gene = segment1 + '*' + segment2.split('*')[1] | |
return gene | |
def parse_alignments(f_in, f_out): | |
f = open(f_in, 'r') | |
out_file = open(f_out, 'w') | |
lines = f.readlines() | |
for i in xrange(len(lines)): | |
if " Query= " in lines[i]: | |
name = lines[i].strip() | |
name = name.split(' ')[2] | |
if "Top V gene match" in lines[i]: | |
l = lines[i+1].strip() | |
genes = l.split()[0:3] | |
chain = l.split()[3] | |
if "Yes" in chain or "No" in chain: | |
chain = genes[2] | |
genes[2] = genes[1] | |
genes[1] = "N/A" | |
to_write = "%s %s %s %s %s\n" % (name, genes[0], genes[1], genes[2], chain) | |
out_file.write(to_write) | |
f.close() | |
out_file.close() | |
def score(p_full, c_full, score_array): | |
'''The actual scoring algorithm''' | |
if p_full == "N/A": | |
return score_array | |
if '-' in c_full: | |
temp = c_full.split('-') | |
c_subgroup = temp[0] | |
temp = temp[1].split('*') | |
c_gene = temp[0] | |
c_allele = temp[1] | |
else: | |
temp = c_full.split('*') | |
c_subgroup = temp[0] | |
c_allele = temp[1] | |
c_gene = None | |
if '-' in p_full: | |
temp = p_full.split('-') | |
p_subgroup = temp[0] | |
temp = temp[1].split('*') | |
p_gene = temp[0] | |
p_allele = temp[1] | |
else: | |
temp = p_full.split('*') | |
p_subgroup = temp[0] | |
p_allele = temp[1] | |
p_gene = None | |
if p_subgroup == c_subgroup: | |
score_array[0] += 1 | |
if p_gene == c_gene: | |
score_array[1] += 1 | |
if p_allele == c_allele: | |
score_array[2] += 1 | |
return score_array | |
else: | |
return score_array | |
else: | |
return score_array | |
else: | |
return score_array | |
def score_chain(chain, v_gene_correct): | |
''' | |
Sees if the right chain is predicted | |
''' | |
if chain == "VH" and "TRBV" in v_gene_correct: | |
return True | |
elif chain == "VA" and "TRAV" in v_gene_correct: | |
return True | |
elif chain == "VG" and "TRGV" in v_gene_correct: | |
return True | |
elif chain == "VH" and "TRDV" in v_gene_correct: | |
return True | |
else: | |
return True | |
def score_alignments(results, correct_alignments): | |
''' | |
Scores how well IgBlast did by comparing the parsed VDJ predictions | |
and comparing it to the correct results | |
Returns the score matrix | |
''' | |
# Scores for [subgroup, gene, allele] | |
v_score = [0, 0, 0] | |
d_score = [0, 0, 0] | |
j_score = [0, 0, 0] | |
v_dict = {} | |
d_dict = {} | |
j_dict = {} | |
chain_score = 0 | |
completely_correct_genes = 0 | |
completely_correct_alleles = 0 | |
results = open(results, 'r') | |
correct = open(correct_alignments, 'r') | |
predicted_lines = results.readlines() | |
correct_lines = correct.readlines() | |
# Loop through predicted results | |
for i in xrange(len(predicted_lines)): | |
# Get predicted V, D, and J | |
genes = predicted_lines[i].split()[1:] | |
p_v_gene = genes[0].strip() | |
if p_v_gene != "N/A": | |
p_v_gene = p_v_gene.split('|')[1] | |
p_d_gene = genes[1].strip() | |
if p_d_gene != "N/A": | |
p_d_gene = p_d_gene.split('|')[1] | |
p_j_gene = genes[2].strip() | |
if p_j_gene != "N/A": | |
p_j_gene = p_j_gene.split('|')[1] | |
chain = genes[3].strip() | |
# Get correct V, D, J genes | |
genes = correct_lines[i].split()[1] | |
genes = genes.split(';') | |
c_v_gene = genes[0].strip() | |
c_d_gene = genes[1].strip() | |
c_j_gene = genes[2].strip() | |
# Sometimes theres an OR# in the correct gene, so lets remove it for now | |
if "OR" in c_v_gene: c_v_gene = remove_OR(c_v_gene) | |
if "OR" in c_d_gene: c_d_gene = remove_OR(c_d_gene) | |
if "OR" in c_j_gene: c_j_gene = remove_OR(c_j_gene) | |
# Now score them | |
# If the chain is correct, score the segments | |
# Else, do nothing because the prediction is not correct and no scores should be added | |
if score_chain(chain, c_v_gene): | |
chain_score += 1 | |
v_score_new = score(p_v_gene, c_v_gene, v_score) | |
d_score_new = score(p_d_gene, c_d_gene, d_score) | |
j_score_new = score(p_j_gene, c_j_gene, j_score) | |
v_name = c_v_gene.split("*")[0] | |
d_name = c_d_gene.split("*")[0] | |
j_name = c_j_gene.split("*")[0] | |
if v_name not in v_dict.keys(): | |
v_dict[v_name] = [0, 1] | |
else: | |
v_dict[v_name][1] += 1 | |
if d_name not in d_dict.keys(): | |
d_dict[d_name] = [0, 1] | |
else: | |
d_dict[d_name][1] += 1 | |
if j_name not in j_dict.keys(): | |
j_dict[j_name] = [0, 1] | |
else: | |
j_dict[j_name][1] += 1 | |
# Check to see if it got all the genes correct in a read | |
if v_score_new[1] > v_score[1] and d_score_new[1] > d_score[1] and j_score_new[1] > j_score[1]: | |
completely_correct_genes += 1 | |
# Check to see if it got all the genes and alleles correct in a read | |
# Note this is less likely because Im only returning 1 allele/gene where as it could be equal probability to get two or three alleles | |
if v_score_new[2] > v_score[2] and d_score_new[2] > d_score[2] and j_score_new[2] > j_score[2]: | |
completely_correct_alleles += 1 | |
# I know this could be optimized but fuck it for now | |
if v_score_new[1] > v_score[1]: | |
v_dict[v_name][0] += 1 | |
if d_score_new[1] > d_score[1]: | |
d_dict[d_name][0] += 1 | |
if j_score_new[1] > j_score[1]: | |
j_dict[j_name][0] += 1 | |
v_score = v_score_new | |
d_score = d_score_new | |
j_score = j_score_new | |
# Write csv's of dicts | |
f = open("v_csv", 'w') | |
f_w = csv.writer(f) | |
f_w.writerow(['Name', 'Percent Correct']) | |
for k in v_dict.keys(): | |
percent_correct = (v_dict[k][0] / v_dict[k][1]) * 100 | |
f_w.writerow([k, percent_correct]) | |
f.close() | |
f = open("d_csv", 'w') | |
f_w = csv.writer(f) | |
f_w.writerow(['Name', 'Percent Correct']) | |
for k in d_dict.keys(): | |
percent_correct = (d_dict[k][0] / d_dict[k][1]) * 100 | |
f_w.writerow([k, percent_correct]) | |
f.close() | |
f = open("j_csv", 'w') | |
f_w = csv.writer(f) | |
f_w.writerow(['Name', 'Percent Correct']) | |
for k in j_dict.keys(): | |
percent_correct = (j_dict[k][0] / j_dict[k][1]) * 100 | |
f_w.writerow([k, percent_correct]) | |
f.close() | |
return [v_score, d_score, j_score, chain_score, completely_correct_genes, completely_correct_alleles] | |
def main(): | |
f_in = "output.txt" | |
f_out = "results.txt" | |
correct_alignments = "../Datasets/fasta_seqs/trb/repertoire_vdj_recombination.txt" | |
parse_alignments(f_in, f_out) | |
scores = score_alignments(f_out, correct_alignments) | |
pprint(scores) | |
if __name__ == "__main__": | |
main() |