# Lib
from enum import IntEnum, unique
import pandas as pd
import numpy as np
import struct
from pprint import pprint
import logging
# App
from ..utils import (
get_file_object,
read_and_reset,
read_byte,
read_char,
read_int,
read_long,
read_results,
read_short,
read_string,
npread,
)
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel( logging.WARNING )
__all__ = ['IdatDataset']
# Constants
# ----------------------------------------------------------------------------
DEFAULT_IDAT_VERSION = 3
DEFAULT_IDAT_FILE_ID = 'IDAT'
@unique
class IdatHeaderLocation(IntEnum):
"""Unique IntEnum defining constant values for byte offsets of IDAT headers.
"""
FILE_TYPE = 0
VERSION = 4
FIELD_COUNT = 12
SECTION_OFFSETS = 16
@unique
class IdatSectionCode(IntEnum):
"""Unique IntEnum defining constant values for byte offsets of IDAT headers.
These values come from the field codes of the Bioconductor illuminaio package.
MM: refer to https://bioconductor.org/packages/release/bioc/vignettes/illuminaio/inst/doc/EncryptedFormat.pdf
and https://bioconductor.org/packages/release/bioc/vignettes/illuminaio/inst/doc/illuminaio.pdf
and source: https://github.com/snewhouse/glu-genetics/blob/master/glu/lib/illumina.py
more on encrypted IDAT format here: https://www.bioconductor.org/packages/release/bioc/vignettes/illuminaio/inst/doc/EncryptedFormat.pdf
"""
ILLUMINA_ID = 102
STD_DEV = 103
MEAN = 104
NUM_BEADS = 107 # how many replicate measurements for each probe
MID_BLOCK = 200
RUN_INFO = 300
RED_GREEN = 400
MOSTLY_NULL = 401 # manifest
BARCODE = 402
CHIP_TYPE = 403 # format
MOSTLY_A = 404 # label
UNKNOWN_1 = 405 # opa
UNKNOWN_2 = 406 # sampleid
UNKNOWN_3 = 407 # descr
UNKNOWN_4 = 408 # plate
UNKNOWN_5 = 409 # well
UNKNOWN_6 = 410
UNKNOWN_7 = 510 # unknown
NUM_SNPS_READ = 1000
"""Constant set of 'Unknown' IDAT sections."""
UNKNOWN_IDAT_SECTIONS = (
IdatSectionCode.MOSTLY_NULL,
IdatSectionCode.MOSTLY_A,
IdatSectionCode.UNKNOWN_1,
IdatSectionCode.UNKNOWN_2,
IdatSectionCode.UNKNOWN_3,
IdatSectionCode.UNKNOWN_4,
IdatSectionCode.UNKNOWN_5,
IdatSectionCode.UNKNOWN_6,
IdatSectionCode.UNKNOWN_7,
)
# Object Definitions
# ----------------------------------------------------------------------------
""" DEPRECATED: use IdatDataset(... verbose=True) instead.
class RunInfo():
'''A dataclass defining IDAT run information.
Arguments:
idat_file {file-like} -- An open IDAT file.'''
__slots__ = [
'run_time',
'block_type',
'block_pars',
'block_code',
'code_version',
]
def __init__(self, idat_file):
# Initializes the RunInfo data class by reading the provided idat_file.
self.run_time = read_string(idat_file)
self.block_type = read_string(idat_file)
self.block_pars = read_string(idat_file)
self.block_code = read_string(idat_file)
self.code_version = read_string(idat_file)
"""
[docs]class IdatDataset():
"""Validates and parses an Illumina IDAT file.
Arguments:
filepath_or_buffer {file-like} -- the IDAT file to parse.
channel {Channel} -- the fluorescent channel (Channel.RED or Channel.GREEN)
that produced the IDAT dataset.
Keyword Arguments:
idat_id {string} -- expected IDAT file identifier (default: {DEFAULT_IDAT_FILE_ID})
idat_version {integer} -- expected IDAT version (default: {DEFAULT_IDAT_VERSION})
bit {string, default 'float32'} -- 'float16' will pre-normalize intensities,
capping max intensity at 32127. This cuts data size in half, but will reduce
precision on ~0.01% of probes. [effectively downscaling fluorescence]
Raises:
ValueError: The IDAT file has an incorrect identifier or version specifier.
"""
def __init__(
self,
filepath_or_buffer,
channel,
idat_id=DEFAULT_IDAT_FILE_ID,
idat_version=DEFAULT_IDAT_VERSION,
verbose=False,
std_dev=False,
nbeads=False,
bit='float32',
):
"""Initializes the IdatDataset, reads and parses the IDAT file."""
self.verbose = verbose
self.channel = channel
self.barcode = None
self.chip_type = None
self.n_beads = 0
self.n_snps_read = 0
self.run_info = []
self.include_std_dev = std_dev
self.include_n_beads = nbeads
self.bit = bit
with get_file_object(filepath_or_buffer) as idat_file:
# assert file is indeed IDAT format
if not self.is_idat_file(idat_file, idat_id):
raise ValueError('Not an IDAT file. Unsupported file type.')
# assert correct IDAT file version
if not self.is_correct_version(idat_file, idat_version):
raise ValueError('Not a version 3 IDAT file. Unsupported IDAT version.')
self.probe_means = self.read(idat_file)
if self.overflow_check() is False:
LOGGER.warning("IDAT: contains negative probe values (uint16 overflow error)")
if self.verbose:
self.meta(idat_file)
@staticmethod
@read_and_reset
def is_idat_file(idat_file, expected):
"""Checks if the provided file has the correct identifier.
Arguments:
idat_file {file-like} -- the IDAT file to check.
expected {string} -- expected IDAT file identifier.
Returns:
[boolean] -- If the IDAT file identifier matches the expected value
"""
idat_file.seek(IdatHeaderLocation.FILE_TYPE.value)
file_type = read_char(idat_file, len(expected))
return file_type.lower() == expected.lower()
@staticmethod
@read_and_reset
def is_correct_version(idat_file, expected):
"""Checks if the provided file has the correct version.
Arguments:
idat_file {file-like} -- the IDAT file to check.
expected {integer} -- expected IDAT version.
Returns:
[boolean] -- If the IDAT file version matches the expected value
"""
idat_file.seek(IdatHeaderLocation.VERSION.value)
idat_version = read_long(idat_file)
return str(idat_version) == str(expected)
@staticmethod
@read_and_reset
def get_section_offsets(idat_file):
"""Parses the IDAT file header to get the byte position
for the start of each section.
Arguments:
idat_file {file-like} -- the IDAT file to process.
Returns:
[dict] -- The byte offset for each file section.
"""
idat_file.seek(IdatHeaderLocation.FIELD_COUNT.value)
num_fields = read_int(idat_file)
idat_file.seek(IdatHeaderLocation.SECTION_OFFSETS.value)
offsets = {}
for _idx in range(num_fields):
key = read_short(idat_file)
offsets[key] = read_long(idat_file)
return offsets
[docs] def read(self, idat_file):
"""Reads the IDAT file and parses the appropriate sections. Joins the
mean probe intensity values with their Illumina probe ID.
Arguments:
idat_file {file-like} -- the IDAT file to process.
Returns:
DataFrame -- mean probe intensity values indexed by Illumina ID.
"""
section_offsets = self.get_section_offsets(idat_file)
def seek_to_section(section_code):
offset = section_offsets[section_code.value]
idat_file.seek(offset)
seek_to_section(IdatSectionCode.BARCODE)
self.barcode = read_string(idat_file)
seek_to_section(IdatSectionCode.CHIP_TYPE)
self.chip_type = read_string(idat_file)
seek_to_section(IdatSectionCode.NUM_SNPS_READ)
self.n_snps_read = read_int(idat_file)
seek_to_section(IdatSectionCode.NUM_BEADS)
self.n_beads = npread(idat_file, '<u1', self.n_snps_read) # was <u1
seek_to_section(IdatSectionCode.ILLUMINA_ID)
illumina_ids = npread(idat_file, '<i4', self.n_snps_read)
seek_to_section(IdatSectionCode.MEAN)
probe_means = npread(idat_file, '<u2', self.n_snps_read) # '<u2' reads data as numpy unsigned-float16
seek_to_section(IdatSectionCode.RUN_INFO)
runinfo_entry_count, = struct.unpack('<L', idat_file.read(4))
for i in range(runinfo_entry_count):
timestamp = read_string(idat_file)
entry_type = read_string(idat_file)
parameters = read_string(idat_file)
codeblock = read_string(idat_file)
code_version = read_string(idat_file)
self.run_info.append( (timestamp, entry_type, parameters, codeblock, code_version) )
if self.include_std_dev and self.include_n_beads:
seek_to_section(IdatSectionCode.STD_DEV)
std_devs = npread(idat_file, '<u2', self.n_snps_read)
# print(len(probe_means), len(std_devs), len(self.n_beads))
data_frame = pd.DataFrame(
data={'mean_value':probe_means, 'std_dev':std_devs, 'n_beads':self.n_beads},
index=illumina_ids,
columns=['mean_value','std_dev','n_beads'],
dtype=self.bit, # int16 could work, and reduce memory by 1/2, but some raw values were > 32127 -- without prenormalization, you get negative values back, which breaks stuff.
)
elif self.include_std_dev:
seek_to_section(IdatSectionCode.STD_DEV)
std_devs = npread(idat_file, '<u2', self.n_snps_read)
# print(len(probe_means), len(std_devs))
data_frame = pd.DataFrame(
data={'mean_value':probe_means, 'std_dev':std_devs},
index=illumina_ids,
columns=['mean_value','std_dev'],
dtype=self.bit, # int16 could work, and reduce memory by 1/2, but some raw values were > 32127 -- without prenormalization, you get negative values back, which breaks stuff.
)
elif self.include_n_beads:
data_frame = pd.DataFrame(
data={'mean_value':probe_means, 'n_beads':self.n_beads},
index=illumina_ids,
columns=['mean_value','n_beads'],
dtype=self.bit, # int16 could work, and reduce memory by 1/2, but some raw values were > 32127 -- without prenormalization, you get negative values back, which breaks stuff.
)
else:
# casting astype(int) fixes pandas bug reading uint16, by coercing to float32-compatible format.
probe_records = dict(zip(illumina_ids, probe_means.astype(int)))
data_frame = pd.DataFrame.from_dict(
data=probe_records,
orient='index',
columns=['mean_value'],
dtype=self.bit, # int16 could work, and reduce memory by 1/2, but some raw values were > 32127 -- without prenormalization, you get negative values back, which breaks stuff.
)
data_frame.index.name = 'illumina_id'
if self.bit == 'float16':
data_frame = data_frame.clip(upper=32127)
data_frame = data_frame.astype('int16')
return data_frame
def overflow_check(self):
if hasattr(self, 'probe_means'):
if (self.probe_means.values < 0).any():
# n_affected = self.probe_means[self.probe_means.mean_value < 0].count().values[0]
return False
return True # passes, no misread probes