Skip to content

LazyWriter

Bases: LazyBuffer

Source code in src/msglc/writer.py
class LazyWriter(LazyBuffer):
    magic: bytes = b"msglc-2024".rjust(max_magic_len, b"\0")

    @classmethod
    def magic_len(cls) -> int:
        return len(cls.magic)

    @classmethod
    def set_magic(cls, magic: bytes):
        cls.magic = magic.rjust(max_magic_len, b"\0")

    def __init__(
        self,
        buffer_or_path: str | UPath | BufferWriter,
        *,
        packer: type[LazyCodec] | LazyCodec | None = None,
        fs: FileSystem | None = None,
        toc_cls: type[TOC] | None = None,
    ):
        """
        It is possible to provide a custom packer object to be used for packing the object.
        However, this packer must be implement `encode` and `decode` methods.

        The `buffer_or_path` can be
        1. a plain `str` pointing to a file on local filesystem, or a file on target filesystem if `fs` is provided,
        2. a `UPath` object that points to a file on supported filesystem (either local or remote),
        3. a `IO` object that has `.tell()`, `.seek()`, `.write()` methods, this object must have random write access.

        It is possible to provide a customized TOC packer via the `toc_cls` parameter to customize how TOC is generated.
        It will be initialized with two keyword arguments: `packer=self._packer, buffer=self._buffer`
        where `packer` is the packer object to be used for encoding `msgpack` data and
        `buffer` is a file-like `IO` object where serialized data will be written to.
        It needs to have a public method `def pack(self, obj) -> dict: ...` that will be called by the writer.
        See the implementation of `TOC` class for further details.

        Warning:
        Some of backend filesystems only supports sequential write rather than random write thus not all filesystems
        supported by `UPath` can be used.
        One must always check if the target filesystem supports random write access.
        If not, the most generic approach is to provide a plain string for `buffer_or_path` and explicitly assign a `fs` object.
        In this case, a local cache will be used to temporarily handle the serialization and the binary blob
        will be uploaded to the remote once everything is processed.

        :param buffer_or_path: target buffer or file path
        :param packer: packer object to be used for packing the object
        :param fs: `FileSystem` object to be used for storing
        :param toc_cls: a `TOC` packer class
        """
        super().__init__(buffer_or_path, packer, fs)

        self._toc_packer: TOC = None  # type: ignore
        self._toc_cls = toc_cls or TOC
        self._no_more_writes: bool = False

    def __enter__(self):
        increment_gc_counter()

        if isinstance(self._buffer_or_path, str):
            if self._fs:
                # we need to seek to the beginning and overwrite the header
                # however, s3 does not allow seek in write mode
                # thus use a local temp file as cache
                self._buffer = TemporaryFile()
            else:
                self._buffer = open(
                    self._buffer_or_path, "wb", config.write_buffer_size
                )
        elif isinstance(self._buffer_or_path, UPath):
            self._buffer = self._buffer_or_path.open("wb", config.write_buffer_size)
            if not self._buffer.seekable():
                self._unseekable_upath = True
                self._buffer.close()
                self._buffer = TemporaryFile()
        elif isinstance(self._buffer_or_path, (BytesIO, BufferedReader)):
            self._buffer = self._buffer_or_path
        else:
            raise ValueError("Expecting a buffer or path.")

        self._buffer.write(self.magic)
        self._header_start = self._buffer.tell()
        self._buffer.write(b"\0" * 20)
        self._file_start = self._buffer.tell()

        self._toc_packer = self._toc_cls(packer=self._packer, buffer=self._buffer)

        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        decrement_gc_counter()

        self.cleanup()

    def write(self, obj) -> None:
        """
        This function is used to write the object to the file.

        Only one write is allowed. The function raises a `ValueError` if it is called more than once.

        :param obj: the object to be written to the file
        :raise ValueError: if the function is called more than once
        :return: None
        """
        if self._no_more_writes:
            raise ValueError("No more writes allowed.")

        self._no_more_writes = True

        self._finalize(self._toc_packer.pack(obj))

__init__(buffer_or_path, *, packer=None, fs=None, toc_cls=None)

It is possible to provide a custom packer object to be used for packing the object. However, this packer must be implement encode and decode methods.

The buffer_or_path can be 1. a plain str pointing to a file on local filesystem, or a file on target filesystem if fs is provided, 2. a UPath object that points to a file on supported filesystem (either local or remote), 3. a IO object that has .tell(), .seek(), .write() methods, this object must have random write access.

It is possible to provide a customized TOC packer via the toc_cls parameter to customize how TOC is generated. It will be initialized with two keyword arguments: packer=self._packer, buffer=self._buffer where packer is the packer object to be used for encoding msgpack data and buffer is a file-like IO object where serialized data will be written to. It needs to have a public method def pack(self, obj) -> dict: ... that will be called by the writer. See the implementation of TOC class for further details.

Warning: Some of backend filesystems only supports sequential write rather than random write thus not all filesystems supported by UPath can be used. One must always check if the target filesystem supports random write access. If not, the most generic approach is to provide a plain string for buffer_or_path and explicitly assign a fs object. In this case, a local cache will be used to temporarily handle the serialization and the binary blob will be uploaded to the remote once everything is processed.

Parameters:

Name Type Description Default
buffer_or_path str | UPath | BufferWriter

target buffer or file path

required
packer type[LazyCodec] | LazyCodec | None

packer object to be used for packing the object

None
fs FileSystem | None

FileSystem object to be used for storing

None
toc_cls type[TOC] | None

a TOC packer class

None
Source code in src/msglc/writer.py
def __init__(
    self,
    buffer_or_path: str | UPath | BufferWriter,
    *,
    packer: type[LazyCodec] | LazyCodec | None = None,
    fs: FileSystem | None = None,
    toc_cls: type[TOC] | None = None,
):
    """
    It is possible to provide a custom packer object to be used for packing the object.
    However, this packer must be implement `encode` and `decode` methods.

    The `buffer_or_path` can be
    1. a plain `str` pointing to a file on local filesystem, or a file on target filesystem if `fs` is provided,
    2. a `UPath` object that points to a file on supported filesystem (either local or remote),
    3. a `IO` object that has `.tell()`, `.seek()`, `.write()` methods, this object must have random write access.

    It is possible to provide a customized TOC packer via the `toc_cls` parameter to customize how TOC is generated.
    It will be initialized with two keyword arguments: `packer=self._packer, buffer=self._buffer`
    where `packer` is the packer object to be used for encoding `msgpack` data and
    `buffer` is a file-like `IO` object where serialized data will be written to.
    It needs to have a public method `def pack(self, obj) -> dict: ...` that will be called by the writer.
    See the implementation of `TOC` class for further details.

    Warning:
    Some of backend filesystems only supports sequential write rather than random write thus not all filesystems
    supported by `UPath` can be used.
    One must always check if the target filesystem supports random write access.
    If not, the most generic approach is to provide a plain string for `buffer_or_path` and explicitly assign a `fs` object.
    In this case, a local cache will be used to temporarily handle the serialization and the binary blob
    will be uploaded to the remote once everything is processed.

    :param buffer_or_path: target buffer or file path
    :param packer: packer object to be used for packing the object
    :param fs: `FileSystem` object to be used for storing
    :param toc_cls: a `TOC` packer class
    """
    super().__init__(buffer_or_path, packer, fs)

    self._toc_packer: TOC = None  # type: ignore
    self._toc_cls = toc_cls or TOC
    self._no_more_writes: bool = False

write(obj)

This function is used to write the object to the file.

Only one write is allowed. The function raises a ValueError if it is called more than once.

Parameters:

Name Type Description Default
obj

the object to be written to the file

required

Returns:

Type Description
None

None

Raises:

Type Description
ValueError

if the function is called more than once

Source code in src/msglc/writer.py
def write(self, obj) -> None:
    """
    This function is used to write the object to the file.

    Only one write is allowed. The function raises a `ValueError` if it is called more than once.

    :param obj: the object to be written to the file
    :raise ValueError: if the function is called more than once
    :return: None
    """
    if self._no_more_writes:
        raise ValueError("No more writes allowed.")

    self._no_more_writes = True

    self._finalize(self._toc_packer.pack(obj))