API Reference

uberjob.Plan

class uberjob.Plan

Represents a symbolic call graph.

graph

The underlying networkx.MultiDiGraph.

call(fn, *args, **kwargs)

Add a function call to this Plan.

Non-symbolic arguments are automatically converted to symbolic arguments using gather().

Parameters
  • fn (Callable) – The function to be called.

  • args – The symbolic positional arguments.

  • kwargs – The symbolic keyword arguments.

Return type

Call

Returns

The symbolic result of the function call.

Raises

TypeError – If arguments fail to bind to parameters.

lit(value)

Add a literal value to this Plan.

Parameters

value – The literal value.

Return type

Literal

Returns

The symbolic literal value.

add_dependency(source, target)

Add a dependency indicating that the source Node must run before the target Node.

Parameters
  • source (Node) – The Node that is depended on.

  • target (Node) – The dependent Node.

Return type

None

gather(value)

Gather a structured value that may contain instances of Node into a single Node representing the entire structured value.

If the value is already a Node, it will be returned unchanged.

When navigating the structured value, gather will only recognize Python’s general purpose built-in containers: dict, list, set, and tuple.

Parameters

value – A structured value that may contain instances of Node.

Return type

Node

Returns

A single Node representing the gathered input value.

unpack(iterable, length)

Unpack a symbolic iterable into a tuple of symbolic values.

Parameters
  • iterable – The symbolic iterable.

  • length (int) – The number of values in the iterable.

Return type

Tuple[Node, …]

Returns

A tuple of Node.

scope(*args)

A context manager for organizing a Plan.

Parameters

args – Values to append to the end of the current scope; they must be hashable and equatable.

Return type

AbstractContextManager[None]

copy()

Make a copy of this Plan.

The new copy starts with an empty scope.

Return type

Plan

Returns

The new copy.

uberjob.run

uberjob.run(plan, *, output=None, registry=None, dry_run=False, max_workers=None, max_errors=0, retry=None, fresh_time=None, progress=True, scheduler=None, transform_physical=None, stale_check_max_workers=None)

Run a Plan.

  • Returns any requested outputs.

  • Ensures that every ValueStore in the Registry is up to date.

  • When an error occurs, waits until there are no calls in flight and either no calls remain or the error limit has been exceeded. Once this condition has been met, the first error is raised.

Parameters
  • plan (Plan) – The Plan to run.

  • output – An optional symbolic output specification. It will be converted via gather().

  • registry (Optional[Registry]) – An optional Registry that may specify a ValueStore for each Node.

  • dry_run (bool) – If True, returns the physical plan and output node without actually running it.

  • max_workers (Optional[int]) – The maximum number of threads that can be used to run the Plan. The default behavior is based on the core count.

  • max_errors (Optional[int]) – The maximum number of errors that can occur before new calls stop being executed. Specifying None will run as much of the graph as possible.

  • retry (Union[None, int, Callable[[Callable], Callable]]) – Optionally specify how many times to attempt each call, or pass a retry decorator. The default behavior is to attempt each call only once.

  • fresh_time (Optional[datetime]) – Stored values older than this will not be used.

  • progress (Union[None, bool, Progress, Iterable[Progress]]) – Optionally specify how to observe progress.

  • scheduler (Optional[str]) – Optionally specify the scheduler, which is used to choose what node to process next. 'default' and 'random' are the available schedulers, 'default' attempts to finish parts of the plan before starting others. 'random' chooses a random node.

  • transform_physical (Optional[Callable[[Plan, Node], Tuple[Plan, Node]]]) – Optional transformation to be applied to the physical plan. It takes (plan, output_node) as positional arguments and returns (transformed_plan, redirected_output_node).

  • stale_check_max_workers (Optional[int]) – Optionally specify the maximum number of threads used for the stale check. The default behavior is to use max_workers.

Returns

The non-symbolic output corresponding to the symbolic output argument.

uberjob.render

uberjob.render(plan, *, registry=None, predicate=None, level=None, format=None)

Use nxv to render a plan’s symbolic call graph.

Parameters
  • plan (Union[Plan, MultiDiGraph, Tuple[Plan, Optional[Node]]]) – A plan’s symbolic call graph.

  • registry (Optional[Registry]) – A registry to include in the visualization.

  • predicate (Optional[Callable[[Node, dict], bool]]) – An optional node predicate f(u, d) that determines whether a node u with attribute dict d will be included in the render.

  • level (Optional[int]) – Optional maximum number of scope levels to view. Nodes are grouped by scope[:level].

  • format (Optional[str]) – The nxv/GraphViz output format to produce.

Return type

Optional[bytes]

Returns

The rendered graph.

uberjob.Registry

class uberjob.Registry

A mapping from Node to ValueStore.

__contains__(node)

Check if the Node has a ValueStore.

Parameters

node (Node) – The plan node.

Return type

bool

Returns

True if the node has a value store.

__getitem__(node)

Get the ValueStore for a Node.

Parameters

node (Node) – The plan node.

Return type

ValueStore

Returns

The value store for the node.

__iter__()

Get all registered Node instances.

Return type

Iterable[Node]

Returns

An iterable of Node.

__len__()

Get the number of registered (node, value_store) pairs.

Return type

int

Returns

The number of (node, value_store) pairs.

add(node, value_store)

Assign a Node to a ValueStore.

Parameters
  • node (Node) – The plan node.

  • value_store (ValueStore) – The value store for the node.

Return type

None

source(plan, value_store)

Create a Node in the Plan that reads from the given ValueStore.

Parameters
  • plan (Plan) – The plan to add a source node to.

  • value_store (ValueStore) – The value store to read from.

Return type

Node

Returns

The newly added plan node.

get(node)

Get the ValueStore for a Node if it has one, or None.

Parameters

node (Node) – The plan node.

Return type

Optional[ValueStore]

Returns

The value store for the node, or None.

keys()

Get all registered Node instances.

Return type

KeysView[Node]

Returns

A keys view of Node.

values()

Get all registered ValueStore instances.

Return type

List[ValueStore]

Returns

A list of ValueStore.

items()

Get all registered (node, value_store) pairs.

Return type

List[Tuple[Node, ValueStore]]

Returns

A list of (node, value_store) pairs.

copy()

Make a copy of this Registry.

Returns

The new copy.

uberjob.ValueStore

class uberjob.ValueStore

The abstract base class for all value stores.

abstract read()

Read the value from the store.

abstract write(value)

Write a value to the store.

Parameters

value – The value.

Return type

None

abstract get_modified_time()

Get the modified time of the stored value, or None if there is no stored value.

Return type

Optional[datetime]

uberjob.stores

class uberjob.stores.BinaryFileStore(path)

A ValueStore for storing a bytes value in a file.

Parameters

path (Union[str, Path]) – The path.

read()

Read the binary value from the file.

Return type

bytes

write(value)

Write a binary value to the file.

Parameters

value (bytes) – The value.

Return type

None

get_modified_time()

Get the modified time of the file, or None if it does not exist or is inaccessible.

Return type

Optional[datetime]

class uberjob.stores.FileStore(path)

The abstract base class for storing a value in a file.

Parameters

path (Union[str, Path]) – The path.

abstract read()

Read the value from the file.

abstract write(value)

Write a value to the file.

Parameters

value – The value.

Return type

None

get_modified_time()

Get the modified time of the file, or None if it does not exist or is inaccessible.

Return type

Optional[datetime]

uberjob.stores.get_modified_time(path)

Gets the modified time of the path, or None if it does not exist or is inaccessible.

Parameters

path (Union[str, Path]) – The path.

Return type

Optional[datetime]

uberjob.stores.staged_write(path, mode='w', **kwargs)

Context manager for writing a file atomically.

It yields a staging file object which will be atomically renamed to the given path if an exception is not raised. If an exception is raised, the staging file will be deleted if it exists.

Parameters
  • path (Union[str, Path]) – The path.

  • mode – The file open mode.

  • kwargs – Extra arguments to pass to open().

Return type

AbstractContextManager[IO[AnyStr]]

uberjob.stores.staged_write_path(path)

Context manager for writing a file atomically.

It yields a staging path which will be atomically renamed to the given path if an exception is not raised. If an exception is raised, the staging path will be deleted if it exists.

Parameters

path (Union[str, Path]) – The path.

Return type

AbstractContextManager[Union[str, Path]]

class uberjob.stores.JsonFileStore(path, *, encoding=None)

A ValueStore for storing a JSON-serializable value in a file.

Parameters
  • path (Union[str, Path]) – The path of the file.

  • encoding (Optional[str]) – The name of the encoding used to decode or encode the file.

read()

Read the JSON value from the file.

write(value)

Write a JSON-serializable value to the file.

Parameters

value – The value.

Return type

None

get_modified_time()

Get the modified time of the file, or None if it does not exist or is inaccessible.

Return type

Optional[datetime]

class uberjob.stores.LiteralSource(value, modified_time)

A ValueStore that takes value and modified time in its constructor and simply returns them from read and get_modified_time.

Parameters
  • value – The value.

  • modified_time (Optional[datetime]) – The modified time.

read()

Get the value.

write(value)

Not implemented.

get_modified_time()

Get the modified time.

Return type

Optional[datetime]

class uberjob.stores.ModifiedTimeSource(modified_time)

A ValueStore that takes a modified time in its constructor and returns it from read and get_modified_time. Useful for ensuring that something gets updated every day regardless of whether any of its inputs were updated.

Parameters

modified_time (Optional[datetime]) – The modified time.

read()

Get the modified time.

Return type

Optional[datetime]

write(value)

Not implemented.

get_modified_time()

Get the modified time.

Return type

Optional[datetime]

class uberjob.stores.MountedStore(create_store)

An abstract ValueStore that behaves like a mounted storage device.

Parameters

create_store (Callable[[str], ValueStore]) – A callable that creates a ValueStore for a path.

create_store

Creates an instance of the underlying ValueStore for the given path.

abstract copy_to_local(local_path)

Copy the value at the local_path into the store.

Parameters

local_path (str) – The local path.

abstract copy_from_local(local_path)

Copy the value in the store to the local_path.

Parameters

local_path (str) – The local path.

read()

Read the value from the store.

write(value)

Write a value to the store.

Parameters

value – The value.

Return type

None

abstract get_modified_time()

Get the modified time of the stored value, or None if there is no stored value.

Return type

Optional[datetime]

class uberjob.stores.PathSource(path, *, required=True)

A ValueStore that returns the path itself from read rather than actually reading any data.

Parameters
  • path (Union[str, Path]) – The input path.

  • required (bool) – When true, get_modified_time will raise an exception when the path is missing rather than return None.

read()

Get the path.

When required is false, this will raise an exception if the file does not exist.

Return type

Union[str, Path]

write(value)

Not implemented.

get_modified_time()

Get the modified time of the file.

If it does not exist or is inaccessible, None will be returned if required is false and an exception will be raised otherwise.

Return type

Optional[datetime]

class uberjob.stores.PickleFileStore(path)

A ValueStore for storing a picklable value in a file.

Parameters

path (Union[str, Path]) – The path.

read()

Read the pickled value from the file.

write(value)

Write a picklable value to the file.

Parameters

value – The value.

Return type

None

get_modified_time()

Get the modified time of the file, or None if it does not exist or is inaccessible.

Return type

Optional[datetime]

class uberjob.stores.TextFileStore(path, *, encoding=None)

A ValueStore for storing a str value in a file.

Parameters
  • path (Union[str, Path]) – The path of the file.

  • encoding (Optional[str]) – The name of the encoding used to decode or encode the file.

read()

Read the string value from the file.

Return type

str

write(value)

Write a string value to the file.

Parameters

value (str) – The value.

Return type

None

get_modified_time()

Get the modified time of the file, or None if it does not exist or is inaccessible.

Return type

Optional[datetime]

class uberjob.stores.TouchFileStore(path)

A ValueStore for a touch file. It can be thought of as storing None in a file.

It is useful for integrating side effects.

Parameters

path (Union[str, Path]) – The path.

read()

Return None after ensuring that the touch file exists and is empty.

write(value)

Write the touch file after ensuring that the given value is None.

Parameters

value (None) – The value, which must be None.

Return type

None

get_modified_time()

Get the modified time of the file, or None if it does not exist or is inaccessible.

Return type

Optional[datetime]

uberjob.graph

Provides the underlying graph, node, and edge classes used by the Plan.

uberjob.graph.Graph

alias of networkx.classes.multidigraph.MultiDiGraph

class uberjob.graph.Node(*, scope=())

A symbolic value in a call Graph.

class uberjob.graph.Call(fn, *, scope=(), stack_frame=None)

A symbolic function call.

Parameters
  • fn (Callable) – The callable function.

  • stack_frame – The stack frame of the call site.

class uberjob.graph.Literal(value, *, scope=())

A symbolic literal value.

Parameters

value – The value of the literal.

class uberjob.graph.Dependency

A dependency in a call Graph that links a Node to a Node that must run after it.

class uberjob.graph.PositionalArg(index)

A dependency in a call Graph that links a symbolic positional argument Node to a symbolic function Call.

Parameters

index (int) – The index of the argument.

class uberjob.graph.KeywordArg(name, index)

A dependency in a call Graph that links a symbolic keyword argument Node to a symbolic function Call.

Parameters
  • name (str) – The parameter name.

  • index (int) – The index of the argument; required because keyword arguments are ordered in Python 3.6+.

uberjob.graph.get_argument_nodes(graph, call)

Return the symbolic args and kwargs of the given Call.

Parameters
  • graph (MultiDiGraph) – The graph.

  • call (Call) – The call.

Return type

Tuple[List[Node], Dict[str, Node]]

uberjob.graph.get_scope(graph, node)

Return the scope of the given Node.

This function is deprecated. Use node.scope instead.

Parameters
  • graph (MultiDiGraph) – The graph.

  • node (Node) – The node.

Return type

Tuple

uberjob.progress

The state of a running Plan can be observed through the use of Progress.

class uberjob.progress.Progress(create_observer)

A way to observe progress.

Parameters

create_observer (Callable[[], ProgressObserver]) – A callable that creates a single-use progress observer.

observer()

Create a single-use progress observer.

Return type

ProgressObserver

uberjob.progress.default_progress = <uberjob.progress._progress.Progress object>

The default method for observing progress.

uberjob.progress.console_progress = <uberjob.progress._progress.Progress object>

Display observed progress using the console.

uberjob.progress.ipython_progress = <uberjob.progress._progress.Progress object>

Display observed progress using IPython widgets.

uberjob.progress.null_progress = <uberjob.progress._progress.Progress object>

Ignore observed progress.

uberjob.progress.html_progress(output)

Write observed progress to an HTML file.

Parameters

output (Union[str, Path, Callable[[bytes], None]]) – The output path or a function that will be called with the output bytes.

Return type

Progress

uberjob.progress.composite_progress(*members)

Create a progress observer that forwards observations to all of its members.

Parameters

members (Progress) – The members.

Return type

Progress

class uberjob.progress.ProgressObserver

The abstract base class for all progress observers.

abstract __enter__()

Start observing progress.

abstract __exit__(exc_type, exc_val, exc_tb)

Stop observing progress.

abstract increment_total(*, section, scope, amount)

Increment the number of entries in this section and scope by the specified amount.

Parameters
  • section (str) – The section.

  • scope (Tuple) – The scope.

  • amount (int) – The amount.

abstract increment_running(*, section, scope)

Increment the number of running entries in this section and scope. This method must be thread-safe.

Parameters
  • section (str) – The section.

  • scope (Tuple) – The scope.

abstract increment_completed(*, section, scope)

Increment the number of completed entries in this section and scope. This method must be thread-safe.

Parameters
  • section (str) – The section.

  • scope (Tuple) – The scope.

abstract increment_failed(*, section, scope, exception)

Increment the number of failed entries in this section and scope. This method must be thread-safe.

Parameters
  • section (str) – The section.

  • scope (Tuple) – The scope.

  • exception (Exception) – The exception.

Exceptions

class uberjob.CallError(call)

An exception was raised in a symbolic call.

Parameters

call (Call) – The call.

class uberjob.NotTransformedError

An expected transformation was not applied.