Module pydsphtools.io
A module which handles input and output operation for DualSPHysics files.
Classes
class Array (name: str, hide: bool, array_type: DataType, count: int, array_size: int, data: np.ndarray)-
Class that represents the a bi4 array.
Attributes
name:str- Name of the array.
hide:bool- Wheither or not the array is hidden.
array_type:DataType- The type of the array data.
count:int- The number of elements in the array.
array_size:int- The size of the array in bytes.
data:np.ndarray- The data of the array.
Expand source code
class Array: """Class that represents the a bi4 array. Attributes ---------- name : str Name of the array. hide : bool Wheither or not the array is hidden. array_type : DataType The type of the array data. count : int The number of elements in the array. array_size : int The size of the array in bytes. data : np.ndarray The data of the array. """ name: str hide: bool array_type: DataType count: int array_size: int data: np.ndarray __slots__ = ( "_name", "_hide", "_array_type", "_count", "_array_size", "_data", ) def __init__( self, name: str, hide: bool, array_type: DataType, count: int, array_size: int, data: np.ndarray, ) -> None: self._name = name self._hide = hide self._array_type = array_type self._count = count self._array_size = array_size self._data = data @classmethod def from_stream( cls: Array, byte_stream: io.BytesIO, endianness: Endianness ) -> Array: """Constructor from bytes. Parameters ---------- bytes : bytes The byte stream. endianness : Endianness The endianness of the bytes. Returns ------- Array The new object. """ array_def_size: int = int.from_bytes( byte_stream.read(UINT_SIZE), endianness.name ) buf = byte_stream.read(array_def_size) stream = io.BytesIO(buf) array_str_size: int = int.from_bytes(stream.read(UINT_SIZE), endianness.name) assert array_str_size == 6, f"Expected 6 but found {array_str_size}" array_str: str = stream.read(array_str_size).decode("utf-8") assert array_str == "\nARRAY", f"Expected '\nARRAY' but found'{array_str}'" name_size: int = int.from_bytes(stream.read(UINT_SIZE), endianness.name) name: str = stream.read(name_size).decode("utf-8") hide: bool = bool.from_bytes(stream.read(UINT_SIZE), endianness.name) array_type: DataType = DataType.from_bytes(stream.read(INT_SIZE), endianness) count: int = int.from_bytes(stream.read(UINT_SIZE), endianness.name) array_size: int = int.from_bytes(stream.read(UINT_SIZE), endianness.name) assert stream.read() == b"", "Array definition buffer is not empty." stream.close() typefmt = cls._get_numpy_fmt(array_type, endianness) data: np.ndarray = np.frombuffer( byte_stream.read(array_size), dtype=np.dtype(typefmt), count=count * 3 if array_type.is_vector() else count, ) if array_type.is_vector(): data = data.reshape((-1, 3)) return cls(name, hide, array_type, count, array_size, data) @classmethod def from_bytes(cls: Array, bytes: bytes, endianness: Endianness) -> Array: """Constructor from bytes. Parameters ---------- bytes : bytes The byte array. endianness : Endianness The endianness of the bytes. Returns ------- Array The new object. """ return cls.from_stream(io.BytesIO(bytes), endianness) @property def name(self) -> str: return self._name @property def hide(self) -> bool: return self._hide @property def array_type(self) -> DataType: return self._array_type @property def count(self) -> int: return self._count @property def array_size(self) -> int: return self._array_size @property def data(self) -> np.ndarray: return self._data def __str__(self) -> str: return self.pretty_print() def __repr__(self) -> str: return self.pretty_print() def __getitem__(self, key: NDIndex): """Allows indexing/slicing directly on the Array object.""" return self._data[key] def __len__(self) -> int: """Returns the number of elements (count).""" return self._count def pretty_print(self, indent=0, indent_str=" ") -> str: ret = ( f"{indent_str * indent}Array(\n" f"{indent_str * (indent + 1)}name = {self._name},\n" f"{indent_str * (indent + 1)}hide = {self._hide},\n" f"{indent_str * (indent + 1)}array_type = {self._array_type},\n" f"{indent_str * (indent + 1)}count = {self._count},\n" f"{indent_str * (indent + 1)}array_size = {self._array_size},\n" f"{indent_str * (indent + 1)}data = [\n" ) if self._count <= 10: for el in self._data: ret += f"{indent_str * (indent + 2)}{el},\n" ret += f"{indent_str * (indent + 1)}]\n" else: for el in self._data[:5]: ret += f"{indent_str * (indent + 2)}{el},\n" ret += f"{indent_str * (indent + 2)}...\n" for el in self._data[-5:]: ret += f"{indent_str * (indent + 2)}{el},\n" ret += f"{indent_str * (indent + 1)}]\n" ret += f"{indent_str * indent})" return ret @staticmethod def _get_numpy_fmt(data_type: DataType, endianness: Endianness) -> str: bo = "<" if endianness == Endianness.little else ">" match data_type: case DataType.int | DataType.int3: return f"{bo}i{INT_SIZE}" case DataType.uint | DataType.uint3: return f"{bo}u{UINT_SIZE}" case DataType.short: return f"{bo}i{SHORT_SIZE}" case DataType.ushort: return f"{bo}u{SHORT_SIZE}" case DataType.llong: return f"{bo}i{LONG_SIZE}" case DataType.ullong: return f"{bo}i{ULONG_SIZE}" case DataType.float | DataType.float3: return f"{bo}f{FLOAT_SIZE}" case DataType.double | DataType.double3: return f"{bo}f{DOUBLE_SIZE}" case _: raise NotImplementedError( f"Can't parse array of data type {data_type}" " because it is not implemented." )Static methods
def from_bytes(bytes: bytes, endianness: Endianness) ‑> pydsphtools._io.Array-
Constructor from bytes.
Parameters
bytes:bytes- The byte array.
endianness:Endianness- The endianness of the bytes.
Returns
Array- The new object.
def from_stream(byte_stream: io.BytesIO, endianness: Endianness) ‑> pydsphtools._io.Array-
Constructor from bytes.
Parameters
bytes:bytes- The byte stream.
endianness:Endianness- The endianness of the bytes.
Returns
Array- The new object.
Instance variables
prop array_size : int-
Expand source code
@property def array_size(self) -> int: return self._array_size prop array_type : pydsphtools._io.DataType-
Expand source code
@property def array_type(self) -> DataType: return self._array_type prop count : int-
Expand source code
@property def count(self) -> int: return self._count prop data : numpy.ndarray-
Expand source code
@property def data(self) -> np.ndarray: return self._data prop hide : bool-
Expand source code
@property def hide(self) -> bool: return self._hide prop name : str-
Expand source code
@property def name(self) -> str: return self._name
Methods
def pretty_print(self, indent=0, indent_str=' ') ‑> str
class Bi4File (filepath: str | os.PathLike)-
Expand source code
class Bi4File(Item): filepath: str | os.PathLike title: str def __init__(self, filepath: str | os.PathLike) -> None: with open(filepath, "rb") as file: title = file.read(60).strip(b"\0").strip().decode("utf-8") byteorder = Endianness.from_bytes(file.read(1)) _ = file.read(3) # ignore extra header bytes main_item = Item.from_stream(file, byteorder) self._filepath = filepath self._title = title super().__init__( item_size=main_item.item_size, name=main_item.name, hide=main_item.hide, hide_values=main_item.hide_values, fmt_float=main_item.fmt_float, fmt_double=main_item.fmt_double, num_arrays=main_item.num_arrays, num_items=main_item.num_items, size_values=main_item.size_values, values=main_item.values, items=main_item.items, arrays=main_item.arrays, ) @property def filepath(self) -> str | os.PathLike: return self._filepath @property def title(self) -> str: return self._title @property def main_item(self) -> Item: return self._main_item def __str__(self) -> str: header = f"File: {self.filepath}\nTitle: {self.title}\n" return header + super().pretty_print().replace("Item", "Bi4File", count=1) def __repr__(self) -> str: return str(self)Ancestors
- pydsphtools._io.Item
Instance variables
prop filepath : str | os.PathLike-
Expand source code
@property def filepath(self) -> str | os.PathLike: return self._filepath prop main_item : Item-
Expand source code
@property def main_item(self) -> Item: return self._main_item prop title : str-
Expand source code
@property def title(self) -> str: return self._title
class DataType (*values)-
Enum that handles the data type
Expand source code
class DataType(Enum): """Enum that handles the data type""" # bytes null = 0 text = 1 bool = 2 char = 3 uchar = 4 # ints short = 5 ushort = 6 int = 7 uint = 8 llong = 9 ullong = 10 # floats float = 11 double = 12 # vectors int3 = 20 uint3 = 21 float3 = 22 double3 = 23 @classmethod def from_bytes(cls: DataType, bytes: bytes, endianness: Endianness) -> DataType: """Constructor from bytes. Parameters ---------- bytes : bytes The byte array. endianness : Endianness The endianness of the bytes. Returns ------- DataType The new object. """ return cls(int.from_bytes(bytes, endianness.name)) def to_python_type(self) -> type: """Converts the enum value to a python type Returns ------- type The follow mapping is used:\\ - "null" => `None`\\ - "text", "char", "uchar" => `str`\\ - "bool" => `bool`\\ - "short", "ushort", "int", "uint", "long" "ulong" => `int`\\ - "float", "double" => `float`\\ - "int3", "uint3", "float3", "double3" => tuple\\ """ if self == DataType.null: return type(None) elif self == DataType.bool: return bool elif self.value in (1, 3, 4): return str elif self.value in range(5, 11): return int elif self.value in (11, 12): return float elif self.value in range(20, 24): return tuple else: raise ValueError(f"Unsupported DataType: {self}") def is_scalar(self) -> bool: return self.value in range(5, 13) def is_vector(self) -> bool: return self.value in range(20, 24)Ancestors
- enum.Enum
Class variables
var bool-
The type of the None singleton.
var char-
The type of the None singleton.
var double-
The type of the None singleton.
var double3-
The type of the None singleton.
var float-
The type of the None singleton.
var float3-
The type of the None singleton.
var int-
The type of the None singleton.
var int3-
The type of the None singleton.
var llong-
The type of the None singleton.
var null-
The type of the None singleton.
var short-
The type of the None singleton.
var text-
The type of the None singleton.
var uchar-
The type of the None singleton.
var uint-
The type of the None singleton.
var uint3-
The type of the None singleton.
var ullong-
The type of the None singleton.
var ushort-
The type of the None singleton.
Static methods
def from_bytes(bytes: bytes, endianness: Endianness) ‑> pydsphtools._io.DataType-
Constructor from bytes.
Parameters
bytes:bytes- The byte array.
endianness:Endianness- The endianness of the bytes.
Returns
DataType- The new object.
Methods
def is_scalar(self) ‑> booldef is_vector(self) ‑> booldef to_python_type(self) ‑> type-
Converts the enum value to a python type
Returns
type- The follow mapping is used:\
- "null" =>
None\ - "text", "char", "uchar" =>str\ - "bool" =>bool\ - "short", "ushort", "int", "uint", "long" "ulong" =>int\ - "float", "double" =>float\ - "int3", "uint3", "float3", "double3" => tuple\
class Endianness (*values)-
Enum that represents the endianness.
Expand source code
class Endianness(Enum): """Enum that represents the endianness.""" little = 0 big = 1 @classmethod def from_bytes(cls, bytes: bytes) -> Endianness: """Constructor from bytes arrays Parameters ---------- bytes : bytes The byte array Returns ------- Endianness The new object """ return cls(int.from_bytes(bytes))Ancestors
- enum.Enum
Class variables
var big-
The type of the None singleton.
var little-
The type of the None singleton.
Static methods
def from_bytes(bytes: bytes) ‑> pydsphtools._io.Endianness-
Constructor from bytes arrays
Parameters
bytes:bytes- The byte array
Returns
Endianness- The new object
class Item (item_size: int, name: str, hide: bool, hide_values: bool, fmt_float: str, fmt_double: str, num_arrays: int, num_items: int, size_values: int, values: list[Value], items: list[Item], arrays: list[Array])-
Expand source code
class Item: item_size: int name: str hide: bool hide_values: bool fmt_float: str fmt_double: str num_arrays: int num_items: int size_values: int values: list[Value] items: list[Item] arrays: list[Array] __slots__ = ( "_item_size", "_name", "_hide", "_hide_values", "_fmt_float", "_fmt_double", "_num_arrays", "_num_items", "_size_values", "_values", "_items", "_arrays", ) def __init__( self, item_size: int, name: str, hide: bool, hide_values: bool, fmt_float: str, fmt_double: str, num_arrays: int, num_items: int, size_values: int, values: list[Value], items: list[Item], arrays: list[Array], ) -> None: self._item_size = item_size self._name = name self._hide = hide self._hide_values = hide_values self._fmt_float = fmt_float self._fmt_double = fmt_double self._num_arrays = num_arrays self._num_items = num_items self._size_values = size_values self._values = values self._items = items self._arrays = arrays @classmethod def from_stream( cls: Item, bytes_stream: io.BytesIO, endianness: Endianness ) -> Item: item_size = int.from_bytes(bytes_stream.read(4), endianness.name) buff = bytes_stream.read(item_size) stream = io.BytesIO(buff) item_str_size = int.from_bytes(stream.read(UINT_SIZE), endianness.name) assert item_str_size == 6, f"Expected 6 but found {item_str_size}" item_str = stream.read(item_str_size).decode("utf-8") assert item_str == "\nITEM\n", f"Expected '\\nITEM\\n' but found '{item_str}'" name_size = int.from_bytes(stream.read(UINT_SIZE), endianness.name) name = stream.read(name_size).decode("utf-8") hide = bool.from_bytes(stream.read(UINT_SIZE), endianness.name) hide_values = bool.from_bytes(stream.read(UINT_SIZE), endianness.name) fmt_float_size = int.from_bytes(stream.read(UINT_SIZE), endianness.name) fmt_float = stream.read(fmt_float_size).decode("utf-8") fmt_double_size = int.from_bytes(stream.read(UINT_SIZE), endianness.name) fmt_double = stream.read(fmt_double_size).decode("utf-8") num_arrays = int.from_bytes(stream.read(UINT_SIZE), endianness.name) num_items = int.from_bytes(stream.read(UINT_SIZE), endianness.name) size_values = int.from_bytes(stream.read(UINT_SIZE), endianness.name) assert stream.read() == b"", "Item buffer is not empty." stream.close() # Update buffer since we the items definition is finished buff = bytes_stream.read(size_values) stream = io.BytesIO(buff) values_str_size = int.from_bytes(stream.read(UINT_SIZE), endianness.name) assert values_str_size == 7, f"Expected 7 but found {values_str_size}" values_str = stream.read(values_str_size).decode("utf-8") assert values_str == "\nVALUES", ( f"Expected '\\nVALUES' but found '{values_str}'" ) num_values = int.from_bytes(stream.read(UINT_SIZE), endianness.name) values = [Value.from_stream(stream, endianness) for _ in range(num_values)] assert stream.read() == b"", "Values buffer is not empty." stream.close() # TODO: Can save file pointer to arrays instead temporarily until we try to read them. items = [Item.from_stream(bytes_stream, endianness) for _ in range(num_items)] arrays = [ Array.from_stream(bytes_stream, endianness) for _ in range(num_arrays) ] return cls( item_size, name, hide, hide_values, fmt_float, fmt_double, num_arrays, num_items, size_values, values, items, arrays, ) @classmethod def from_bytes(cls: Item, bytes: bytes, endianness: Endianness) -> Item: return cls.from_stream(io.BytesIO(bytes), endianness) @property def item_size(self) -> int: return self._item_size @property def name(self) -> str: return self._name @property def hide(self) -> bool: return self._hide @property def hide_values(self) -> bool: return self._hide_values @property def fmt_float(self) -> str: return self._fmt_float @property def fmt_double(self) -> str: return self._fmt_double @property def num_arrays(self) -> int: return self._num_arrays @property def num_items(self) -> int: return self._num_items @property def size_values(self) -> int: return self._size_values @property def values(self) -> list[Value]: return self._values @property def items(self) -> list[Item]: return self._items @property def arrays(self) -> list[Array]: return self._arrays def __str__(self) -> str: return self.pretty_print() def __repr__(self) -> str: return self.pretty_print() def pretty_print(self, indent=0, indent_str=" ") -> str: ret = ( f"{indent_str * indent}Item(\n" f"{indent_str * (indent + 1)}item_size = {self._item_size},\n" f"{indent_str * (indent + 1)}name = {self._name},\n" f"{indent_str * (indent + 1)}hide = {self._hide},\n" f"{indent_str * (indent + 1)}hide_values = {self._hide_values},\n" f"{indent_str * (indent + 1)}fmt_float = {self._fmt_float},\n" f"{indent_str * (indent + 1)}fmt_double = {self._fmt_double},\n" f"{indent_str * (indent + 1)}num_arrays = {self._num_arrays},\n" f"{indent_str * (indent + 1)}num_items = {self._num_items},\n" f"{indent_str * (indent + 1)}size_values = {self._size_values},\n" f"{indent_str * (indent + 1)}values = [\n" ) for value in self._values: ret += f"{value.pretty_print(indent=indent + 2)}\n" ret += f"{indent_str * (indent + 1)}]\n" if self._num_items: ret += f"{indent_str * (indent + 1)}items = [\n" for item in self._items: ret += f"{item.pretty_print(indent + 2)},\n" ret += f"{indent_str * (indent + 1)}]\n" else: ret += f"{indent_str * (indent + 1)}items = []\n" if self._num_arrays: ret += f"{indent_str * (indent + 1)}arrays = [\n" for array in self._arrays: ret += f"{array.pretty_print(indent + 2)},\n" ret += f"{indent_str * (indent + 1)}]\n" else: ret += f"{indent_str * (indent + 1)}arrays = []\n" ret += f"{indent_str * indent})" return ret def get_value_by_name(self, name: str) -> Value | None: for value in self.values: if value.name == name: return value for item in self.items: value = item.get_value_by_name(name) if value is not None: return value return None def get_array_by_name(self, name: str) -> Array | None: for array in self.arrays: if array.name == name: return array for item in self.items: array = item.get_array_by_name(name) if array is not None: return array return None def get_item_by_name(self, name: str) -> Item | None: for item in self.items: if item.name == name: return item for item in self.items: array = item.get_item_by_name(name) if array is not None: return array return None def _pretty_print_dict(self, d: dict, indent=0, indent_str=" ") -> str: ret = "" for key, value in d.items(): ret += f"{indent_str * (indent + 1)}{key}: " if isinstance(value, dict): ret += "{\n" ret += f"{self._pretty_print_dict(value, indent + 1)}" ret += f"{indent_str * (indent + 1)}}}\n" else: ret += f"{value}\n" return retSubclasses
- pydsphtools._io.Bi4File
Static methods
def from_bytes(bytes: bytes, endianness: Endianness) ‑> pydsphtools._io.Itemdef from_stream(bytes_stream: io.BytesIO, endianness: Endianness) ‑> pydsphtools._io.Item
Instance variables
prop arrays : list[pydsphtools._io.Array]-
Expand source code
@property def arrays(self) -> list[Array]: return self._arrays prop fmt_double : str-
Expand source code
@property def fmt_double(self) -> str: return self._fmt_double prop fmt_float : str-
Expand source code
@property def fmt_float(self) -> str: return self._fmt_float prop hide : bool-
Expand source code
@property def hide(self) -> bool: return self._hide prop hide_values : bool-
Expand source code
@property def hide_values(self) -> bool: return self._hide_values prop item_size : int-
Expand source code
@property def item_size(self) -> int: return self._item_size prop items : list[pydsphtools._io.Item]-
Expand source code
@property def items(self) -> list[Item]: return self._items prop name : str-
Expand source code
@property def name(self) -> str: return self._name prop num_arrays : int-
Expand source code
@property def num_arrays(self) -> int: return self._num_arrays prop num_items : int-
Expand source code
@property def num_items(self) -> int: return self._num_items prop size_values : int-
Expand source code
@property def size_values(self) -> int: return self._size_values prop values : list[pydsphtools._io.Value]-
Expand source code
@property def values(self) -> list[Value]: return self._values
Methods
def get_array_by_name(self, name: str) ‑> Optional[pydsphtools._io.Array]def get_item_by_name(self, name: str) ‑> Optional[pydsphtools._io.Item]def get_value_by_name(self, name: str) ‑> Optional[pydsphtools._io.Value]def pretty_print(self, indent=0, indent_str=' ') ‑> str
class Value (name: str, value_type: DataType, value: None | bool | str | int | float | tuple[float, float, float] | tuple[int, int, int])-
Expand source code
class Value: name: str value_type: DataType value: ( None | bool | str | int | float | tuple[float, float, float] | tuple[int, int, int] ) __slots__ = ("_name", "_value_type", "_value") def __init__( self, name: str, value_type: DataType, value: ( None | bool | str | int | float | tuple[float, float, float] | tuple[int, int, int] ), ) -> None: self._name = name self._value_type = value_type self._value = value @classmethod def from_stream( cls: Value, stream: io.BytesIO, endianness: Endianness, ) -> Value: name_size = int.from_bytes(stream.read(UINT_SIZE), endianness.name) name = stream.read(name_size).decode("utf-8") data_type = DataType(int.from_bytes(stream.read(INT_SIZE), endianness.name)) edn_fmt = "<" if endianness == Endianness.little else ">" match data_type: case DataType.null: data = None case DataType.char | DataType.uchar: data = stream.read(CHAR_SIZE).decode("utf-8") case DataType.text: str_size = int.from_bytes(stream.read(UINT_SIZE), endianness.name) data = stream.read(str_size).decode("utf-8") case DataType.short | DataType.ushort: data = int.from_bytes(stream.read(SHORT_SIZE), endianness.name) case DataType.bool: data = bool.from_bytes(stream.read(INT_SIZE), endianness.name) case DataType.int | DataType.uint: data = int.from_bytes(stream.read(INT_SIZE), endianness.name) case DataType.llong | DataType.ullong: data = int.from_bytes(stream.read(LONG_SIZE), endianness.name) case DataType.float: data = struct.unpack(f"{edn_fmt}f", stream.read(FLOAT_SIZE))[0] case DataType.double: data = struct.unpack(f"{edn_fmt}d", stream.read(DOUBLE_SIZE))[0] case DataType.int3 | DataType.uint3: data = ( int.from_bytes(stream.read(INT_SIZE), endianness.name), int.from_bytes(stream.read(INT_SIZE), endianness.name), int.from_bytes(stream.read(INT_SIZE), endianness.name), ) case DataType.float3: data = struct.unpack(f"{edn_fmt}3f", stream.read(FLOAT3_SIZE)) case DataType.double3: data = struct.unpack(f"{edn_fmt}3d", stream.read(DOUBLE3_SIZE)) case _: raise NotImplementedError( f"Can't parse value of data type {data_type}" " because it is not implemented" ) return cls(name, data_type, data) @classmethod def from_bytes(cls: Value, bytes: bytes, endianness: Endianness) -> Value: return cls.from_stream(io.BytesIO(bytes), endianness) @property def name(self) -> str: return self._name @property def value_type(self) -> DataType: return self._value_type @property def value( self, ) -> ( None | bool | str | int | float | tuple[float, float, float] | tuple[int, int, int] ): return self._value def pretty_print(self, indent=0, indent_str=" ") -> str: return f"{indent_str * indent}{self}" def __str__(self) -> str: return f"{self.name}: {self.value}"Static methods
def from_bytes(bytes: bytes, endianness: Endianness) ‑> pydsphtools._io.Valuedef from_stream(stream: io.BytesIO, endianness: Endianness) ‑> pydsphtools._io.Value
Instance variables
prop name : str-
Expand source code
@property def name(self) -> str: return self._name prop value : None | bool | str | int | float | tuple[float, float, float] | tuple[int, int, int]-
Expand source code
@property def value( self, ) -> ( None | bool | str | int | float | tuple[float, float, float] | tuple[int, int, int] ): return self._value prop value_type : pydsphtools._io.DataType-
Expand source code
@property def value_type(self) -> DataType: return self._value_type
Methods
def pretty_print(self, indent=0, indent_str=' ') ‑> str