Skip to content

Commit

Permalink
Merge pull request #8 from lrm22005/Luis
Browse files Browse the repository at this point in the history
Preprocessing updates to ensure recommendations in final file format
  • Loading branch information
lrm22005 authored Aug 20, 2024
2 parents 3a7b4f1 + 29efc80 commit 3a51804
Show file tree
Hide file tree
Showing 11 changed files with 934 additions and 60 deletions.
237 changes: 180 additions & 57 deletions code/step2_data_preprocessing.py
Original file line number Diff line number Diff line change
@@ -1,63 +1,186 @@
# -*- coding: utf-8 -*-
"""
Created on Sun Jun 21 00:16:25 2020
@author: Jihye Moon
"""

import os
import pathlib
import sys

import unicodedata
import traceback
import json
import re
sys.path.append('lib')
import lib.Literature_Data_Preprocessing as ldp

base = sys.argv[1]
output = sys.argv[2]
batch_dir = base # os.path.join(base, 'literature_data')
comb_dir = os.path.join(base, 'arranged')
preprocessed_dir = os.path.join(output, 'preprocessed')
pathlib.Path(comb_dir).mkdir(parents=True, exist_ok=True)
pathlib.Path(preprocessed_dir).mkdir(parents=True, exist_ok=True)

lp=ldp.preprocessing(base, batch_dir, comb_dir, preprocessed_dir)

### Extracting only abstracts and combining all collected files into one file (Gene name based documents)
file_names, data_list=lp.batch_data_matching(batch_dir, ['gene2document'])
arr_list = lp.combining_files(file_names, data_list, ['FullText'], 3)

for i in range(len(file_names)):
lp.Indexing(os.path.join(comb_dir, file_names[i]), arr_list[file_names[i]])

gene2doc = lp.gene2doc_mapping(arr_list[file_names[0]])


### Extracting only abstracts and combining all collected files into one file (Word name based documents)
file_names_doc, data_list_doc = lp.batch_data_matching(batch_dir, ['baseline_doc'])
arr_list2 = lp.combining_query2doc(file_names_doc, data_list_doc, ['pubmed'], 4)


### Literature Data Preprocessing
total_FullText = ''; total_meta = ''
total_size=len(arr_list2[file_names_doc[0]])
full_handle = open(os.path.join(comb_dir, file_names_doc[0]+'.FullText.txt'), "w")
meta_handle = open(os.path.join(comb_dir, file_names_doc[0]+'.meta.txt'), "w")

total_FullText=[]
for i in range(total_size):
FullText, Meta = lp.Medine_mapping(arr_list2[file_names_doc[0]][i])
#print(i, '/', total_size, round(i/total_size,2)*100)
total_FullText.append(FullText)
full_handle.write(FullText)
meta_handle.write(Meta)
full_handle.close()
meta_handle.close()

doc_gene=list(gene2doc.keys())

print('----- preprocessing --- for gene name based documents')
lp.making_doc_data(doc_gene, file_names[0], gene2doc)

print('----- preprocessing --- for word name based documents')
lp.making_doc_data(None, file_names_doc[0], total_FullText)
def check_directory(path):
"""Check if a directory exists."""
if not os.path.exists(path):
print(f"Directory does not exist: {path}")
return False
return True

def safe_read(file_path):
"""Safely read a file with different encodings."""
try:
with open(file_path, 'r', encoding='utf-8') as f:
return f.read()
except UnicodeDecodeError:
try:
with open(file_path, 'r', encoding='latin-1') as f:
return f.read()
except Exception as e:
print(f"Error reading file {file_path}: {e}")
return None
except Exception as e:
print(f"Unexpected error reading file {file_path}: {e}")
return None

def process_gene_docs_in_chunks(file_path, chunk_size=10000):
"""Process gene documents in chunks."""
gene2doc = {}
with open(file_path, 'r', encoding='utf-8') as f:
current_gene = None
current_doc = []
for line in f:
if line.startswith("#GENENAME-"):
if current_gene and current_doc:
gene2doc[current_gene] = ''.join(current_doc)
if len(gene2doc) % chunk_size == 0:
yield gene2doc
gene2doc = {}
current_gene = line.split("-", 1)[1].strip()
current_doc = []
else:
current_doc.append(line)

if current_gene and current_doc:
gene2doc[current_gene] = ''.join(current_doc)

if gene2doc:
yield gene2doc

class CustomPreprocessing(ldp.preprocessing):
def making_doc_data(self, gene_list, name, dic, mode='w'):
"""Create preprocessed document data."""
preprocessed_dir = self.preprocessed_dir
counting = 0
handle = open(os.path.join(preprocessed_dir, name+'.data.doc.txt'), mode, encoding='utf-8')
if gene_list is None:
for i in range(len(dic)):
if counting == 10000:
print(i, '/', len(dic))
counting = 0
buffer = dic[i].split('\t')
if buffer[0] != '\n':
buffer = buffer[3] + buffer[4]
if buffer != '':
buffer = self.doc_preprocessor(buffer)
handle.write('-1' + '\t' + buffer + '\n')
counting += 1
else:
for i in range(len(gene_list)):
if counting == 10000:
print(i, '/', len(gene_list))
counting = 0
gene_name = gene_list[i]
data = dic[gene_name]
buffer = self.doc_preprocessor(data)
if buffer != '':
# Extract PMID from the buffer
pmid_match = re.search(r'#PMID-\s*(\d+)', buffer)
if pmid_match:
pmid = pmid_match.group(1)
# Add gene name before PMID
modified_buffer = re.sub(r'(#PMID-\s*\d+)', f'#GENENAME- {gene_name} \\1', buffer)
handle.write('#'+ gene_name + '\t' + modified_buffer + '\n')
else:
# If PMID is not found, just prepend the gene name
handle.write('#'+ gene_name + '\t#GENENAME- ' + gene_name + ' ' + buffer + '\n')
counting += 1
handle.close()

def main():
base = sys.argv[1]
output = sys.argv[2]

# Update paths to match your new structure
batch_dir = base
gene_based_dir = os.path.join(batch_dir, 'results', 'gene_based_records')
baseline_doc_dir = os.path.join(batch_dir, 'results', 'baseline_doc')
comb_dir = os.path.join(output, 'arranged')
preprocessed_dir = os.path.join(output, 'gene_based_preprocessed')

print(f"Checking directories...")
print(f"batch_dir: {batch_dir}")
print(f"gene_based_dir: {gene_based_dir}")
print(f"baseline_doc_dir: {baseline_doc_dir}")
print(f"comb_dir: {comb_dir}")
print(f"preprocessed_dir: {preprocessed_dir}")

if not all(map(check_directory, [batch_dir, gene_based_dir, baseline_doc_dir])):
sys.exit("One or more required directories do not exist. Please check the paths and try again.")

pathlib.Path(comb_dir).mkdir(parents=True, exist_ok=True)
pathlib.Path(preprocessed_dir).mkdir(parents=True, exist_ok=True)

lp = CustomPreprocessing(base, batch_dir, comb_dir, preprocessed_dir)

# Process gene-based documents
consolidated_file = os.path.join(comb_dir, 'consolidated_gene_docs.txt')

if not os.path.exists(consolidated_file):
print("Consolidated file not found. Starting consolidation process...")
gene_files = [f for f in os.listdir(gene_based_dir) if f.endswith('.txt')]
print(f"Found {len(gene_files)} gene-based documents.")

if not gene_files:
print("No gene-based documents found. Skipping this step.")
else:
with open(consolidated_file, 'w', encoding='utf-8') as outfile:
for i, file in enumerate(gene_files, 1):
try:
gene_name = os.path.splitext(file)[0] # Get filename without extension
content = safe_read(os.path.join(gene_based_dir, file))
if content is not None:
content = unicodedata.normalize('NFKD', content).encode('ascii', 'ignore').decode('ascii')
outfile.write(f"#GENENAME- {gene_name}\n{content}\n\n")
else:
print(f"Skipping file {file} due to reading error.")
except Exception as e:
print(f"Error processing file {file}: {e}")
print(traceback.format_exc())
if i % 1000 == 0:
print(f"Consolidating file {i}/{len(gene_files)}")
print("All gene-based documents consolidated.")
else:
print("Consolidated file found. Skipping consolidation process.")

print("Processing consolidated gene-based document...")
processed_genes_file = os.path.join(preprocessed_dir, 'processed_genes.json')
processed_genes = set()

if os.path.exists(processed_genes_file):
with open(processed_genes_file, 'r') as f:
processed_genes = set(json.load(f))
print(f"Resuming from {len(processed_genes)} previously processed genes.")

output_file = os.path.join(preprocessed_dir, 'consolidated_gene_docs.data.doc.txt')
mode = 'a' if os.path.exists(output_file) else 'w'

for gene2doc_chunk in process_gene_docs_in_chunks(consolidated_file):
print(f"Processing chunk with {len(gene2doc_chunk)} genes...")
new_genes = set(gene2doc_chunk.keys()) - processed_genes
if new_genes:
doc_gene = list(new_genes)
lp.making_doc_data(doc_gene, 'consolidated_gene_docs', {g: gene2doc_chunk[g] for g in new_genes}, mode)
processed_genes.update(new_genes)

# Update the processed genes file
with open(processed_genes_file, 'w') as f:
json.dump(list(processed_genes), f)
else:
print("All genes in this chunk have been processed already. Moving to next chunk.")

# Change mode to 'a' after first write
mode = 'a'

print("All processing completed.")

if __name__ == "__main__":
main()
File renamed without changes.
121 changes: 121 additions & 0 deletions code/step2_data_preprocessing_Luis_genebased_new.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import os
import pathlib
import sys
import unicodedata
import json
import re
from tqdm import tqdm
from concurrent.futures import ProcessPoolExecutor, as_completed
import multiprocessing
sys.path.append('lib')
import lib.Literature_Data_Preprocessing as ldp

def check_directory(path):
if not os.path.exists(path):
print(f"Directory does not exist: {path}")
return False
return True

def safe_read(file_path):
encodings = ['utf-8', 'latin-1']
for encoding in encodings:
try:
with open(file_path, 'r', encoding=encoding) as f:
return f.read()
except UnicodeDecodeError:
continue
except Exception as e:
print(f"Unexpected error reading file {file_path}: {e}")
print(f"Error reading file {file_path}: Unable to decode with available encodings")
return None

def load_gene_info(file_path):
with open(file_path, 'r', encoding='utf-8') as f:
return {line.strip(): line.strip() for line in f}

class CustomPreprocessing(ldp.preprocessing):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.pmid_pattern = re.compile(r'#PMID-\s*(\d+)')

def process_gene_file(self, file_path, gene_symbol, gene_name):
content = safe_read(file_path)
if content is not None:
content = unicodedata.normalize('NFKD', content).encode('ascii', 'ignore').decode('ascii')
buffer = self.doc_preprocessor(content)
if buffer:
pmid_match = self.pmid_pattern.search(buffer)
if pmid_match:
modified_buffer = re.sub(r'(#PMID-\s*\d+)', f'#GENESYMBOL- {gene_symbol} #GENENAME- {gene_name} \\1', buffer)
return f'#{gene_symbol}\t{modified_buffer}\n'
else:
return f'#{gene_symbol}\t#GENESYMBOL- {gene_symbol} #GENENAME- {gene_name} {buffer}\n'
return None

def process_gene(args):
lp, file_path, gene_symbol, gene_name = args
return lp.process_gene_file(file_path, gene_symbol, gene_name)

def main(base, output):
batch_dir = base
gene_based_dir = os.path.join(batch_dir, 'results', 'gene_based_records')
gene_info_dir = os.path.join(batch_dir, 'data', 'gene_name_info')
preprocessed_dir = os.path.join(output, 'gene_based_preprocessed')

print("Checking directories...")
for dir_path in [batch_dir, gene_based_dir, gene_info_dir]:
if not check_directory(dir_path):
sys.exit(f"Directory does not exist: {dir_path}")

pathlib.Path(preprocessed_dir).mkdir(parents=True, exist_ok=True)

lp = CustomPreprocessing(base, batch_dir, None, preprocessed_dir)

gene_symbols = load_gene_info(os.path.join(gene_info_dir, 'query_symbol.txt'))
gene_names = load_gene_info(os.path.join(gene_info_dir, 'query_full_name.txt'))

output_file = os.path.join(preprocessed_dir, 'consolidated_gene_docs.data.doc.txt')
processed_genes_file = os.path.join(preprocessed_dir, 'processed_genes.json')

processed_genes = set()
if os.path.exists(processed_genes_file):
with open(processed_genes_file, 'r') as f:
processed_genes = set(json.load(f))
print(f"Resuming from {len(processed_genes)} previously processed genes.")

gene_files = [f for f in os.listdir(gene_based_dir) if f.endswith('.txt')]
total_files = len(gene_files)
print(f"Found {total_files} gene-based documents.")

num_processes = multiprocessing.cpu_count()

with ProcessPoolExecutor(max_workers=num_processes) as executor:
futures = []
for file in gene_files:
gene_symbol = os.path.splitext(file)[0]
if gene_symbol not in processed_genes:
gene_name = gene_names.get(gene_symbol, gene_symbol)
file_path = os.path.join(gene_based_dir, file)
futures.append(executor.submit(process_gene, (lp, file_path, gene_symbol, gene_name)))

with open(output_file, 'a', encoding='utf-8') as outfile:
for future in tqdm(as_completed(futures), total=len(futures), desc="Processing genes"):
result = future.result()
if result:
outfile.write(result)
gene_symbol = result.split('\t')[0][1:] # Extract gene symbol from the result
processed_genes.add(gene_symbol)

# Save progress every 1000 genes
if len(processed_genes) % 1000 == 0:
with open(processed_genes_file, 'w') as pf:
json.dump(list(processed_genes), pf)

# Final save of processed genes
with open(processed_genes_file, 'w') as pf:
json.dump(list(processed_genes), pf)

print("All processing completed.")

if __name__ == "__main__":
main(sys.argv[1], sys.argv[2])
Loading

0 comments on commit 3a51804

Please sign in to comment.