Skip to content

Preprocessing updates to ensure recommendations in final file format #8

Merged
merged 1 commit into from Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
237 changes: 180 additions & 57 deletions code/step2_data_preprocessing.py
@@ -1,63 +1,186 @@
# -*- coding: utf-8 -*-
"""
Created on Sun Jun 21 00:16:25 2020
@author: Jihye Moon
"""

import os
import pathlib
import sys

import unicodedata
import traceback
import json
import re
sys.path.append('lib')
import lib.Literature_Data_Preprocessing as ldp

base = sys.argv[1]
output = sys.argv[2]
batch_dir = base # os.path.join(base, 'literature_data')
comb_dir = os.path.join(base, 'arranged')
preprocessed_dir = os.path.join(output, 'preprocessed')
pathlib.Path(comb_dir).mkdir(parents=True, exist_ok=True)
pathlib.Path(preprocessed_dir).mkdir(parents=True, exist_ok=True)

lp=ldp.preprocessing(base, batch_dir, comb_dir, preprocessed_dir)

### Extracting only abstracts and combining all collected files into one file (Gene name based documents)
file_names, data_list=lp.batch_data_matching(batch_dir, ['gene2document'])
arr_list = lp.combining_files(file_names, data_list, ['FullText'], 3)

for i in range(len(file_names)):
lp.Indexing(os.path.join(comb_dir, file_names[i]), arr_list[file_names[i]])

gene2doc = lp.gene2doc_mapping(arr_list[file_names[0]])


### Extracting only abstracts and combining all collected files into one file (Word name based documents)
file_names_doc, data_list_doc = lp.batch_data_matching(batch_dir, ['baseline_doc'])
arr_list2 = lp.combining_query2doc(file_names_doc, data_list_doc, ['pubmed'], 4)


### Literature Data Preprocessing
total_FullText = ''; total_meta = ''
total_size=len(arr_list2[file_names_doc[0]])
full_handle = open(os.path.join(comb_dir, file_names_doc[0]+'.FullText.txt'), "w")
meta_handle = open(os.path.join(comb_dir, file_names_doc[0]+'.meta.txt'), "w")

total_FullText=[]
for i in range(total_size):
FullText, Meta = lp.Medine_mapping(arr_list2[file_names_doc[0]][i])
#print(i, '/', total_size, round(i/total_size,2)*100)
total_FullText.append(FullText)
full_handle.write(FullText)
meta_handle.write(Meta)
full_handle.close()
meta_handle.close()

doc_gene=list(gene2doc.keys())

print('----- preprocessing --- for gene name based documents')
lp.making_doc_data(doc_gene, file_names[0], gene2doc)

print('----- preprocessing --- for word name based documents')
lp.making_doc_data(None, file_names_doc[0], total_FullText)
def check_directory(path):
"""Check if a directory exists."""
if not os.path.exists(path):
print(f"Directory does not exist: {path}")
return False
return True

def safe_read(file_path):
"""Safely read a file with different encodings."""
try:
with open(file_path, 'r', encoding='utf-8') as f:
return f.read()
except UnicodeDecodeError:
try:
with open(file_path, 'r', encoding='latin-1') as f:
return f.read()
except Exception as e:
print(f"Error reading file {file_path}: {e}")
return None
except Exception as e:
print(f"Unexpected error reading file {file_path}: {e}")
return None

def process_gene_docs_in_chunks(file_path, chunk_size=10000):
"""Process gene documents in chunks."""
gene2doc = {}
with open(file_path, 'r', encoding='utf-8') as f:
current_gene = None
current_doc = []
for line in f:
if line.startswith("#GENENAME-"):
if current_gene and current_doc:
gene2doc[current_gene] = ''.join(current_doc)
if len(gene2doc) % chunk_size == 0:
yield gene2doc
gene2doc = {}
current_gene = line.split("-", 1)[1].strip()
current_doc = []
else:
current_doc.append(line)

if current_gene and current_doc:
gene2doc[current_gene] = ''.join(current_doc)

if gene2doc:
yield gene2doc

class CustomPreprocessing(ldp.preprocessing):
def making_doc_data(self, gene_list, name, dic, mode='w'):
"""Create preprocessed document data."""
preprocessed_dir = self.preprocessed_dir
counting = 0
handle = open(os.path.join(preprocessed_dir, name+'.data.doc.txt'), mode, encoding='utf-8')
if gene_list is None:
for i in range(len(dic)):
if counting == 10000:
print(i, '/', len(dic))
counting = 0
buffer = dic[i].split('\t')
if buffer[0] != '\n':
buffer = buffer[3] + buffer[4]
if buffer != '':
buffer = self.doc_preprocessor(buffer)
handle.write('-1' + '\t' + buffer + '\n')
counting += 1
else:
for i in range(len(gene_list)):
if counting == 10000:
print(i, '/', len(gene_list))
counting = 0
gene_name = gene_list[i]
data = dic[gene_name]
buffer = self.doc_preprocessor(data)
if buffer != '':
# Extract PMID from the buffer
pmid_match = re.search(r'#PMID-\s*(\d+)', buffer)
if pmid_match:
pmid = pmid_match.group(1)
# Add gene name before PMID
modified_buffer = re.sub(r'(#PMID-\s*\d+)', f'#GENENAME- {gene_name} \\1', buffer)
handle.write('#'+ gene_name + '\t' + modified_buffer + '\n')
else:
# If PMID is not found, just prepend the gene name
handle.write('#'+ gene_name + '\t#GENENAME- ' + gene_name + ' ' + buffer + '\n')
counting += 1
handle.close()

def main():
base = sys.argv[1]
output = sys.argv[2]

# Update paths to match your new structure
batch_dir = base
gene_based_dir = os.path.join(batch_dir, 'results', 'gene_based_records')
baseline_doc_dir = os.path.join(batch_dir, 'results', 'baseline_doc')
comb_dir = os.path.join(output, 'arranged')
preprocessed_dir = os.path.join(output, 'gene_based_preprocessed')

print(f"Checking directories...")
print(f"batch_dir: {batch_dir}")
print(f"gene_based_dir: {gene_based_dir}")
print(f"baseline_doc_dir: {baseline_doc_dir}")
print(f"comb_dir: {comb_dir}")
print(f"preprocessed_dir: {preprocessed_dir}")

if not all(map(check_directory, [batch_dir, gene_based_dir, baseline_doc_dir])):
sys.exit("One or more required directories do not exist. Please check the paths and try again.")

pathlib.Path(comb_dir).mkdir(parents=True, exist_ok=True)
pathlib.Path(preprocessed_dir).mkdir(parents=True, exist_ok=True)

lp = CustomPreprocessing(base, batch_dir, comb_dir, preprocessed_dir)

# Process gene-based documents
consolidated_file = os.path.join(comb_dir, 'consolidated_gene_docs.txt')

if not os.path.exists(consolidated_file):
print("Consolidated file not found. Starting consolidation process...")
gene_files = [f for f in os.listdir(gene_based_dir) if f.endswith('.txt')]
print(f"Found {len(gene_files)} gene-based documents.")

if not gene_files:
print("No gene-based documents found. Skipping this step.")
else:
with open(consolidated_file, 'w', encoding='utf-8') as outfile:
for i, file in enumerate(gene_files, 1):
try:
gene_name = os.path.splitext(file)[0] # Get filename without extension
content = safe_read(os.path.join(gene_based_dir, file))
if content is not None:
content = unicodedata.normalize('NFKD', content).encode('ascii', 'ignore').decode('ascii')
outfile.write(f"#GENENAME- {gene_name}\n{content}\n\n")
else:
print(f"Skipping file {file} due to reading error.")
except Exception as e:
print(f"Error processing file {file}: {e}")
print(traceback.format_exc())
if i % 1000 == 0:
print(f"Consolidating file {i}/{len(gene_files)}")
print("All gene-based documents consolidated.")
else:
print("Consolidated file found. Skipping consolidation process.")

print("Processing consolidated gene-based document...")
processed_genes_file = os.path.join(preprocessed_dir, 'processed_genes.json')
processed_genes = set()

if os.path.exists(processed_genes_file):
with open(processed_genes_file, 'r') as f:
processed_genes = set(json.load(f))
print(f"Resuming from {len(processed_genes)} previously processed genes.")

output_file = os.path.join(preprocessed_dir, 'consolidated_gene_docs.data.doc.txt')
mode = 'a' if os.path.exists(output_file) else 'w'

for gene2doc_chunk in process_gene_docs_in_chunks(consolidated_file):
print(f"Processing chunk with {len(gene2doc_chunk)} genes...")
new_genes = set(gene2doc_chunk.keys()) - processed_genes
if new_genes:
doc_gene = list(new_genes)
lp.making_doc_data(doc_gene, 'consolidated_gene_docs', {g: gene2doc_chunk[g] for g in new_genes}, mode)
processed_genes.update(new_genes)

# Update the processed genes file
with open(processed_genes_file, 'w') as f:
json.dump(list(processed_genes), f)
else:
print("All genes in this chunk have been processed already. Moving to next chunk.")

# Change mode to 'a' after first write
mode = 'a'

print("All processing completed.")

if __name__ == "__main__":
main()
121 changes: 121 additions & 0 deletions code/step2_data_preprocessing_Luis_genebased_new.py
@@ -0,0 +1,121 @@
import os
import pathlib
import sys
import unicodedata
import json
import re
from tqdm import tqdm
from concurrent.futures import ProcessPoolExecutor, as_completed
import multiprocessing
sys.path.append('lib')
import lib.Literature_Data_Preprocessing as ldp

def check_directory(path):
if not os.path.exists(path):
print(f"Directory does not exist: {path}")
return False
return True

def safe_read(file_path):
encodings = ['utf-8', 'latin-1']
for encoding in encodings:
try:
with open(file_path, 'r', encoding=encoding) as f:
return f.read()
except UnicodeDecodeError:
continue
except Exception as e:
print(f"Unexpected error reading file {file_path}: {e}")
print(f"Error reading file {file_path}: Unable to decode with available encodings")
return None

def load_gene_info(file_path):
with open(file_path, 'r', encoding='utf-8') as f:
return {line.strip(): line.strip() for line in f}

class CustomPreprocessing(ldp.preprocessing):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.pmid_pattern = re.compile(r'#PMID-\s*(\d+)')

def process_gene_file(self, file_path, gene_symbol, gene_name):
content = safe_read(file_path)
if content is not None:
content = unicodedata.normalize('NFKD', content).encode('ascii', 'ignore').decode('ascii')
buffer = self.doc_preprocessor(content)
if buffer:
pmid_match = self.pmid_pattern.search(buffer)
if pmid_match:
modified_buffer = re.sub(r'(#PMID-\s*\d+)', f'#GENESYMBOL- {gene_symbol} #GENENAME- {gene_name} \\1', buffer)
return f'#{gene_symbol}\t{modified_buffer}\n'
else:
return f'#{gene_symbol}\t#GENESYMBOL- {gene_symbol} #GENENAME- {gene_name} {buffer}\n'
return None

def process_gene(args):
lp, file_path, gene_symbol, gene_name = args
return lp.process_gene_file(file_path, gene_symbol, gene_name)

def main(base, output):
batch_dir = base
gene_based_dir = os.path.join(batch_dir, 'results', 'gene_based_records')
gene_info_dir = os.path.join(batch_dir, 'data', 'gene_name_info')
preprocessed_dir = os.path.join(output, 'gene_based_preprocessed')

print("Checking directories...")
for dir_path in [batch_dir, gene_based_dir, gene_info_dir]:
if not check_directory(dir_path):
sys.exit(f"Directory does not exist: {dir_path}")

pathlib.Path(preprocessed_dir).mkdir(parents=True, exist_ok=True)

lp = CustomPreprocessing(base, batch_dir, None, preprocessed_dir)

gene_symbols = load_gene_info(os.path.join(gene_info_dir, 'query_symbol.txt'))
gene_names = load_gene_info(os.path.join(gene_info_dir, 'query_full_name.txt'))

output_file = os.path.join(preprocessed_dir, 'consolidated_gene_docs.data.doc.txt')
processed_genes_file = os.path.join(preprocessed_dir, 'processed_genes.json')

processed_genes = set()
if os.path.exists(processed_genes_file):
with open(processed_genes_file, 'r') as f:
processed_genes = set(json.load(f))
print(f"Resuming from {len(processed_genes)} previously processed genes.")

gene_files = [f for f in os.listdir(gene_based_dir) if f.endswith('.txt')]
total_files = len(gene_files)
print(f"Found {total_files} gene-based documents.")

num_processes = multiprocessing.cpu_count()

with ProcessPoolExecutor(max_workers=num_processes) as executor:
futures = []
for file in gene_files:
gene_symbol = os.path.splitext(file)[0]
if gene_symbol not in processed_genes:
gene_name = gene_names.get(gene_symbol, gene_symbol)
file_path = os.path.join(gene_based_dir, file)
futures.append(executor.submit(process_gene, (lp, file_path, gene_symbol, gene_name)))

with open(output_file, 'a', encoding='utf-8') as outfile:
for future in tqdm(as_completed(futures), total=len(futures), desc="Processing genes"):
result = future.result()
if result:
outfile.write(result)
gene_symbol = result.split('\t')[0][1:] # Extract gene symbol from the result
processed_genes.add(gene_symbol)

# Save progress every 1000 genes
if len(processed_genes) % 1000 == 0:
with open(processed_genes_file, 'w') as pf:
json.dump(list(processed_genes), pf)

# Final save of processed genes
with open(processed_genes_file, 'w') as pf:
json.dump(list(processed_genes), pf)

print("All processing completed.")

if __name__ == "__main__":
main(sys.argv[1], sys.argv[2])