API Reference¶
uberjob.Plan¶
-
class
uberjob.Plan¶ Represents a symbolic call graph.
-
graph¶ The underlying
networkx.MultiDiGraph.
-
call(fn, *args, **kwargs)¶ Add a function call to this
Plan.Non-symbolic arguments are automatically converted to symbolic arguments using
gather().
-
lit(value)¶ Add a literal value to this
Plan.- Parameters
value – The literal value.
- Return type
Literal- Returns
The symbolic literal value.
-
add_dependency(source, target)¶ Add a dependency indicating that the source
Nodemust run before the targetNode.
-
gather(value)¶ Gather a structured value that may contain instances of
Nodeinto a singleNoderepresenting the entire structured value.If the value is already a
Node, it will be returned unchanged.When navigating the structured value, gather will only recognize Python’s general purpose built-in containers:
dict,list,set, andtuple.
-
unpack(iterable, length)¶ Unpack a symbolic iterable into a tuple of symbolic values.
-
uberjob.run¶
-
uberjob.run(plan, *, output=None, registry=None, dry_run=False, max_workers=None, max_errors=0, retry=None, fresh_time=None, progress=True, scheduler=None, transform_physical=None, stale_check_max_workers=None)¶ Run a
Plan.Returns any requested outputs.
Ensures that every
ValueStorein theRegistryis up to date.When an error occurs, waits until there are no calls in flight and either no calls remain or the error limit has been exceeded. Once this condition has been met, the first error is raised.
- Parameters
output – An optional symbolic output specification. It will be converted via
gather().registry (
Optional[Registry]) – An optionalRegistrythat may specify aValueStorefor eachNode.dry_run (
bool) – If True, returns the physical plan and output node without actually running it.max_workers (
Optional[int]) – The maximum number of threads that can be used to run thePlan. The default behavior is based on the core count.max_errors (
Optional[int]) – The maximum number of errors that can occur before new calls stop being executed. SpecifyingNonewill run as much of the graph as possible.retry (
Union[None,int,Callable[[Callable],Callable]]) – Optionally specify how many times to attempt each call, or pass a retry decorator. The default behavior is to attempt each call only once.fresh_time (
Optional[datetime]) – Stored values older than this will not be used.progress (
Union[None,bool,Progress,Iterable[Progress]]) – Optionally specify how to observe progress.scheduler (
Optional[str]) – Optionally specify the scheduler, which is used to choose what node to process next.'default'and'random'are the available schedulers,'default'attempts to finish parts of the plan before starting others.'random'chooses a random node.transform_physical (
Optional[Callable[[Plan,Node],Tuple[Plan,Node]]]) – Optional transformation to be applied to the physical plan. It takes(plan, output_node)as positional arguments and returns(transformed_plan, redirected_output_node).stale_check_max_workers (
Optional[int]) – Optionally specify the maximum number of threads used for the stale check. The default behavior is to usemax_workers.
- Returns
The non-symbolic output corresponding to the symbolic output argument.
uberjob.render¶
-
uberjob.render(plan, *, registry=None, predicate=None, level=None, format=None)¶ Use
nxvto render a plan’s symbolic call graph.- Parameters
plan (
Union[Plan,MultiDiGraph,Tuple[Plan,Optional[Node]]]) – A plan’s symbolic call graph.registry (
Optional[Registry]) – A registry to include in the visualization.predicate (
Optional[Callable[[Node,dict],bool]]) – An optional node predicatef(u, d)that determines whether a nodeuwith attribute dictdwill be included in the render.level (
Optional[int]) – Optional maximum number of scope levels to view. Nodes are grouped by scope[:level].format (
Optional[str]) – The nxv/GraphViz output format to produce.
- Return type
- Returns
The rendered graph.
uberjob.Registry¶
-
class
uberjob.Registry¶ A mapping from
NodetoValueStore.-
__contains__(node)¶ Check if the
Nodehas aValueStore.- Parameters
node (
Node) – The plan node.- Return type
- Returns
True if the node has a value store.
-
__getitem__(node)¶ Get the
ValueStorefor aNode.- Parameters
node (
Node) – The plan node.- Return type
- Returns
The value store for the node.
-
__len__()¶ Get the number of registered (node, value_store) pairs.
- Return type
- Returns
The number of (node, value_store) pairs.
-
add(node, value_store)¶ Assign a
Nodeto aValueStore.- Parameters
node (
Node) – The plan node.value_store (
ValueStore) – The value store for the node.
- Return type
None
-
source(plan, value_store)¶ Create a
Nodein thePlanthat reads from the givenValueStore.- Parameters
plan (
Plan) – The plan to add a source node to.value_store (
ValueStore) – The value store to read from.
- Return type
Node- Returns
The newly added plan node.
-
get(node)¶ Get the
ValueStorefor aNodeif it has one, orNone.- Parameters
node (
Node) – The plan node.- Return type
- Returns
The value store for the node, or
None.
-
values()¶ Get all registered
ValueStoreinstances.- Return type
- Returns
A list of
ValueStore.
-
items()¶ Get all registered (node, value_store) pairs.
- Return type
List[Tuple[Node,ValueStore]]- Returns
A list of (node, value_store) pairs.
-
uberjob.ValueStore¶
uberjob.stores¶
-
class
uberjob.stores.BinaryFileStore(path)¶ A
ValueStorefor storing abytesvalue in a file.
-
class
uberjob.stores.FileStore(path)¶ The abstract base class for storing a value in a file.
-
abstract
read()¶ Read the value from the file.
-
abstract
write(value)¶ Write a value to the file.
- Parameters
value – The value.
- Return type
None
-
abstract
-
uberjob.stores.get_modified_time(path)¶ Gets the modified time of the path, or
Noneif it does not exist or is inaccessible.
-
uberjob.stores.staged_write(path, mode='w', **kwargs)¶ Context manager for writing a file atomically.
It yields a staging file object which will be atomically renamed to the given path if an exception is not raised. If an exception is raised, the staging file will be deleted if it exists.
-
uberjob.stores.staged_write_path(path)¶ Context manager for writing a file atomically.
It yields a staging path which will be atomically renamed to the given path if an exception is not raised. If an exception is raised, the staging path will be deleted if it exists.
-
class
uberjob.stores.JsonFileStore(path, *, encoding=None)¶ A
ValueStorefor storing a JSON-serializable value in a file.- Parameters
-
read()¶ Read the JSON value from the file.
-
write(value)¶ Write a JSON-serializable value to the file.
- Parameters
value – The value.
- Return type
None
-
class
uberjob.stores.LiteralSource(value, modified_time)¶ A
ValueStorethat takes value and modified time in its constructor and simply returns them fromreadandget_modified_time.-
read()¶ Get the value.
-
write(value)¶ Not implemented.
-
-
class
uberjob.stores.ModifiedTimeSource(modified_time)¶ A
ValueStorethat takes a modified time in its constructor and returns it fromreadandget_modified_time. Useful for ensuring that something gets updated every day regardless of whether any of its inputs were updated.-
write(value)¶ Not implemented.
-
-
class
uberjob.stores.MountedStore(create_store)¶ An abstract
ValueStorethat behaves like a mounted storage device.- Parameters
create_store (
Callable[[str],ValueStore]) – A callable that creates aValueStorefor a path.
-
create_store¶ Creates an instance of the underlying
ValueStorefor the given path.
-
abstract
copy_to_local(local_path)¶ Copy the value at the local_path into the store.
- Parameters
local_path (
str) – The local path.
-
abstract
copy_from_local(local_path)¶ Copy the value in the store to the local_path.
- Parameters
local_path (
str) – The local path.
-
read()¶ Read the value from the store.
-
write(value)¶ Write a value to the store.
- Parameters
value – The value.
- Return type
None
-
class
uberjob.stores.PathSource(path, *, required=True)¶ A
ValueStorethat returns the path itself fromreadrather than actually reading any data.- Parameters
-
read()¶ Get the path.
When
requiredis false, this will raise an exception if the file does not exist.
-
write(value)¶ Not implemented.
-
class
uberjob.stores.PickleFileStore(path)¶ A
ValueStorefor storing a picklable value in a file.-
read()¶ Read the pickled value from the file.
-
write(value)¶ Write a picklable value to the file.
- Parameters
value – The value.
- Return type
None
-
-
class
uberjob.stores.TextFileStore(path, *, encoding=None)¶ A
ValueStorefor storing astrvalue in a file.- Parameters
-
class
uberjob.stores.TouchFileStore(path)¶ A
ValueStorefor a touch file. It can be thought of as storingNonein a file.It is useful for integrating side effects.
-
read()¶ Return
Noneafter ensuring that the touch file exists and is empty.
-
write(value)¶ Write the touch file after ensuring that the given value is
None.- Parameters
value (
None) – The value, which must beNone.- Return type
None
-
uberjob.graph¶
Provides the underlying graph, node, and edge classes used by the Plan.
-
uberjob.graph.Graph¶
-
class
uberjob.graph.Call(fn, *, scope=(), stack_frame=None)¶ A symbolic function call.
- Parameters
fn (
Callable) – The callable function.stack_frame – The stack frame of the call site.
-
class
uberjob.graph.Literal(value, *, scope=())¶ A symbolic literal value.
- Parameters
value – The value of the literal.
-
class
uberjob.graph.Dependency¶ A dependency in a call
Graphthat links aNodeto aNodethat must run after it.
-
class
uberjob.graph.PositionalArg(index)¶ A dependency in a call
Graphthat links a symbolic positional argumentNodeto a symbolic functionCall.- Parameters
index (
int) – The index of the argument.
-
class
uberjob.graph.KeywordArg(name, index)¶ A dependency in a call
Graphthat links a symbolic keyword argumentNodeto a symbolic functionCall.
-
uberjob.graph.get_argument_nodes(graph, call)¶ Return the symbolic args and kwargs of the given
Call.- Parameters
graph (
MultiDiGraph) – The graph.call (
Call) – The call.
- Return type
-
uberjob.graph.get_scope(graph, node)¶ Return the scope of the given
Node.This function is deprecated. Use
node.scopeinstead.- Parameters
graph (
MultiDiGraph) – The graph.node (
Node) – The node.
- Return type
uberjob.progress¶
The state of a running Plan can be observed through the use of Progress.
-
class
uberjob.progress.Progress(create_observer)¶ A way to observe progress.
- Parameters
create_observer (
Callable[[],ProgressObserver]) – A callable that creates a single-use progress observer.
-
observer()¶ Create a single-use progress observer.
- Return type
ProgressObserver
-
uberjob.progress.default_progress= <uberjob.progress._progress.Progress object>¶ The default method for observing progress.
-
uberjob.progress.console_progress= <uberjob.progress._progress.Progress object>¶ Display observed progress using the console.
-
uberjob.progress.ipython_progress= <uberjob.progress._progress.Progress object>¶ Display observed progress using IPython widgets.
-
uberjob.progress.null_progress= <uberjob.progress._progress.Progress object>¶ Ignore observed progress.
-
uberjob.progress.html_progress(output)¶ Write observed progress to an HTML file.
-
uberjob.progress.composite_progress(*members)¶ Create a progress observer that forwards observations to all of its members.
- Parameters
members (
Progress) – The members.- Return type
Progress
-
class
uberjob.progress.ProgressObserver¶ The abstract base class for all progress observers.
-
abstract
__enter__()¶ Start observing progress.
-
abstract
__exit__(exc_type, exc_val, exc_tb)¶ Stop observing progress.
-
abstract
increment_total(*, section, scope, amount)¶ Increment the number of entries in this section and scope by the specified amount.
-
abstract
increment_running(*, section, scope)¶ Increment the number of running entries in this section and scope. This method must be thread-safe.
-
abstract
increment_completed(*, section, scope)¶ Increment the number of completed entries in this section and scope. This method must be thread-safe.
-
abstract
Exceptions¶
-
class
uberjob.CallError(call)¶ An exception was raised in a symbolic call.
- Parameters
call (
Call) – The call.
-
class
uberjob.NotTransformedError¶ An expected transformation was not applied.