biglist#

Creating a Biglist#

Create a new Biglist object via the classmethod new():

>>> from cloudly.biglist import Biglist
>>> mylist = Biglist.new(batch_size=100)

then add data to it, for example,

>>> for x in range(10_023):
...     mylist.append(x)

This saves a new data file for every 100 elements accumulated. In the end, there are 23 elements in a memory buffer that are not yet persisted to disk. The code has no way to know whether we will append more elements soon, hence it does not save this partial batch. Suppose we’re done with adding data, we call flush() to persist the content of the buffer to disk:

>>> mylist.flush()

If, after a while, we decide to append more data to mylist, we just call append() again. We can continue to add more data as long as the disk has space. New data files will be saved. The smaller file containing 23 elements will stay there among larger files with no problem.

Now let’s take a look at the biglist object:

>>> mylist  
<Biglist at '/tmp/19f88a17-3e78-430f-aad0-a35d39485f80' with 10023 elements in 101 data file(s)>
>>> len(mylist)
10023
>>> mylist.path  
LocalUpath('/tmp/19f88a17-3e78-430f-aad0-a35d39485f80')
>>> mylist.num_data_files
101

The data have been saved in the directory /tmp/19f88a17-3e78-430f-aad0-a35d39485f80, which is a temporary one because we did not tell new() where to save data. When the object mylist gets garbage collected, this directory will be deleted automatically. This has its uses, but often we want to save the data for future use. In that case, just pass a currently non-existent directory to new(), for example,

>>> yourlist = Biglist.new('/tmp/project/data/store-a', batch_size=10_000)

Later, initiate a Biglist object for reading the existing dataset:

>>> yourlist = Biglist('/tmp/project/data/store-a')

If we want to persist the data in Google Cloud Storage, we would specify a path in the 'gs://bucket-name/path/to/data' format.

The Seq protocol and FileReader class#

Before going further with Biglist, let’s digress a bit and introduce a few helper facilities.

BiglistBase (and its subclasses Biglist and ExternalBiglist) could have implemented the |Sequence|_ interface in the standard library. However, that interface contains a few methods that are potentially hugely inefficient for Biglist, and hence are not supposed to be used on a Biglist. These methods include __contains__, count, and index. These methods require iterating over the entire dataset for purposes about one particular data item. For Biglist, this would require loading and unloading each of a possibly large number of data files. Biglist does not want to give user the illusion that they can use these methods at will and lightly.

For this reason, the protocol Seq is defined, which has three methods: __len__(), __iter__(), and __getitem__(). Therefore, classes that implement this protocol are Sized, Iterable, and support random element access by index. Most classes in the biglist package implement this protocol rather than the standard |Sequence|_.

Because a biglist manages any number of data files, a basic operation concerns reading one data file. Each subclass of BiglistBase implements its file-reading class as a subclass of FileReader. FileReader implements the Seq protocol, hence the data items in one data file can be used like a list. Importantly, a FileReader instance does not load the data file upon initialization. At that moment, the instance can be pickled. This lends itself to uses in multiprocessing. This point of the design will be showcased later.

The centerpiece of a biglist is a sequence of data files in persistence, or correspondingly, a sequence of FileReader’s in memory. The property BiglistBase.files() returns a FileSeq to manage the FileReader objects of the biglist.

Finally, BiglistBase implements the Seq protocol for its data items across the data files.

To sum up, a BiglistBase is a Seq of data items across data files; BiglistBase.files is a FileSeq, which in turn is a Seq of FileReaders; a FileReader is a Seq of data items in one data file.

Reading a Biglist#

Random element access#

We can access any element of a Biglist like we do a list:

>>> mylist[18]
18
>>> mylist[-3]
10020

Biglist does not support slicing directly. However, the class Slicer wraps a Seq and enables element access by a single index, by a slice, or by a list of indices:

>>> from cloudly.util.seq import Slicer
>>> v = Slicer(mylist)
>>> len(v)
10023
>>> v  
<Slicer into 10023/10023 of <Biglist at '/tmp/dc260854-8041-40e8-801c-34084451d7a3' with 10023 elements in 101 data file(s)>>
>>> v[83]
83
>>> v[100:104]  
<Slicer into 4/10023 of <Biglist at '/tmp/dc260854-8041-40e8-801c-34084451d7a3' with 10023 elements in 101 data file(s)>>
>>>

Note that slicing the slicer does not return a list of values. Instead, it returns another Slicer object, which, naturally, can be used the same way, including slicing further.

A Slicer object is a |Iterable|_ (in fact, it is a Seq), hence we can gather all of its elements in a list:

>>> list(v[100:104])
[100, 101, 102, 103]

Slicer provides a convenience method collect() to do the same:

>>> v[100:104].collect()
[100, 101, 102, 103]

A few more examples:

>>> v[-8:].collect()
[10015, 10016, 10017, 10018, 10019, 10020, 10021, 10022]
>>> v[-8::2].collect()
[10015, 10017, 10019, 10021]
>>> v[[1, 83, 250, -2]]  
<Slicer into 4/10023 of <Biglist at '/tmp/dc260854-8041-40e8-801c-34084451d7a3' with 10023 elements in 101 data file(s)>>
>>> v[[1, 83, 250, -2]].collect()
[1, 83, 250, 10021]
>>> v[[1, 83, 250, -2]][-3:].collect()
[83, 250, 10021]

Iteration#

Don’t be carried away by the many easy and flexible ways of random access. Random element access for Biglist is inefficient. The reason is that it needs to load a data file that contains the element of interest. If the biglist has many data files and we are “jumping around” randomly, it is wasting a lot of time loading entire files just to access a few data elements in them. (However, consecutive random accesses to elements residing in the same file will not load the file repeatedly.)

The preferred way to consume the data of a Biglist is to iterate over it. For example,

>>> for i, x in enumerate(mylist):
...     print(x)
...     if i > 4:
...         break
0
1
2
3
4
5

Conceptually, this loads each data file in turn and yields the elements in each file. The implementation “pre-loads” a few files in background threads to speed up the iteration.

Reading from a Biglist in multiple processes#

To collectively consume a Biglist object from multiple processes, we can distribute FileReaders to the processes. The FileReader’s of mylist is accessed via its property files(), which returns a FileSeq:

>>> files = mylist.files
>>> files  
<BiglistFileSeq at '/tmp/dc260854-8041-40e8-801c-34084451d7a3' with 10023 elements in 101 data file(s)>
>>> len(files)
101
>>> files.num_data_files
101
>>> files.num_data_items
10023
>>> files[0]  
<BiglistFileReader for '/tmp/cfb39dc0-94bb-4557-a056-c7cea20ea653/store/1669667946.647939_46eb97f6-bdf3-45d2-809c-b90c613d69c7_100.pickle_zstd'>

A FileReader object is light-weight. Upon initialization, it has not loaded the file yet—it merely records the file path along with the function that will be used to load the file. In addition, FileReader objects are friendly to pickling, hence lend themselves to multiprocessing code. Let’s design a small experiment to consume this dataset in multiple processes:

>>> def worker(file_reader):
...     total = 0
...     for x in file_reader:
...         total += x
...     return total
>>> from concurrent.futures import ProcessPoolExecutor
>>> total = 0
>>> with ProcessPoolExecutor(5) as pool:  
...     tasks = [pool.submit(worker, fr) for fr in mylist.files]  
...     for t in tasks:  
...         total += t.result()  
>>> total  
50225253

What is the expected result?

>>> sum(mylist)
50225253

Sure enough, this verifies that the entire biglist is consumed by the processes collectively.

If the file loading is the bottleneck of the task, we can use threads in place of processes.

Similarly, it is possible to read mylist from multiple machines if mylist is stored in the cloud. Since a FileReader object is pickle-able, it works just fine if we pickle it and send it to another machine, provided the file path that is contained in the FileReader object is in the cloud, hence accessible from the other machine. We need a mechanism to distribute these FileReader objects to machines. For that, check out Multiplexer from upathlib.

Writing to a Biglist in multiple workers#

The flip side of distributed reading is distributed writing. If we have a biglist on the local disk, we can append to it from multiple processes or threads. If we have a biglist in the cloud, we can append to it from multiple machines. Let’s use multiprocessing to demo the idea.

First, we create a new Biglist at a storage location of our choosing:

>>> from cloudly.upathlib import LocalUpath
>>> path = LocalUpath('/tmp/a/b/c/d')
>>> path.rmrf()
0
>>> yourlist = Biglist.new(path, batch_size=6)

In each worker process, we will open this biglist by Biglist(path) and append data to it. Now that this has a presence on the disk, Biglist(path) will not complain the dataset does not exist.

>>> yourlist.info
{'storage_format': 'pickle-zstd', 'datafile_ext': 'pickle_zstd', 'storage_version': 3, 'batch_size': 6, 'data_files_info': []}
>>> yourlist.path
LocalUpath('/tmp/a/b/c/d')
>>> len(yourlist)
0

Then we can tell workers, “here is the location, add data to it.” Let’s design a simple worker:

>>> def worker(path, idx):
...     yourlist = Biglist(path)
...     for i in range(idx):
...         yourlist.append(100 * idx + i)
...     yourlist.flush()

From the main process, let’s instruct the workers to write data to the same Biglist:

>>> import multiprocessing
>>> with ProcessPoolExecutor(10, mp_context=multiprocessing.get_context('spawn')) as pool:  
...     tasks = [pool.submit(worker, path, idx) for idx in range(10)]  
...     for t in tasks:  
...         _ = t.result()  

Let’s see what we’ve got:

>>> yourlist.reload()  
>>> len(yourlist)  
45
>>> yourlist.num_data_files  
12
>>> list(yourlist)  
[400, 401, 402, 403, 500, 501, 502, 503, 504, 600, 601, 602, 603, 604, 605, 700, 701, 702, 703, 704, 705, 706, 900, 901, 902, 903, 904, 905, 906, 907, 908, 100, 800, 801, 802, 803, 804, 805, 806, 807, 200, 201, 300, 301, 302]
>>>

Does this look right? It took me a moment to realize that idx = 0 did not append anything. So, the data elements are in the 100, 200, …, 900 ranges; that looks right. But the order of things is confusing.

Well, in a distributed setting, there’s no guarantee of order. It’s not a problem that numbers in the 800 range come after those in the 900 range.

We can get more insights if we dive to the file level:

>>> for f in yourlist.files:  
...     print(f)  
<BiglistFileReader for '/tmp/a/b/c/d/store/20230129073410.971439_f84d0cf3-e2c4-40a7-acf2-a09296ff73bc_1.pickle_zstd'>
<BiglistFileReader for '/tmp/a/b/c/d/store/20230129073410.973651_63e4ca6d-4e44-49e1-a035-6d60a88f7789_.pickle_zstd'>
<BiglistFileReader for '/tmp/a/b/c/d/store/20230129073410.975576_f59ab2f0-be9c-477d-a95b-70d3dfc00d94_6.pickle_zstd'>
<BiglistFileReader for '/tmp/a/b/c/d/store/20230129073410.982828_3219d2d1-50e2-4b41-b595-2c6df4e63d3c_6.pickle_zstd'>
<BiglistFileReader for '/tmp/a/b/c/d/store/20230129073410.983024_674e57de-66ed-4e3b-bb73-1db36c13fd6f_1.pickle_zstd'>
<BiglistFileReader for '/tmp/a/b/c/d/store/20230129073410.985425_78eec966-8139-4401-955a-7b81fb8b47b9_6.pickle_zstd'>
<BiglistFileReader for '/tmp/a/b/c/d/store/20230129073410.985555_752b4975-fbf3-4172-9063-711722a83abc_3.pickle_zstd'>
<BiglistFileReader for '/tmp/a/b/c/d/store/20230129073411.012161_3a7620f5-b040-4cec-9018-e8bd537ea98d_1.pickle_zstd'>
<BiglistFileReader for '/tmp/a/b/c/d/store/20230129073411.034502_4a340751-fa1c-412e-8f49-13f2ae83fc3a_6.pickle_zstd'>
<BiglistFileReader for '/tmp/a/b/c/d/store/20230129073411.035010_32c58dbe-e3a2-4ba1-9ffe-32c127df11a6_2.pickle_zstd'>
<BiglistFileReader for '/tmp/a/b/c/d/store/20230129073411.067370_20a0e926-7a5d-46a1-805d-86d16c346852_2.pickle_zstd'>
<BiglistFileReader for '/tmp/a/b/c/d/store/20230129073411.119890_89ae31bc-7c48-488d-8dd1-e22212773d79_3.pickle_zstd'>

The file names do not appear to be totally random. They follow some pattern that facilitates ordering, and they have encoded some useful info. In fact, the part before the first underscore is the date and time of file creation, with a resolution to microseconds. This is followed by a uuid.uuid4 random string. When we iterate the Biglist object, files are read in the order of their paths, hence in the order of creation time. The number in the file name before the suffix is the number of elements in the file.

We can get similar info in a more readable format:

>>> for v in yourlist.files.data_files_info:  
...     print(v)  
['/tmp/a/b/c/d/store/20230129073410.971439_f84d0cf3-e2c4-40a7-acf2-a09296ff73bc_4.pickle_zstd', 4, 4]
['/tmp/a/b/c/d/store/20230129073410.973651_63e4ca6d-4e44-49e1-a035-6d60a88f7789_5.pickle_zstd', 5, 9]
['/tmp/a/b/c/d/store/20230129073410.975576_f59ab2f0-be9c-477d-a95b-70d3dfc00d94_6.pickle_zstd', 6, 15]
['/tmp/a/b/c/d/store/20230129073410.982828_3219d2d1-50e2-4b41-b595-2c6df4e63d3c_6.pickle_zstd', 6, 21]
['/tmp/a/b/c/d/store/20230129073410.983024_674e57de-66ed-4e3b-bb73-1db36c13fd6f_1.pickle_zstd', 1, 22]
['/tmp/a/b/c/d/store/20230129073410.985425_78eec966-8139-4401-955a-7b81fb8b47b9_6.pickle_zstd', 6, 28]
['/tmp/a/b/c/d/store/20230129073410.985555_752b4975-fbf3-4172-9063-711722a83abc_3.pickle_zstd', 3, 31]
['/tmp/a/b/c/d/store/20230129073411.012161_3a7620f5-b040-4cec-9018-e8bd537ea98d_1.pickle_zstd', 1, 32]
['/tmp/a/b/c/d/store/20230129073411.034502_4a340751-fa1c-412e-8f49-13f2ae83fc3a_6.pickle_zstd', 6, 38]
['/tmp/a/b/c/d/store/20230129073411.035010_32c58dbe-e3a2-4ba1-9ffe-32c127df11a6_2.pickle_zstd', 2, 40]
['/tmp/a/b/c/d/store/20230129073411.067370_20a0e926-7a5d-46a1-805d-86d16c346852_2.pickle_zstd', 2, 42]
['/tmp/a/b/c/d/store/20230129073411.119890_89ae31bc-7c48-488d-8dd1-e22212773d79_3.pickle_zstd', 3, 45]

The values for each entry are file path, number of elements in the file, and accumulative number of elements. The accumulative count is obviously the basis for random access—Biglist uses this to figure out which file contains the element at a specified index.

API reference#

class cloudly.biglist.FileReader[source]#

Bases: Seq[Element]

A FileReader is a “lazy” loader of a data file. It keeps track of the path of a data file along with a loader function, but performs the loading only when needed. In particular, upon initiation of a FileReader object, file loading has not happened, and the object is light weight and friendly to pickling.

Once data have been loaded, this class provides various ways to navigate the data. At a minimum, the Seq API is implemented.

With loaded data and associated facilities, this object may no longer be pickle-able, depending on the specifics of a subclass.

One use case of this class is to pass around FileReader objects (that are initiated but not loaded) in multiprocessing code for concurrent data processing.

This class is generic with a parameter indicating the type of the elements in the data sequence contained in the file. For example you can write:

def func(file_reader: FileReader[int]):
    ...
abstract load() None[source]#

This method eagerly loads all the data from the file into memory.

Once this method has been called, subsequent data consumption should all draw upon this in-memory copy. However, if the data file is large, and especially if only part of the data is of interest, calling this method may not be the best approach. This all depends on the specifics of the subclass.

A subclass may allow consuming the data and load parts of data in a “as-needed” or “streaming” fashion. In that approach, __getitem__() and __iter__() do not require this method to be called (although they may take advantage of the in-memory data if this method has been called.).

class cloudly.biglist._base.BiglistBase[source]#

Bases: Seq[Element]

This base class contains code mainly concerning reading. The subclass Biglist adds functionalities for writing. Another subclass ExternalBiglist is read-only. Here, “reading” and “read-only” is talking about the data files. This class always needs to write meta info about the data files. In addition, the subclass Biglist also creates and manages the data files, whereas ExternalBiglist provides methods to read existing data files, treating them as read-only.

This class is generic with a parameter indicating the type of the data items, but this is useful only for the subclass Biglist. For the subclass ExternalBiglist, this parameter is essentially Any because the data items (or rows) in Parquet files are composite and flexible.

registered_storage_formats = {'avro': <class 'cloudly.util.serializer.AvroSerializer'>, 'csv': <class 'cloudly.util.serializer.CsvSerializer'>, 'newline-delimited-json': <class 'cloudly.util.serializer.NewlineDelimitedOrjsonSeriealizer'>, 'parquet': <class 'cloudly.util.serializer.ParquetSerializer'>, 'pickle': <class 'cloudly.util.serializer.PickleSerializer'>, 'pickle-zstd': <class 'cloudly.util.serializer.ZstdPickleSerializer'>}#
classmethod register_storage_format(name: str, serializer: type[Serializer]) None[source]#

Register a new serializer to handle data file dumping and loading.

This class has a few serializers registered out of the box. They should be adequate for most applications.

Parameters:
name

Name of the format to be associated with the new serializer.

After registering the new serializer with name “xyz”, one can use storage_format='xyz' in calls to new(). When reading the object back from persistence, make sure this registry is also in place so that the correct deserializer can be found.

serializer

A subclass of cloudly.util.serializer.Serializer.

Although this class needs to provide the Serializer API, it is possible to write data files in text mode. The registered ‘csv’ and ‘newline-delimited-json’ formats do that.

classmethod get_temp_path() Upath[source]#

If user does not specify path when calling new() (in a subclass), this method is used to determine a temporary directory.

This implementation returns a temporary location in the local file system.

Subclasses may want to customize this if they prefer other ways to find temporary locations. For example, they may want to use a temporary location in a cloud storage.

classmethod new(path: str | Path | Upath | None = None, *, init_info: dict | None = None, **kwargs) BiglistBase[source]#

Create a new object of this class (of a subclass, to be precise) and then add data to it.

Parameters:
path

A directory in which this BiglistBase will save whatever it needs to save.

The directory must be non-existent. It is not necessary to pre-create the parent directory of this path.

This path can be either on local disk or in a cloud storage.

If not specified, BiglistBase.get_temp_path() is called to determine a temporary path.

The subclass Biglist saves both data and meta-info in this path. The subclass ExternalBiglist saves meta-info only.

init_info

Initial info that should be written into the info file before __init__ is called. This is in addition to whatever this method internally decides to write.

The info file info.json is written before __init__() is called. In __init__(), this file is read into self.info.

This parameter can be used to write some high-level info that __init__ needs.

If the info is not needed in __init__, then user can always add it to self.info after the object has been instantiated, hence saving general info in info.json is not the intended use of this parameter.

User rarely needs to use this parameter. It is mainly used by the internals of the method new of subclasses.

**kwargs

additional arguments are passed on to __init__().

Notes

A BiglistBase object construction is in either of the two modes below:

  1. create a new BiglistBase to store new data.

  2. create a BiglistBase object pointing to storage of existing data, which was created by a previous call to new().

In case (a), one has called new(). In case (b), one has called BiglistBase(..) (i.e. __init__()).

Some settings are applicable only in mode (a), b/c in mode (b) they can’t be changed and, if needed, should only use the value already set in mode (a). Such settings can be parameters to new() but should not be parameters to __init__(). Examples include storage_format and batch_size for the subclass Biglist. These settings typically should be taken care of in new(), before and/or after the object has been created by a call to __init__() within new().

__init__() should be defined in such a way that it works for both a barebone object that is created in this new(), as well as a fleshed out object that already has data in persistence.

Some settings may be applicable to an existing BiglistBase object, e.g., they control styles of display and not intrinsic attributes persisted along with the BiglistBase. Such settings should be parameters to __init__() but not to new(). If provided in a call to new(), these parameters will be passed on to __init__().

Subclass authors should keep these considerations in mind.

__init__(path: str | Path | Upath)[source]#
Parameters:
path

Directory that contains files written by an instance of this class.

path: Upath#

Root directory of the storage space for this object.

info: dict#

Various meta info.

__len__() int[source]#

Number of data items in this biglist.

This is an alias to num_data_items().

destroy(*, concurrent=True) None[source]#
__getitem__(idx: int) Element[source]#

Access a data item by its index; negative index works as expected.

__iter__() Iterator[Element][source]#

Iterate over all the elements.

When there are multiple data files, as the data in one file is being yielded, the next file(s) may be pre-loaded in background threads. For this reason, although the following is equivalent in the final result:

for file in self.files:
    for item in file:
        ... use item ...

it could be less efficient than iterating over self directly, as in

for item in self:
    ... use item ...
property num_data_files: int#
property num_data_items: int#
abstract property files: FileSeq[FileReader[Element]]#
class cloudly.biglist.Biglist[source]#

Bases: BiglistBase[Element]

DEFAULT_STORAGE_FORMAT = 'pickle-zstd'#
classmethod new(path: str | Path | Upath | None = None, *, batch_size: int | None = None, storage_format: str | None = None, datafile_ext: str | None = None, serialize_kwargs: dict | None = None, deserialize_kwargs: dict | None = None, init_info: dict | None = None, **kwargs) Self[source]#
Parameters:
path

Passed on to BiglistBase.new().

batch_size

Max number of data elements in each persisted data file.

There’s no good default value for this parameter, although one is provided (currently the default is 1000), because the code of new() doesn’t know the typical size of the data elements. User is recommended to specify the value of this parameter.

In choosing a value for batch_size, the most important consideration is the size of each data file, which is determined by the typical size of the data elements as well as batch_size, which is the upper bound of the the number of elements in each file.

There are several considerations about the data file sizes:

  • It should not be so small that the file reading/writing is a large overhead relative to actual processing of the data. This is especially important when path is cloud storage.

  • It should not be so large that it is “unwieldy”, e.g. approaching 1GB.

  • When __iter__()ating over a Biglist object, there can be up to (by default) 4 files-worth of data in memory at any time, where 4 is self._n_read_threads plus 1.

  • When append()ing or extend()ing to a Biglist object at high speed, there can be up to (by default) 9 times batch_size data elements in memory at any time, where 9 is self._n_write_threads plus 1. See _flush() and Dumper.

Another consideration is access pattern of elements in the Biglist. If there are many “jumping around” with random element access, large data files will lead to very wasteful file loading, because to read any element, its hosting file must be read into memory. (After all, if the application is heavy on random access, then Biglist is not the right tool.)

The performance of iteration is not expected to be highly sensitive to the value of batch_size, as long as it is in a reasonable range.

A rule of thumb: it is recommended to keep the persisted files between 32-128MB in size. (Note: no benchmark was performed to back this recommendation.)

storage_format

This should be a key in registered_storage_formats. If not specified, DEFAULT_STORAGE_FORMAT is used.

datafile_ext

Extension of data files. If not provided, an extension based on the storage_format value is used. This is especially useful when storage_format is a long string.

This is used when writing data, and ignored when reading data.

serialize_kwargs

Additional keyword arguments to the serialization function.

deserialize_kwargs

Additional keyword arguments to the deserialization function.

serialize_kwargs and deserialize_kwargs are rarely needed. One use case is schema when storage format is “parquet”. See ParquetSerializer.

serialize_kwargs and deserialize_kwargs, if not None, will be saved in the “info.json” file, hence they must be JSON serializable, meaning they need to be the few simple native Python types that are supported by the standard json library. (However, the few formats “natively” supported by Biglist may get special treatment to relax this requirement.) If this is not possible, the solution is to define a custom serialization class and register it with register_storage_format().

**kwargs

additional arguments are passed on to BiglistBase.new().

Returns:
Biglist

A new Biglist object.

__init__(*args, **kwargs)[source]#

Please see the base class for additional documentation.

property batch_size: int#

The max number of data items in one data file.

property data_path: Upath#
property storage_format: str#

The value of storage_format used in new(), either user-specified or the default value.

property storage_version: int#

The internal format used in persistence. This is a read-only attribute for information only.

__len__() int[source]#

Number of data items in this biglist.

If data is being appended to this biglist, then this method only includes the items that have been “flushed” to storage. Data items in the internal memory buffer are not counted. The buffer is empty upon calling _flush() (internally and automatically) or flush() (explicitly by user).

__getitem__(idx: int) Element[source]#

Access a data item by its index; negative index works as expected.

Items not yet “flushed” are not accessible by this method. They are considered “invisible” to this method. Similarly, negative idx operates in the range of flushed items only.

__iter__() Iterator[Element][source]#

Iterate over all the elements.

Items that are not yet “flushed” are invisible to this iteration.

append(x: Element) None[source]#

Append a single element to the Biglist.

In implementation, this appends to an in-memory buffer. Once the buffer size reaches batch_size, the buffer’s content will be persisted as a new data file, and the buffer will re-start empty. In other words, whenever the buffer is non-empty, its content is not yet persisted.

You can append data to a common biglist from multiple processes. In the processes, use independent Biglist objects that point to the same “path”. Each of the objects will maintain its own in-memory buffer and save its own files once the buffer fills up. Remember to flush() at the end of work in each process.

extend(x: Iterable[Element]) None[source]#

This simply calls append() repeatedly.

make_file_name(buffer_len: int, extra: str = '') str[source]#

This method constructs the file name of a data file. If you need to customize this method for any reason, you should do it via extra and keep the other patterns unchanged. The string extra will appear between other fixed patterns in the file name.

One possible usecase is this: in distributed writing, you want files written by different workers to be distinguishable by the file names. Do something like this:

def worker(datapath: str, worker_id: str, ...):
    out = Biglist(datapath)
    _make_file_name = out.make_file_name
    out.make_file_name = lambda buffer_len: _make_file_name(buffer_len, worker_id)
    ...
flush(*, lock_timeout=300, raise_on_write_error: bool = True, eager: bool = False) None[source]#

The persisted biglist consists of a number of data files and an overall meta info file (info.json in the root of self.path). The latter contains, among others, a listing of the data files so that the data elements have a defined order. Only data files listed in the info file are visible to reading.

When user adds data via append() or extend(), _flush() is called automatically whenever the “append buffer” is full, so to persist the data and empty the buffer. (The capacity of this buffer is equal to self.batch_size.) If multiple Biglist objects in threads, processes, or machines add data concurrently, each object has its own append buffer and does _flush independent of other objects. A data file has a random name (comprised of datetime accurate to sub-seconds, plus a random string, plus other things); there is essentially no risk of name clash when multiple Biglist objects save data files independent of each other.

However, there are two things that the automatic _flush does not do:

  • First, if the append buffer is only partially filled when the user (of one Biglist object) is done adding elements to the biglist, the data in the buffer will not be persisted.

  • Second, _flush does not add new data files it has created into the meta info file. It does not do so because doing it would need to lock the info file, which adds overhead and harms concurrent independent writing.

These two things are left to the user to do via explicit calls to flush().

A Biglist object should call flush() at the end of its data writing session, regardless of whether all the new data have been persisted in data files. (They would be if their count happens to be a multiple of self.batch_size.) This will flush the append buffer.

By default, flush also adds newly created data files to the meta info file. (Until that point, all the new data files created by the particular Biglist object are recorded in memory.) This operation locks the info file so that concurrent writers will not corrupt it.

The parameter eager (default False) gives this op a twist. If eager is True, the list of new data files created by self–that is, this calling Biglist object–is written to a small “interim” file, and the meta info file is not updated. The interim file has a random name with no risk of name clash between multiple writers. In sum, flush(eager=True) persists all data and info but puts the data structure in an “interim” state. Importantly, this op does not involve locking, because it does not update the meta info file. The parameter eager is provided to manage the lock overhead when we write to a cloud-persisted biglist using many concurrent, distributed writers.

A call to flush() (i.e., flush(eager=False)) does all that flush(eager=True) does, plus it integrates the content of all interim files, if any, into the meta info file, and deletes the interim files. This op does lock the info file. Multiple interim files may have been created by multiple writers. One call to flush() will take care of all the interim files in existence. This call can be made from any Biglist object as long as it points to the same path.

Unless you know what you are doing, don’t use flush(eager=True).

User should assume that data not yet fully persisted via flush are not visible to data reading via __getitem__() or __iter__(), and are not included in __len__(), even to the same Biglist object that has performed writing. In common use cases, we do not start reading data until we’re done adding data to the biglist (at least “for now”), hence this is not a big inconvenience.

In summary, call flush() when

  • You are done adding data (for this “session”),

  • or you need to start reading data.

After a call to flush(), there’s no problem to add more elements again by append() or extend(). Data files created by flush() with less than batch_size elements will stay as is among larger files. This is a legitimate case in parallel or distributed writing, or writing in multiple sessions.

User is strongly recommended to explicitly call flush at the end of their writing session. (See _warn_flush().)

On the other hand, you should not call flush frequently “just to be safe”. It has I/O overhead, and it may create small data files because it flushes the append buffer regardless of whether it is full.

reload() None[source]#

Reload the meta info.

This is used in this scenario: suppose we have this object pointing to a biglist on the local disk; another object in another process is appending data to the same biglist (that is, it points to the same storage location); then after a while, the meta info file on the disk has been modified by the other process, hence the current object is out-dated; calling this method will bring it up to date. The same idea applies if the storage is in the cloud, and another machine is appending data to the same remote biglist.

Creating a new object pointing to the same storage location would achieve the same effect.

property files#
class cloudly.biglist.BiglistFileReader[source]#

Bases: FileReader[Element]

__init__(path: str | Path | Upath, loader: Callable[[Upath], Any])[source]#
Parameters:
path

Path of a data file.

loader

A function that will be used to load the data file. This must be pickle-able. Usually this is the bound method load of a subclass of cloudly.util.serializer.Serializer. If you customize this, please see the doc of FileReader.

load() None[source]#

This method eagerly loads all the data from the file into memory.

Once this method has been called, subsequent data consumption should all draw upon this in-memory copy. However, if the data file is large, and especially if only part of the data is of interest, calling this method may not be the best approach. This all depends on the specifics of the subclass.

A subclass may allow consuming the data and load parts of data in a “as-needed” or “streaming” fashion. In that approach, __getitem__() and __iter__() do not require this method to be called (although they may take advantage of the in-memory data if this method has been called.).

data() list[Element][source]#

Return the data loaded from the file.

__len__() int[source]#
__getitem__(idx: int) Element[source]#
__iter__() Iterator[Element][source]#
class cloudly.biglist.ExternalBiglist[source]#

Bases: BiglistBase

A ExternalBiglist provides read functionalities for existing files that are not created by biglist.

As long as you use an ExternalBiglist object to read, it is assumed that the dataset (all the data files) have not changed since the ExternalBiglist object was created by new().

classmethod get_file_meta(p: Upath, storage_format: str) dict[source]#
classmethod new(data_path: str | Path | Upath | Sequence[str | Path | Upath], path: str | Path | Upath | None = None, *, storage_format: str | None = None, deserialize_kwargs: dict | None = None, datafile_ext: str | None = None, **kwargs) ExternalBiglist[source]#

This classmethod gathers info of the specified data files and saves the info to facilitate reading the data files. The data files remain “external” to the ExternalBiglist object; the “data” persisted and managed by the ExternalBiglist object are the meta info about the data files.

If the number of data files is small, it’s feasible to create a temporary object of this class (by leaving path at the default value None) “on-the-fly” for one-time use.

Parameters:
data_path

File(s) or folder(s) containing data files.

If this is a single path, then it’s either a file or a directory. If this is a list, each element is either a file or a directory; there can be a mix of files and directories. Directories are traversed recursively for data files. The paths can be local, or in the cloud (specified by cloudly.upathlib.BlobUpath), or a mix of both.

Once the info of all data files are gathered, their order is fixed as far as this ExternalBiglist is concerned. The data sequence represented by this ExternalBiglist follows this order of the data files. The order is determined as follows:

The order of the entries in data_path is preserved; if any entry is a directory, the files therein (recursively) are sorted by the string value of each file’s full path.

path

Passed on to BiglistBase.new() of BiglistBase. This is the location where the ExternalBiglist object stores info to facilitate it’s reading functions.

datafile_ext

Only files with this extension will be included. The default None would include all files.

This is simply the trailing part of the file name. No ‘.’ (dot) is appended. So if you mean “.parquet”, then pass in “.parquet” rather than “parquet”.

deserialize_kwargs

Additional keyword arguments to the deserialization function.

This is rarely needed.

**kwargs

additional arguments are passed on to __init__().

__init__(*args, **kwargs)[source]#
Parameters:
path

Directory that contains files written by an instance of this class.

__getitem__(idx)[source]#

Access a data item by its index; negative index works as expected.

property storage_format: str#
property storage_version: int#
property files#