iripau.threading module

Generic and stand-alone threading utilities.

class iripau.threading.AsyncResult(function, *args, **kwargs)[source]

Bases: Thread

Implementation of the multiprocessing.pool.AsyncResult class but spawning a new thread to execute the target function instead of using a Process or Thread Pool.

Parameters:
  • function (Callable[..., Any]) – Callable to run in a thnread.

  • *args – Will be passed to function.

  • **kwargs – Will be passed to function.

Example

Start multiplying two matrices, do other things, and then print the multiplication result:

from time import sleep
from random import randint
from operator import mul


def slow_matmul(a, b):
    return [[sum(map(mul, a_row, b_col)) for b_col in zip(*b)] for a_row in a]


a = [[randint(0, 100) for _ in range(1000)] for _ in range(1000)]
b = [[randint(0, 100) for _ in range(1000)] for _ in range(1000)]
result = AsyncResult(slow_matmul, a, b)

print("Processing...")
while not result.ready():
    sleep(5)
    print("Still processing...")

print(*result.get(timeout=900), sep="\n")

Example

Perform several matrix multiplications at once and print the results:

from random import randint


def random_mat_mul():
    a = [[randint(0, 100) for _ in range(1000)] for _ in range(1000)]
    b = [[randint(0, 100) for _ in range(1000)] for _ in range(1000)]
    return slow_matmul(a, b)

results = [AsyncResult(random_mat_mul) for _ in range(10)]
for i, result in enumerate(results):
    print(f"Result {i}:")
    print(*result.get(timeout=900), sep="\n")
run()[source]

Run the target function, store its return value or exception information if any.

get(timeout=None)[source]

Implementation of multiprocessing.pool.AsyncResult.get().

Parameters:

timeout (float) – Maximum time in seconds to wait for the result to arrive.

Return type:

Any

Returns:

The target function return value when it arrives.

Raises:
  • TimeoutError – If the result does not arrive within timeout..

  • Exception – Whatever Exception the target function raised during its execution.

wait(timeout=None)[source]

Implementation of multiprocessing.pool.AsyncResult.wait().

Parameters:

timeout (float) – Maximum time to wait for the result to be available.

ready()[source]

Implementation of multiprocessing.pool.AsyncResult.ready().

Return type:

bool

Returns:

Whether the target function has completed

successful()[source]

Implementation of multiprocessing.pool.AsyncResult.successful().

Return type:

bool

Returns:

Whether the target function completed without raising an exception.

Raises:

ValueError – If the result is not ready.

class iripau.threading.MultiDequeuer(consumer, time_to_consume=None, count_hint=0, collection_type=<class 'list'>)[source]

Bases: object

Implementation of the Producer-Consumer Design Pattern in which one of the Producer workers will act as Consumer, processing all of the elements that have been produced so far. Also, the workers will block until the elements they produced are consumed.

This is helpful to keep the logic of processing one single element per worker but consuming several elements more efficiently at once, without the need of having a separated thread or threads for consumption.

If there is not any advantage on consuming multiple elements at once compared to consuming them one by one, it is preferred to consume the elements in the workers, keeping the logic of blocking the workers.

If blocking the workers is not desired, the regular Producer-Consumer Pattern will suffice.

Attention

This class relies on the Python GIL.

Parameters:
  • consumer (Callable[[Collection[Any]], None]) – The elements queue will be consumed by this callable, which will be called by one of the workers passing all of the elements in the queue so far. When the worker has started consuming the queue, a new queue will be created to handle future elements.

  • time_to_consume (int) – If this is greater than 0, the queue will be consumed when this time (in seconds) has passed after the first element was added to the queue.

  • count_hint (int) – If this is greater than 0, the queue will be consumed when at least this amount of elements have been added to the queue, ignoring time_to_consume.

  • collection_type (Type) – The elements queue will be cast into this type before it is passed to the consumer callable.

put(product)[source]

Add an element to the queue. This method is meant to be called by the workers. It blocks until the element is consumed.

Parameters:

product (Any) – Element to add to the queue.

class iripau.threading.FunctionCacher(function, synchronized=False, enabled=True)[source]

Bases: object

Cache the return value of a callable depending on their call arguments. The first call will generate the cache and subsequent calls will return the cached value without executing the original callable.

The instance of this class is meant to be called as if it were the original callable.

The main difference from the functools.cache decorator is that the cache for a particular set of arguments will be generated by only one execution of the original callable; if more than one thread calls this object before the cache is generated, only one of them will actually execute the original callable and the rest will wait for it to finish.

https://docs.python.org/3/library/functools.html#functools.cache

enable_cache(synchronized=False)[source]

Enable caching if not enabled already. The synchronized argument is used as in the constructor. This method is not re-entrant nor thread-safe.

disable_cache()[source]

Disable and delete caching if not disabled already. This method is not re-entrant nor thread-safe.

clear_cache()[source]

Delete cache if caching is not disabled. This method is not re-entrant nor thread-safe.

iripau.threading.synchronized(lock=None)[source]

Decorator to wrap a function with a lock/semaphore

iripau.threading.cached(synchronized=False, enabled=True)[source]

Decorator using the FunctionCacher class.