multiplexer#
Multiplexer is a utility for distributing data elements to multiple concurrent or distributed workers.
Its implementation relies on the “locking” capability of Upath.
Suppose we perform some brute-force search on a cluster of machines;
there are 1000 grids, and the algorithm takes on one grid at a time.
Now, the grid is a “hyper-parameter” or “control parameter” that takes 1000 possible values.
We want to distribute these 1000 values to the workers.
This is the kind of use cases targeted by Multiplexer.
Let’s show its usage using local data and multiprocessing.
(For real work, we would use cloud storage and a cluster of machines.)
First, create a Multiplexer to hold the values to be distributed:
>>> from cloudly.upathlib import LocalUpath
>>> from cloudly.util.multiplexer import Multiplexer
>>> p = LocalUpath('/tmp/abc/mux')
>>> p.rmrf()
0
>>> hyper = Multiplexer.new(range(20), p)
>>> len(hyper)
20
Next, design an interesting worker function:
>>> import multiprocessing, random, time
>>>
>>> def worker(mux_id):
... for x in Multiplexer(mux_id):
... time.sleep(random.uniform(0.1, 0.2)) # doing a lot of things
... print(x, 'done in', multiprocessing.current_process().name)
Back in the main process,
>>> mux_id = hyper.create_read_session()
>>> tasks = [multiprocessing.Process(target=worker, args=(mux_id,)) for _ in range(5)]
>>> for t in tasks:
... t.start()
>>>
2 done in Process-13
0 done in Process-11
1 done in Process-12
4 done in Process-15
3 done in Process-14
6 done in Process-11
7 done in Process-12
8 done in Process-15
5 done in Process-13
9 done in Process-14
12 done in Process-15
13 done in Process-13
11 done in Process-12
10 done in Process-11
14 done in Process-14
15 done in Process-15
18 done in Process-11
16 done in Process-13
17 done in Process-12
19 done in Process-14
>>>
>>> for t in tasks:
... t.join()
>>> hyper.done(mux_id)
True
>>> hyper.destroy()
>>>
Multiplexer is a utility for distributing a set of control parameters to multiple workers, which consume the parameters collectively,
i.e. each control parameter is consumed by exactly one worker.
- class cloudly.util.multiplexer.Multiplexer[source]#
Bases:
Iterable[Element],SizedMultiplexer is used to distribute data elements to multiple “workers” so that each element is obtained by exactly one worker.
Typically, the data element is small in size but each requires significant time to process by the worker. The data elements are “hyper parameters”.
The usage consists of two main parts:
1. In “coordinator” code, call
create_read_session()to start a new “session”. Different sessions (at the same time or otherwise) are independent consumers of the data.Typically, this dataset, which is small and easy to create, is consumed only once. In this case, the coordinator code typically calls
new()to create a new Multiplexer, then callscreate_read_session()on it, and then manages to send the ID to workers.2. In “worker” code, use the ID that was returned by
create_read_session()to instantiate a Multiplexer and iterate over it. In so doing, multiple workers will obtain the data elements collectively, i.e., each element is obtained by exactly one worker.As far as this utility is concerned, the coordinator does not need to wait for the workers to finish consuming the data. The coordinator process or machine may quit and let the workers continue with their job. Usually you should take care of job tracking separately.
See
create_read_session()for more details.- classmethod new(data: Iterable[Element], path: str | Path | Upath, *, tag: str | None = None)[source]#
- Parameters:
- data
The data elements that need to be distributed. The elements should be pickle-able.
Importantly, data (the dataset) is meant to contain a modest number (say, up to thousands) of “control parameters”, not massive number of raw data elements. This is because distributing each data element by this utility incurs nontrivial overhead. Each data element is meant to trigger a substantial amount of processing in a worker, making the overhead of obtaining the data element worthwhile.
Based on this understanding, the elements in data are simply saved in a single pickle file, and each worker will fetch a copy of this file.
- path
A directory where the data and any supporting info will be saved. The directory can be existent or non-existent. A sub-directory will be created under
pathto store data and info about this particular multiplexer. The name of the subdirectory is a datetime string.tagis appended to the sub-directory name to be more informative, if so desired.If
pathis in the cloud, then the workers can be on multiple machines, and in multiple threads or processes on each machine. Ifpathis on the local disk, then the workers are in threads or processes on the same machine.However, there are no strong reasons to use this facility on a local machine, because the same functionality can be achieved by a queue-based solution.
Usually this class is used to distribute data to a cluster of machines, hence this path points to a location in a cloud storage that is supported by
upathlib.Since path is a “root directory” hosting Multiplexers (each in a randomly named sub-directory), a subclass may choose to fix this directory so that
new()does away with this parameter.
- __init__(mux_id: str, worker_id: str | None = None, timeout: int | float | None = None)[source]#
Create a
Multiplexerobject and use it to distribute the data elements that have been stored bynew().- Parameters:
- mux_id
The value that is returned by
create_read_session().- worker_id
A string representing the current worker (i.e. this instance). If missing, a default is constructed based on thread name and process name.
- property worker_id: str#
- create_read_session() str[source]#
Let’s say there is a “coordinator” and some “workers”; these are programs running in threads, processes, or distributed machines. The coordinator creates a new
Multiplexerand calls this method to start a “session” to read (i.e. iterate over) the elements in this Multiplexer:mux = Multiplexer.new(range(1000), '/tmp/abc/mux/') mux_id = mux.create_read_session()
The
mux_idis then provided to the workers, which will createMultiplexerinstances pointing to the same dataset and participating in the reading session that has just been started:mux = Multiplexer(mux_id) for x in mux: ...
The data that was provided to
new()is split between the workers so that each data element will be obtained by exactly one worker.The returned value (the “mux ID”) encodes info about the location (“path”) of the data storage as well as the newly created read session. All workers that use the same ID participate in the same read session, i.e. the data elements will be split between them. There can be multiple, independent read sessions going on at the same time.
This call does not make the current Multiplexer object a participant in the read session just created. One has to use the returned value to create a new
Multiplexerobject to participate in the said read session. If the current object is already participating in a read session (an “old” session), making this call on the object does not change its role as a participant in the old session. This call merely creates a new read session but does not modify the current object.As a rule of thumb, an object created by
Multiplexer.new(data, ...)is not a participant of any read session (even aftercreate_read_session()is called on it subsequently). On the other hand, an object created byMultiplexer(mux_id, ...)is participating in the read session that is identified bymux_id.
- stat(mux_id: str | None = None) dict[source]#
Return status info of an ongoing read session.
This is often called in the “coordinator” code on the object that has had its
create_read_session()called.mux_idis the return ofcreate_read_session(). Ifmux_idisNone, then this method is about the read session in which the current object is participating.
- done(mux_id: str | None = None) bool[source]#
Return whether the data iteration is finished.
This is often called in the “coordinator” code on the object that has had its
create_read_session()called.mux_idis the return ofcreate_read_session(). Ifmux_idisNone, then this method is about the read session in which the current object is participating.