toolrack.aio

Utilities based on the asyncio library.

exception toolrack.aio.AlreadyRunning

The TimedCall is already running.

exception toolrack.aio.NotRunning

The TimedCall is not running.

class toolrack.aio.PeriodicCall(func, *args, **kwargs)

A TimedCall called at a fixed time intervals.

start(interval, now=True)

Start calling the function periodically.

Parameters:
  • interval (int | float) – the time interval in seconds between calls.

  • now (bool) – whether to make the first call immediately.

class toolrack.aio.ProcessParserProtocol(out_parser=None, err_parser=None)

Collect process stdout and stderr.

If parser functions are be passed for stdout and/or stderr, they are called on each full line of output.

If no parser function is passed, the full output content is returned when the process terminates via the done Future. This returns a tuple with the full stdout and stderr. Each tuple element is None if a parser is passed for that stream.

Parameters:
  • out_parser – an optional parser for the process standard output.

  • err_parser – an optional parser for the process standard error.

done: Future
pipe_connection_lost(fd, exc)

Called when a file descriptor associated with the child process is closed.

fd is the int file descriptor that was closed.

pipe_data_received(fd, data)

Called when the subprocess writes data into stdout/stderr pipe.

fd is int file descriptor. data is bytes object.

process_exited()

Called when subprocess has exited.

class toolrack.aio.StreamHelper(callback=None, separator='\\n')

Helper to cache data until full lines of text are received.

This is useful to collect data from a stream and process them when full lines are received. For example:

stream = StreamHelper(callback=callback)
stream.receive_data('line one\nline two')
stream.receive_data('continues here\n')

would call callback twice, one with 'line one' and one with 'line two continues here'

Parameters:
  • callback (callable) – an optional function which is called with full lines of text from the stream.

  • separator (str) – the line separator

flush_partial()

Flush and process pending data from a partial line.

get_data()

Return the full content of the stream if no callback is defined.

Return type:

Optional[str]

receive_data(data)

Receive data and process them.

If a callback has been passed to the class, it’s called for each full line of text.

class toolrack.aio.TimedCall(func, *args, **kwargs)

Call a function based on a timer.

The class takes a function with optional arguments. Upon start(), the function is scheduled at specified times until stop() is called (or the time iterator is exausted).

Parameters:
  • func (Callable) – the function to call periodically.

  • args – arguments to pass to the function.

  • kwargs – keyword arguments to pass to the function.

property running: bool

Whether the PeriodicCall is currently running.

start(times_iter)

Start calling the function at specified times.

Parameters:

times_iter (Iterator[float | int]) – an iterable yielding times to execute the function at. If the iterator exhausts, the TimedCall is stopped. Times must be compatible with loop.time().

async stop()

Stop calling the function periodically.

Return type:

None