from __future__ import annotations
from math import floor, sqrt
from pathlib import Path
from typing import Union
from numpy import empty, float32, float64, uint8, zeros
from cbgen.typing import CData, DtypeLike, Genotype
from ._ffi import ffi, lib
__all__ = ["bgen_file"]
[docs]class bgen_file:
"""
BGEN file handler.
>>> import cbgen
>>>
>>> bgen = cbgen.bgen_file(cbgen.example.get("haplotypes.bgen"))
>>> print(bgen.nvariants)
4
>>> print(bgen.nsamples)
4
>>> print(bgen.contain_samples)
True
>>> print(bgen.read_samples())
[b'sample_0' b'sample_1' b'sample_2' b'sample_3']
>>> mf = cbgen.bgen_metafile(cbgen.example.get("haplotypes.bgen.metafile"))
>>> part = mf.read_partition(0)
>>> gt = bgen.read_genotype(part.variants.offset[0])
>>> print(gt.probability)
[[1. 0. 1. 0.]
[0. 1. 1. 0.]
[1. 0. 0. 1.]
[0. 1. 0. 1.]]
>>> mf.close()
>>> bgen.close()
Use `with`-statement context to guarantee file closing at the end.
>>> with cbgen.bgen_file(cbgen.example.get("haplotypes.bgen")) as bgen:
... print(bgen.nvariants)
4
Parameters
----------
filepath
BGEN file path.
"""
def __init__(self, filepath: Union[str, Path]):
self._filepath = Path(filepath)
self._bgen_file: CData = ffi.NULL
self._bgen_file = lib.bgen_file_open(bytes(self._filepath))
if self._bgen_file == ffi.NULL:
raise RuntimeError(f"Failed to open {filepath}.")
@property
def filepath(self) -> Path:
"""
File path.
Returns
-------
File path.
"""
return self._filepath
@property
def nvariants(self) -> int:
"""
Number of variants.
Returns
-------
Number of variants.
"""
return lib.bgen_file_nvariants(self._bgen_file)
@property
def nsamples(self) -> int:
"""
Number of samples.
Returns
-------
Number of samples.
"""
return lib.bgen_file_nsamples(self._bgen_file)
@property
def contain_samples(self) -> bool:
"""
Check if it contains samples.
Returns
-------
``True`` if it does contain samples; ``False`` otherwise.
"""
return lib.bgen_file_contain_samples(self._bgen_file)
[docs] def read_samples(self) -> DtypeLike:
"""
Read samples.
Returns
-------
Samples.
Raises
------
RuntimeError
If samples are not stored or a file stream reading error occurs.
"""
nsamples = self.nsamples
bgen_samples: CData = lib.bgen_file_read_samples(self._bgen_file)
if bgen_samples == ffi.NULL:
raise RuntimeError("Could not fetch samples from the bgen file.")
try:
samples_max_len = ffi.new("uint32_t[]", 1)
lib.read_samples_part1(bgen_samples, nsamples, samples_max_len)
samples = zeros(nsamples, dtype=f"S{samples_max_len[0]}")
lib.read_samples_part2(
bgen_samples,
nsamples,
ffi.from_buffer("char[]", samples),
samples_max_len[0],
)
finally:
lib.bgen_samples_destroy(bgen_samples)
return samples
[docs] def read_genotype(self, offset: int, precision: int = 64) -> Genotype:
"""
Read genotype.
Parameters
----------
offset
Variant offset.
precision
Probability precision in bits: 64 (default) or 32.
Returns
-------
Genotype.
Raises
------
RuntimeError
If invalid offset of or a file stream reading error occurs.
"""
gt: CData = lib.bgen_file_open_genotype(self._bgen_file, offset)
if gt == ffi.NULL:
raise RuntimeError(f"Could not open genotype (offset {offset}).")
if precision not in [64, 32]:
raise ValueError("Precision should be either 64 or 32.")
nsamples = self.nsamples
ncombs = lib.bgen_genotype_ncombs(gt)
err: int = 0
if precision == 64:
probs = empty((nsamples, ncombs), dtype=float64)
err = lib.bgen_genotype_read64(gt, ffi.cast("double *", probs.ctypes.data))
else:
probs = empty((nsamples, ncombs), dtype=float32)
err = lib.bgen_genotype_read32(gt, ffi.cast("float *", probs.ctypes.data))
if err != 0:
msg = f"Could not read genotype probabilities (offset {offset})."
raise RuntimeError(msg)
phased = lib.bgen_genotype_phased(gt)
ploidy = empty(nsamples, dtype=uint8)
lib.read_ploidy(gt, ffi.cast("uint8_t *", ploidy.ctypes.data), nsamples)
missing = empty(nsamples, dtype=bool)
lib.read_missing(gt, ffi.cast("bool *", missing.ctypes.data), nsamples)
lib.bgen_genotype_close(gt)
return Genotype(probs, phased, ploidy, missing)
[docs] def read_probability(self, offset: int, precision: int = 64) -> DtypeLike:
"""
Read genotype probability.
Parameters
----------
offset
Variant offset.
precision
Probability precision in bits: 64 (default) or 32.
Returns
-------
Probabilities.
Raises
------
RuntimeError
If invalid offset of or a file stream reading error occurs.
"""
gt: CData = lib.bgen_file_open_genotype(self._bgen_file, offset)
if gt == ffi.NULL:
raise RuntimeError(f"Could not open genotype (offset {offset}).")
if precision not in [64, 32]:
raise ValueError("Precision should be either 64 or 32.")
nsamples = self.nsamples
ncombs = lib.bgen_genotype_ncombs(gt)
err: int = 0
if precision == 64:
probs = empty((nsamples, ncombs), dtype=float64)
err = lib.bgen_genotype_read64(gt, ffi.cast("double *", probs.ctypes.data))
else:
probs = empty((nsamples, ncombs), dtype=float32)
err = lib.bgen_genotype_read32(gt, ffi.cast("float *", probs.ctypes.data))
if err != 0:
msg = f"Could not read genotype probabilities (offset {offset})."
raise RuntimeError(msg)
lib.bgen_genotype_close(gt)
return probs
[docs] def close(self):
"""
Close file stream.
"""
if self._bgen_file != ffi.NULL:
lib.bgen_file_close(self._bgen_file)
self._bgen_file = ffi.NULL
def __del__(self):
self.close()
def __enter__(self) -> bgen_file:
return self
def __exit__(self, *_):
self.close()
def estimate_best_npartitions(nvariants: int) -> int:
min_variants = 128
m = max(min(min_variants, nvariants), floor(sqrt(nvariants)))
return nvariants // m