Source code for cbgen._bgen_metafile
from __future__ import annotations
from pathlib import Path
from typing import Union
from numpy import empty, uint16, uint32, uint64, zeros
from cbgen.typing import CData, Partition, Variants
from ._ffi import ffi, lib
__all__ = ["bgen_metafile"]
[docs]class bgen_metafile:
"""
BGEN metafile file handler.
>>> import cbgen
>>>
>>> bgen = cbgen.bgen_file(cbgen.example.get("haplotypes.bgen"))
>>> mf = cbgen.bgen_metafile(cbgen.example.get("haplotypes.bgen.metafile"))
>>> print(mf.npartitions)
1
>>> print(mf.nvariants)
4
>>> print(mf.partition_size)
4
>>> part = mf.read_partition(0)
>>> gt = bgen.read_genotype(part.variants.offset[0])
>>> print(gt.probability)
[[1. 0. 1. 0.]
[0. 1. 1. 0.]
[1. 0. 0. 1.]
[0. 1. 0. 1.]]
>>> mf.close()
>>> bgen.close()
Use `with`-statement context to guarantee file closing at the end.
>>> with cbgen.bgen_metafile(cbgen.example.get("haplotypes.bgen.metafile")) as mf:
... print(mf.npartitions)
1
Parameters
----------
filepath
BGEN metafile file path.
Raises
------
RuntimeError
If a file stream reading error occurs.
"""
def __init__(self, filepath: Union[str, Path]):
self._filepath = Path(filepath)
self._bgen_metafile: CData = ffi.NULL
self._bgen_metafile = lib.bgen_metafile_open(bytes(self._filepath))
if self._bgen_metafile == ffi.NULL:
raise RuntimeError(f"Failed to open {filepath}.")
@property
def filepath(self) -> Path:
"""
File path.
Returns
-------
File path.
"""
return self._filepath
@property
def npartitions(self) -> int:
"""
Number of partitions.
Returns
-------
Number of partitions.
"""
return lib.bgen_metafile_npartitions(self._bgen_metafile)
@property
def nvariants(self) -> int:
"""
Number of variants.
Returns
-------
Number of variants.
"""
return lib.bgen_metafile_nvariants(self._bgen_metafile)
@property
def partition_size(self) -> int:
"""
Number of variants per partition.
The last partition might have less variants than the partition size.
Every other partition is guaranteed to have ``partition_size``
variants.
Returns
-------
Partition size.
"""
return ceildiv(self.nvariants, self.npartitions)
[docs] def read_partition(self, index: int) -> Partition:
"""
Read partition.
Parameters
----------
index
Partition index.
Returns
-------
Partition.
Raises
------
RuntimeError
If index is invalid or a file stream reading error occurs.
"""
partition = lib.bgen_metafile_read_partition(self._bgen_metafile, index)
if partition == ffi.NULL:
raise RuntimeError(f"Could not read partition {partition}.")
nvariants = lib.bgen_partition_nvariants(partition)
position = empty(nvariants, dtype=uint32)
nalleles = empty(nvariants, dtype=uint16)
var_offset = empty(nvariants, dtype=uint64)
vid_max_len = ffi.new("uint32_t[]", 1)
rsid_max_len = ffi.new("uint32_t[]", 1)
chrom_max_len = ffi.new("uint32_t[]", 1)
allele_ids_max_len = ffi.new("uint32_t[]", 1)
position_ptr = ffi.cast("uint32_t *", ffi.from_buffer(position))
nalleles_ptr = ffi.cast("uint16_t *", ffi.from_buffer(nalleles))
offset_ptr = ffi.cast("uint64_t *", ffi.from_buffer(var_offset))
lib.read_partition_part1(
partition,
position_ptr,
nalleles_ptr,
offset_ptr,
vid_max_len,
rsid_max_len,
chrom_max_len,
allele_ids_max_len,
)
vid = zeros(nvariants, dtype=f"S{vid_max_len[0]}")
rsid = zeros(nvariants, dtype=f"S{rsid_max_len[0]}")
chrom = zeros(nvariants, dtype=f"S{chrom_max_len[0]}")
allele_ids = zeros(nvariants, dtype=f"S{allele_ids_max_len[0]}")
lib.read_partition_part2(
partition,
ffi.from_buffer("char[]", vid),
vid_max_len[0],
ffi.from_buffer("char[]", rsid),
rsid_max_len[0],
ffi.from_buffer("char[]", chrom),
chrom_max_len[0],
ffi.from_buffer("char[]", allele_ids),
allele_ids_max_len[0],
)
lib.bgen_partition_destroy(partition)
part_offset = self.partition_size * index
v = Variants(vid, rsid, chrom, position, nalleles, allele_ids, var_offset)
return Partition(part_offset, v)
[docs] def close(self):
"""
Close file stream.
"""
if self._bgen_metafile != ffi.NULL:
lib.bgen_metafile_close(self._bgen_metafile)
self._bgen_metafile = ffi.NULL
def __del__(self):
self.close()
def __enter__(self) -> bgen_metafile:
return self
def __exit__(self, *_):
self.close()
def ceildiv(a: int, b: int) -> int:
return -(-a // b)