Skip to content

LazyWriter

Source code in src/msglc/writer.py
Python
class LazyWriter:
    magic: bytes = b"msglc-2024".rjust(max_magic_len, b"\0")

    @classmethod
    def magic_len(cls) -> int:
        return len(cls.magic)

    @classmethod
    def set_magic(cls, magic: bytes):
        cls.magic = magic.rjust(max_magic_len, b"\0")

    def __init__(
        self,
        buffer_or_path: str | UPath | BufferWriter,
        packer: Packer = None,
        *,
        fs: FileSystem | None = None,
    ):
        """
        It is possible to provide a custom packer object to be used for packing the object.
        However, this packer must be compatible with the `msgpack` packer.

        The `buffer_or_path` can be
        1. a plain `str` pointing to a file on local filesystem, or a file on target filesystem if `fs` is provided,
        2. a `UPath` object that points to a file on supported filesystem (either local or remote),
        3. a `IO` object that has `.tell()`, `.seek()`, `.write()` methods, this object must have random write access.

        Warning:
        Some of backend filesystems only supports sequential write rather than random write thus not all filesystems
        supported by `UPath` can be used.
        One must always check if the target filesystem supports random write access.
        If not, the most generic approach is to provide a plain string for `buffer_or_path` and explicitly assign a `fs` object.
        In this case, a local cache will be used to temporarily handle the serialization and the binary blob
        will be uploaded to the remote once everything is processed.

        :param buffer_or_path: target buffer or file path
        :param packer: packer object to be used for packing the object
        :param fs: `FileSystem` object to be used for storing
        """
        self._buffer_or_path: str | UPath | BufferWriter = buffer_or_path
        self._packer: Packer = packer or Packer()
        self._fs: FileSystem | None = fs or config.fs

        self._buffer: BufferWriter | TemporaryFile = None  # type: ignore
        self._toc_packer: TOC = None  # type: ignore
        self._header_start: int = 0
        self._file_start: int = 0
        self._no_more_writes: bool = False

    def __enter__(self):
        increment_gc_counter()

        if isinstance(self._buffer_or_path, str):
            if self._fs:
                # we need to seek to the beginning and overwrite the header
                # however, s3 does not allow seek in write mode
                # thus use a local temp file as cache
                self._buffer = TemporaryFile()
            else:
                self._buffer = open(
                    self._buffer_or_path, "wb", config.write_buffer_size
                )
        elif isinstance(self._buffer_or_path, UPath):
            self._buffer = self._buffer_or_path.open("wb")
            if not self._buffer.seekable():
                raise ValueError(
                    f"The underlying filesystem of the given UPath ({self._buffer_or_path}) does not support random write."
                )
        elif isinstance(self._buffer_or_path, (BytesIO, BufferedReader)):
            self._buffer = self._buffer_or_path
        else:
            raise ValueError("Expecting a buffer or path.")

        self._buffer.write(self.magic)
        self._header_start = self._buffer.tell()
        self._buffer.write(b"\0" * 20)
        self._file_start = self._buffer.tell()

        self._toc_packer = TOC(packer=self._packer, buffer=self._buffer)

        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        decrement_gc_counter()

        if isinstance(self._buffer_or_path, str):
            _upsert(self._buffer, self._buffer_or_path, self._fs)

        if isinstance(self._buffer_or_path, (str, UPath)):
            self._buffer.close()

    def write(self, obj) -> None:
        """
        This function is used to write the object to the file.

        Only one write is allowed. The function raises a `ValueError` if it is called more than once.

        :param obj: the object to be written to the file
        :raise ValueError: if the function is called more than once
        :return: None
        """
        if self._no_more_writes:
            raise ValueError("No more writes allowed.")

        self._no_more_writes = True

        toc: dict = self._toc_packer.pack(obj)
        toc_start: int = self._buffer.tell() - self._file_start
        packed_toc: bytes = self._packer.pack(toc)

        self._buffer.write(packed_toc)
        self._buffer.seek(self._header_start)
        self._buffer.write(self._packer.pack(toc_start).rjust(10, b"\0"))
        self._buffer.write(self._packer.pack(len(packed_toc)).rjust(10, b"\0"))

__init__(buffer_or_path, packer=None, *, fs=None)

It is possible to provide a custom packer object to be used for packing the object. However, this packer must be compatible with the msgpack packer.

The buffer_or_path can be 1. a plain str pointing to a file on local filesystem, or a file on target filesystem if fs is provided, 2. a UPath object that points to a file on supported filesystem (either local or remote), 3. a IO object that has .tell(), .seek(), .write() methods, this object must have random write access.

Warning: Some of backend filesystems only supports sequential write rather than random write thus not all filesystems supported by UPath can be used. One must always check if the target filesystem supports random write access. If not, the most generic approach is to provide a plain string for buffer_or_path and explicitly assign a fs object. In this case, a local cache will be used to temporarily handle the serialization and the binary blob will be uploaded to the remote once everything is processed.

Parameters:

Name Type Description Default
buffer_or_path str | UPath | BufferWriter

target buffer or file path

required
packer Packer

packer object to be used for packing the object

None
fs FileSystem | None

FileSystem object to be used for storing

None
Source code in src/msglc/writer.py
Python
def __init__(
    self,
    buffer_or_path: str | UPath | BufferWriter,
    packer: Packer = None,
    *,
    fs: FileSystem | None = None,
):
    """
    It is possible to provide a custom packer object to be used for packing the object.
    However, this packer must be compatible with the `msgpack` packer.

    The `buffer_or_path` can be
    1. a plain `str` pointing to a file on local filesystem, or a file on target filesystem if `fs` is provided,
    2. a `UPath` object that points to a file on supported filesystem (either local or remote),
    3. a `IO` object that has `.tell()`, `.seek()`, `.write()` methods, this object must have random write access.

    Warning:
    Some of backend filesystems only supports sequential write rather than random write thus not all filesystems
    supported by `UPath` can be used.
    One must always check if the target filesystem supports random write access.
    If not, the most generic approach is to provide a plain string for `buffer_or_path` and explicitly assign a `fs` object.
    In this case, a local cache will be used to temporarily handle the serialization and the binary blob
    will be uploaded to the remote once everything is processed.

    :param buffer_or_path: target buffer or file path
    :param packer: packer object to be used for packing the object
    :param fs: `FileSystem` object to be used for storing
    """
    self._buffer_or_path: str | UPath | BufferWriter = buffer_or_path
    self._packer: Packer = packer or Packer()
    self._fs: FileSystem | None = fs or config.fs

    self._buffer: BufferWriter | TemporaryFile = None  # type: ignore
    self._toc_packer: TOC = None  # type: ignore
    self._header_start: int = 0
    self._file_start: int = 0
    self._no_more_writes: bool = False

write(obj)

This function is used to write the object to the file.

Only one write is allowed. The function raises a ValueError if it is called more than once.

Parameters:

Name Type Description Default
obj

the object to be written to the file

required

Returns:

Type Description
None

None

Raises:

Type Description
ValueError

if the function is called more than once

Source code in src/msglc/writer.py
Python
def write(self, obj) -> None:
    """
    This function is used to write the object to the file.

    Only one write is allowed. The function raises a `ValueError` if it is called more than once.

    :param obj: the object to be written to the file
    :raise ValueError: if the function is called more than once
    :return: None
    """
    if self._no_more_writes:
        raise ValueError("No more writes allowed.")

    self._no_more_writes = True

    toc: dict = self._toc_packer.pack(obj)
    toc_start: int = self._buffer.tell() - self._file_start
    packed_toc: bytes = self._packer.pack(toc)

    self._buffer.write(packed_toc)
    self._buffer.seek(self._header_start)
    self._buffer.write(self._packer.pack(toc_start).rjust(10, b"\0"))
    self._buffer.write(self._packer.pack(len(packed_toc)).rjust(10, b"\0"))