Source code for methylprep.download.miniml

import re
from pathlib import Path
import shutil
from bs4 import BeautifulSoup
import pandas as pd
import tarfile
from tarfile import ReadError
from tqdm import tqdm
from ftplib import FTP
import logging
import time
# app
from .samplesheet_sync_idat import remove_idats_not_in_samplesheet
from methylprep import run_pipeline
#from methylprep.download.process_data import run_series --- breaks, dunno why!
import methylprep.download.process_data

LOGGER = logging.getLogger(__name__)
LOGGER.setLevel( logging.INFO )

geo_platforms=['GPL13534', 'GPL21145']

### FIX: make extract_controls FALSE by default, cli to turn on in meta_data ###

__all__ = [
    'convert_miniml',
    'download_miniml',
    'merge_sample_sheets',
    'meta_from_idat_filenames',
    'sample_sheet_from_miniml',
    'sample_sheet_from_idats',
    'cleanup',
    'build_composite_dataset',
]


[docs]def convert_miniml(geo_id, data_dir='.', merge=True, download_it=True, extract_controls=False, require_keyword=None, sync_idats=False, remove_tgz=False, verbose=False): """This scans the datadir for an xml file with the geo_id in it. Then it parses it and saves the useful stuff to a dataframe called "sample_sheet_meta_data.pkl". DOES NOT REQUIRE idats. CLI version: python -m meta_data -i GSExxxxx -d <my_folder> Arguments: merge: If merge==True and there is a file with 'samplesheet' in the folder, and that sheet has GSM_IDs, merge that data into this samplesheet. Useful for when you have idats and want one combined samplesheet for the dataset. download_it: if miniml file not in data_dir path, it will download it from web. extract_controls [experimental]: if you only want to retain samples from the whole set that have certain keywords, such as "control" or "blood", this experimental flag will rewrite the samplesheet with only the parts you want, then feed that into run_pipeline with named samples. require_keyword [experimental]: another way to eliminate samples from samplesheets, before passing into the processor. if specified, this keyword needs to appear somewhere in the values of a samplesheet. sync_idats: If flagged, this will search `data_dir` for idats and remove any of those that are not found in the filtered samplesheet. Requires you to run the download function first to get all idats, before you run this `meta_data` function. Attempts to also read idat filenames, if they exist, but won't fail if they don't. """ file_pattern = f'{geo_id}*.xml' local_files = [] files = list(Path(data_dir).rglob(file_pattern)) if files: file = Path(files[0]) result = {'local_files': [file]} else: if download_it == True: # option to have it retain the .xml.tgz files, which -- if multipart -- contain each smaple betas result = download_miniml(geo_id, data_dir, remove_tgz=remove_tgz, verbose=verbose) ### methylprep v1.3.0 will change value return from string to list. Only used here within methylprep. file = result['meta_data'] # also returns a list of local_files found, if there were samples tgz'ed in with meta_data local_files = result['meta_data'] if verbose: LOGGER.info(f"Attempting to read xml: {file}") else: LOGGER.info('Did not find any miniml XML files to convert.') return local_files with open(file, 'r') as fp: soup = BeautifulSoup(fp, "xml") meta_dicts = {'GPL13534':{}, 'GPL21145':{}} samples_dict = {'GPL13534':{}, 'GPL21145':{}} samples = soup.MINiML.find_all("Sample") missing_methylprep_names = 0 warned_once = False for sample in samples: platform = sample.find('Platform-Ref')['ref'] accession = sample.Accession.text title = sample.Title.text attributes_dir = {} attributes_dir['source'] = sample.Source.text attributes_dir['platform'] = platform attributes_dir['title'] = title for char in sample.find_all('Characteristics'): attributes_dir[char['tag']] = char.text.strip() if sample.find('Description'): attributes_dir['description'] = sample.find('Description').text.strip() # only some MINiML files have this. try: split_idat = sample.find('Supplementary-Data').text.split("/")[-1].split("_") attributes_dir['Sample_ID'] = f"{split_idat[1]}_{split_idat[2]}" # {accession / GSM} is not used in methylprep data columns attributes_dir['Sentrix_ID'] = f"{split_idat[1]}" attributes_dir['Sentrix_Position'] = f"{split_idat[2]}" except: missing_methylprep_names += 1 protocols = ['Extract-Protocol', 'Label-Protocol', 'Hybridization-Protocol', 'Scan-Protocol', 'Data-Processing'] for protocol in protocols: if sample.find(protocol): attributes_dir[protocol] = sample.find(protocol).text.strip() if platform in geo_platforms: meta_dicts[platform][accession] = attributes_dir samples_dict[platform][accession] = title elif warned_once is False: LOGGER.warning(f"{platform} is not among the supported platforms {geo_platforms}") warned_once = True if missing_methylprep_names > 0: LOGGER.info( f"MINiML file does not provide (Sentrix_id_R00C00) for {missing_methylprep_names}/{len(samples)} samples." ) for platform in geo_platforms: if verbose: LOGGER.info(f"Platform {platform} --> {len(meta_dicts[platform])} samples") if meta_dicts[platform]: #Path(f"{data_dir}/{platform}").mkdir(parents=True, exist_ok=True) meta_dicts[platform] = meta_from_idat_filenames(data_dir, meta_dicts[platform]) if merge == True: # find samplesheet.csv with idat ID/Position and merge in here. meta_dicts[platform] = merge_sample_sheets(data_dir, meta_dicts[platform]) sample_sheet_from_miniml(geo_id, data_dir, platform, samples_dict[platform], meta_dicts[platform], save_df=True, extract_controls=extract_controls, require_keyword=require_keyword, verbose=verbose) if sync_idats: paths = list(Path(data_dir).rglob(f'{geo_id}_{platform}_samplesheet.csv')) if len(paths) > 0: remove_idats_not_in_samplesheet(paths[0], data_dir) else: LOGGER.warning(f"Could not locate file ({f'{geo_id}_{platform}_samplesheet.csv'}) in path of {data_dir}.") cleanup(data_dir) return result['local_files']
def download_miniml(geo_id, series_path, remove_tgz=False, verbose=False): """Downloads the MINIML metadata for a GEO series Arguments: geo_id [required] the GEO Accension for the desired series (e.g. GSE134293) series_path [required] the directory to download the data to Note about GEO IDs: You can use the NIH online search to find data sets, then click "Send to:" at the button of a results page, and export a list of unique IDs as text file. These IDs are not GEO_IDs used here. First, remove the first three digits from the number, so Series ID: 200134293 is GEO accension ID: 134293, then include the GSE part, like "GSE134293" in your CLI parameters. Note on alt filenames: GSE50660_family.xml was actually three files: GSE50660_family.xml.part3.tgz GSE50660_family.xml.part2.tgz GSE50660_family.xml.tgz so v1.3.0 adds support for multi-part miniml files. (these often include a lot of additional processed idats data) note: returns a LIST instead of a single filename now. update convert_miniml() to use that. """ series_dir = Path(series_path) expected_miniml_filename = f"{geo_id}_family.xml" if not Path(series_path).exists(): Path(series_path).mkdir(parents=True, exist_ok=True) ftp = FTP('ftp.ncbi.nlm.nih.gov', timeout=120) # 2 mins ftp.login() ftp.cwd(f"geo/series/{geo_id[:-3]}nnn/{geo_id}") # look for additional parts of xml files, so long as each part contains the geo_id. file_parts = [] for filename,filestats in ftp.mlsd(path=f"miniml", facts=["size"]): if geo_id in filename: file_parts.append(filename) local_files = [] for miniml_filename in file_parts: LOGGER.info(f"Downloading {miniml_filename}") miniml_ftp_path = f"miniml/{miniml_filename}" miniml_local_path = f"{series_path}/{miniml_filename}" miniml_file = open(miniml_local_path, 'wb') try: # Alternatively, as a hacky workaround, you might try download a file -- any file -- before sending your SIZE command. With request.UseBinary = true for that request, it should cause your client to send the "TYPE I" command to the FTP server. (And it won't matter if that download request fails; the TYPE command will still have been sent.) filesize = ftp.size(miniml_ftp_path) with tqdm(unit = 'b', unit_scale = True, leave = False, miniters = 1, desc = geo_id, total = filesize) as tqdm_instance: def tqdm_callback(data): tqdm_instance.update(len(data)) miniml_file.write(data) ftp.retrbinary(f"RETR {miniml_ftp_path}", tqdm_callback) except Exception as e: if not str(e).startswith('550'): LOGGER.info(e) if verbose: LOGGER.info('Failed to create a progress bar, but it is downloading...') ftp.retrbinary(f"RETR {miniml_ftp_path}", miniml_file.write) miniml_file.close() if not Path(miniml_local_path).exists(): LOGGER.error(f"{miniml_local_path} missing") min_tar = tarfile.open(miniml_local_path) for file in min_tar.getnames(): # this only extracts the one .xml file we need. #LOGGER.info(f"DEBUG miniml_filename {expected_miniml_filename} == tar_file {file} | {file == expected_miniml_filename}") if file == expected_miniml_filename: min_tar.extract(file, path=series_path) if Path(miniml_local_path).exists() and remove_tgz: Path(miniml_local_path).unlink() if verbose: LOGGER.info(f"Downloaded and unpacked {miniml_filename}") for extracted_file in Path(series_path).glob('*'): if verbose: LOGGER.info(f"Extracted {extracted_file}") if expected_miniml_filename == extracted_file.name: miniml_local_path = f"{series_path}/{extracted_file.name}" if verbose: LOGGER.info(f"*Identified* meta_data: {extracted_file}") local_files.append(str(extracted_file)) ftp.quit() return {'meta_data': miniml_local_path, 'local_files': local_files} def merge_sample_sheets(data_dir, meta_dict): # potentially grab Sentrix_ ID and Position from samplesheet of idats. files = list(Path(data_dir).rglob(f'*samplesheet*.csv')) if files: for file in files: try: samplesheet = pd.read_csv(file) except: LOGGER.warning('could not open {file}') continue if 'GSM_ID' in samplesheet: if set(samplesheet['GSM_ID']) & set(meta_dict) != set(): # meta_dict is a dict of dicts with GSM_IDS as keys # add lists of Sentrix_ID and Sentrix_Position to meta_dict. gsm_sid = {row.GSM_ID:row.Sentrix_ID for idx,row in samplesheet.iterrows() if row.get('Sentrix_ID')} gsm_spos = {row.GSM_ID:row.Sentrix_Position for idx,row in samplesheet.iterrows() if row.get('Sentrix_Position')} updated = 0 not_found = 0 for GSM_ID in samplesheet['GSM_ID']: if GSM_ID in meta_dict and GSM_ID in gsm_sid and GSM_ID in gsm_spos: if not meta_dict[GSM_ID].get('Sentrix_ID'): meta_dict[GSM_ID]['Sentrix_ID'] = gsm_sid[GSM_ID] updated += 1 if not meta_dict[GSM_ID].get('Sentrix_Position'): meta_dict[GSM_ID]['Sentrix_Position'] = gsm_spos[GSM_ID] else: not_found += 1 if updated > 0: LOGGER.info(f'{file}: meta_dict updated with {updated} Sentrix IDs/Positions') if not_found > 0: LOGGER.info(f'{file}: {not_found} GSM_IDs in samplesheet where not found in meta_dict') return meta_dict def meta_from_idat_filenames(data_dir, meta_dict): gsm_id_pos = {} for idx, idat in enumerate(Path(data_dir).rglob('*Grn.idat')): if len(idat.stem.split("_")) == 4: # and 'Grn' in idat.stem: gsm_id = idat.stem.split("_")[0] sentrix_id = idat.stem.split("_")[1] sentrix_position = idat.stem.split("_")[2] gsm_id_pos[gsm_id] = {'Sentrix_ID': sentrix_id, 'Sentrix_Position': sentrix_position} elif len(idat.stem.split("_")) == 3: # and 'Grn' in idat.stem: gsm_id = idx sentrix_id = idat.stem.split("_")[0] sentrix_position = idat.stem.split("_")[1] gsm_id_pos[gsm_id] = {'Sentrix_ID': sentrix_id, 'Sentrix_Position': sentrix_position} if len(gsm_id_pos) == len(meta_dict): for gsm_id, data in gsm_id_pos.items(): if gsm_id in meta_dict: meta_dict[gsm_id]['Sentrix_ID'] = data['Sentrix_ID'] meta_dict[gsm_id]['Sentrix_Position'] = data['Sentrix_Position'] LOGGER.info(f"found {len(gsm_id_pos)} idat files; updated meta_data.") return meta_dict def sample_sheet_from_miniml(geo_id, series_path, platform, samp_dict, meta_dict, save_df=False, extract_controls=False, require_keyword=None, verbose=True): """Creates a sample_sheet for all samples of a particular platform for a given series Arguments: geo_id [required] the GEO Accension for the desired series series_path [required] the directory containing the series data platform [required] the platform to generate a sample sheet for samp_dict the dictionary of {sample GSM_ID: title} for the given platform meta_dict the dictionary of {sample GSM_ID: meta_dict} for the given platform Notes: cleans up redundant columns, including Sample_Name == title and platform """ # id and position are NOT in the miniml file, must match with GSMID later _dict = {'GSM_ID': [], 'Sample_Name': [], 'Sentrix_ID': [], 'Sentrix_Position': []} for GSM_ID, sample_title in samp_dict.items(): _dict['GSM_ID'].append(GSM_ID) _dict['Sample_Name'].append(sample_title) for key, value in meta_dict[GSM_ID].items(): if key not in _dict: _dict[key] = [] _dict[key].append(value) # arrays must be all same length out = _dict.copy() for column in _dict.keys(): # Sentrix_ID and Sentrix_Position may be missing. if len(_dict[column]) == 0: LOGGER.debug(f"dropped {column} (empty)") out.pop(column,None) if set(_dict[column]) & set(geo_platforms) != set(): LOGGER.debug(f"dropped `{column}` ({set(_dict[column]) & set(geo_platforms)})") out.pop(column,None) # don't need platform, since saved in folder this way. if column in ('title','source'): # drop these columns first, if redundant for other_column in _dict.keys(): if set(_dict[column]) == set(_dict[other_column]) and column != other_column: LOGGER.debug(f"{column} == {other_column}; dropping {column}") out.pop(column,None) try: df = pd.DataFrame(data=out) except ValueError as e: # arrays must all be same length from collections import Counter LOGGER.info(f"Some sample meta data is missing for some samples (and will be filled in with ''): {[(key, len(val)) for key,val in out.items()]}") # LOGGER.info(f"Trying another method to save partial sample data into csv/pkl file.") ## alt method to salvage it by filling in blanks for missing rows -- but doesn't seem to capture Sentrix_ID / Sentrix_Position ## column_counts = {'GSM_ID': [], 'Sample_Name': []} # column_name : [GSM_IDs included] out = {} # list of dicts as sample rows, keyed to GSM_IDs for GSM_ID, sample_title in samp_dict.items(): out[GSM_ID] = {'GSM_ID': GSM_ID, 'Sample_Name': sample_title} column_counts['GSM_ID'].append(GSM_ID) column_counts['Sample_Name'].append(GSM_ID) for key, value in meta_dict[GSM_ID].items(): out[GSM_ID][key] = value if key not in column_counts: column_counts[key] = [GSM_ID] else: column_counts[key].append(GSM_ID) # check if any fields are missing GSM_IDs, and fill in with blanks for col_name, gsm_id_list in column_counts.items(): if len(out) != len(gsm_id_list): for GSM_ID in out.keys(): if GSM_ID not in gsm_id_list: out[GSM_ID][col_name] = '' try: df = pd.DataFrame(data=list(out.values())) except: raise ValueError(f"{e}; this happens when a samplesheet is missing descriptors for one or more samples.") # filter: only retain control samples if extract_controls: keywords = ['control','ctrl'] filtered = df.copy() for idx, row in df.iterrows(): # some column needs to match one of the keywords to be retained tested = [] for keyword in keywords: for val in row.values: if re.search(keyword, val, re.I): tested.append(val) if tested == []: filtered.drop(idx, inplace=True) LOGGER.info(f"Filtering controls: Retained {len(filtered.index)}/{len(df.index)} samples matching {keywords}.") del df df = filtered if require_keyword: filtered = df.copy() for idx, row in df.iterrows(): # some column value needs to match this keyword for sample to be retained tested = [] for val in row.values: if re.search(require_keyword, val, re.I): tested.append(val) if tested == []: filtered.drop(idx, inplace=True) LOGGER.info(f"Filtering keyword: Retained {len(filtered.index)}/{len(df.index)} samples matching `{require_keyword}``.") del df df = filtered #LOGGER.info(df.head()) LOGGER.info(f"Final samplesheet contains {df.shape[0]} rows and {df.shape[1]} columns") if len(df.columns) < 30 and verbose: LOGGER.info(f"{list(df.columns)}") if Path(series_path, platform).exists(): df.to_csv(path_or_buf=(Path(series_path, platform, f'{geo_id}_{platform}_samplesheet.csv')),index=False) else: df.to_csv(path_or_buf=(Path(series_path, f'{geo_id}_{platform}_samplesheet.csv')),index=False) if save_df: if Path(series_path, platform).exists(): df.to_pickle(Path(series_path, platform, f'{geo_id}_{platform}_meta_data.pkl')) else: df.to_pickle(Path(series_path, f'{geo_id}_{platform}_meta_data.pkl')) def sample_sheet_from_idats(geo_id, series_path, platform, platform_samples_dict, save_df=False): """This is a simpler "fallback" parser of miniml.sample_sheet_from_miniml(). Creates a sample_sheet for all samples of a particular platform for a given series Arguments: geo_id [required] the GEO Accension for the desired series series_path [required] the directory containing the series data platform [required] the platform to generate a sample sheet for platform_samples_dict the dictionary of samples for the given platform""" series_dir = Path(f"{series_path}/{platform}") idat_files = series_dir.glob('*Grn.idat') _dict = {'GSM_ID': [], 'Sample_Name': [], 'Sentrix_ID': [], 'Sentrix_Position': []} for idat in idat_files: filename = str(idat).split("/")[-1] if re.match('(GSM[0-9]+_[0-9a-zA-Z]+_R0[0-9].0[0-9].Grn.idat)', filename): split_filename = filename.split("_") _dict['GSM_ID'].append(split_filename[0]) _dict['Sentrix_ID'].append(split_filename[1]) _dict['Sentrix_Position'].append(split_filename[2]) else: raise ValueError(f"{filename} has an unexpected naming format") # confusing logic here, names_dir is unordered, but retrieving by key in correct order alleviates for key in _dict['GSM_ID']: _dict['Sample_Name'].append(platform_samples_dict[key]) df = pd.DataFrame(data=_dict) df.to_csv(path_or_buf=(Path(f"{series_path}/{platform}", 'samplesheet.csv')),index=False) if save_df: df.to_pickle(Path(f"{series_path}/{platform}", 'sample_sheet_meta_data.pkl')) def cleanup(path): """removes unused/empty directories Arguments: path [required] the root path to check recursively""" if not Path(path).is_dir(): raise ValueError(f"{path} doesn't exist") folders = [p for p in list(Path(path).glob('*')) if p.is_dir()] for folder in folders: filled_folders = {str(p.parent) for p in Path(folder).rglob('*') if p.is_file()} if filled_folders == set(): Path(folder).rmdir()
[docs]def build_composite_dataset(geo_id_list, data_dir, merge=True, download_it=True, extract_controls=False, require_keyword=None, sync_idats=True, betas=False, m_value=False, export=False): """A wrapper function for convert_miniml() to download a list of GEO datasets and process only those samples that meet criteria. Specifically - grab the "control" or "normal" samples from a bunch of experiments for one tissue type (e.g. "blood"), process them, and put all the resulting beta_values and/or m_values pkl files in one place, so that you can run `methylize.load_both()` to create a combined reference dataset for QC, analysis, or meta-analysis. Arguments: geo_id_list (required): A list of GEO "GSEnnn" ids. From command line, pass these in as separate values data_dir: folder to save data merge (True): If merge==True and there is a file with 'samplesheet' in the folder, and that sheet has GSM_IDs, merge that data into this samplesheet. Useful for when you have idats and want one combined samplesheet for the dataset. download_it (True): if miniml file not in data_dir path, it will download it from web. extract_controls (True)): if you only want to retain samples from the whole set that have certain keywords, such as "control" or "blood", this experimental flag will rewrite the samplesheet with only the parts you want, then feed that into run_pipeline with named samples. require_keyword (None): another way to eliminate samples from samplesheets, before passing into the processor. if specified, the "keyword" string passed in must appear somewhere in the values of a samplesheet for sample to be downloaded, processed, retained. sync_idats: If flagged, this will search `data_dir` for idats and remove any of those that are not found in the filtered samplesheet. Requires you to run the download function first to get all idats, before you run this `meta_data` function. betas: process beta_values m_value: process m_values - Attempts to also read idat filenames, if they exist, but won't fail if they don't. - removes unneeded files as it goes, but leaves the xml MINiML file and folder there as a marker if a geo dataset fails to download. So it won't try again on resume. """ def remove_unused_files(geo_id, geo_folder): if list(Path(data_dir, geo_id).rglob('*.idat')) == []: for file in Path(data_dir, geo_id).glob("*_samplesheet.csv"): file.unlink() for file in Path(data_dir, geo_id).glob("*_meta_data.pkl"): file.unlink() # the XML and folder is used to mark failed downloads on resume, so it skips them next time #for file in Path(data_dir, geo_id).glob("*_family.xml"): # file.unlink() #try: # Path(data_dir, geo_id).rmdir() #except Exception as e: # LOGGER.error(f"Path {data_dir/geo_id} is not empty. Could not remove.") return True return False start_time = time.process_time() # note: parser uses VERBOSE setting to show/suppress INFO and DEBUG level messages. WARNING/ERROR msgs always appear. # get the ids from file try: with open(Path(data_dir,geo_id_list), 'r') as fp: geo_ids = [series_id.strip() for series_id in fp] except FileNotFoundError: LOGGER.error("""File not found: Specify your list of GEO series IDs to download using a text file in the folder where data should be saved. Put one ID on each line. """) return except ValueError as e: LOGGER.error(f"Error with {fp.name}: {e}") fp.close() return fp.close() geo_folders = {} for geo_id in geo_ids: # exclude failed folders: if the folder exists and only contains the miniml family.xml file, skip it. if Path(data_dir,geo_id).exists() and Path(data_dir,geo_id, f'{geo_id}_family.xml').exists(): if len(list(Path(data_dir, geo_id).rglob('*'))) == 1: LOGGER.info(f"Skipping {geo_id}; appears to be a prior run that didn't match filters, or was missing data.") continue # exclude geo series whose HTML pages don't say TAR (of idat). if methylprep.download.process_data.confirm_dataset_contains_idats(geo_id) == False: LOGGER.error(f"[!] Geo data set {geo_id} probably does NOT contain usable raw data (in .idat format). Not downloading.") continue geo_folder = Path(data_dir,geo_id) geo_folders[geo_id] = geo_folder # download meta_data LOGGER.info(f"Running {geo_id}") try: convert_miniml( geo_id, data_dir=geo_folder, merge=merge, download_it=download_it, extract_controls=extract_controls, require_keyword=require_keyword, sync_idats=False) #no idat files exist yet. except Exception as e: LOGGER.error(f'Processing meta_data failed: {e}') continue ## if the samplesheet is empty, stop. abort = False for platform in geo_platforms: if Path(data_dir, geo_id, f"{geo_id}_{platform}_samplesheet.csv").is_file(): samplesheet = pd.read_csv(Path(data_dir, geo_id, f"{geo_id}_{platform}_samplesheet.csv")) if len(samplesheet.index) == 0: LOGGER.warning(f"Aborting {geo_id}: No samples match filters (control:{extract_controls}, keyword: {require_keyword})") # TODO: cleanup this first. abort = True remove_unused_files(geo_id, geo_folder) geo_folders.pop(geo_id) continue if abort: continue # check if no idats yet if list(Path(geo_folder).rglob('*.idat')) != []: pass else: try: # download idats methylprep.download.process_data.run_series( geo_id, geo_folder, dict_only=True, batch_size=200, clean=True, # do later in this function ) except Exception as e: LOGGER.error(f'Downloading IDATs failed: {e}') # THIS DOES NOT CLEAN THE FOLDERS IF ERROR continue if sync_idats: for platform in geo_platforms: paths = list(Path(data_dir, geo_id).rglob(f'{geo_id}_{platform}_samplesheet.csv')) if len(paths) > 0: remove_idats_not_in_samplesheet(paths[0], Path(data_dir, geo_id)) #else: # LOGGER.error(f"Could not locate file ({geo_id}_{platform}_samplesheet.csv}) in path of {data_dir}/{geo_id}") # there are probably two versions of samplesheets. if they're the same, remove one. # if they differ, keep the one created by miniml (where filtering happens) # this should only proceed this far if both samplesheets contain samples. for platform in geo_platforms: samp_A = Path(data_dir, geo_id, f"{geo_id}_{platform}_samplesheet.csv") samp_B = Path(data_dir, geo_id, platform, f"{geo_id}_{platform}_samplesheet.csv") meta_B = Path(data_dir, geo_id, platform, f"{geo_id}_{platform}_meta_data.pkl") if (samp_A.is_file() and samp_B.is_file()): samplesheet = pd.read_csv(samp_A) basic_samplesheet = pd.read_csv(samp_B) #if samplesheet == basic_samplesheet: OR in every case, keep the miniml-filtered version. #shutil.move(samp_A, samp_B) samp_B.unlink() if meta_B.is_file(): meta_B.unlink() # next, remove all files if there are no idats in the folder. if remove_unused_files(geo_id, geo_folder): geo_folders.pop(geo_id) if export == False and betas == False and m_value == False: LOGGER.info("Not processing data, because no output types are specified.") else: for geo_id, geo_folder in geo_folders.items(): try: run_pipeline(geo_folder, #maybe pass in sample_sheet_filepath if it gets confused here. betas=betas, m_value=m_value, batch_size=200, export=export, meta_data_frame=False ) except Exception as e: LOGGER.warning(f'Processing IDATs failed for {geo_id}: {e}') LOGGER.info('[!] Consolidating data files [!]') # consoldate data into one folder and remove rest. file_patterns = [ 'beta_values_*.pkl' if betas else None, 'm_values_*.pkl' if m_value else None ] for pattern in file_patterns: if pattern == None: continue datas = [] samples = 0 for file in Path(data_dir).rglob(pattern): if file.parts[-2] in geo_ids: data = pd.read_pickle(file) # data should have probes in rows, samples in columns. if data.shape[1] > data.shape[0]: data = data.transpose() datas.append(data) samples += data.shape[1] print(f"-- {len(datas)} {file.parts[-2]} {data.shape}") if datas: big_data = pd.concat(datas, axis=1, ignore_index=False, sort=True, copy=False) del datas LOGGER.info(f"[!] saved {samples} samples to disk from {pattern}.") if 'beta' in pattern: big_data.to_pickle(Path(data_dir,'beta_values.pkl')) if 'm_value' in pattern: big_data.to_pickle(Path(data_dir,'m_values.pkl')) end_time = time.process_time() LOGGER.info(f"[*] Process time: {round((end_time - start_time)/60.0,1)} min")