Source code for methylprep.download.geo

# Lib
import logging
from ftplib import FTP
import ftplib
import socket
from pathlib import Path, PurePath
import os
from tarfile import ReadError
import re
import zipfile
import gzip
import tarfile
import zlib
import shutil
from bs4 import BeautifulSoup
import dateutil # python-dateutil, non-built-in
import datetime
import pickle
from urllib.request import urlopen
import pandas as pd
from tqdm import tqdm
from collections import Counter
import sys

# unique to find_betas_any_source()...
try:
    import methylcheck # not required generally, but needed for one of these functions; handled within.
    # geo.py uses .load, .read_geo, and .read_geo_processed
except ImportError:
    pass

try:
    from pandas.errors import ParserError
except ImportError:
    try:
        from pandas.io.parsers import ParserError
    except Exception:
        class ParserError(ValueError):
            """
            Exception that is raised by an error encountered in parsing file contents.

            This is a generic error raised for errors encountered when functions like
            `read_csv` or `read_html` are parsing contents of a file.

            See Also
            --------
            read_csv : Read CSV (comma-separated) file into a DataFrame.
            read_html : Read HTML table into a DataFrame."""
            pass
import io
import json
import time
import tempfile
import requests
import zipfile
import random
import subprocess

# app
from .miniml import sample_sheet_from_miniml, sample_sheet_from_idats, convert_miniml
# cannot relative-import here because process_data uses geo.py.
#from .process_data import confirm_dataset_contains_idats, get_attachment_info, run_series
import methylprep.download.process_data

__all__ = [
    'geo_download',
    'pipeline_find_betas_any_source',
    'geo_metadata',
    'download_geo_processed',
    'search',
]


#logging.basicConfig(level=logging.DEBUG) # always verbose
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel( logging.INFO )

def geo_download(geo_id, series_path, geo_platforms, clean=True, decompress=True):
    """Downloads the IDATs and metadata for a GEO series

    Arguments:
        geo_id [required]
            the GEO Accension for the desired series (e.g. GSE134293)
        series_path [required]
            the directory to download the data to
        geo_platforms [required]
            the list of supported GEO platforms
        clean
            whether or not to delete files once they are no longer need (True by default)

    Note about GEO IDs:
        You can use the NIH online search to find data sets, then click "Send to:" at the button of a results page,
        and export a list of unique IDs as text file. These IDs are not GEO_IDs used here. First, remove the first
        three digits from the number, so Series ID: 200134293 is GEO accension ID: 134293, then include the GSE part,
        like "GSE134293" in your CLI parameters.

    This function returns True or False, depending on whether the downloaded data is correct."""
    success = True
    series_dir = Path(series_path)
    raw_filename = f"{geo_id}_RAW.tar"
    miniml_filename = f"{geo_id}_family.xml"

    if not os.path.exists(series_path):
        raise FileNotFoundError(f'{geo_id} directory not found.')

    for platform in geo_platforms:
        if not Path(f"{series_path}/{platform}").exists():
            Path(f"{series_path}/{platform}").mkdir(parents=True, exist_ok=True)

    ftp = FTP('ftp.ncbi.nlm.nih.gov', timeout=120) # 2 mins
    ftp.login()
    ftp.cwd(f"geo/series/{geo_id[:-3]}nnn/{geo_id}")

    try:
        filesize = ftp.size(f"miniml/{miniml_filename}.tgz") # -- gives 550 error because CWD puts it in ASCII mode.
    except Exception as e:
        LOGGER.error(f"ftp.size ERROR: {e}")

    def is_valid_tgz(filepath=f"{series_path}/{miniml_filename}.tgz"):
        if Path(filepath).exists():
            try:
                min_tar = tarfile.open(filepath)
                for file in min_tar.getnames():
                    pass
                return True
            except gzip.BadGzipFile:
                LOGGER.info(f"Found a family series file, but appears to be corrupted, so re-downloading.")
                Path(filepath).unlink()
                return False
            except zlib.error:
                LOGGER.info(f"Found a family series file, but appears to be corrupted (invalid stored block lengths), so re-downloading.")
                Path(filepath).unlink()
                return False
        else:
            return False # file does not exist yet, so download

    if not Path(f"{series_path}/{miniml_filename}").exists():
        # test opening it; may be corrupted
        attempt = 0
        while is_valid_tgz(f"{series_path}/{miniml_filename}.tgz") is False:
            LOGGER.info(f"Downloading {miniml_filename}")
            miniml_file = open(f"{series_path}/{miniml_filename}.tgz", 'wb')
            try:
                #filesize = ftp.size(f"miniml/{miniml_filename}.tgz") -- gives 550 error because CWD puts it in ASCII mode.
                for filename,filestats in ftp.mlsd(path="miniml", facts=["size"]):
                    if filename == miniml_filename:
                        filesize = filestats['size']
                        break
                with tqdm(unit = 'b', unit_scale = True, unit_divisor=1024, leave = False, miniters = 1, desc = geo_id, total = filesize) as tqdm_instance:
                    def tqdm_callback(data):
                        tqdm_instance.update(len(data))
                        miniml_file.write(data)

                        # check to see if download has completed
                        if (miniml_file.tell() == filesize):
                            # close down the progress bar
                            # prevents an extra ftp call which generates a timed out exception
                            tqdm_instance.close()

                    ftp.retrbinary(f"RETR miniml/{miniml_filename}.tgz", tqdm_callback, 8192)
            except Exception as e:
                LOGGER.error(e)
                LOGGER.info('Failed to create a progress bar, but retrying. It should be downloading...')
                ftp.retrbinary(f"RETR miniml/{miniml_filename}.tgz", miniml_file.write, 8192)
            miniml_file.close()
            attempt += 1
            if attempt > 1:
                LOGGER.info(f"[Attempt #{attempt}]")
            if attempt == 5:
                LOGGER.error(f"Could not download after {attempt} attempts. Aborting.")
                break

        #ftp.quit() # instead of 'close()'
        min_tar = tarfile.open(f"{series_path}/{miniml_filename}.tgz")
        for file in min_tar.getnames():
            if file == miniml_filename:
                min_tar.extract(file, path=series_path)
        min_tar.close()
        if clean:
            Path(f"{series_path}/{miniml_filename}.tgz").unlink()
    ftp.quit()

    if list(series_dir.glob('*.idat.gz')) == [] and list(series_dir.glob('**/*.idat')) == []:
        if not Path(f"{series_path}/{raw_filename}").exists():
            ftp = FTP('ftp.ncbi.nlm.nih.gov',
                      timeout=59)  # see issue https://bugs.python.org/issue30956 (must be <60s because of a bug)
            ftp.login()
            ftp.cwd(f"geo/series/{geo_id[:-3]}nnn/{geo_id}")
            raw_file = open(f"{series_path}/{raw_filename}", 'wb')
            filesize = ftp.size(f"suppl/{raw_filename}")

            import requests
            nih_root = 'https://www.ncbi.nlm.nih.gov/geo/download/'
            url = f'{nih_root}?acc={geo_id}&format=file'
            with requests.get(url, stream=True) as ss:
                ss.raise_for_status()
                filesize = int(ss.headers['Content-Length'])
                with tqdm(unit = 'b', unit_scale = True, unit_divisor=1024, leave=False, miniters=1, desc=raw_file.name, total=filesize) as tqdm_instance:
                    for chunk in ss.raw.stream(8192, decode_content=False):
                        if chunk:
                            raw_file.write(chunk)
                            tqdm_instance.update(len(chunk))
            """
            try:
                with tqdm(unit = 'b', unit_scale = True, unit_divisor=1024, leave = False, miniters = 1, desc = geo_id, total = filesize) as tqdm_instance:
                    def tqdm_callback(data):
                        # update progress bar and write out data
                        tqdm_instance.update(len(data))
                        raw_file.write(data)

                        # check to see if download has completed
                        if (raw_file.tell() == filesize):
                            # close down the progress bar
                            # prevents an extra ftp call which generates a timed out exception
                            tqdm_instance.close()

                    # make ftp data request
                    ftp.retrbinary(f"RETR suppl/{raw_filename}", tqdm_callback, 8192)

                # quit the current ftp session
                ftp.quit()

            except ftplib.all_errors as e:
                LOGGER.warning(f"FTP error: {e}")
            except Exception as e:
                LOGGER.warning(f"FTP exception: {e}")
            """
            # check the downloaded file size
            rawsize = raw_file.tell()
            if rawsize != filesize:
                LOGGER.info('geo_download sizes: FTP:{filesize} Raw:{rawsize} Diff:{filesize-rawsize}')
                LOGGER.info(f"geo_download: File {raw_filename} download failed.")
                return False
            raw_file.close()

        LOGGER.info(f"Unpacking {raw_filename}")
        try:
            tar = tarfile.open(f"{series_path}/{raw_filename}")
            # let user know if this lack idats
            if not any([(True if '.idat' in member.name else False) for member in list(tar.getmembers())]):
                file_endings = Counter([tuple(PurePath(member.name).suffixes) for member in list(tar.getmembers())])
                file_endings = [(k,v) for k,v in file_endings.most_common() if v > 1]
                LOGGER.warning(f'No idat files found in {raw_filename}. {len(list(tar.getmembers()))} files found: {file_endings}.')
                success = False
            for member in tar.getmembers():
                if re.match('.*.idat.gz', member.name):
                    tar.extract(member, path=series_path)
        except ReadError as e:
            raise ReadError(f"There appears to be an incomplete download of {geo_id}. Please delete those files and run this again.")
            success = False
        tar.close()
        if clean:
            os.remove(f"{series_path}/{raw_filename}")

    if decompress:
        for gz in series_dir.glob("*.idat.gz"):
            gz_string = str(gz)
            with gzip.open(gz_string, 'rb') as f_in:
                with open(gz_string[:-3], 'wb') as f_out:
                    shutil.copyfileobj(f_in, f_out)
            if clean:
                gz.unlink()
        LOGGER.info(f"Downloaded and unpacked {geo_id} idats")
    elif not decompress:
        LOGGER.info(f"Downloaded {geo_id} idats without decompressing")
    return success


def geo_metadata(geo_id, series_path, geo_platforms, path):
    """Reads the metadata for the given series (MINiML file) and creates a metadata dictionary and sample sheet for each platform

    Arguments:
        geo_id [required]
            the GEO Accension for the desired series
        series_path [required]
            the directory containing the series data
        geo_platforms [required]
            the list of supported GEO platforms
        path
            the path to the directory containing dictionaries for each platform

    Returns:
        A list of platforms that the series contains samples of"""
    pipeline_kwargs = {}
    miniml_filename = f"{geo_id}_family.xml"
    with open(f"{series_path}/{miniml_filename}", 'r') as fp:
        soup = BeautifulSoup(fp, "xml")

    meta_dicts = {}
    samples_dict = {}

    for platform in geo_platforms:
        meta_dicts[platform] = {}
        samples_dict[platform] = {}

    samples = soup.MINiML.find_all("Sample")
    for sample in samples:
        platform = sample.find('Platform-Ref')['ref']
        accession = sample.Accession.text
        title = sample.Title.text
        attributes_dir = {}
        attributes_dir['source'] = sample.Source.text
        attributes_dir['platform'] = platform
        attributes_dir['title'] = title
        for char in sample.find_all('Characteristics'):
            attributes_dir[char['tag']] = char.text.strip()
        if sample.find('Description'):
            attributes_dir['description'] = sample.find('Description').text.strip()

        # only some MINiML files have this.
        try:
            split_idat = sample.find('Supplementary-Data').text.split("/")[-1].split("_")
            attributes_dir['Sample_ID'] = f"{split_idat[1]}_{split_idat[2]}" # matches beta_value column names
            attributes_dir['Sentrix_ID'] = f"{split_idat[1]}"
            attributes_dir['Sentrix_Position'] = f"{split_idat[2]}"
        except:
            LOGGER.info( "MINiML file does not provide (Sentrix_ID_R00C00)" )

        if platform in geo_platforms:
            for idat in sample.find_all('Supplementary-Data'):
                if idat['type'] == 'IDAT':
                    file_name = (idat.text.split("/")[-1]).strip()[:-3]
                    if (not(Path(f"{series_path}/{file_name}").is_file())) and Path(f"{series_path}/{file_name}.gz").is_file():
                        file_name = file_name+".gz"
                    try:
                        shutil.move(f"{series_path}/{file_name}", f"{series_path}/{platform}/{file_name}")
                    except FileNotFoundError:
                        # this doesn't throw an error if file is already in the right folder
                        if not Path(f"{series_path}/{platform}/{file_name}").is_file():
                            raise FileNotFoundError (f"Could not move file {series_path}/{file_name} after downloading.")

            meta_dicts[platform][accession] = attributes_dir
            samples_dict[platform][accession] = title
        else:
            # this ought to keep other idat files from being included in the package.
            LOGGER.warning(f'Sample: {title[:40]} has unrecognized platform: {platform}; not moving data file')
    LOGGER.info(f"Found {len(attributes_dir)} tags for {len(samples)} samples: {attributes_dir}")
    fp.close()

    seen_platforms = []

    for platform in geo_platforms:
        if meta_dicts[platform]:
            meta_dict_filename = f"{geo_id}_{platform}_dict.pkl"
            pickle.dump(meta_dicts[platform], open(f"{series_path}/{meta_dict_filename}", 'wb'))
            if not os.path.exists(f"{path}/{platform}_dictionaries/{geo_id}_dict.pkl"):
                shutil.move(f"{series_path}/{meta_dict_filename}", f"{path}/{platform}_dictionaries/{geo_id}_dict.pkl")
            try:
                sample_sheet_from_miniml(geo_id, series_path, platform, samples_dict[platform], meta_dicts[platform], save_df=True)
            except ValueError:
                # in this case, the samplesheet meta data wasn't consistent, so using idat filenames instead
                try:
                    sample_sheet_from_idats(geo_id, series_path, platform, samples_dict[platform])
                except ValueError as e:
                    LOGGER.info(f"{e}; will try run_pipeline's create samplesheet method.")
                    pipeline_kwargs['make_sample_sheet'] = True
            if platform not in seen_platforms:
                seen_platforms.append(platform)

    return seen_platforms, pipeline_kwargs


[docs]def pipeline_find_betas_any_source(**kwargs): """beta_bake: Sets up a script to run methylprep that saves directly to path or S3. The slowest part of processing GEO datasets is downloading, so this handles that. STEPS - uses `methylprep alert -k <keywords>` to curate a list of GEO IDs worth grabbing. note that version 1 will only process idats. also runs methylcheck.load on processed files, if installed. - downloads a zipfile, uncompresses it, - creates a samplesheet, - moves it into foxo-test-pipeline-raw for processing. - You get back a zipfile with all the output data. required kwargs: - project_name: string, like GSE123456, to specify one GEO data set to download. to initialize, specify one GEO id as an input when starting the function. - beforehand, you can use `methylprep alert` to verify the data exists. - OR you can pass in a string of GEO_ID separated by commas without any spaces and it will split them. optional kwargs: - function: 'geo' (optional, ignored; used to specify this pipeline to run from command line) - data_dir: - default is current working directory ('.') if omitted - use to specify where all files will be downloaded, processed, and finally stored, unless `--cleanup=False`. - if using AWS S3 settings below, this will be ignored. - verbose: False, default is minimal logging messages. - save_source: if True, it will retain .idat and/or -tbl-1.txt files used to generate beta_values dataframe pkl files. - compress: if True, it will package everything together in a {geo_id}.zip file, or use gzip if files are too big for zip. - default is False - clean: If True, removes files from folder, except the compressed output zip file. (Requires compress to the True too) It will use local disk by default, but if you want it to run in AWS batch + efs and save to S3, provide these: - bucket (where downloaded files are stored) - efs (AWS elastic file system name, for lambda or AWS batch processing) - processed_bucket (where final files are saved) - clean: default True. If False, does not explicitly remove the tempfolder files at end, or move files into data_dir output filepath/folder. - if you need to keep folders in working/efs folder instead of moving them to the data_dir. - use cleanup=False when embedding this in an AWS/batch/S3 context, then use the `working tempfolder` path and filenames returned to copy these files into S3. returns: - if a single GEO_ID, returns a dict with "error", "filenames", and "tempdir" keys. - if mulitple GEO_IDs, returns a dict with "error", "geo_ids" (nested dict), and "tempdir" keys. Uses same tempdir for everything, so clean should be set to True. - "error" will be None if it worked okay. - "filenames" will be a list of filenames that were created as outputs (type=string) - "tempdir" will be the python tempfile tempory-directory object. Passing this out prevents garbage collector from removing it when the function ends, so you can retrive these files and run tempdir.cleanup() manually. Otherwise, python will remove the tempdir for you when python closes, so copy whatever you want out of it first. This makes it possible to use this function with AWS EFS (elastic file systems) as part of a lambda or aws-batch function where disk space is more limited. NOTE: v1.3.0 does NOT support multiple GEO IDs yet. """ if 'methylcheck' not in sys.modules: assert ImportError("You cannot run this without `methylcheck` installed.") import zipfile # FOR SOME REASON, importing zipfile at top of file doesn't work in this function :( -- prob because I imported this function without loading the whole file. Or I reassigned var 'zipfile' by accident somewhere. BATCH_SIZE=33 # for processing idats if kwargs.get('verbose') == False: LOGGER.setLevel( logging.WARNING ) if not kwargs.get('project_name'): return {"filenames": [], "tempdir": None, "error": "`project_name` is required (to specify one GEO_ID or a list of them as a comma separated string without spaces)"} geo_ids = kwargs['project_name'].split(',') # always a list. if not kwargs.get('compress'): kwargs['compress'] = False LOGGER.info(f"Your command line inputs: {kwargs}") LOGGER.info(f"Starting batch GEO pipeline processor for {geo_ids}.") #1: set working folder, and make sure /mnt/efs exists if used. if kwargs.get('efs') and kwargs['efs'] is not None: EFS = kwargs['efs'] mounted = False try: #efs_files = list(Path(EFS).glob('*')) mounted = Path(EFS).exists() LOGGER.info(f"{EFS} exists: {mounted}") except Exception as e: LOGGER.error(f"{EFS} error: {e}") if not mounted: result = subprocess.run(["df", "-h"], stdout=subprocess.PIPE) LOGGER.warning(f"EFS mount [df -h]: {result.stdout.decode('utf-8')}") raise FileNotFoundError("This batch function has no {EFS} mounted. Cannot proceed.") working = tempfile.TemporaryDirectory(dir=EFS) elif not kwargs.get('efs') and kwargs.get('data_dir') != None: if not Path(kwargs.get('data_dir')).exists(): Path(kwargs.get('data_dir')).mkdir(parents=True, exist_ok=True) working = tempfile.TemporaryDirectory(dir=kwargs['data_dir']) EFS = working.name elif not kwargs.get('efs') and kwargs.get('data_dir') == None: working = tempfile.TemporaryDirectory(dir='.') EFS = working.name if kwargs.get('data_dir') == None: kwargs['data_dir'] = '.' # CLI seems to pass in None and the get() doesn't get it. #2: set output folder(s), if specified. Otherwise, use data_dir or '.' for both. s3 = False if kwargs.get('bucket') and kwargs.get('output_bucket'): s3 = True BUCKET = kwargs['bucket'] PROCESSED_BUCKET = kwargs['output_bucket'] else: BUCKET = kwargs.get('data_dir', '.') PROCESSED_BUCKET = kwargs.get('data_dir', '.') #2b: test outgoing requests work (if, for example, you're on AWS behind a NAT gateway) #test = requests.get('http://google.com') #LOGGER.info(f"testing outgoing request returned: {test.status_code} headers {test.headers['content-type']} encoding {test.encoding} text length {len(test.text)}") #3: run EACH geo series through downloader/processor each_geo_result = {} zipfile_names = [] for geo_id in geo_ids: # this download processed data, or idats, or -tbl-1.txt files inside the _family.xml.tgz files. # also downloads meta_data # download_geo_processed() gets nothing if idats exist. result = download_geo_processed(geo_id, working, verbose=kwargs.get('verbose', False), compress=kwargs.get('compress',False), use_headers=True) if result['found_idats'] == False and result['processed_files'] == False and result['tbl_txt_files'] == False: LOGGER.warning(f"No downloadable methylation data found for {geo_id}.") continue # result dict tells this function what it found. if result['tbl_txt_files'] == True: LOGGER.info(f"Found -tbl-1.txt files with meta data for {geo_id}.") if result['tbl_txt_files'] == True: LOGGER.info(f"Found processed csv files for {geo_id}.") if result['found_idats'] == True: try: # dict_only: should download without processing idats. methylprep.download.process_data.run_series(geo_id, working.name, dict_only=True, batch_size=BATCH_SIZE, clean=True, abort_if_no_idats=True) except Exception as e: LOGGER.error(f"run_series ERROR: {e}") if kwargs.get('compress') == False: # move them out of working dir before it goes away. parent = Path(working.name).parent # data_dir is used to create working as a subdir off it. for _file in Path(working.name).rglob('*'): try: if Path(str(parent), _file.name).exists() or not Path(_file).exists(): if kwargs.get('verbose') == True: LOGGER.info(f"Skipping {str(_file)}") continue shutil.move(str(_file), str(parent)) if kwargs.get('verbose') == True: LOGGER.info(f"Moving {str(_file)} --> {str(parent)}") except Exception as e: LOGGER.error(f"moving: {e}") each_geo_result[geo_id] = result #4: memory check / debug if os.name != 'nt' and kwargs.get('verbose') == True: total_disk_space = subprocess.check_output(['du','-sh', EFS]).split()[0].decode('utf-8') LOGGER.info(f"Tempfolder {EFS} contains {len(list([str(k) for k in Path(EFS).rglob('*')]))} files, {total_disk_space} total.") #LOGGER.info(f"DEBUG {EFS} all files: {list([str(k) for k in Path(working.name).rglob('*')])}") #LOGGER.info(f"Downloaded and extracted: {geo_ids}. Zipping and moving to S3 for processing in pipeline now.") zipfile_names = [] # one for each GSExxx, or one per pkl file found that was too big and used gzip. #5: compress and package files if kwargs.get('compress') is True: zipfile_name = f"{geo_id}.zip" outfile_path = Path(working.name, zipfile_name) # check if any one file is too big, and switch to gzip if so. use_gzip = False for k in Path(working.name).rglob('*'): if k.stat().st_size >= zipfile.ZIP64_LIMIT: # this next line assumes only one GSE ID in function. LOGGER.info(f"Switching to gzip because {str(k)} is greater than 2GB. This probably breaks if processing IDATS. Found_idats = {result['found_idats']}") use_gzip = True break if use_gzip or result['found_idats'] == False: for k in Path(working.name).rglob('*.pkl'): # gets beta_values.pkl and meta_data.pkl # gzip each file in-place, then upload them. These are big pkl files. # this will also catch the GSExxxxx-tbl-1.txt pickled beta_values.pkl dataframe, if it exists. with open(k, 'rb') as file_in: gzip_name = Path(f"{str(k)}.gz") with gzip.open(gzip_name, 'wb') as file_out: shutil.copyfileobj(file_in, file_out) zipfile_names.append(gzip_name.name) #file_size = file_out.seek(0, io.SEEK_END) #LOGGER.info(f"File: {gzip_name.name} -- {round(file_size/1000000)} MB") if os.name != 'nt': result = subprocess.run(["du", "-chs", EFS], stdout=subprocess.PIPE) result = result.stdout.decode('utf-8').split('\t')[0] LOGGER.info(f"{gzip_name.name} -- {result} total") else: with zipfile.ZipFile(outfile_path, mode='w', compression=zipfile.ZIP_DEFLATED, allowZip64=True, compresslevel=9) as zip: for k in Path(working.name).rglob('*'): if k.is_dir(): continue if k.name == zipfile_name: continue # there is an empty file created in the same folder I'm zipping up, so need to skip this guy. zip.write(str(k), k.name) # 2nd arg arcname will drop folder structure in zipfile (the /mnt/efs/tmpfolder stuff) zipinfo = zip.getinfo(k.name) LOGGER.info(f"{k.name} ({round(zipinfo.file_size/1000)} --> {round(zipinfo.compress_size/1000)} KB)") LOGGER.info(f"In ZipFile {outfile_path}: {zip.namelist()}") zipfile_names.append(zipfile_name) """ #6: upload, if s3. -- this won't work inside methylprep because of all the AWS config stuff, so this function passes the data out. #my_s3 = boto3.resource('s3') # using multipart so handle >5GB files, but test with smaller files for now. if s3: for zipfile_name in zipfile_names: if result['found_idats'] == False: s3_key = PurePath(zipfile_name) local_file = Path(working.name, zipfile_name) file_size = round(local_file.stat().st_size/1000000) LOGGER.info(f"S3: Uploading processed file {zipfile_name} ({file_size} MB) to s3://{PROCESSED_BUCKET}/{s3_key}") with open(local_file, 'rb') as f: provider.upload_s3_object(PROCESSED_BUCKET, str(s3_key), f, multipart=True) else: s3_key = PurePath(START_PIPELINE_FOLDER, zipfile_name) #mp_id = ''.join([random.choice('abcdefghjiklmnopqrstuvwxyz1234567890') for i in range(16)]) LOGGER.info(f"S3: Uploading zipfile {zipfile_name} to s3://{BUCKET}/{s3_key}") # multipart needs you to pass in the fileobj, not the fileobj.read() part. with open(Path(working.name, zipfile_name), 'rb') as f: provider.upload_s3_object(BUCKET, str(s3_key), f, multipart=True) if os.name != 'nt': total_disk_space = subprocess.check_output(['du','-sh', EFS]).split()[0].decode('utf-8') LOGGER.info(f"DEBUG {EFS} {len(list([str(k) for k in Path(EFS).rglob('*')]))} files, total: {total_disk_space}") """ #7: delete/move tempfile/efs stuff #efs_files = list(Path(working.name).glob('*')) # NOTE: if running a bunch of GEO_IDs, and clean is False, everything will be in the same temp workdir, so make sure to clean/move each one. # skip this step if you need to keep folders in working/efs folder instead of moving them to the data_dir. if kwargs.get('clean') is True and kwargs.get('compress') is True: # moving important files out of working folder, then returning these as a list. # this would break in lambda/aws-batch because efs is huge but the host drive is not. #zipfiles = [_zipfile for _zipfile in list(Path(working.name).glob('*')) if _zipfile in zipfile_names] for _zipfile in zipfile_names: if kwargs.get('verbose') == True: LOGGER.info(f"Copying {_zipfile} to {kwargs.get('data_dir','.')}") shutil.copy(Path(working.name, _zipfile), kwargs.get('data_dir','.')) efs_exists = Path(working.name).exists() if len(geo_ids) > 1 and geo_id != geo_ids[-1]: # remove files, but don't let folder go away with cleanup(), unless this is the last one. for _file in Path(working.name).rglob('*'): if _file.is_file(): _file.unlink() LOGGER.info(f"Removed temp files; left working dir intact.") else: working.cleanup() final_files = list(Path(kwargs.get('data_dir','.')).glob('*')) if kwargs.get('verbose') == True: LOGGER.info(f"Removing temp_files: (exists: {efs_exists} --> exists: {Path(working.name).exists()})") LOGGER.info(f"Files saved: {final_files} vs zipfile_names: {zipfile_names}") #total_disk_space = subprocess.check_output(['du','-sh', EFS]).split()[0].decode('utf-8') each_geo_result[geo_id]['filenames'] = zipfile_names if len(geo_ids) > 1: LOGGER.warning("Returning a different, nested DICT structure because more than one GEO_ID was processed in this function.") return {"error": None, "geo_ids": each_geo_result, "tempdir": working} return {"error":None, "filenames": zipfile_names, "tempdir": working}
def download_geo_processed(geo_id, working, verbose=False, compress=False, use_headers=False): """Uses methylprep/methylcheck to get processed beta values. use_headers: if True, it will use the series_matrix headers to create a samplesheet instead of MiNiML file, which is faster, but lacks Sentrix_ID and Sentrix_Position data. But this is only needed for methylprep process. In this case, the meta data and betas are keyed using GSM_IDs instead.""" filename_keywords = ['matrix', 'processed', 'signals', 'intensities', 'normalized', 'intensity', 'raw_data', 'mean', 'average', 'beta'] filename_exclude_keywords = ['RNA','Illumina'] if verbose: LOGGER.info(f"Searching GEO NIH for {geo_id}") result_df = search(geo_id, filepath=working.name, verbose=verbose) """ note: each df row columns are: ROW = {'title': 'GSE: 'url': 'samples': 'date': 'platform': 'idats': file_name_1, file_size_1, file_link_1 ... } """ # confirm no idats, then # find the links and download if match pattern. found_idats = False found_headers = False downloaded_files = False tbl_txt_files = False for idx, row in result_df.iterrows(): # first, skip rows if IDATs exist for the series. if verbose: LOGGER.info(f"{idx}: {dict(row)}") if row.get('idats') == '1': if verbose: LOGGER.info(f"IDATs exist for {geo_id}") found_idats = True return {'found_idats': found_idats, 'processed_files': downloaded_files, 'tbl_txt_files': tbl_txt_files} # second, see if the series_matrix file exists, and use that. series_server = "https://ftp.ncbi.nlm.nih.gov" series_path = f"geo/series/{geo_id[:-3]}nnn/{geo_id}/matrix" file_name = f"{geo_id}_series_matrix.txt.gz" series_matrix_path = f"{series_server}/{series_path}/{file_name}" series_matrix_path2 = f"{series_server}/geo/series/{geo_id[:-3]}nnn/matrix/{geo_id}/{file_name}" # some series paths include the GPL platform code too, if multiple platforms are present series_matrix_path3 = f"{series_server}/{series_path}/{geo_id}-GPL21145_series_matrix.txt.gz" series_matrix_path4 = f"{series_server}/{series_path}/{geo_id}-GPL13534_series_matrix.txt.gz" try: saved_file = file_name.replace(' ','_') response = requests.head(series_matrix_path) response2 = requests.head(series_matrix_path2) response3 = requests.head(series_matrix_path3) response4 = requests.head(series_matrix_path4) if response.status_code == 200: url = series_matrix_path elif response2.status_code == 200: url = series_matrix_path2 elif response3.status_code == 200: url = series_matrix_path3 elif response4.status_code4 == 200: url = series_matrix_path4 else: url = None if url != None: if verbose: LOGGER.info(f"Downloading {series_matrix_path}") #with requests.get(url, stream=True) as r: # with open(Path(working.name, saved_file), 'wb') as f: # shutil.copyfileobj(r.raw, f) with requests.get(url, stream=True) as r: total_size_in_bytes= int(r.headers.get('content-length', 0)) block_size = 1024 #1 Kibibyte progress_bar = tqdm(desc=saved_file, total=total_size_in_bytes, unit='iB', unit_scale=True) with open(Path(working.name, saved_file), 'wb') as f: #shutil.copyfileobj(r.raw, f) --- needed when saving to EFS for data in r.iter_content(block_size): progress_bar.update(len(data)) f.write(data) progress_bar.close() if total_size_in_bytes != 0 and progress_bar.n != total_size_in_bytes: LOGGER.error("ERROR, something went wrong") else: # TEST the downloaded file. possibly avoid downloading miniml file because this has the meta data already. saved_file_path = Path(working.name,saved_file) if url == None or saved_file_path.exists() == False: LOGGER.info("no series_matrix file downloaded") elif '.gz' in saved_file_path.suffixes: LOGGER.info("Un-gzipping...") unzipped_file = str(saved_file_path)[:-3] with gzip.open(saved_file_path, 'rb') as f_in: with open(unzipped_file, 'wb') as f_out: shutil.copyfileobj(f_in, f_out) # delete gzip if Path(unzipped_file).exists(): saved_file_path.unlink() import methylcheck data = methylcheck.read_geo_processed.read_series_matrix(unzipped_file, include_headers_df=True) if verbose: LOGGER.info(f"{geo_id} data: {data['df'].shape} headers: {data['headers_df'].shape}") downloaded_files = True if isinstance(data.get('headers_df'),pd.DataFrame): found_headers = True try: samplesheet = samplesheet_from_series_matrix(data['headers_df']) samplesheet.to_csv(Path(Path(unzipped_file).parent, f"{geo_id}_samplesheet.csv"), index=False) except Exception as e: LOGGER.error(f"Could not create samplesheet from series_matrix headers: {e}") if data.get('series_dict'): with open(Path(Path(unzipped_file).parent, f"{geo_id}_series_summary.json"), 'w', encoding='utf8') as f: json.dump(data['series_dict'],f) if isinstance(data.get('df'), pd.DataFrame): # betas data['df'].to_pickle(Path(Path(unzipped_file).parent, f"{geo_id}_beta_values.pkl")) # if not compressing later, move this file out of tempfolder if compress != True: current = Path(unzipped_file).parent parent = Path(unzipped_file).parent.parent # shutil.move(unzipped_file, str(parent)) --- the .txt file is no longer needed. everything is repackaged. shutil.move(str(Path(current, f"{geo_id}_samplesheet.csv")), str(parent)) shutil.move(str(Path(current, f"{geo_id}_series_summary.json")), str(parent)) shutil.move(str(Path(current, f"{geo_id}_beta_values.pkl")), str(parent)) continue except Exception as e: LOGGER.info(f"Series_matrix download failed: {e}, trying other saved files") import traceback LOGGER.info(traceback.format_exc()) pass # third, follow the other file links to processed data from the search DF. for i in range(3): if row.get(f'file_name_{i}') and row.get(f'file_size_{i}') and row.get(f'file_link_{i}'): file_name = row.get(f'file_name_{i}') file_size = row.get(f'file_size_{i}') file_link = row.get(f'file_link_{i}') if (any([keyword.lower() in file_name.lower() for keyword in filename_keywords]) and all([keyword.lower() not in file_name.lower() for keyword in filename_exclude_keywords]) ): if verbose: LOGGER.info(f"Matched {file_name}; size: {file_size}") if 'ftp://' in file_link: raw_filename = file_link series_path = working.name saved_file = file_name.replace(' ','_') ftp = FTP('ftp.ncbi.nlm.nih.gov', timeout=120) # 2 mins ftp.login() ftp.cwd(f"geo/series/{geo_id[:-3]}nnn/{geo_id}") raw_file = open(f"{series_path}/{saved_file}", 'wb') filesize = ftp.size(f"suppl/{file_name}") LOGGER.info(f"FTPing {saved_file} -- {round(filesize/1000000)} MB") # from geo/series/{geo_id[:-3]}nnn/{geo_id} try: #ftp.retrbinary(f"RETR suppl/{raw_filename}", raw_file.write) ftp.retrbinary(f"RETR suppl/{file_name}", raw_file.write) except Exception as e: LOGGER.error("error: {e}, trying {file_link} instead of geo/series/{geo_id[:-3]}nnn/{geo_id}/suppl/{raw_filename}") ftp.retrbinary(f"RETR {file_link}", raw_file.write) raw_file.close() if verbose: LOGGER.info(f"Downloaded {raw_filename}") downloaded_files = True elif 'https://' in file_link: saved_file = file_name.replace(' ','_') if verbose: LOGGER.info(f"Downloading {saved_file} from {file_link}") with requests.get(file_link, stream=True) as r: with open(Path(working.name, saved_file), 'wb') as f: shutil.copyfileobj(r.raw, f) if verbose: LOGGER.info(f"Downloaded {saved_file}") downloaded_files = True else: LOGGER.error(f"Unrecognized protocol in {file_link}") # pandas supports reading zipped files already. """ if '.gz' in PurePath(saved_file).suffixes: gzip.decompress(Path(working.name, saved_file)) if '.zip' in PurePath(saved_file).suffixes: """ this = Path(working.name, saved_file) try: if verbose: LOGGER.info(f"Trying read_geo() on {this}") import methylcheck beta_df = methylcheck.read_geo(this, verbose=verbose) is_df = isinstance(beta_df, pd.DataFrame) if is_df and verbose: LOGGER.info(f"Result is a dataframe.") else: if verbose: LOGGER.info(f"Result is NOT a dataframe.") except Exception as e: import traceback LOGGER.info(f"ERROR: {e}") LOGGER.info(traceback.format_exc()) return {'found_idats': found_idats, 'processed_files': downloaded_files, 'tbl_txt_files': tbl_txt_files} if is_df: # save to disk, then load again. don't overwriting pre-existing file of same name df_file = Path(working.name, f"{geo_id}_beta_values.pkl") if df_file.exists(): # could use a while loop here... if verbose: LOGGER.info(f"{df_file} already exists. Trying an alternate name.") alt_name = Path(saved_file).stem.replace(geo_id,'') df_file = Path(working.name, f"{geo_id}_beta_values_from_{alt_name}.pkl") if verbose: LOGGER.info(f"Alt name: {df_file}") if df_file.exists(): # guaranteed to work, but less informative. random_id = ''.join([random.choice('abcdefghjiklmnopqrstuvwxyz1234567890') for i in range(16)]) df_file = Path(working.name, f"{geo_id}_beta_values_{random_id}.pkl") beta_df.to_pickle(df_file) if verbose: LOGGER.info(f"Saved {df_file.name}") try: if verbose: LOGGER.info(f"reopening {df_file.name}") if 'methylcheck' in sys.modules: beta_df = methylcheck.load(df_file, verbose=verbose, silent=(not verbose)) else: beta_df = pd.read_pickle(df_file) if isinstance(beta_df, pd.DataFrame) and verbose: LOGGER.info(f"df shape: {beta_df.shape}") except Exception as e: import traceback LOGGER.info(f"ERROR: {e}") #LOGGER.info(traceback.format_exc()) else: if verbose: LOGGER.info(f"Skipped {file_name}") if found_headers == True and use_headers == True: pass elif downloaded_files == True and use_headers == False: if verbose: LOGGER.info(f"Getting MINiML meta data") ftp = FTP('ftp.ncbi.nlm.nih.gov', timeout=120) # 2 mins ftp.login() ftp.cwd(f"geo/series/{geo_id[:-3]}nnn/{geo_id}/miniml") # look for additional parts of xml files file_parts = [] for filename,filestats in ftp.mlsd(facts=["size"]): if geo_id in filename: #LOGGER.info(f"DEBUG {filename} -- {round(int(filestats['size'])/1000000)} MB") file_parts.append(filename) if len(file_parts) > 1: LOGGER.info(f"The {geo_id}_family.xml miniml file for this series contains multiple ({len(file_parts)}) files.") try: # methylprep.download.convert_miniml local_files = convert_miniml(geo_id, data_dir=working.name, merge=False, download_it=True, extract_controls=False, require_keyword=None, sync_idats=False, verbose=verbose) # HERE -- copy all of these (.xml files) into the S3 output folder gsm_files = [] for local_file in local_files: if '.tgz' in Path(local_file).suffixes: if verbose == True: LOGGER.info(f"Unpacking: {local_file}") shutil.unpack_archive(local_file) all_files = list(Path(working.name).rglob('*')) gsm_files = list(Path(working.name).rglob('GSM*.txt')) non_gsm_files = [file for file in all_files if file not in gsm_files] if verbose == True: LOGGER.info(f"{len(all_files)} local_files, {len(gsm_files)}, non-GSM: {len(non_gsm_files)} | GSMs: {gsm_files}") else: if verbose == True: LOGGER.info(f"local_file skipped: {local_file}") if len(gsm_files) > 0: tbl_txt_files = True LOGGER.info(f"DEBUG: gsm_files detected: {gsm_files} -- to_df()") beta_df = betas_from_tbl_txt_files(gsm_files) # overwrites each time, but last cycle should be a complete list. beta_file = f"{geo_id}_beta_values-tbl-1.pkl" beta_df.to_pickle(beta_file) if verbose: LOGGER.info(f"{beta_file} written, exists: {Path(working.name,beta_file).exists()}") # uploading to s3 will happen in outside function, because it is a .pkl file in working.name dir. if Path(working.name, f"{geo_id}_family.xml").exists(): # check again, as unpack_archive should create this from multi-part _family files # this should create the meta_data.pkl file that gets auto-saved in pipeline. local_files = convert_miniml(geo_id, data_dir=working.name, merge=False, download_it=False, extract_controls=False, require_keyword=None, sync_idats=False) except Exception as e: LOGGER.error(f"convert_miniml error: {e}") import traceback LOGGER.error(traceback.format_exc()) return {'found_idats': found_idats, 'processed_files': downloaded_files, 'tbl_txt_files': tbl_txt_files} def betas_from_tbl_txt_files(file_list, remove_after=True): """input: list of file paths to be converted into one beta DF and saved, returning saved file path.""" samples = [] # list of dfs to merge, with cols being sample names and probes as index for file in file_list: FILE = Path(file) if FILE.suffix == '.txt' and FILE.name.startswith('GSM') and '-tbl' in FILE.name: sample_id = FILE.name.split('-')[0] sample = pd.read_csv(FILE, sep='\t', header=0, names=['IlmnID', sample_id]) sample = sample.set_index('IlmnID') samples.append(sample) #LOGGER.info(f"{len(samples)}: {sample_id} -> {len(sample)}") df_ok = False try: df = pd.concat(samples, axis=1, sort=False) df_ok = True except Exception as e: LOGGER.error("Could not concat these samples into a dataframe. The probe names in rows don't line up.") LOGGER.error(e) if df_ok == False: return if remove_after == True: for file in file_list: # deleting source files FILE = Path(file) if FILE.suffix == '.txt' and FILE.name.startswith('GSM') and '-tbl' in FILE.name: FILE.unlink() LOGGER.info(f"Found {df.shape[1]} samples with {df.shape[0]} probes from {len(file_list)} GSM-txt files.") return df def samplesheet_from_series_matrix(df): """input: header_df from methylcheck.read_series_matrix. This approach matches meta-data with sample betas without needing sentrix_ids. Key is GSMxxxx. This won't support methylprep process function, but fine for all post-process functions needing phenotype data. This parses multiple Characteristics columns into separate colummns in dataframe.""" missing = ['Sentrix_ID', 'Sentrix_Position'] columns = { '!Sample_geo_accession':'Sample_ID', '!Sample_source_name_ch1': 'source', '!Sample_platform_id':'platform', '!Sample_description':'Description', # may have more than one row in df '!Sample_characteristics_ch1':'Characteristics', # may have more than one row in df } #test: samplesheet = samplesheet_from_series_matrix(data['headers_df']) samplesheet_rows = [] for gsm in df.columns: data = df[gsm] new = {} missing_data = Counter() for column,field in columns.items(): if column == '!Sample_characteristics_ch1': continue # parse each row as a separate column, with : as key-value separator. if column not in data: missing_data[column] += 1 else: if isinstance(data.loc[column], pd.Series): # merge values new[field] = " | ".join(list(data.loc[column].values)) else: new[field] = data.loc[column] if missing_data: LOGGER.warning(f"missing from meta data: {missing_data.most_common()}") column = '!Sample_characteristics_ch1' OVERWRITE_WARNINGS = Counter() if isinstance(data.loc[column], pd.Series): # only detects multi-line data here # parse each key-value as a separate column; only works if the labels in every sample are exactly the same. for item in data.loc[column]: if isinstance(item, str) and ':' in item: try: key,value = item.split(':') except: LOGGER.warning("could not split key-value pair because extra : present") continue if key.strip() in new: # possible that same key appears twice, or matches some other meta data. Data loss. OVERWRITE_WARNINGS[key.strip()] += 1 else: new[key.strip()] = value.strip() else: LOGGER.warning(f"Characteristic '{item}' not understood") if len(OVERWRITE_WARNINGS) > 0: LOGGER.warning(f"These Sample Characteristics had the same labels and were lost: {OVERWRITE_WARNINGS.most_common()}") new['GSM_ID'] = new.get('Sample_ID') for column,value in data.items(): if column in columns.keys(): continue field = column.replace('!Sample_','') # some fields are multiple rows, so represented by multiple index rows in this headers_df if field not in new and isinstance(data.loc[column], pd.Series): value = " | ".join(list(data.loc[column].values)) if field not in new: new[field] = value new.update({field:None for field in missing}) samplesheet_rows.append(new) # finally, make sure Sample_ID (index column) does not appear more than once. Some GEOs have this problem. # Avoid creating files with two Sample_ID columns, or any other duplicate columns: sample_sheet_meta_data = pd.DataFrame(data=samplesheet_rows) if any(sample_sheet_meta_data.columns.duplicated()): sample_sheet_meta_data = sample_sheet_meta_data.loc[:, ~sample_sheet_meta_data.columns.duplicated()] return sample_sheet_meta_data