zzlib.utils module¶
zzlib.utils
¶
zzlib.utils.AsyncTaskPool
¶
A pool for managing asynchronous tasks with optional concurrency control.
This class provides functionality to:
-
Add and manage multiple async tasks.
-
Control maximum concurrent tasks.
-
Wait for task completion.
-
Get results as soon as tasks complete.
__init__(max_workers=None)
¶
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
max_workers |
int
|
Maximum number of concurrent tasks. If None, no limit will be set. |
None
|
add(func, *args, **kw)
¶
as_completed()
async
¶
Yield completed tasks as they finish.
Yields:
| Type | Description |
|---|---|
AsyncGenerator[Task, None, None]
|
Completed tasks in order of completion. |
cancel()
¶
Cancel all pending tasks and stop the pool.
zzlib.utils.BinaryEventSource
¶
Bases: EventSource
A simplified event source for binary (True/False) conditions.
__init__(func, whenTrue=None, whenFalse=None, initial=InitialState.NONE)
¶
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
func |
Callable
|
A callable that returns True or False. |
required |
whenTrue |
Any
|
Event to trigger when condition becomes True. |
None
|
whenFalse |
Any
|
Event to trigger when condition becomes False. |
None
|
initial |
InitialState
|
Initial state handling, one of InitialState enum values. |
NONE
|
zzlib.utils.CallbackQueue
¶
Bases: IterQueue
A queue that processes items through a series of callback functions before storing them.
This queue extends IterQueue by adding callback processing functionality. When items are put into the queue, they are first passed through a series of callback functions before being stored.
__init__(*args, **kw)
¶
callbacks = []
instance-attribute
¶
List of callback functions that transform queue items.
Each callback should be a function that:
-
Takes a single item as input.
-
Returns either:
-
A single transformed item.
-
An iterable of transformed items.
-
put(item, **kw)
¶
Put an item into the queue after processing it through callbacks.
The item is passed through all registered callback functions in sequence before being stored in the queue.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
item |
Any
|
The item to put in the queue. |
required |
**kw |
Additional keyword arguments passed to parent put() method. |
{}
|
zzlib.utils.Checker
¶
Bases: FactoryProxy
A class for checking conditions and generating events based on those conditions.
Example
# Create a checker for a condition
checker = Checker(lambda: x > 10)
# Configure when and how to check
checker.when(lambda: y < 5).on('change')
# Wait for the condition with timeout
checker.wait(timeout=5.0)
# Use with an iterable
for item in checker.during(items):
process(item)
# Stops iteration when condition is met
__init__(func, **kw)
¶
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
func |
Callable
|
The function to check/evaluate. |
required |
**kw |
Additional options for event configuration. |
{}
|
__subject__
cached
property
¶
Get or create the underlying eventer object.
during(it)
¶
event(val)
¶
Set the event to trigger.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
val |
Any
|
The event object |
required |
on(val)
¶
Set when the checking should occur.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
val |
Union[str, EventTiming]
|
Timing specification, either as string or EventTiming enum. |
required |
wait(timeout=None)
¶
zzlib.utils.EventCondition = namedtuple('EventCondition', ['event', 'condition', 'timing'], defaults=(True, EventTiming.ON_BEING))
module-attribute
¶
A named tuple defining an event's trigger condition and timing.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event |
Any
|
The event object to trigger when condition is met. |
required |
condition |
Union[bool, Callable]
|
The condition to check. Can be a boolean value, or a callable that takes current value and returns bool. Defaults to True. |
required |
timing |
[EventTiming][EventTiming]
|
When to trigger the event. Defaults to ON_BEING. |
required |
zzlib.utils.EventSource
¶
Bases: Delayed
A class that monitors events based on conditions and timing.
__init__(func, conditions=(), initial=InitialState.NONE)
¶
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
func |
Callable
|
The function to call to check event state. |
required |
conditions |
Iterable[EventCondition]
|
List of EventCondition tuples defining event triggers. |
()
|
initial |
InitialState
|
Initial state, one of InitialState enum values. |
NONE
|
add(event, condition=True, timing=EventTiming.ON_BEING)
¶
Add a new event condition.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event |
Any
|
The event to trigger. |
required |
condition |
bool
|
The condition to check, can be value or callable. |
True
|
timing |
EventTiming
|
When to trigger, one of EventTiming enum values. |
ON_BEING
|
check(queue)
¶
Start checking events in a daemon thread.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
queue |
Queue
|
Queue to put triggered events into. |
required |
value
property
writable
¶
Get the last checked value.
zzlib.utils.EventTiming = Enum('EventTiming', 'CONTINUOUS ON_BEING ON_NOT_BEING')
module-attribute
¶
Enum defining when an event should be triggered.
Values
CONTINUOUS: Trigger continuously while condition is True. Example: Keep triggering alarm while temperature is high.
ON_BEING: Trigger once when condition changes from False to True. Example: Trigger when temperature first exceeds threshold.
ON_NOT_BEING: Trigger once when condition changes from True to False. Example: Trigger when temperature returns to normal range.
zzlib.utils.Eventer
¶
Bases: ThreadingMixin
A class for managing event sources and processing events through callbacks.
__init__(*sources, interval=1.0)
¶
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
*sources |
EventSource
|
One or more EventSource instances to monitor. |
()
|
interval |
float
|
Time in seconds between event checks. Defaults to 1.0. |
1.0
|
add_callback(callback, events=None)
¶
add_processor(processor)
¶
Add an event processor to the start of the callback chain.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
processor |
Callable
|
A callable that processes events. The processor should accept an iterable of events and yield processed events. It can modify, filter, or generate new events. |
required |
check()
¶
Check all event sources and process any triggered events.
check_loop()
¶
Run continuous event checking loop until stopped.
event_flag(events, flag=None)
¶
Create or use an Event flag that gets set when specified events occur.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
events |
Union[Any, Iterable]
|
Event or events to watch for. |
required |
flag |
Event
|
Optional Event instance to use. Creates new one if not provided. |
None
|
Returns:
| Type | Description |
|---|---|
Event
|
The event flag that will be set on matching events. |
events(history=True)
¶
get(*args, **kw)
¶
Get an event from the event queue.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
*args |
Positional arguments passed to queue's get(). |
()
|
|
**kw |
Keyword arguments passed to queue's get(). |
{}
|
put(*args, **kw)
¶
Put an event into the event queue.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
*args |
Positional arguments passed to queue's put(). |
()
|
|
**kw |
Keyword arguments passed to queue's put(). |
{}
|
wait(events, history=True, timeout=None)
¶
Wait for specified events to occur.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
events |
Union[Any, Iterable]
|
Event or events to wait for. |
required |
history |
bool
|
Whether to check historical events. Defaults to True. |
True
|
timeout |
bool
|
Maximum time to wait in seconds. None means wait forever. |
None
|
Returns:
| Type | Description |
|---|---|
bool
|
True if matching event occurred, False if timed out. |
zzlib.utils.InitialState = Enum('InitialState', 'NONE FIRST ERROR')
module-attribute
¶
Enum defining initial states for event sources.
Values
NONE: Default initial state where any new value is treated as a change.
FIRST: First check is ignored as a change, but subsequent checks are evaluated.
ERROR: Indicates that an error occurred while checking the value.
zzlib.utils.IterQueue
¶
Bases: Queue
A Queue that can be iterated and followed.
This class extends Queue to add iteration capabilities. It allows:
-
Iterating over current queue contents without popping them.
-
Following queue updates in real-time.
-
Optionally including history when following.
-
Stopping follow based on a condition.
__iter__()
¶
Iterate over current queue contents.
follow(history=True, stop_when=None)
¶
Follow queue updates, optionally including history.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
history |
bool
|
If True, yield existing items before following updates. Defaults to True. |
True
|
stop_when |
Callable
|
Optional callable that determines when to stop following. When this callable evaluates to True, following will stop. |
None
|
zzlib.utils.Logger
¶
Bases: ThreadingMixin
A logger content processor for dealing with patterns.
add(source, pattern)
¶
Add a log source and its associated pattern.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
source |
Iterable[str]
|
An iterable yielding log messages as strings. |
required |
pattern |
LoggerPattern
|
The LoggerPattern to process messages from this source. |
required |
zzlib.utils.LoggerPattern
¶
A class for managing logging patterns, used by Logger.
add(func, match='.*', format=None, processors=())
¶
Add a pattern to match and process log lines.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
func |
Callable
|
Callable to handle matched lines. |
required |
match |
str
|
Regex pattern to match lines against. |
'.*'
|
format |
str
|
Optional format string to format matched groups. |
None
|
processors |
Iterable[Callable]
|
Optional sequence of callables to pre-process lines. |
()
|
zzlib.utils.Merged
¶
Bases: ThreadingMixin
A class that merges multiple iterators into a single iterator.
This class takes multiple iterators and combines their outputs into a single iterator stream. Each iterator runs in its own thread and outputs are merged in the order they arrive.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
*its |
Iterators to merge. |
()
|
|
endswhen |
Literal['all', 'first', 'any']
|
When to end the merged iterator: - 'all': End when all source iterators complete (default)
|
'all'
|
Example
# Create iterators that simulate slow data streams
def slow_iter1():
for i in range(5):
time.sleep(0.1) # Simulate slow processing
yield i
def slow_iter2():
for i in range(5, 10):
time.sleep(0.2) # Different delay
yield i
merged = Merged(slow_iter1(), slow_iter2())
for item in merged:
print(item) # Will print numbers from both iterators as they arrive
zzlib.utils.TimedCache
¶
Bases: FactoryProxy
A class that delays function execution and caches the result with timeout.
Similar to FactoryProxy but invalidates the result when timeout.
Example
@TimedCache(timeout=60) # Cache for 60 seconds
def expensive_function():
# Do expensive computation
return result
# First call computes and caches
result1 = expensive_function()
# Within timeout, returns cached value
result2 = expensive_function()
# After timeout, recomputes and updates cache
time.sleep(61)
result3 = expensive_function()
zzlib.utils.Timeout
¶
zzlib.utils.Timer
¶
Bases: StopableMixin
A timer class that can track elapsed time, timeout, and support pause/resume.
__init__(timeout=None, start=True)
¶
elapsed: timedelta
property
¶
Get accumulated time excluding paused time.
Returns:
| Type | Description |
|---|---|
timedelta
|
Total accumulated time. |
is_paused()
¶
Check if timer is paused.
is_ticking()
¶
Check if timer is actively running.
is_timeout()
¶
Check if timer has timed out.
pause()
¶
Pause the timer while preserving elapsed time.
start(reset=False)
¶
Start or resume the timer.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
reset |
bool
|
Whether to reset accumulated time. Defaults to False. |
False
|
wait()
¶
Wait for timer to timeout or stop.
Returns:
| Type | Description |
|---|---|
bool
|
True if timed out, False if stopped. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If timer not started. |
watch(on_timeout=None, on_stop=None)
¶
Watch the timer in a separate thread and trigger callbacks.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
on_timeout |
Callable
|
Callback when timer times out. |
None
|
on_stop |
Callable
|
Callback when timer is stopped. |
None
|
Returns:
| Type | Description |
|---|---|
Thread
|
The daemon thread watching the timer. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If timer not started or already stopped. |