Skip to content

LazyCombiner

Source code in src/msglc/writer.py
Python
class LazyCombiner:
    def __init__(
        self,
        buffer_or_path: str | BufferWriter,
        *,
        mode: Literal["a", "w"] = "w",
        s3fs=None,
    ):
        """
        The mode resembles typical mode designations and implies the same meaning.
        If the mode is 'w', the file is overwritten.
        If the mode is 'a', the file is appended.

        :param buffer_or_path: target buffer or file path
        :param mode: mode of operation, 'w' for write and 'a' for append
        :param s3fs: s3fs object (s3fs.S3FileSystem) to be used for storing
        """
        self._buffer_or_path: str | BufferWriter = buffer_or_path
        self._mode: str = mode
        self._s3fs = s3fs or config.s3fs

        self._buffer: BufferWriter | TemporaryFile = None  # type: ignore

        self._toc: dict | list = None  # type: ignore
        self._header_start: int = 0
        self._file_start: int = 0

    def __enter__(self):
        if isinstance(self._buffer_or_path, str):
            if self._s3fs:
                self._buffer = TemporaryFile()
                if self._s3fs.exists(self._buffer_or_path):
                    with self._s3fs.open(self._buffer_or_path, "rb") as s3_file:
                        while chunk := s3_file.read(config.read_buffer_size):
                            self._buffer.write(chunk)
                    self._buffer.seek(0)
            else:
                mode: str = (
                    "wb"
                    if not os.path.exists(self._buffer_or_path) or self._mode == "w"
                    else "r+b"
                )
                self._buffer = open(  # type: ignore
                    self._buffer_or_path, mode, buffering=config.write_buffer_size
                )
        elif isinstance(self._buffer_or_path, (BytesIO, BufferedReader)):
            self._buffer = self._buffer_or_path
            if self._mode == "a":
                # need to read the header anyway
                self._buffer.seek(0)
        else:
            raise ValueError("Expecting a buffer or path.")

        if self._mode == "w":
            self._buffer.write(LazyWriter.magic)
            self._header_start = self._buffer.tell()
            self._buffer.write(b"\0" * 20)
            self._file_start = self._buffer.tell()
        else:
            sep_a, sep_b, sep_c = (
                LazyWriter.magic_len(),
                LazyWriter.magic_len() + 10,
                LazyWriter.magic_len() + 20,
            )

            ini_position: int = self._buffer.tell()
            header: bytes = self._buffer.read(sep_c)

            def _raise_invalid(msg: str):
                self._buffer.seek(ini_position)
                raise ValueError(msg)

            if header[:sep_a] != LazyWriter.magic:
                _raise_invalid(
                    "Invalid file format, cannot append to the current file."
                )

            toc_start: int = unpackb(header[sep_a:sep_b].lstrip(b"\0"))
            toc_size: int = unpackb(header[sep_b:sep_c].lstrip(b"\0"))

            self._buffer.seek(ini_position + sep_c + toc_start)
            self._toc = unpackb(self._buffer.read(toc_size)).get("t", None)

            if isinstance(self._toc, list):
                if any(not isinstance(i, int) for i in self._toc):
                    _raise_invalid("The given file is not a valid combined file.")
            elif isinstance(self._toc, dict):
                if any(not isinstance(i, int) for i in self._toc.values()):
                    _raise_invalid("The given file is not a valid combined file.")
            else:
                _raise_invalid("The given file is not a valid combined file.")

            self._header_start = ini_position + sep_a
            self._file_start = ini_position + sep_c
            self._buffer.seek(ini_position + sep_c + toc_start)

        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        toc_start: int = self._buffer.tell() - self._file_start
        packed_toc: bytes = packb({"t": self._toc})

        self._buffer.write(packed_toc)
        self._buffer.seek(self._header_start)
        self._buffer.write(packb(toc_start).rjust(10, b"\0"))
        self._buffer.write(packb(len(packed_toc)).rjust(10, b"\0"))

        if not isinstance(self._buffer_or_path, str):
            return

        _upsert(self._buffer, self._buffer_or_path, self._s3fs)

        self._buffer.close()

    def write(self, obj: Generator, name: str | None = None) -> None:
        """
        Write a number of objects to the file.

        :param obj: a generator of objects to be written to the file
        :param name: a name to be assigned to the object, only required when combining in dict mode
        """
        if self._toc is None:
            self._toc = [] if name is None else {}

        if name is None:
            if not isinstance(self._toc, list):
                raise ValueError("Need a name when combining in dict mode.")
        else:
            if not isinstance(self._toc, dict):
                raise ValueError("Cannot assign a name when combining in list mode.")
            if name in self._toc:
                raise ValueError(f"File {name} already exists.")

        start: int = self._buffer.tell() - self._file_start
        for chunk in obj:
            self._buffer.write(chunk)

        if name is None:
            self._toc.append(start)
        else:
            self._toc[name] = start

__init__(buffer_or_path, *, mode='w', s3fs=None)

The mode resembles typical mode designations and implies the same meaning. If the mode is 'w', the file is overwritten. If the mode is 'a', the file is appended.

Parameters:

Name Type Description Default
buffer_or_path str | BufferWriter

target buffer or file path

required
mode Literal['a', 'w']

mode of operation, 'w' for write and 'a' for append

'w'
s3fs

s3fs object (s3fs.S3FileSystem) to be used for storing

None
Source code in src/msglc/writer.py
Python
def __init__(
    self,
    buffer_or_path: str | BufferWriter,
    *,
    mode: Literal["a", "w"] = "w",
    s3fs=None,
):
    """
    The mode resembles typical mode designations and implies the same meaning.
    If the mode is 'w', the file is overwritten.
    If the mode is 'a', the file is appended.

    :param buffer_or_path: target buffer or file path
    :param mode: mode of operation, 'w' for write and 'a' for append
    :param s3fs: s3fs object (s3fs.S3FileSystem) to be used for storing
    """
    self._buffer_or_path: str | BufferWriter = buffer_or_path
    self._mode: str = mode
    self._s3fs = s3fs or config.s3fs

    self._buffer: BufferWriter | TemporaryFile = None  # type: ignore

    self._toc: dict | list = None  # type: ignore
    self._header_start: int = 0
    self._file_start: int = 0

write(obj, name=None)

Write a number of objects to the file.

Parameters:

Name Type Description Default
obj Generator

a generator of objects to be written to the file

required
name str | None

a name to be assigned to the object, only required when combining in dict mode

None
Source code in src/msglc/writer.py
Python
def write(self, obj: Generator, name: str | None = None) -> None:
    """
    Write a number of objects to the file.

    :param obj: a generator of objects to be written to the file
    :param name: a name to be assigned to the object, only required when combining in dict mode
    """
    if self._toc is None:
        self._toc = [] if name is None else {}

    if name is None:
        if not isinstance(self._toc, list):
            raise ValueError("Need a name when combining in dict mode.")
    else:
        if not isinstance(self._toc, dict):
            raise ValueError("Cannot assign a name when combining in list mode.")
        if name in self._toc:
            raise ValueError(f"File {name} already exists.")

    start: int = self._buffer.tell() - self._file_start
    for chunk in obj:
        self._buffer.write(chunk)

    if name is None:
        self._toc.append(start)
    else:
        self._toc[name] = start