Skip to content

zzlib.utils module

zzlib.utils

zzlib.utils.AsyncTaskPool

A pool for managing asynchronous tasks with optional concurrency control.

This class provides functionality to:

  • Add and manage multiple async tasks.

  • Control maximum concurrent tasks.

  • Wait for task completion.

  • Get results as soon as tasks complete.

__init__(max_workers=None)

Parameters:

Name Type Description Default
max_workers int

Maximum number of concurrent tasks. If None, no limit will be set.

None

add(func, *args, **kw)

Add an async task to the pool.

Parameters:

Name Type Description Default
func Callable[..., Awaitable]

Async function to execute.

required
*args

Positional arguments for the function.

()
**kw

Keyword arguments for the function.

{}

Returns:

Type Description
Task

The created task.

as_completed() async

Yield completed tasks as they finish.

Yields:

Type Description
AsyncGenerator[Task, None, None]

Completed tasks in order of completion.

cancel()

Cancel all pending tasks and stop the pool.

wait() async

Wait for all tasks to complete and return their results.

Returns:

Type Description
list

Results from all completed tasks.

zzlib.utils.BinaryEventSource

Bases: EventSource

A simplified event source for binary (True/False) conditions.

__init__(func, whenTrue=None, whenFalse=None, initial=InitialState.NONE)

Parameters:

Name Type Description Default
func Callable

A callable that returns True or False.

required
whenTrue Any

Event to trigger when condition becomes True.

None
whenFalse Any

Event to trigger when condition becomes False.

None
initial InitialState

Initial state handling, one of InitialState enum values.

NONE

zzlib.utils.CallbackQueue

Bases: IterQueue

A queue that processes items through a series of callback functions before storing them.

This queue extends IterQueue by adding callback processing functionality. When items are put into the queue, they are first passed through a series of callback functions before being stored.

__init__(*args, **kw)

Initialize the CallbackQueue.

Parameters:

Name Type Description Default
*args

Positional arguments passed to parent IterQueue.

()
**kw

Keyword arguments passed to parent IterQueue.

{}

callbacks = [] instance-attribute

List of callback functions that transform queue items.

Each callback should be a function that:

  • Takes a single item as input.

  • Returns either:

    • A single transformed item.

    • An iterable of transformed items.

put(item, **kw)

Put an item into the queue after processing it through callbacks.

The item is passed through all registered callback functions in sequence before being stored in the queue.

Parameters:

Name Type Description Default
item Any

The item to put in the queue.

required
**kw

Additional keyword arguments passed to parent put() method.

{}

zzlib.utils.Checker

Bases: FactoryProxy

A class for checking conditions and generating events based on those conditions.

Example
# Create a checker for a condition
checker = Checker(lambda: x > 10)

# Configure when and how to check
checker.when(lambda: y < 5).on('change')

# Wait for the condition with timeout
checker.wait(timeout=5.0)

# Use with an iterable
for item in checker.during(items):
    process(item)
    # Stops iteration when condition is met

__init__(func, **kw)

Parameters:

Name Type Description Default
func Callable

The function to check/evaluate.

required
**kw

Additional options for event configuration.

{}

__subject__ cached property

Get or create the underlying eventer object.

during(it)

Iterate while waiting for condition.

Parameters:

Name Type Description Default
it Iterable

The iterable to process.

required

Returns:

Type Description
Merged

A merged iterator that stops when condition is met.

event(val)

Set the event to trigger.

Parameters:

Name Type Description Default
val Any

The event object

required

on(val)

Set when the checking should occur.

Parameters:

Name Type Description Default
val Union[str, EventTiming]

Timing specification, either as string or EventTiming enum.

required

wait(timeout=None)

Wait for the configured condition to become true.

Parameters:

Name Type Description Default
timeout float

Maximum time to wait in seconds.

None

Returns:

Type Description
bool

True if condition was met, False if timed out.

when(val)

Set the condition for when events should trigger.

Parameters:

Name Type Description Default
val Callable

The condition to check.

required

zzlib.utils.EventCondition = namedtuple('EventCondition', ['event', 'condition', 'timing'], defaults=(True, EventTiming.ON_BEING)) module-attribute

A named tuple defining an event's trigger condition and timing.

Parameters:

Name Type Description Default
event Any

The event object to trigger when condition is met.

required
condition Union[bool, Callable]

The condition to check. Can be a boolean value, or a callable that takes current value and returns bool. Defaults to True.

required
timing [EventTiming][EventTiming]

When to trigger the event. Defaults to ON_BEING.

required
Example
# Trigger event when temperature exceeds 30°C
temp_alarm = EventCondition(
    event="high_temp_alarm",
    condition=lambda temp: temp > 30,
    timing=EventTiming.ON_BEING
)

zzlib.utils.EventSource

Bases: Delayed

A class that monitors events based on conditions and timing.

__init__(func, conditions=(), initial=InitialState.NONE)

Parameters:

Name Type Description Default
func Callable

The function to call to check event state.

required
conditions Iterable[EventCondition]

List of EventCondition tuples defining event triggers.

()
initial InitialState

Initial state, one of InitialState enum values.

NONE

add(event, condition=True, timing=EventTiming.ON_BEING)

Add a new event condition.

Parameters:

Name Type Description Default
event Any

The event to trigger.

required
condition bool

The condition to check, can be value or callable.

True
timing EventTiming

When to trigger, one of EventTiming enum values.

ON_BEING

check(queue)

Start checking events in a daemon thread.

Parameters:

Name Type Description Default
queue Queue

Queue to put triggered events into.

required

value property writable

Get the last checked value.

zzlib.utils.EventTiming = Enum('EventTiming', 'CONTINUOUS ON_BEING ON_NOT_BEING') module-attribute

Enum defining when an event should be triggered.

Values

CONTINUOUS: Trigger continuously while condition is True. Example: Keep triggering alarm while temperature is high.

ON_BEING: Trigger once when condition changes from False to True. Example: Trigger when temperature first exceeds threshold.

ON_NOT_BEING: Trigger once when condition changes from True to False. Example: Trigger when temperature returns to normal range.

zzlib.utils.Eventer

Bases: ThreadingMixin

A class for managing event sources and processing events through callbacks.

__init__(*sources, interval=1.0)

Parameters:

Name Type Description Default
*sources EventSource

One or more EventSource instances to monitor.

()
interval float

Time in seconds between event checks. Defaults to 1.0.

1.0

add_callback(callback, events=None)

Add a callback to be triggered for specific events.

Parameters:

Name Type Description Default
callback Callable

Function to call when matching events occur.

required
events Any

Optional event or events to filter for. If None, triggers for all events.

None

add_processor(processor)

Add an event processor to the start of the callback chain.

Parameters:

Name Type Description Default
processor Callable

A callable that processes events. The processor should accept an iterable of events and yield processed events. It can modify, filter, or generate new events.

required

check()

Check all event sources and process any triggered events.

check_loop()

Run continuous event checking loop until stopped.

event_flag(events, flag=None)

Create or use an Event flag that gets set when specified events occur.

Parameters:

Name Type Description Default
events Union[Any, Iterable]

Event or events to watch for.

required
flag Event

Optional Event instance to use. Creates new one if not provided.

None

Returns:

Type Description
Event

The event flag that will be set on matching events.

events(history=True)

Get an iterator over events.

Parameters:

Name Type Description Default
history bool

Whether to include historical events. Defaults to True.

True

Yields:

Type Description
Any

Events from the event queue.

get(*args, **kw)

Get an event from the event queue.

Parameters:

Name Type Description Default
*args

Positional arguments passed to queue's get().

()
**kw

Keyword arguments passed to queue's get().

{}

put(*args, **kw)

Put an event into the event queue.

Parameters:

Name Type Description Default
*args

Positional arguments passed to queue's put().

()
**kw

Keyword arguments passed to queue's put().

{}

wait(events, history=True, timeout=None)

Wait for specified events to occur.

Parameters:

Name Type Description Default
events Union[Any, Iterable]

Event or events to wait for.

required
history bool

Whether to check historical events. Defaults to True.

True
timeout bool

Maximum time to wait in seconds. None means wait forever.

None

Returns:

Type Description
bool

True if matching event occurred, False if timed out.

zzlib.utils.InitialState = Enum('InitialState', 'NONE FIRST ERROR') module-attribute

Enum defining initial states for event sources.

Values

NONE: Default initial state where any new value is treated as a change.

FIRST: First check is ignored as a change, but subsequent checks are evaluated.

ERROR: Indicates that an error occurred while checking the value.

zzlib.utils.IterQueue

Bases: Queue

A Queue that can be iterated and followed.

This class extends Queue to add iteration capabilities. It allows:

  • Iterating over current queue contents without popping them.

  • Following queue updates in real-time.

  • Optionally including history when following.

  • Stopping follow based on a condition.

__iter__()

Iterate over current queue contents.

follow(history=True, stop_when=None)

Follow queue updates, optionally including history.

Parameters:

Name Type Description Default
history bool

If True, yield existing items before following updates. Defaults to True.

True
stop_when Callable

Optional callable that determines when to stop following. When this callable evaluates to True, following will stop.

None

zzlib.utils.Logger

Bases: ThreadingMixin

A logger content processor for dealing with patterns.

add(source, pattern)

Add a log source and its associated pattern.

Parameters:

Name Type Description Default
source Iterable[str]

An iterable yielding log messages as strings.

required
pattern LoggerPattern

The LoggerPattern to process messages from this source.

required

start(wait=False)

Start processing all log sources.

Parameters:

Name Type Description Default
wait bool

If True, blocks until all sources are finished. If False, starts processing asynchronously.

False

zzlib.utils.LoggerPattern

A class for managing logging patterns, used by Logger.

add(func, match='.*', format=None, processors=())

Add a pattern to match and process log lines.

Parameters:

Name Type Description Default
func Callable

Callable to handle matched lines.

required
match str

Regex pattern to match lines against.

'.*'
format str

Optional format string to format matched groups.

None
processors Iterable[Callable]

Optional sequence of callables to pre-process lines.

()

ignore(match)

Add a pattern to ignore matching lines.

Parameters:

Name Type Description Default
match str

Regex pattern for lines to ignore.

required

width_limit_processor(width=None) staticmethod

Create a processor that wraps lines to a maximum width.

Parameters:

Name Type Description Default
width int

Maximum line width. If None, uses terminal width minus 70.

None

zzlib.utils.Merged

Bases: ThreadingMixin

A class that merges multiple iterators into a single iterator.

This class takes multiple iterators and combines their outputs into a single iterator stream. Each iterator runs in its own thread and outputs are merged in the order they arrive.

Parameters:

Name Type Description Default
*its

Iterators to merge.

()
endswhen Literal['all', 'first', 'any']

When to end the merged iterator: - 'all': End when all source iterators complete (default)

  • 'first'/'any': End when any source iterator completes
'all'
Example
# Create iterators that simulate slow data streams
def slow_iter1():
    for i in range(5):
        time.sleep(0.1)  # Simulate slow processing
        yield i

def slow_iter2():
    for i in range(5, 10):
        time.sleep(0.2)  # Different delay
        yield i

merged = Merged(slow_iter1(), slow_iter2())
for item in merged:
    print(item)  # Will print numbers from both iterators as they arrive

zzlib.utils.TimedCache

Bases: FactoryProxy

A class that delays function execution and caches the result with timeout.

Similar to FactoryProxy but invalidates the result when timeout.

Example
@TimedCache(timeout=60)  # Cache for 60 seconds
def expensive_function():
    # Do expensive computation
    return result

# First call computes and caches
result1 = expensive_function()

# Within timeout, returns cached value
result2 = expensive_function()

# After timeout, recomputes and updates cache
time.sleep(61)
result3 = expensive_function()

__init__(func, timeout=None, onupdate=None)

Parameters:

Name Type Description Default
func Callable

The function to execute.

required
timeout float

Time in seconds before cache invalidation. If None, cache never expires.

None
onupdate Callable

Optional callback function to run when cache is updated.

None

freeze()

Stop the timer to prevent cache updates.

noupdate()

Get the cached value without checking for updates.

update()

Force update the cached value and restart timer.

updated(forceupdate=False, noupdate=False)

Get the cached value with update control options.

Parameters:

Name Type Description Default
forceupdate bool

If True, force update cache regardless of timeout.

False
noupdate bool

If True, return cached value without checking timeout.

False

Returns:

Type Description
Any

The cached value.

zzlib.utils.Timeout

Bases: Timer

A Timer to handle stream timeouts.

during(it)

Iterate an iterator with timeout.

zzlib.utils.Timer

Bases: StopableMixin

A timer class that can track elapsed time, timeout, and support pause/resume.

__init__(timeout=None, start=True)

Parameters:

Name Type Description Default
timeout Union[timedelta, int, float]

Timeout duration. Can be timedelta or number of seconds.

None
start bool

Whether to start the timer immediately. Defaults to True.

True

elapsed: timedelta property

Get accumulated time excluding paused time.

Returns:

Type Description
timedelta

Total accumulated time.

is_paused()

Check if timer is paused.

is_ticking()

Check if timer is actively running.

is_timeout()

Check if timer has timed out.

pause()

Pause the timer while preserving elapsed time.

start(reset=False)

Start or resume the timer.

Parameters:

Name Type Description Default
reset bool

Whether to reset accumulated time. Defaults to False.

False

wait()

Wait for timer to timeout or stop.

Returns:

Type Description
bool

True if timed out, False if stopped.

Raises:

Type Description
ValueError

If timer not started.

watch(on_timeout=None, on_stop=None)

Watch the timer in a separate thread and trigger callbacks.

Parameters:

Name Type Description Default
on_timeout Callable

Callback when timer times out.

None
on_stop Callable

Callback when timer is stopped.

None

Returns:

Type Description
Thread

The daemon thread watching the timer.

Raises:

Type Description
ValueError

If timer not started or already stopped.