Skip to content

Utility Functions

The following are main utility functions to create archive.

FileInfo

Wrap the file path or in memory buffer and name into a FileInfo object. The name is optional and is only used when the file is combined in the dictionary (key-value) mode.

The s3fs can be different for each FileInfo object, meaning it is possible to combine files from different sources. It is not affected by the global s3fs object stored in config.

Parameters:

Name Type Description Default
path str | BinaryIO

a string representing the file path or an in memory buffer

required
name str | None

key name of the content in the combined dict

None
s3fs

s3fs object (s3fs.S3FileSystem) to read the object from

None
Source code in src/msglc/__init__.py
Python
class FileInfo:
    """
    Wrap the file path or in memory buffer and name into a FileInfo object.
    The `name` is optional and is only used when the file is combined in the dictionary (key-value) mode.

    The `s3fs` can be different for each `FileInfo` object, meaning it is possible to combine files from different sources.
    It is not affected by the global `s3fs` object stored in `config`.

    :param path: a string representing the file path or an in memory buffer
    :param name: key name of the content in the combined dict
    :param s3fs: s3fs object (s3fs.S3FileSystem) to read the object from
    """

    def __init__(self, path: str | BinaryIO, name: str | None = None, *, s3fs=None):
        self.path = path
        self.name = name
        self._s3fs = s3fs

    def exists(self):
        if not isinstance(self.path, str):
            return True

        if self._s3fs:
            return self._s3fs.exists(self.path)

        return os.path.exists(self.path)

    def open(self):
        if not isinstance(self.path, str):
            return nullcontext(self.path)

        return self._s3fs.open(self.path) if self._s3fs else open(self.path, "rb")

dump

This function is used to write the object to the file.

Parameters:

Name Type Description Default
file str | BytesIO

a string representing the file path

required
obj

the object to be written to the file

required
kwargs

additional keyword arguments to be passed to the LazyWriter

{}

Returns:

Type Description

None

Source code in src/msglc/__init__.py
Python
def dump(file: str | BytesIO, obj, **kwargs):
    """
    This function is used to write the object to the file.

    :param file: a string representing the file path
    :param obj: the object to be written to the file
    :param kwargs: additional keyword arguments to be passed to the `LazyWriter`
    :return: None
    """
    with LazyWriter(file, **kwargs) as msglc_writer:
        msglc_writer.write(obj)

combine

This function is used to combine the multiple serialized files into a single archive. If s3fs is given, the combined archive will be uploaded to S3.

The files to be combined must exist in local filesystem regardless of whether s3fs is given. In other words, only local files can be combined.

Parameters:

Name Type Description Default
archive str | BytesIO

a string representing the file path of the archive

required
files FileInfo | list[FileInfo]

a list of FileInfo objects

required
mode Literal['a', 'w']

a string representing the combination mode, 'w' for write and 'a' for append

'w'
validate bool

switch on to validate the files before combining

True
s3fs

s3fs object (s3fs.S3FileSystem) to be used for storing

None

Returns:

Type Description

None

Source code in src/msglc/__init__.py
Python
def combine(
    archive: str | BytesIO,
    files: FileInfo | list[FileInfo],
    *,
    mode: Literal["a", "w"] = "w",
    validate: bool = True,
    s3fs=None,
):
    """
    This function is used to combine the multiple serialized files into a single archive.
    If `s3fs` is given, the combined archive will be uploaded to S3.

    The files to be combined must exist in local filesystem regardless of whether `s3fs` is given.
    In other words, only local files can be combined.

    :param archive: a string representing the file path of the archive
    :param files: a list of FileInfo objects
    :param mode: a string representing the combination mode, 'w' for write and 'a' for append
    :param validate: switch on to validate the files before combining
    :param s3fs: s3fs object (s3fs.S3FileSystem) to be used for storing
    :return: None
    """
    if isinstance(files, FileInfo):
        files = [files]

    if 0 < sum(1 for file in files if file.name is not None) < len(files):
        raise ValueError("Files must either all have names or all not have names.")

    if len(all_names := {file.name for file in files}) != len(files) and (
        len(all_names) != 1 or all_names.pop() is not None
    ):
        raise ValueError("Files must have unique names.")

    def _validate(_fp: FileInfo):
        if isinstance(_fp.path, str):
            if not _fp.exists():
                raise ValueError(f"File {_fp.path} does not exist.")
            with _fp.open() as _file:
                if _file.read(LazyWriter.magic_len()) != LazyWriter.magic:
                    raise ValueError(f"Invalid file format: {_fp.path}.")
        else:
            with _fp.open() as _file:
                ini_pos = _file.tell()
                magic = _file.read(LazyWriter.magic_len())
                _file.seek(ini_pos)
                if magic != LazyWriter.magic:
                    raise ValueError("Invalid file format.")

    if validate:
        for file in files:
            _validate(file)

    def _iter(_fp: FileInfo):
        with _fp.open() as _file:
            while _data := _file.read(config.copy_chunk_size):
                yield _data

    with LazyCombiner(archive, mode=mode, s3fs=s3fs) as combiner:
        for file in files:
            combiner.write(_iter(file), file.name)

append

This function is used to append the multiple serialized files to an existing single archive. If s3fs is given, the target will be downloaded first if it exists in the bucket. The final archive will be uploaded to S3.

The files to be appended must exist in local filesystem regardless of whether s3fs is given.

Parameters:

Name Type Description Default
archive str | BytesIO

a string representing the file path of the archive

required
files FileInfo | list[FileInfo]

a list of FileInfo objects

required
validate bool

switch on to validate the files before combining

True
s3fs

s3fs object (s3fs.S3FileSystem) to be used for storing

None

Returns:

Type Description

None

Source code in src/msglc/__init__.py
Python
def append(
    archive: str | BytesIO,
    files: FileInfo | list[FileInfo],
    *,
    validate: bool = True,
    s3fs=None,
):
    """
    This function is used to append the multiple serialized files to an existing single archive.
    If `s3fs` is given, the target will be downloaded first if it exists in the bucket.
    The final archive will be uploaded to S3.

    The files to be appended must exist in local filesystem regardless of whether `s3fs` is given.

    :param archive: a string representing the file path of the archive
    :param files: a list of FileInfo objects
    :param validate: switch on to validate the files before combining
    :param s3fs: s3fs object (s3fs.S3FileSystem) to be used for storing
    :return: None
    """
    combine(archive, files, mode="a", validate=validate, s3fs=s3fs)

configure

This function is used to configure the settings. It accepts any number of keyword arguments. The function updates the values of the configuration parameters if they are provided in the arguments.

Parameters:

Name Type Description Default
small_obj_optimization_threshold int | None

The threshold (in bytes) for small object optimization. Objects smaller than this threshold are not indexed.

None
write_buffer_size int | None

The size (in bytes) for the write buffer.

None
read_buffer_size int | None

The size (in bytes) for the read buffer.

None
fast_loading bool | None

Flag to enable or disable fast loading. If enabled, the container will be read in one go, instead of reading each child separately.

None
fast_loading_threshold int | float | None

The threshold (0 to 1) for fast loading. With the fast loading flag turned on, fast loading will be performed if the number of already read children over the total number of children is smaller than this threshold.

None
trivial_size int | None

The size (in bytes) considered trivial, around a dozen bytes. Objects smaller than this size are considered trivial. For a list of trivial objects, the container will be indexed in a blocked fashion.

None
disable_gc bool | None

Flag to enable or disable garbage collection.

None
simple_repr bool | None

Flag to enable or disable simple representation used in the repr method. If turned on, repr will not incur any disk I/O.

None
copy_chunk_size int | None

The size (in bytes) for the copy chunk.

None
numpy_encoder bool | None

Flag to enable or disable the numpy support. If enabled, the numpy arrays will be encoded using the dumps method provided by numpy. The arrays are stored as binary data directly. If disabled, the numpy arrays will be converted to lists before encoding.

None
numpy_fast_int_pack bool | None

If enabled, the integer numpy array will be packed assigning each element has identical size (4 or 8 bytes). This improves the performance of packing by avoiding the overhead of checking the size of each element. However, depending on the backend, for example, messagepack C implementation packs unsigned long long or long long. But its python implementation packs integer of various lengths (1, 2, 3, 5, 9 bytes).

None
magic bytes | None

Magic bytes (max length: 30) to set, used to identify the file format version.

None
s3fs

The global S3FileSystem object that will be used by default so that there is no need to provide this for every function call. It is used to 1) read data by readers, 2) write output by writers/combiners. To specify where combiners read input files from, assign a specific S3FileSystem object to each FileInfo.

None
Source code in src/msglc/config.py
Python
def configure(
    *,
    small_obj_optimization_threshold: int | None = None,
    write_buffer_size: int | None = None,
    read_buffer_size: int | None = None,
    fast_loading: bool | None = None,
    fast_loading_threshold: int | float | None = None,
    trivial_size: int | None = None,
    disable_gc: bool | None = None,
    simple_repr: bool | None = None,
    copy_chunk_size: int | None = None,
    numpy_encoder: bool | None = None,
    numpy_fast_int_pack: bool | None = None,
    magic: bytes | None = None,
    s3fs=None,
):
    """
    This function is used to configure the settings. It accepts any number of keyword arguments.
    The function updates the values of the configuration parameters if they are provided in the arguments.

    :param small_obj_optimization_threshold:
            The threshold (in bytes) for small object optimization.
            Objects smaller than this threshold are not indexed.
    :param write_buffer_size:
            The size (in bytes) for the write buffer.
    :param read_buffer_size:
            The size (in bytes) for the read buffer.
    :param fast_loading:
            Flag to enable or disable fast loading.
            If enabled, the container will be read in one go, instead of reading each child separately.
    :param fast_loading_threshold:
            The threshold (0 to 1) for fast loading.
            With the fast loading flag turned on, fast loading will be performed if the number of
            already read children over the total number of children is smaller than this threshold.
    :param trivial_size:
            The size (in bytes) considered trivial, around a dozen bytes.
            Objects smaller than this size are considered trivial.
            For a list of trivial objects, the container will be indexed in a blocked fashion.
    :param disable_gc:
            Flag to enable or disable garbage collection.
    :param simple_repr:
            Flag to enable or disable simple representation used in the __repr__ method.
            If turned on, __repr__ will not incur any disk I/O.
    :param copy_chunk_size:
            The size (in bytes) for the copy chunk.
    :param numpy_encoder:
            Flag to enable or disable the `numpy` support.
            If enabled, the `numpy` arrays will be encoded using the `dumps` method provided by `numpy`.
            The arrays are stored as binary data directly.
            If disabled, the `numpy` arrays will be converted to lists before encoding.
    :param numpy_fast_int_pack:
            If enabled, the integer numpy array will be packed assigning each element has identical size (4 or 8 bytes).
            This improves the performance of packing by avoiding the overhead of checking the size of each element.
            However, depending on the backend, for example, `messagepack` C implementation packs unsigned long long or long long.
            But its python implementation packs integer of various lengths (1, 2, 3, 5, 9 bytes).
    :param magic:
            Magic bytes (max length: 30) to set, used to identify the file format version.
    :param s3fs:
            The global `S3FileSystem` object that will be used by default so that there is no need to provide this for every function call.
            It is used to 1) read data by readers, 2) write output by writers/combiners.
            To specify where combiners read input files from, assign a specific `S3FileSystem` object to each `FileInfo`.
    """
    if (
        isinstance(small_obj_optimization_threshold, int)
        and small_obj_optimization_threshold > 0
    ):
        config.small_obj_optimization_threshold = small_obj_optimization_threshold
        if config.trivial_size > config.small_obj_optimization_threshold:
            config.trivial_size = config.small_obj_optimization_threshold

    if isinstance(write_buffer_size, int) and write_buffer_size > 0:
        config.write_buffer_size = write_buffer_size

    if isinstance(read_buffer_size, int) and read_buffer_size > 0:
        config.read_buffer_size = read_buffer_size

    if isinstance(fast_loading, bool):
        config.fast_loading = fast_loading

    if (
        isinstance(fast_loading_threshold, (int, float))
        and 0 <= fast_loading_threshold <= 1
    ):
        config.fast_loading_threshold = fast_loading_threshold

    if isinstance(trivial_size, int) and trivial_size > 0:
        config.trivial_size = trivial_size
        if config.trivial_size > config.small_obj_optimization_threshold:
            config.small_obj_optimization_threshold = config.trivial_size

    if isinstance(disable_gc, bool):
        config.disable_gc = disable_gc

    if isinstance(simple_repr, bool):
        config.simple_repr = simple_repr

    if isinstance(copy_chunk_size, int) and copy_chunk_size > 0:
        config.copy_chunk_size = copy_chunk_size

    if isinstance(numpy_encoder, bool):
        config.numpy_encoder = numpy_encoder

    if isinstance(numpy_fast_int_pack, bool):
        config.numpy_fast_int_pack = numpy_fast_int_pack

    config.s3fs = s3fs

    if isinstance(magic, bytes) and 0 < len(magic) <= max_magic_len:
        from msglc import LazyWriter

        LazyWriter.set_magic(magic)