Skip to content

LazyWriter

Source code in src/msglc/writer.py
Python
class LazyWriter:
    magic: bytes = b"msglc-2024".rjust(max_magic_len, b"\0")

    @classmethod
    def magic_len(cls) -> int:
        return len(cls.magic)

    @classmethod
    def set_magic(cls, magic: bytes):
        cls.magic = magic.rjust(max_magic_len, b"\0")

    def __init__(
        self, buffer_or_path: str | BufferWriter, packer: Packer = None, *, s3fs=None
    ):
        """
        It is possible to provide a custom packer object to be used for packing the object.
        However, this packer must be compatible with the `msgpack` packer.

        :param buffer_or_path: target buffer or file path
        :param packer: packer object to be used for packing the object
        :param s3fs: s3fs object (s3fs.S3FileSystem) to be used for storing
        """
        self._buffer_or_path: str | BufferWriter = buffer_or_path
        self._packer = packer if packer else Packer()
        self._s3fs = s3fs or config.s3fs

        self._buffer: BufferWriter | TemporaryFile = None  # type: ignore
        self._toc_packer: TOC = None  # type: ignore
        self._header_start: int = 0
        self._file_start: int = 0
        self._no_more_writes: bool = False

    def __enter__(self):
        increment_gc_counter()

        if isinstance(self._buffer_or_path, str):
            if self._s3fs:
                # we need to seek to the beginning and overwrite the header
                # however, s3 does not allow seek in write mode
                # thus use a local temp file as cache
                self._buffer = TemporaryFile()
            else:
                self._buffer = open(
                    self._buffer_or_path, "wb", buffering=config.write_buffer_size
                )
        elif isinstance(self._buffer_or_path, (BytesIO, BufferedReader)):
            self._buffer = self._buffer_or_path
        else:
            raise ValueError("Expecting a buffer or path.")

        self._buffer.write(self.magic)
        self._header_start = self._buffer.tell()
        self._buffer.write(b"\0" * 20)
        self._file_start = self._buffer.tell()

        self._toc_packer = TOC(packer=self._packer, buffer=self._buffer)

        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        decrement_gc_counter()

        if not isinstance(self._buffer_or_path, str):
            return

        _upsert(self._buffer, self._buffer_or_path, self._s3fs)

        self._buffer.close()

    def write(self, obj) -> None:
        """
        This function is used to write the object to the file.

        Only one write is allowed. The function raises a `ValueError` if it is called more than once.

        :param obj: the object to be written to the file
        :raise ValueError: if the function is called more than once
        :return: None
        """
        if self._no_more_writes:
            raise ValueError("No more writes allowed.")

        self._no_more_writes = True

        toc: dict = self._toc_packer.pack(obj)
        toc_start: int = self._buffer.tell() - self._file_start
        packed_toc: bytes = self._packer.pack(toc)

        self._buffer.write(packed_toc)
        self._buffer.seek(self._header_start)
        self._buffer.write(self._packer.pack(toc_start).rjust(10, b"\0"))
        self._buffer.write(self._packer.pack(len(packed_toc)).rjust(10, b"\0"))

__init__(buffer_or_path, packer=None, *, s3fs=None)

It is possible to provide a custom packer object to be used for packing the object. However, this packer must be compatible with the msgpack packer.

Parameters:

Name Type Description Default
buffer_or_path str | BufferWriter

target buffer or file path

required
packer Packer

packer object to be used for packing the object

None
s3fs

s3fs object (s3fs.S3FileSystem) to be used for storing

None
Source code in src/msglc/writer.py
Python
def __init__(
    self, buffer_or_path: str | BufferWriter, packer: Packer = None, *, s3fs=None
):
    """
    It is possible to provide a custom packer object to be used for packing the object.
    However, this packer must be compatible with the `msgpack` packer.

    :param buffer_or_path: target buffer or file path
    :param packer: packer object to be used for packing the object
    :param s3fs: s3fs object (s3fs.S3FileSystem) to be used for storing
    """
    self._buffer_or_path: str | BufferWriter = buffer_or_path
    self._packer = packer if packer else Packer()
    self._s3fs = s3fs or config.s3fs

    self._buffer: BufferWriter | TemporaryFile = None  # type: ignore
    self._toc_packer: TOC = None  # type: ignore
    self._header_start: int = 0
    self._file_start: int = 0
    self._no_more_writes: bool = False

write(obj)

This function is used to write the object to the file.

Only one write is allowed. The function raises a ValueError if it is called more than once.

Parameters:

Name Type Description Default
obj

the object to be written to the file

required

Returns:

Type Description
None

None

Raises:

Type Description
ValueError

if the function is called more than once

Source code in src/msglc/writer.py
Python
def write(self, obj) -> None:
    """
    This function is used to write the object to the file.

    Only one write is allowed. The function raises a `ValueError` if it is called more than once.

    :param obj: the object to be written to the file
    :raise ValueError: if the function is called more than once
    :return: None
    """
    if self._no_more_writes:
        raise ValueError("No more writes allowed.")

    self._no_more_writes = True

    toc: dict = self._toc_packer.pack(obj)
    toc_start: int = self._buffer.tell() - self._file_start
    packed_toc: bytes = self._packer.pack(toc)

    self._buffer.write(packed_toc)
    self._buffer.seek(self._header_start)
    self._buffer.write(self._packer.pack(toc_start).rjust(10, b"\0"))
    self._buffer.write(self._packer.pack(len(packed_toc)).rjust(10, b"\0"))