iripau.threading module¶
Generic and stand-alone threading utilities.
- class iripau.threading.AsyncResult(function, *args, **kwargs)[source]¶
Bases:
ThreadImplementation of the
multiprocessing.pool.AsyncResultclass but spawning a new thread to execute the targetfunctioninstead of using a Process or Thread Pool.- Parameters:
function (
Callable[...,Any]) – Callable to run in a thnread.*args – Will be passed to
function.**kwargs – Will be passed to
function.
Example
Start multiplying two matrices, do other things, and then print the multiplication result:
from time import sleep from random import randint from operator import mul def slow_matmul(a, b): return [[sum(map(mul, a_row, b_col)) for b_col in zip(*b)] for a_row in a] a = [[randint(0, 100) for _ in range(1000)] for _ in range(1000)] b = [[randint(0, 100) for _ in range(1000)] for _ in range(1000)] result = AsyncResult(slow_matmul, a, b) print("Processing...") while not result.ready(): sleep(5) print("Still processing...") print(*result.get(timeout=900), sep="\n")
Example
Perform several matrix multiplications at once and print the results:
from random import randint def random_mat_mul(): a = [[randint(0, 100) for _ in range(1000)] for _ in range(1000)] b = [[randint(0, 100) for _ in range(1000)] for _ in range(1000)] return slow_matmul(a, b) results = [AsyncResult(random_mat_mul) for _ in range(10)] for i, result in enumerate(results): print(f"Result {i}:") print(*result.get(timeout=900), sep="\n")
- get(timeout=None)[source]¶
Implementation of
multiprocessing.pool.AsyncResult.get().- Parameters:
timeout (
float) – Maximum time in seconds to wait for the result to arrive.- Return type:
Any- Returns:
The target
functionreturn value when it arrives.- Raises:
TimeoutError – If the result does not arrive within
timeout..Exception – Whatever
Exceptionthe targetfunctionraised during its execution.
- wait(timeout=None)[source]¶
Implementation of
multiprocessing.pool.AsyncResult.wait().- Parameters:
timeout (
float) – Maximum time to wait for the result to be available.
- class iripau.threading.MultiDequeuer(consumer, time_to_consume=None, count_hint=0, collection_type=<class 'list'>)[source]¶
Bases:
objectImplementation of the Producer-Consumer Design Pattern in which one of the Producer workers will act as Consumer, processing all of the elements that have been produced so far. Also, the workers will block until the elements they produced are consumed.
This is helpful to keep the logic of processing one single element per worker but consuming several elements more efficiently at once, without the need of having a separated thread or threads for consumption.
If there is not any advantage on consuming multiple elements at once compared to consuming them one by one, it is preferred to consume the elements in the workers, keeping the logic of blocking the workers.
If blocking the workers is not desired, the regular Producer-Consumer Pattern will suffice.
Attention
This class relies on the Python GIL.
- Parameters:
consumer (
Callable[[Collection[Any]],None]) – The elements queue will be consumed by this callable, which will be called by one of the workers passing all of the elements in the queue so far. When the worker has started consuming the queue, a new queue will be created to handle future elements.time_to_consume (
int) – If this is greater than 0, the queue will be consumed when this time (in seconds) has passed after the first element was added to the queue.count_hint (
int) – If this is greater than 0, the queue will be consumed when at least this amount of elements have been added to the queue, ignoringtime_to_consume.collection_type (
Type) – The elements queue will be cast into this type before it is passed to theconsumercallable.
- class iripau.threading.FunctionCacher(function, synchronized=False, enabled=True)[source]¶
Bases:
objectCache the return value of a callable depending on their call arguments. The first call will generate the cache and subsequent calls will return the cached value without executing the original callable.
The instance of this class is meant to be called as if it were the original callable.
The main difference from the functools.cache decorator is that the cache for a particular set of arguments will be generated by only one execution of the original callable; if more than one thread calls this object before the cache is generated, only one of them will actually execute the original callable and the rest will wait for it to finish.
https://docs.python.org/3/library/functools.html#functools.cache
- enable_cache(synchronized=False)[source]¶
Enable caching if not enabled already. The synchronized argument is used as in the constructor. This method is not re-entrant nor thread-safe.