V1Dag
polyaxon._flow.run.dag.V1Dag()
Dag (Directed Acyclic Graphs) is a collection of all the operations you want to run, organized in a way that reflects their relationships and dependencies.
A dag’s main goal is to describe and run several operations necessary for a Machine Learning (ML) workflow.
A dag executes a dependency graph of operations, each operation runs a Kubernetes primitive described in its component.
Dags are defined in Polyaxon as a component runtime, which makes them compatible with all knowledge used for running other runtimes:
- They can be defined in reusable components and can be registered in the Component Hub.
- They get executed using operations.
- They can be parametrized similar to jobs and services.
- Since they are defined as components’ runtimes, and they run a graph of other components, they can be nested natively.
- They can leverage all pipeline helpers.
- They can run in parallel and can be used with mapping or other optimization algorithms.
- They can run on schedule
- They can subscribe to events
- They can take advantage of all scheduling strategies to route operations to nodes, namespaces, and clusters even within the same DAG.
- Args:
- kind: str, should be equal
dag
- operations: List[V1Operation]
- components: List[V1Component], optional
- environment: V1Environment, optional
- connections: List[str], optional
- volumes: List[Kubernetes Volume], optional
- concurrency: init, optional
- early_stopping: List[EarlyStopping], optional
- kind: str, should be equal
YAML usage
run:
kind: dag
operations:
components:
environment:
connections:
volumes:
concurrency:
earlyStopping:
Python usage
from polyaxon.schemas import V1Dag, V1Component, V1Environment, V1Operation
from polyaxon import k8s
dag = V1Dag(
operations=[V1Operation(...)],
components=[V1Component(...), V1Component(...)],
environment=V1Environment(...),
connections=["connection-name1"],
volumes=[k8s.V1Volume(...)],
)
Fields
kind
The kind signals to the CLI, client, and other tools that this component’s runtime is a dag.
If you are using the python client to create the runtime, this field is not required and is set by default.
run:
kind: dag
operations
A list of operations to run with their dependency definition. If the operations are defined with no dependencies or no params are passed from one operation to another, the operations will be running in parallel following the concurrency and other queue priority definitions.
run:
kind: dag
operations:
- name: job1
hubRef: component1:latest
params:
...
- name: job2
hubRef: component1:2.1
params:
...
- name: job3
urlRef: https://some_url.com
params:
param1:
ref: ops.job2
value: outputs.outputName
Note: For more information about managing the execution graph and creating dependencies between operations, please check the flow dependencies section.
references
A list of operations and their dependency definition. If operations are defined with dependencies or no params are passed from one operation to another, the operations will be running in parallel following the concurrency and other queue priority definitions.
Operations can reference components using:
- dagRef (reusable component defined inside the dag)
- hubRef
- pathRef
- urlRef
- inline component
run:
kind: dag
operations:
- name: download1
dagRef: download
params:
url: {value: 'gs://ml-pipeline-playground/shakespeare1.txt'}
result: {value: 'result.txt'}
- name: download2
dagRef: download
params:
url: {value: 'gs://ml-pipeline-playground/shakespeare2.txt'}
result: {value: 'result.txt'}
components
A list of reusable components defined inside the DAG that can be used by one or several operations. This field is only useful when you need to define inline components for your operations and more than one operation is using the same component definition.
operations:
- name: download-url1
dagRef: download
...
- name: download-url2
dagRef: download
...
components:
- name: download
inputs:
- name: url
type: url
outputs:
- name: result
type: path
delayValidation: false
run:
kind: job
container:
image: 'google/cloud-sdk:272.0.0'
command: ['sh', '-c'],
args: ['gsutil cat $0 | tee $1', "{{ url }}", "{{ outputs_path }}/{{ result }}"]
environment
Optional environment section, it provides a way to inject pod related information.
The environment definition will be passed to all children operations.
run:
kind: dag
environment:
labels:
key1: "label1"
key2: "label2"
annotations:
key1: "value1"
key2: "value2"
nodeSelector:
node_label: node_value
...
...
connections
A list of connection names to resolve for the dag.
If you are referencing a connection it must be configured. All referenced connections will be checked:
If they are accessible in the context of the project of this run
If the user running the operation can have access to those connections
The connections definition will be passed to all operations. After checks, the connections will be resolved and inject any volumes, secrets, configMaps, environment variables for your main container to function correctly.
run:
kind: dag
connections: [connection1, connection2]
volumes
A list of Kubernetes Volumes to resolve and mount for your jobs.
This is an advanced use-case where configuring a connection is not an option.
the volumes definition will be passed to all operations.
When you add a volume you need to mount it manually to your container(s).
run:
kind: dag
volumes:
- name: volume1
persistentVolumeClaim:
claimName: pvc1
...
concurrency
An optional value to set the number of concurrent operations.
matrix:
kind: dag
concurrency: 2
For more details about concurrency management, please check the concurrency section.
earlyStopping
A list of early stopping conditions to check for terminating all operations managed by the pipeline. If one of the early stopping conditions is met, a signal will be sent to terminate all running and pending operations.
matrix:
kind: dag
earlyStopping: ...
For more details please check the early stopping section.