multiplexer#

Multiplexer is a utility for distributing data elements to multiple concurrent or distributed workers. Its implementation relies on the “locking” capability of Upath.

Suppose we perform some brute-force search on a cluster of machines; there are 1000 grids, and the algorithm takes on one grid at a time. Now, the grid is a “hyper-parameter” or “control parameter” that takes 1000 possible values. We want to distribute these 1000 values to the workers. This is the kind of use cases targeted by Multiplexer.

Let’s show its usage using local data and multiprocessing. (For real work, we would use cloud storage and a cluster of machines.) First, create a Multiplexer to hold the values to be distributed:

>>> from cloudly.upathlib import LocalUpath
>>> from cloudly.util.multiplexer import Multiplexer
>>> p = LocalUpath('/tmp/abc/mux')
>>> p.rmrf()
0
>>> hyper = Multiplexer.new(range(20), p)
>>> len(hyper)
20

Next, design an interesting worker function:

>>> import multiprocessing, random, time
>>>
>>> def worker(mux_id):
...     for x in Multiplexer(mux_id):
...         time.sleep(random.uniform(0.1, 0.2))  # doing a lot of things
...         print(x, 'done in', multiprocessing.current_process().name)

Back in the main process,

>>> mux_id = hyper.create_read_session()
>>> tasks = [multiprocessing.Process(target=worker, args=(mux_id,)) for _ in range(5)]
>>> for t in tasks:
...     t.start()
>>>
2 done in Process-13
0 done in Process-11
1 done in Process-12
4 done in Process-15
3 done in Process-14
6 done in Process-11
7 done in Process-12
8 done in Process-15
5 done in Process-13
9 done in Process-14
12 done in Process-15
13 done in Process-13
11 done in Process-12
10 done in Process-11
14 done in Process-14
15 done in Process-15
18 done in Process-11
16 done in Process-13
17 done in Process-12
19 done in Process-14
>>>
>>> for t in tasks:
...     t.join()
>>> hyper.done(mux_id)
True
>>> hyper.destroy()
>>>

Multiplexer is a utility for distributing a set of control parameters to multiple workers, which consume the parameters collectively, i.e. each control parameter is consumed by exactly one worker.

class cloudly.util.multiplexer.Multiplexer[source]#

Bases: Iterable[Element], Sized

Multiplexer is used to distribute data elements to multiple “workers” so that each element is obtained by exactly one worker.

Typically, the data element is small in size but each requires significant time to process by the worker. The data elements are “hyper parameters”.

The usage consists of two main parts:

1. In “coordinator” code, call create_read_session() to start a new “session”. Different sessions (at the same time or otherwise) are independent consumers of the data.

Typically, this dataset, which is small and easy to create, is consumed only once. In this case, the coordinator code typically calls new() to create a new Multiplexer, then calls create_read_session() on it, and then manages to send the ID to workers.

2. In “worker” code, use the ID that was returned by create_read_session() to instantiate a Multiplexer and iterate over it. In so doing, multiple workers will obtain the data elements collectively, i.e., each element is obtained by exactly one worker.

As far as this utility is concerned, the coordinator does not need to wait for the workers to finish consuming the data. The coordinator process or machine may quit and let the workers continue with their job. Usually you should take care of job tracking separately.

See create_read_session() for more details.

classmethod new(data: Iterable[Element], path: str | Path | Upath, *, tag: str | None = None)[source]#
Parameters:
data

The data elements that need to be distributed. The elements should be pickle-able.

Importantly, data (the dataset) is meant to contain a modest number (say, up to thousands) of “control parameters”, not massive number of raw data elements. This is because distributing each data element by this utility incurs nontrivial overhead. Each data element is meant to trigger a substantial amount of processing in a worker, making the overhead of obtaining the data element worthwhile.

Based on this understanding, the elements in data are simply saved in a single pickle file, and each worker will fetch a copy of this file.

path

A directory where the data and any supporting info will be saved. The directory can be existent or non-existent. A sub-directory will be created under path to store data and info about this particular multiplexer. The name of the subdirectory is a datetime string. tag is appended to the sub-directory name to be more informative, if so desired.

If path is in the cloud, then the workers can be on multiple machines, and in multiple threads or processes on each machine. If path is on the local disk, then the workers are in threads or processes on the same machine.

However, there are no strong reasons to use this facility on a local machine, because the same functionality can be achieved by a queue-based solution.

Usually this class is used to distribute data to a cluster of machines, hence this path points to a location in a cloud storage that is supported by upathlib.

Since path is a “root directory” hosting Multiplexers (each in a randomly named sub-directory), a subclass may choose to fix this directory so that new() does away with this parameter.

__init__(mux_id: str, worker_id: str | None = None, timeout: int | float | None = None)[source]#

Create a Multiplexer object and use it to distribute the data elements that have been stored by new().

Parameters:
mux_id

The value that is returned by create_read_session().

worker_id

A string representing the current worker (i.e. this instance). If missing, a default is constructed based on thread name and process name.

property worker_id: str#
__len__() int[source]#

Return the number of data elements stored in this Multiplexer.

create_read_session() str[source]#

Let’s say there is a “coordinator” and some “workers”; these are programs running in threads, processes, or distributed machines. The coordinator creates a new Multiplexer and calls this method to start a “session” to read (i.e. iterate over) the elements in this Multiplexer:

mux = Multiplexer.new(range(1000), '/tmp/abc/mux/')
mux_id = mux.create_read_session()

The mux_id is then provided to the workers, which will create Multiplexer instances pointing to the same dataset and participating in the reading session that has just been started:

mux = Multiplexer(mux_id)
for x in mux:
    ...

The data that was provided to new() is split between the workers so that each data element will be obtained by exactly one worker.

The returned value (the “mux ID”) encodes info about the location (“path”) of the data storage as well as the newly created read session. All workers that use the same ID participate in the same read session, i.e. the data elements will be split between them. There can be multiple, independent read sessions going on at the same time.

This call does not make the current Multiplexer object a participant in the read session just created. One has to use the returned value to create a new Multiplexer object to participate in the said read session. If the current object is already participating in a read session (an “old” session), making this call on the object does not change its role as a participant in the old session. This call merely creates a new read session but does not modify the current object.

As a rule of thumb, an object created by Multiplexer.new(data, ...) is not a participant of any read session (even after create_read_session() is called on it subsequently). On the other hand, an object created by Multiplexer(mux_id, ...) is participating in the read session that is identified by mux_id.

__iter__() Iterator[Element][source]#

Iterates over the data contained in the Multiplexer.

stat(mux_id: str | None = None) dict[source]#

Return status info of an ongoing read session.

This is often called in the “coordinator” code on the object that has had its create_read_session() called. mux_id is the return of create_read_session(). If mux_id is None, then this method is about the read session in which the current object is participating.

done(mux_id: str | None = None) bool[source]#

Return whether the data iteration is finished.

This is often called in the “coordinator” code on the object that has had its create_read_session() called. mux_id is the return of create_read_session(). If mux_id is None, then this method is about the read session in which the current object is participating.

destroy() None[source]#

Delete all the data stored by this Multiplexer, hence reclaiming the storage space.