Source code for methylprep.processing.pipeline

# Lib
import logging
import numpy as np
import pandas as pd
from ..utils.progress_bar import * # checks environment and imports tqdm appropriately.
from collections import Counter
from pathlib import Path
import pickle
import sys
# App
from ..files import Manifest, get_sample_sheet, create_sample_sheet
from ..models import (
    Channel,
    #MethylationDataset,
    SigSet,
    ArrayType,
    #get_raw_datasets,
    get_array_type,
    parse_sample_sheet_into_idat_datasets,
)
from .postprocess import (
    calculate_beta_value,
    calculate_m_value,
    calculate_copy_number,
    consolidate_values_for_sheet,
    one_sample_control_snp,
    consolidate_mouse_probes,
    merge_batches,
)
from ..utils import ensure_directory_exists, is_file_like
from .preprocess import preprocess_noob, _apply_sesame_quality_mask
from .p_value_probe_detection import _pval_sesame_preprocess, _pval_neg_ecdf
from .infer_channel_switch import infer_type_I_probes
from .dye_bias import nonlinear_dye_bias_correction
from .multi_array_idat_batches import check_array_folders


__all__ = ['SampleDataContainer', 'run_pipeline', 'consolidate_values_for_sheet', 'make_pipeline']

LOGGER = logging.getLogger(__name__)


[docs]def run_pipeline(data_dir, array_type=None, export=False, manifest_filepath=None, sample_sheet_filepath=None, sample_name=None, betas=False, m_value=False, make_sample_sheet=False, batch_size=None, save_uncorrected=False, save_control=True, meta_data_frame=True, bit='float32', poobah=False, export_poobah=False, poobah_decimals=3, poobah_sig=0.05, low_memory=True, sesame=True, quality_mask=None, pneg_ecdf=False, file_format='pickle', **kwargs): """The main CLI processing pipeline. This does every processing step and returns a data set. Required Arguments: data_dir [required] path where idat files can be found, and a samplesheet csv. Optional file and sub-sampling inputs: manifest_filepath [optional] if you want to provide a custom manifest, provide the path. Otherwise, it will download the appropriate one for you. sample_sheet_filepath [optional] it will autodetect if ommitted. make_sample_sheet [optional] if True, generates a sample sheet from idat files called 'samplesheet.csv', so that processing will work. From CLI pass in "--no_sample_sheet" to trigger sample sheet auto-generation. sample_name [optional, list] if you don't want to process all samples, you can specify individual samples as a list. if sample_names are specified, this will not also do batch sizes (large batches must process all samples) Optional processing arguments: sesame [default: True] If True, applies offsets, poobah, noob, infer_channel_switch, nonlinear-dye-bias-correction, and qualityMask to imitate the output of openSesame function. If False, outputs will closely match minfi's processing output. Prior to version 1.4.0, file processing matched minfi. array_type [default: autodetect] 27k, 450k, EPIC, EPIC+ If omitted, this will autodetect it. batch_size [optional] if set to any integer, samples will be processed and saved in batches no greater than the specified batch size. This will yield multiple output files in the format of "beta_values_1.pkl ... beta_values_N.pkl". bit [default: float32] You can change the processed output files to one of: {float16, float32, float64}. This will make files & memory usage smaller, often with no loss in precision. However, using float16 masy cause an overflow error, resulting in "inf" appearing instead of numbers, and numpy/pandas functions do not universally support float16. low_memory [default: True] If False, pipeline will not remove intermediate objects and data sets during processing. This provides access to probe subsets, foreground, and background probe sets in the SampleDataContainer object returned when this is run in a notebook (not CLI). quality_mask [default: None] If False, process will NOT remove sesame's list of unreliable probes. If True, removes probes. The default None will defer to sesamee, which defaults to true. But if explicitly set, it will override sesame setting. Optional export files: meta_data_frame [default: True] if True, saves a file, "sample_sheet_meta_data.pkl" with samplesheet info. export [default: False] if True, exports a CSV of the processed data for each idat file in sample. file_format [default: pickle; optional: parquet] Matrix style files are faster to load and process than CSVs, and python supports two types of binary formats: pickle and parquet. Parquet is readable by other languages, so it is an option starting v1.7.0. save_uncorrected [default: False] if True, adds two additional columns to the processed.csv per sample (meth and unmeth), representing the raw fluorescence intensities for all probes. It does not apply NOOB correction to values in these columns. save_control [default: False] if True, adds all Control and SnpI type probe values to a separate pickled dataframe, with probes in rows and sample_name in the first column. These non-CpG probe names are excluded from processed data and must be stored separately. poobah [default: False] If specified as True, the pipeline will run Sesame's p-value probe detection method (poobah) on samples to remove probes that fail the signal/noise ratio on their fluorescence channels. These will appear as NaNs in the resulting dataframes (beta_values.pkl or m_values.pkl). All probes, regardless of p-value cutoff, will be retained in CSVs, but there will be a 'poobah_pval' column in CSV files that methylcheck.load uses to exclude failed probes upon import at a later step. poobah_sig [default: 0.05] the p-value level of significance, above which, will exclude probes from output (typical range of 0.001 to 0.1) poobah_decimals [default: 3] The number of decimal places to round p-value column in the processed CSV output files. mouse probes Mouse-specific will be saved if processing a mouse array. Optional final estimators: betas if True, saves a pickle (beta_values.pkl) of beta values for all samples m_value if True, saves a pickle (m_values.pkl) of beta values for all samples Note on meth/unmeth: if either betas or m_value is True, this will also save two additional files: 'meth_values.pkl' and 'unmeth_values.pkl' with the same dataframe structure, representing raw, uncorrected meth probe intensities for all samples. These are useful in some methylcheck functions and load/produce results 100X faster than loading from processed CSV output. Returns: By default, if called as a function, a list of SampleDataContainer objects is returned, with the following execptions: betas if True, will return a single data frame of betavalues instead of a list of SampleDataContainer objects. Format is a "wide matrix": columns contain probes and rows contain samples. m_value if True, will return a single data frame of m_factor values instead of a list of SampleDataContainer objects. Format is a "wide matrix": columns contain probes and rows contain samples. if batch_size is set to more than ~600 samples, nothing is returned but all the files are saved. You can recreate/merge output files by loading the files using methylcheck.load(). Processing notes: The sample_sheet parser will ensure every sample has a unique name and assign one (e.g. Sample1) if missing, or append a number (e.g. _1) if not unique. This may cause sample_sheets and processed data in dataframes to not match up. Will fix in future version. pipeline steps: 1 make sample sheet or read sample sheet into a list of samples' data 2 split large projects into batches, if necessary, and ensure unique sample names 3 read idats 4 select and read manifest 5 put everything into SampleDataContainer class objects 6 process everything, using the pipeline steps specified idats -> channel_swaps -> poobah -> quality_mask -> noob -> dye_bias 7 apply the final estimator function (beta, m_value, or copy number) to all data 8 export all the data into multiple files, as defined by pipeline """ #local_vars = list(locals().items()) #print([(key,val) for key,val in local_vars]) # support for the make_pipeline wrapper function here; a more structured way to pass in args like sklearn. # unexposed flags all start with 'do_': (None will retain default settings) do_infer_channel_switch = None # defaults to sesame(True) do_noob = None # defaults to True do_nonlinear_dye_bias = True # defaults to sesame(True), but can be False (linear) or None (omit step) do_save_noob = None do_mouse = True hidden_kwargs = ['pipeline_steps', 'pipeline_exports', 'debug'] if kwargs != {}: for kwarg in kwargs: if kwarg not in hidden_kwargs: if sys.stdin.isatty() is False: raise SystemExit(f"One of your parameters ({kwarg}) was not recognized. Did you misspell it?") else: raise KeyError(f"One of your parameters ({kwarg}) was not recognized. Did you misspell it?") if sesame == True: poobah = True # if sesame is True and poobah is False, it hangs forever. if sesame == False and 'pipeline_steps' not in kwargs: do_nonlinear_dye_bias = False # FORCE minfi to do linear if kwargs != {} and 'pipeline_steps' in kwargs: pipeline_steps = kwargs.get('pipeline_steps') if 'all' in pipeline_steps: do_infer_channel_switch = True poobah = True quality_mask = True do_noob = True do_nonlinear_dye_bias = True sesame = None # prevent this from overriding elsewhere else: do_infer_channel_switch = True if 'infer_channel_switch' in pipeline_steps else False poobah = True if 'poobah' in pipeline_steps else False quality_mask = True if 'quality_mask' in pipeline_steps else False do_noob = True if 'noob' in pipeline_steps else False if 'dye_bias' in pipeline_steps: do_nonlinear_dye_bias = True elif 'linear_dye_bias' in pipeline_steps: do_nonlinear_dye_bias = False else: do_nonlinear_dye_bias = None # omit step sesame = None if sesame == True else sesame if kwargs != {} and 'pipeline_exports' in kwargs: pipeline_exports = kwargs.get('pipeline_exports') if 'all' in pipeline_exports: export = True # csv save_uncorrected = True # meth, unmeth do_save_noob = True # noob_meth, noob_unmeth export_poobah = True # poobah meta_data_frame = True # sample_sheet_meta_data do_mouse = True # 'mouse' -- only if array_type matches; False will suppress export save_control = True # control else: export = True if 'csv' in pipeline_exports else False save_uncorrected = True if ('meth' in pipeline_exports or 'unmeth' in pipeline_exports) else False do_save_noob = True if ('noob_meth' in pipeline_exports or 'noob_unmeth' in pipeline_exports) else False export_poobah = True if 'poobah' in pipeline_exports else False meta_data_frame = True if 'sample_sheet_meta_data' in pipeline_exports else False # mouse is determined by the array_type match, but you can suppress creating this file here do_mouse = True if 'mouse' in pipeline_exports else False save_control = True if 'control' in pipeline_exports else False if file_format == 'parquet': try: pd.DataFrame().to_parquet() except AttributeError(): LOGGER.error("parquet is not installed in your environment; reverting to pickle format") file_format = 'pickle' suffix = 'parquet' if file_format == 'parquet' else 'pkl' LOGGER.info('Running pipeline in: %s', data_dir) if bit not in ('float64','float32','float16'): raise ValueError("Input 'bit' must be one of ('float64','float32','float16') or ommitted.") if sample_name: LOGGER.info('Sample names: {0}'.format(sample_name)) if make_sample_sheet: create_sample_sheet(data_dir) try: sample_sheet = get_sample_sheet(data_dir, filepath=sample_sheet_filepath) except Exception as e: # e will be 'Too many sample sheets in this directory.' instructions = check_array_folders(data_dir, verbose=True) # prints instructions for GEO multi-array data packages. if instructions != []: instructions = '\n'.join(instructions) print(f"This folder contains idats for multiple types of arrays. Run each array separately:\n{instructions}") sys.exit(0) raise Exception(e) samples = sample_sheet.get_samples() if sample_sheet.renamed_fields != {}: show_fields = [] for k,v in sample_sheet.renamed_fields.items(): if v != k: show_fields.append(f"{k} --> {v}") else: show_fields.append(f"{k}") LOGGER.info(f"Found {len(show_fields)} additional fields in sample_sheet:\n{' | '.join(show_fields)}") if sample_name is not None: if not isinstance(sample_name,(list,tuple)): raise SystemExit(f"sample_name must be a list of sample_names") matched_samples = [sample.name for sample in samples if sample.name in sample_name] if set(matched_samples) != set(sample_name): possible_sample_names = [sample.name for sample in samples] unmatched_samples = [_sample for _sample in sample_name if _sample not in possible_sample_names] raise SystemExit(f"Your sample_name filter does not match the samplesheet; these samples were not found: {unmatched_samples}") batches = [] batch = [] sample_id_counter = 1 if batch_size: if type(batch_size) != int or batch_size < 1: raise ValueError('batch_size must be an integer greater than 0') for sample in samples: if sample_name and sample.name not in sample_name: continue # batch uses Sample_Name, so ensure these exist if sample.name in (None,''): sample.name = f'Sample_{sample_id_counter}' sample_id_counter += 1 # and are unique. if Counter((s.name for s in samples)).get(sample.name) > 1: sample.name = f'{sample.name}_{sample_id_counter}' sample_id_counter += 1 if len(batch) < batch_size: batch.append(sample.name) else: batches.append(batch) batch = [] batch.append(sample.name) batches.append(batch) else: for sample in samples: if sample_name and sample.name not in sample_name: continue # batch uses Sample_Name, so ensure these exist if sample.name in (None,''): sample.name = f'Sample_{sample_id_counter}' sample_id_counter += 1 # and are unique. if Counter((s.name for s in samples)).get(sample.name) > 1: sample.name = f'{sample.name}_{sample_id_counter}' sample_id_counter += 1 batch.append(sample.name) batches.append(batch) temp_data_pickles = [] control_snps = {} #data_containers = [] # returned when this runs in interpreter, and < 200 samples # v1.3.0 memory fix: save each batch_data_containers object to disk as temp, then load and combine at end. # 200 samples still uses 4.8GB of memory/disk space (float64) missing_probe_errors = {'noob': [], 'raw':[]} for batch_num, batch in enumerate(batches, 1): idat_datasets = parse_sample_sheet_into_idat_datasets(sample_sheet, sample_name=batch, from_s3=None, meta_only=False, bit=bit) # replaces get_raw_datasets # idat_datasets are a list; each item is a dict of {'green_idat': ..., 'red_idat':..., 'array_type', 'sample'} to feed into SigSet #--- pre v1.5 --- raw_datasets = get_raw_datasets(sample_sheet, sample_name=batch) if array_type is None: # use must provide either the array_type or manifest_filepath. array_type = get_array_type(idat_datasets) manifest = Manifest(array_type, manifest_filepath) # this allows each batch to be a different array type; but not implemented yet. common with older GEO sets. batch_data_containers = [] export_paths = set() # inform CLI user where to look for idat_dataset_pair in tqdm(idat_datasets, total=len(idat_datasets), desc="Processing samples"): data_container = SampleDataContainer( idat_dataset_pair=idat_dataset_pair, manifest=manifest, retain_uncorrected_probe_intensities=save_uncorrected, bit=bit, switch_probes=(do_infer_channel_switch or sesame), # this applies all sesame-specific options quality_mask= (quality_mask or sesame or False), # this applies all sesame-specific options (beta / noob offsets too) do_noob=(do_noob if do_noob != None else True), # None becomes True, but make_pipeline can override with False pval=poobah, #defaults to False as of v1.4.0 poobah_decimals=poobah_decimals, poobah_sig=poobah_sig, do_nonlinear_dye_bias=do_nonlinear_dye_bias, # start of run_pipeline sets this to True, False, or None debug=kwargs.get('debug',False), sesame=sesame, pneg_ecdf=pneg_ecdf, file_format=file_format, ) data_container.process_all() if export: # as CSV or parquet suffix = 'parquet' if file_format == 'parquet' else 'csv' output_path = data_container.sample.get_export_filepath(extension=suffix) data_container.export(output_path) export_paths.add(output_path) # this tidies-up the tqdm by moving errors to end of batch warning. if data_container.noob_processing_missing_probe_errors != []: missing_probe_errors['noob'].extend(data_container.noob_processing_missing_probe_errors) if data_container.raw_processing_missing_probe_errors != []: missing_probe_errors['raw'].extend(data_container.raw_processing_missing_probe_errors) if save_control: # Process and consolidate now. Keep in memory. These files are small. sample_id = f"{data_container.sample.sentrix_id}_{data_container.sample.sentrix_position}" control_df = one_sample_control_snp(data_container) control_snps[sample_id] = control_df # now I can drop all the unneeded stuff from each SampleDataContainer (400MB per sample becomes 92MB) # these are stored in SampleDataContainer.__data_frame for processing. if low_memory is True: # use data_frame values instead of these class objects, because they're not in sesame SigSets. del data_container.man del data_container.snp_man del data_container.ctl_man del data_container.green_idat del data_container.red_idat del data_container.data_channel del data_container.methylated del data_container.unmethylated del data_container.oobG del data_container.oobR del data_container.ibG del data_container.ibR batch_data_containers.append(data_container) #if str(data_container.sample) == '200069280091_R01C01': # print(f"200069280091_R01C01 -- cg00035864 -- meth -- {data_container._SampleDataContainer__data_frame['meth']['cg00035864']}") # print(f"200069280091_R01C01 -- cg00035864 -- unmeth -- {data_container._SampleDataContainer__data_frame['unmeth']['cg00035864']}") if kwargs.get('debug'): LOGGER.info('[finished SampleDataContainer processing]') def _prepare_save_out_file(df, file_stem, uint16=False): out_name = f"{file_stem}_{batch_num}" if batch_size else file_stem if uint16 and file_format != 'parquet': df = df.astype('float32') if df.isna().sum().sum() > 0 else df.astype('uint16') else: df = df.astype('float32') if df.shape[1] > df.shape[0]: df = df.transpose() # put probes as columns for faster loading. # sort sample names df = df.sort_index().reindex(sorted(df.columns), axis=1) if file_format == 'parquet': # put probes in rows; format is optimized for same-type storage so it won't really matter df.to_parquet(Path(data_dir,f"{out_name}.parquet")) else: df.to_pickle(Path(data_dir, f"{out_name}.pkl")) LOGGER.info(f"saved {out_name}") if betas: df = consolidate_values_for_sheet(batch_data_containers, postprocess_func_colname='beta_value', bit=bit, poobah=poobah, exclude_rs=True) _prepare_save_out_file(df, 'beta_values') if m_value: df = consolidate_values_for_sheet(batch_data_containers, postprocess_func_colname='m_value', bit=bit, poobah=poobah, exclude_rs=True) _prepare_save_out_file(df, 'm_values') if (do_save_noob is not False) or betas or m_value: df = consolidate_values_for_sheet(batch_data_containers, postprocess_func_colname='noob_meth', bit=bit, poobah=poobah, exclude_rs=True) _prepare_save_out_file(df, 'noob_meth_values', uint16=True) df = consolidate_values_for_sheet(batch_data_containers, postprocess_func_colname='noob_unmeth', bit=bit, poobah=poobah, exclude_rs=True) _prepare_save_out_file(df, 'noob_unmeth_values', uint16=True) if save_uncorrected: df = consolidate_values_for_sheet(batch_data_containers, postprocess_func_colname='meth', bit=bit, poobah=False, exclude_rs=True) _prepare_save_out_file(df, 'meth_values', uint16=True) df = consolidate_values_for_sheet(batch_data_containers, postprocess_func_colname='unmeth', bit=bit, poobah=False, exclude_rs=True) _prepare_save_out_file(df, 'unmeth_values', uint16=True) if manifest.array_type == ArrayType.ILLUMINA_MOUSE and do_mouse: # save mouse specific probes if not batch_size: mouse_probe_filename = f'mouse_probes.{suffix}' else: mouse_probe_filename = f'mouse_probes_{batch_num}.{suffix}' consolidate_mouse_probes(batch_data_containers, Path(data_dir, mouse_probe_filename), file_format) LOGGER.info(f"saved {mouse_probe_filename}") if export: export_path_parents = list(set([str(Path(e).parent) for e in export_paths])) LOGGER.info(f"[!] Exported results ({file_format}) to: {export_path_parents}") if export_poobah: if all(['poobah_pval' in e._SampleDataContainer__data_frame.columns for e in batch_data_containers]): # this option will save pvalues for all samples, with sample_ids in the column headings and probe names in index. # this sets poobah to false in kwargs, otherwise some pvalues would be NaN I think. df = consolidate_values_for_sheet(batch_data_containers, postprocess_func_colname='poobah_pval', bit=bit, poobah=False, poobah_sig=poobah_sig, exclude_rs=True) _prepare_save_out_file(df, 'poobah_values') if all(['pNegECDF_pval' in e._SampleDataContainer__data_frame.columns for e in batch_data_containers]): # this option will save negative control based pvalues for all samples, with # sample_ids in the column headings and probe names in index. df = consolidate_values_for_sheet(batch_data_containers, postprocess_func_colname='pNegECDF_pval', bit=bit, poobah=False, poobah_sig=poobah_sig, exclude_rs=True) _prepare_save_out_file(df, 'pNegECDF_values') # v1.3.0 fixing mem problems: pickling each batch_data_containers object then reloading it later. # consolidating data_containers this will break with really large sample sets, so skip here. #if batch_size and batch_size >= 200: # continue #data_containers.extend(batch_data_containers) pkl_name = f"_temp_data_{batch_num}.pkl" with open(Path(data_dir,pkl_name), 'wb') as temp_data: pickle.dump(batch_data_containers, temp_data) temp_data_pickles.append(pkl_name) del batch_data_containers if meta_data_frame == True: meta_frame = sample_sheet.build_meta_data(samples) if file_format == 'parquet': meta_frame_filename = f'sample_sheet_meta_data.parquet' meta_frame.to_parquet(Path(data_dir, meta_frame_filename)) else: meta_frame_filename = f'sample_sheet_meta_data.pkl' meta_frame.to_pickle(Path(data_dir, meta_frame_filename)) LOGGER.info(f"saved {meta_frame_filename}") # FIXED in v1.3.0 if save_control: if file_format == 'parquet': control_filename = f'control_probes.parquet' control = pd.concat(control_snps) # creates multiindex (control.reset_index() .rename(columns={'level_0': 'Sentrix_ID', 'level_1': 'IlmnID'}) .astype({'IlmnID':str}) .to_parquet('control_probes.parquet') ) else: control_filename = f'control_probes.pkl' with open(Path(data_dir, control_filename), 'wb') as control_file: pickle.dump(control_snps, control_file) LOGGER.info(f"saved {control_filename}") # summarize any processing errors if missing_probe_errors['noob'] != []: avg_missing_per_sample = int(round(sum([item[1] for item in missing_probe_errors['noob']])/len(missing_probe_errors['noob']))) samples_affected = len(set([item[0] for item in missing_probe_errors['noob']])) LOGGER.warning(f"{samples_affected} samples were missing (or had infinite values) NOOB meth/unmeth probe values (average {avg_missing_per_sample} per sample)") if missing_probe_errors['raw'] != []: avg_missing_per_sample = int(round(sum([item[1] for item in missing_probe_errors['raw']])/len(missing_probe_errors['raw']))) samples_affected = len(set([item[0] for item in missing_probe_errors['raw']])) LOGGER.warning(f"{samples_affected} samples were missing (or had infinite values) RAW meth/unmeth probe values (average {avg_missing_per_sample} per sample)") # batch processing done; consolidate and return data. This uses much more memory, but not called if in batch mode. if batch_size and batch_size >= 200: LOGGER.warning("Because the batch size was >=200 samples, files are saved but no data objects are returned.") del batch_data_containers for temp_data in temp_data_pickles: temp_file = Path(data_dir, temp_data) temp_file.unlink(missing_ok=True) # delete it return # consolidate batches and delete parts, if possible for file_type in ['beta_values', 'm_values', 'meth_values', 'unmeth_values', 'noob_meth_values', 'noob_unmeth_values', 'mouse_probes', 'poobah_values']: # control_probes.pkl not included yet test_parts = list([str(temp_file) for temp_file in Path(data_dir).rglob(f'{file_type}*.{suffix}')]) num_batches = len(test_parts) # ensures that only the file_types that appear to be selected get merged. #print(f"DEBUG num_batches {num_batches}, batch_size {batch_size}, file_type {file_type}") if batch_size and num_batches >= 1: #--- if the batch size was larger than the number of total samples, this will still drop the _1 merge_batches(num_batches, data_dir, file_type, file_format) # reload all the big stuff -- after everything important is done. # attempts to consolidate all the batch_files below, if they'll fit in memory. data_containers = [] for temp_data in temp_data_pickles: temp_file = Path(data_dir, temp_data) if temp_file.exists(): #possibly user deletes file while processing, since these are big with open(temp_file,'rb') as _file: batch_data_containers = pickle.load(_file) data_containers.extend(batch_data_containers) del batch_data_containers temp_file.unlink() # delete it after loading. if betas: return consolidate_values_for_sheet(data_containers, postprocess_func_colname='beta_value', poobah=poobah, exclude_rs=True) elif m_value: return consolidate_values_for_sheet(data_containers, postprocess_func_colname='m_value', poobah=poobah, exclude_rs=True) else: return data_containers
[docs]class SampleDataContainer(SigSet): """Wrapper that provides easy access to red+green idat datasets, the sample, manifest, and processing params. Arguments: raw_dataset {RawDataset} -- A sample's RawDataset for a single well on the processed array. manifest {Manifest} -- The Manifest for the correlated RawDataset's array type. bit (default: float32) -- option to store data as float16 or float32 to save space. pval (default: False) -- whether to apply p-value-detection algorithm to remove unreliable probes (based on signal/noise ratio of fluoresence) uses the sesame method (pOOBah) based on out of band background levels Jan 2020: added .snp_(un)methylated property. used in postprocess.consolidate_crontrol_snp() Mar 2020: added p-value detection option Mar 2020: added mouse probe post-processing separation June 2020: major refactor to use SigSet, like sesame. Removed raw_dataset and methylationDataset. - SigSet is now a Super-class of SampleDataContainer. """ __data_frame = None __quality_mask_excluded_probes = None noob_processing_missing_probe_errors = [] raw_processing_missing_probe_errors = [] def __init__(self, idat_dataset_pair, manifest=None, retain_uncorrected_probe_intensities=False, bit='float32', pval=False, poobah_decimals=3, poobah_sig=0.05, do_noob=True, quality_mask=True, switch_probes=True, do_nonlinear_dye_bias=True, debug=False, sesame=True, pneg_ecdf=False, file_format='csv'): self.debug = debug self.do_noob = do_noob self.pval = pval self.poobah_decimals = poobah_decimals self.poobah_sig = poobah_sig self.quality_mask = quality_mask # if True, filters sesame's standard sketchy probes out of 450k, EPIC, EPIC+ arrays. self.switch_probes = switch_probes self.do_nonlinear_dye_bias = do_nonlinear_dye_bias self.green_idat = idat_dataset_pair['green_idat'] self.red_idat = idat_dataset_pair['red_idat'] self.sample = idat_dataset_pair['sample'] self.retain_uncorrected_probe_intensities=retain_uncorrected_probe_intensities self.sesame = sesame # defines offsets in functions # pneg_ecdf defines if negative control based pvalue is calculated - will use poobah_decimals for rounding self.pneg_ecdf = pneg_ecdf self.data_type = 'float32' if bit == None else bit # options: (float64, float32, or float16) self.file_format = file_format if debug: print(f'DEBUG SDC: sesame {self.sesame} switch {self.switch_probes} noob {self.do_noob} poobah {self.pval} mask {self.quality_mask}, dye {self.do_nonlinear_dye_bias}') self.manifest = manifest # used by inter_channel_switch only. if self.switch_probes: # apply inter_channel_switch here; uses raw_dataset and manifest only; then updates self.raw_dataset # these are read from idats directly, not SigSet, so need to be modified at source. infer_type_I_probes(self, debug=self.debug) super().__init__(self.sample, self.green_idat, self.red_idat, self.manifest, self.debug) # SigSet defines all probe-subsets, then SampleDataContainer adds them with super(); no need to re-define below. # mouse probes are processed within the normals meth/unmeth sets, then split at end of preprocessing step. del self.manifest del manifest if self.data_type not in ('float64','float32','float16'): raise ValueError(f"invalid data_type: {self.data_type} should be one of ('float64','float32','float16')") if self.debug: print(f"DEBUG SDC params:") lists = ['red_switched','green_switched'] exclude = ['data_channel', 'man', 'snp_man', 'ctl_man', 'address_code', 'ctrl_green', 'ctrl_red', 'II', 'IG', 'IR', 'oobG', 'oobR', 'methylated', 'unmethylated', 'snp_methylated', 'snp_unmethylated', 'ibG', 'ibR', 'mouse_probes_mask', ] for key,value in self.__dict__.items(): if key in exclude: try: print(f"-- {key}: {value.shape}") except: print(f"-- skipping {key}") elif key in lists: print(f"-- {key}: {len(value)}") else: print(f"-- {key}: {value}") self.check_for_probe_loss()
[docs] def process_all(self): """Runs all pre and post-processing calculations for the dataset. Combines the SigSet methylated and unmethylated parts of SampleDataContainer, and modifies them, whilst creating self.__data_frame with noob/dye processed data. Order: - poobah - quality_mask - noob (background correction) - build data_frame - nonlinear dye-bias correction - reduce memory/bit-depth of data - copy over uncorrected values - split out mouse probes """ # self.preprocess -- applies BG_correction and NOOB to .methylated, .unmethylated # also creates a self.mouse_data_frame for mouse specific probes with 'noob_meth' and 'noob_unmeth' columns here. if self.__data_frame: return self.__data_frame pval_probes_df = _pval_sesame_preprocess(self) if self.pval == True else None pneg_ecdf_probes_df = _pval_neg_ecdf(self) if self.pneg_ecdf == True else None # output: df with one column named 'poobah_pval' quality_mask_df = _apply_sesame_quality_mask(self) if self.quality_mask == True else None # output: df with one column named 'quality_mask' | if not supported array / custom array: returns nothing. if self.do_noob == True: # apply corrections: bg subtract, then noob (in preprocess.py) preprocess_noob(self, pval_probes_df=pval_probes_df, quality_mask_df=quality_mask_df, nonlinear_dye_correction=self.do_nonlinear_dye_bias, debug=self.debug) #if self.sesame in (None,True): #preprocess_noob(self, pval_probes_df=pval_probes_df, quality_mask_df=quality_mask_df, nonlinear_dye_correction=self.do_nonlinear_dye_bias, debug=self.debug) #if container.__dye_bias_corrected is False: # process failed, so fallback is linear-dye # print(f'ERROR preprocess_noob sesame-dye: linear_dye_correction={self.do_nonlinear_dye_bias}') # preprocess_noob(self, linear_dye_correction = True) #if self.sesame is False: # match minfi legacy settings #preprocess_noob(self, pval_probes_df=pval_probes_df, quality_mask_df=quality_mask_df, nonlinear_dye_correction=self.do_nonlinear_dye_bias, debug=self.debug) if self._SigSet__preprocessed is False: raise ValueError("preprocessing did not run") # nonlinear_dye_correction is done below, but if sesame if false, revert to previous linear dye method here. self.methylated = self.methylated.rename(columns={'noob_Meth':'noob'}).drop(columns=['used','Unmeth', 'noob_Unmeth']) self.unmethylated = self.unmethylated.rename(columns={'noob_Unmeth':'noob'}).drop(columns=['used','Meth', 'noob_Meth']) else: # renaming will make dye_bias work later; or I track here and pass in kwargs to dye bias for column names self.methylated = self.methylated[['Meth']].astype('float32').round(0) #.rename(columns={'mean_value':'noob'}) self.unmethylated = self.unmethylated[['Unmeth']].astype('float32').round(0) #.rename(columns={'mean_value':'noob'}) if self.debug: LOGGER.info('SDC data_frame already exists.') #--- happens with make_pipeline('.',steps=[]) if set(self.unmethylated.index) - set(self.methylated.index) != set(): LOGGER.warning(f"Dropping mismatched probes: {set(self.unmethylated.index) - set(self.methylated.index)}") if set(self.methylated.index) - set(self.unmethylated.index) != set(): LOGGER.warning(f"Dropping mismatched probes: {set(self.methylated.index) - set(self.unmethylated.index)}") try: # index: IlmnID | has A | B | Unmeth | Meth | noob_meth | noob_unmeth -- no control or snp probes included self.__data_frame = self.methylated.join( self.unmethylated.drop(columns=['AddressA_ID','AddressB_ID']), lsuffix='_meth', rsuffix='_unmeth', how='inner').drop(columns=['AddressA_ID','AddressB_ID']) # 'inner' join is necessary to avoid dye-bias getting duplicate probes if mismatched data. except KeyError: # for steps=[] self.__data_frame = self.methylated.join( self.unmethylated, lsuffix='_meth', rsuffix='_unmeth', how='inner') # noob did not run, but copying data into new column so steps won't break self.__data_frame['noob_meth'] = self.__data_frame['Meth'] self.__data_frame['noob_unmeth'] = self.__data_frame['Unmeth'] if self.pval == True and isinstance(pval_probes_df, pd.DataFrame): pval_probes_df = pval_probes_df.loc[ ~pval_probes_df.index.duplicated() ] self.__data_frame = self.__data_frame.join(pval_probes_df, how='inner') if self.pneg_ecdf == True and isinstance(pneg_ecdf_probes_df, pd.DataFrame): pneg_ecdf_probes_df = pneg_ecdf_probes_df.loc[ ~pneg_ecdf_probes_df.index.duplicated() ] self.__data_frame = self.__data_frame.join(pneg_ecdf_probes_df, how='inner') self.check_for_probe_loss(f"preprocess_noob sesame={self.sesame} --> {self.methylated.shape} {self.unmethylated.shape}") if self.quality_mask == True and isinstance(quality_mask_df, pd.DataFrame): self.__data_frame = self.__data_frame.join(quality_mask_df, how='inner') if self.do_nonlinear_dye_bias == True: nonlinear_dye_bias_correction(self, debug=self.debug) # this step ensures that failed probes are not included in the NOOB calculations. # but they MUST be included in CSV exports, so I move the failed probes to another df for storage until pipeline.export() needs them. if self.quality_mask == True and 'quality_mask' in self.__data_frame.columns: self.__quality_mask_excluded_probes = self.__data_frame.loc[self.__data_frame['quality_mask'].isna(), ['noob_meth','noob_unmeth']] self.__data_frame.loc[self.__data_frame['quality_mask'].isna(), 'noob_meth'] = np.nan self.__data_frame.loc[self.__data_frame['quality_mask'].isna(), 'noob_unmeth'] = np.nan # Downcasting to 'unsigned' uses the smallest possible integer that can hold the values, but need to retain dtypes if self.__data_frame['Meth'].isna().sum() == 0 and self.__data_frame['Unmeth'].isna().sum() == 0: self.__data_frame['Meth'] = self.__data_frame['Meth'].apply(pd.to_numeric, downcast='unsigned') self.__data_frame['Unmeth'] = self.__data_frame['Unmeth'].apply(pd.to_numeric, downcast='unsigned') for column in self.__data_frame.columns: if column in ['Meth','Unmeth']: continue if self.__data_frame[column].isna().sum() == 0: # and self.__data_frame[column].dtype self.__data_frame[column] = self.__data_frame[column].apply(pd.to_numeric, downcast='unsigned') if self.retain_uncorrected_probe_intensities == False: self.__data_frame = self.__data_frame.drop(columns=['Meth','Unmeth']) else: self.__data_frame = self.__data_frame.rename(columns={'Meth':'meth', 'Unmeth':'unmeth'}) # reduce to float32 during processing. final output may be 16,32,64 in _postprocess() + export() #self.__data_frame = self.__data_frame.astype('float32') --- downcast takes care of this #if self.poobah_decimals != 3 and 'poobah_pval' in self.__data_frame.columns: # other_columns = list(self.__data_frame.columns) # other_columns.remove('poobah_pval') # other_columns = {column:3 for column in other_columns} # #self.__data_frame = self.__data_frame.round(other_columns) # self.__data_frame = self.__data_frame.round({'poobah_pval': self.poobah_decimals}) #else: # self.__data_frame = self.__data_frame.round(3) self.__data_frame = self.__data_frame.round({'poobah_pval': self.poobah_decimals}) # here, separate the mouse from normal probes and store mouse experimental probes separately. # normal_probes_mask = (self.manifest.data_frame.index.str.startswith('cg', na=False)) | (self.manifest.data_frame.index.str.startswith('ch', na=False)) # v2_mouse_probes_mask = (self.manifest.data_frame.index.str.startswith('mu', na=False)) | (self.manifest.data_frame.index.str.startswith('rp', na=False)) # v4 mouse_probes_mask pre-v1.4.6: ( (self.manifest.data_frame['Probe_Type'] == 'mu') | (self.manifest.data_frame['Probe_Type'] == 'rp') | self.manifest.data_frame.index.str.startswith('uk', na=False) ) if self.array_type == ArrayType.ILLUMINA_MOUSE: mouse_probes = self.man[self.mouse_probes_mask] mouse_probe_count = mouse_probes.shape[0] else: mouse_probes = pd.DataFrame() mouse_probe_count = 0 self.mouse_data_frame = self.__data_frame[self.__data_frame.index.isin(mouse_probes.index)] if mouse_probe_count > 0: if self.debug: LOGGER.info(f"{mouse_probe_count} mouse probes ->> {self.mouse_data_frame.shape[0]} in idat") # add 'design' column to mouse_data_frame, so it appears in the output. -- needed for 'Random' and 'Multi' filter # matches manifest [IlmnID] to df.index # NOTE: other manifests have no 'design' column, so avoiding this step with them. probe_designs = self.man[['design']] self.mouse_data_frame = self.mouse_data_frame.join(probe_designs, how='inner') # now remove these from normal list. confirmed they appear in the processed.csv if this line is not here. self.__data_frame = self.__data_frame[~self.__data_frame.index.isin(mouse_probes.index)] # finally, sort probes -- note: uncommenting this step breaks beta/m_value calcs in testing. Some downstream function depends on the probe_order staying same. # --- must fix all unit tests using .iloc[ before this will work | fixed. self.__data_frame.sort_index(inplace=True) ###### end preprocessing ###### if hasattr(self, '_SampleDataContainer__quality_mask_excluded_probes') and isinstance(self._SampleDataContainer__quality_mask_excluded_probes, pd.DataFrame): # these probes are not used in processing, but should appear in the final CSV. # and if quality_mask is off, it should skip this step. # later: consoldate() should exclude these probes from pickles self.__data_frame.update({ 'noob_meth': self.__quality_mask_excluded_probes['noob_meth'], 'noob_unmeth': self.__quality_mask_excluded_probes['noob_unmeth'] }) self.__data_frame = self.process_beta_value(self.__data_frame) self.__data_frame = self.process_m_value(self.__data_frame) if self.debug: self.check_for_probe_loss(f"816 self.check_for_probe_loss(): self.__data_frame = {self.__data_frame.shape}") if self.array_type == ArrayType.ILLUMINA_MOUSE: self.mouse_data_frame = self.process_beta_value(self.mouse_data_frame) self.mouse_data_frame = self.process_m_value(self.mouse_data_frame) self.mouse_data_frame = self.process_copy_number(self.mouse_data_frame) return # self.__data_frame
[docs] def process_m_value(self, input_dataframe): """Calculate M value from methylation data""" return self._postprocess(input_dataframe, calculate_m_value, 'm_value')
[docs] def process_beta_value(self, input_dataframe, quality_mask_probes=None): """Calculate Beta value from methylation data""" if self.sesame == False: offset=100 # minfi code suggest offset of 100, but empirically, seems like the unit tests match 0 instead. else: offset=0 # make_pipeline uses sesame=None return self._postprocess(input_dataframe, calculate_beta_value, 'beta_value', offset)
[docs] def process_copy_number(self, input_dataframe): """Calculate copy number value from methylation data""" return self._postprocess(input_dataframe, calculate_copy_number, 'cm_value')
[docs] def export(self, output_path): """Saves a CSV for each sample with all processing intermediate data""" ensure_directory_exists(output_path) # ensure smallest possible csv files self.__data_frame = self.__data_frame.round({'noob_meth':0, 'noob_unmeth':0, 'm_value':3, 'beta_value':3, 'meth':0, 'unmeth':0, 'poobah_pval':self.poobah_decimals}) this = self.__data_frame.copy(deep=True) if hasattr(self, '_SampleDataContainer__quality_mask_excluded_probes') and isinstance(self._SampleDataContainer__quality_mask_excluded_probes, pd.DataFrame): # copy over these failed probes to a dataframe for export this.update({ 'noob_meth': self.__quality_mask_excluded_probes['noob_meth'], 'noob_unmeth': self.__quality_mask_excluded_probes['noob_unmeth'] }) if 'quality_mask' in this.columns: this['quality_mask'] = this['quality_mask'].fillna(1) # noob columns contain NANs now because of sesame (v1.4.0 to v1.4.5); v1.4.6+ CSVs contain all data, but pickles are filtered. #try: # self.__data_frame['noob_meth'] = self.__data_frame['noob_meth'].astype(int, copy=False) # self.__data_frame['noob_unmeth'] = self.__data_frame['noob_unmeth'].astype(int, copy=False) #except ValueError as e: #if 'quality_mask' in self.__data_frame.columns: # num_missing = self.__data_frame[ ~self.__data_frame['quality_mask'].isna() ]['noob_unmeth'].isna().sum() + self.__data_frame[ ~self.__data_frame['quality_mask'].isna() ]['noob_meth'].isna().sum() #else: # num_missing = self.__data_frame['noob_unmeth'].isna().sum() + self.__data_frame['noob_meth'].isna().sum() #if num_missing > 0: # self.noob_processing_missing_probe_errors.append((output_path, num_missing)) # these are the raw, uncorrected values, replaced by sesame quality_mask as NANs #if 'meth' in self.__data_frame.columns and 'unmeth' in self.__data_frame.columns: # try: # self.__data_frame['meth'] = self.__data_frame['meth'] # .astype('float16', copy=False) # --- float16 was changing these values, so not doing this step. # self.__data_frame['unmeth'] = self.__data_frame['unmeth'] # .astype('float16', copy=False) # except ValueError as e: # if 'quality_mask' in self.__data_frame.columns: # num_missing = self.__data_frame[ ~self.__data_frame['quality_mask'].isna() ]['unmeth'].isna().sum() + self.__data_frame[ ~self.__data_frame['quality_mask'].isna() ]['meth'].isna().sum() # else: # num_missing = self.__data_frame['meth'].isna().sum() + self.__data_frame['unmeth'].isna().sum() # self.raw_processing_missing_probe_errors.append((output_path, num_missing)) if self.file_format == 'parquet': this.to_parquet(output_path) else: this.to_csv(output_path)
def _postprocess(self, input_dataframe, postprocess_func, header, offset=None): if offset is not None: input_dataframe[header] = postprocess_func( input_dataframe['noob_meth'].values, input_dataframe['noob_unmeth'].values, offset=offset ) else: input_dataframe[header] = postprocess_func( input_dataframe['noob_meth'].values, input_dataframe['noob_unmeth'].values, ) if self.data_type != 'float32': #np.seterr(over='raise', divide='raise') try: LOGGER.debug('Converting %s to %s: %s', header, self.data_type, self.sample) input_dataframe[header] = input_dataframe[header].astype(self.data_type) except Exception as e: LOGGER.warning(f'._postprocess: {e}') LOGGER.info('%s failed for %s, using float32 instead: %s', self.data_type, header, self.sample) input_dataframe[header] = input_dataframe[header].astype('float32') return input_dataframe
[docs]def make_pipeline(data_dir='.', steps=None, exports=None, estimator='beta', **kwargs): """Specify a list of processing steps for run_pipeline, then instantiate and run that pipeline. steps: list of processing steps ['all', 'infer_channel_switch', 'poobah', 'quality_mask', 'noob', 'dye_bias'] exports: list of files to be saved; anything not specified is not saved; ['all'] saves everything. ['all', 'csv', 'poobah', 'meth', 'unmeth', 'noob_meth', 'noob_unmeth', 'sample_sheet_meta_data', 'mouse', 'control'] estimator: which final format? [beta | m_value | copy_number | None (returns containers instead)] This feeds a Class that runs the run_pipeline function of transforms with a final estimator. It replaces all of the kwargs that are in run_pipeline() and adds a few more options: [steps] -- you can set all of these with ['all'] or any combination of these in a list of steps: Also note that adding "sesame=True" to kwargs will enable: infer_channel_switch, poobah, quality_mask, noob, dye_bias 'infer_channel_switch' 'poobah' 'quality_mask' 'noob' 'dye_bias' -- specifying this select's sesame's nonlinear-dye-bias correction. Omitting causes NOOB to use minfi's linear-dye-correction, unless NOOB is missing. [exports] export=False, make_sample_sheet=False, export_poobah=False, save_uncorrected=False, save_control=False, meta_data_frame=True, [final estimator] -- default: return list of sample data containers. betas=False, m_value=False, -copy_number- You may override that by specifying `estimator`= ('betas' or 'm_value'). [how it works] make_pipeline calls run_pipeline(), which has a **kwargs final keyword that maps many additional esoteric settings that you can define here. These are used for more granular unit testing on methylsuite, but could allow you to change how data is processed in very fine-tuned ways. The rest of these are additional optional kwargs you can include: [inputs] -- omitting these kwargs will assume the defaults, as shown below data_dir, array_type=None, manifest_filepath=None, sample_sheet_filepath=None, sample_name=None, [processing] -- omitting these kwargs will assume the defaults, as shown below batch_size=None, --- if you have low RAM memory or >500 samples, you might need to process the batch in chunks. bit='float32', --- float16 or float64 also supported for higher/lower memory/disk usage low_memory=True, --- If True, processing deletes intermediate objects. But you can save them in the SampleDataContainer by setting this to False. poobah_decimals=3 --- in csv file output poobah_sig=0.05 [logging] -- how much information do you want on the screen? Default is minimal information. verbose=False (True for more) debug=False (True for a LOT more info) """ allowed_steps = ['all', 'infer_channel_switch', 'poobah', 'quality_mask', 'noob', 'dye_bias'] allowed_exports = ['all', 'csv', 'poobah', 'meth', 'unmeth', 'noob_meth', 'noob_unmeth', 'sample_sheet_meta_data', 'mouse', 'control'] allowed_estimators = ['betas', 'beta', 'm_value', 'copy_number', None] if steps is None: steps = [] if exports is None: exports = [] if not isinstance(steps,(tuple, list)) and not set(steps).issubset(set(allowed_steps)): raise ValueError(f"steps, the first argument, must be a list or tuple of names of allowed processing steps: {allowed_steps} or ['all']; you said {steps}") if not isinstance(exports,(tuple, list)) and not set(exports).issubset(set(allowed_exports)): raise ValueError(f"[exports] must be a list or tuple of names of allowed processing steps: {allowed_exports}, or ['all']; you said {exports}") if estimator not in set(allowed_estimators): raise ValueError(f"Your chosen final estimator must be one of these: {allowed_estimators}; you said {estimator}") if estimator == 'copy_number': raise ValueError("copy_number is not yet suppported. (You can get it in the code, but not with make_pipelines)") if estimator in ('betas','beta'): kwargs['betas'] = True if estimator == 'm_value': kwargs['m_value'] = True return run_pipeline(data_dir, pipeline_steps=steps, pipeline_exports=exports, **kwargs)