V1Dag

polyaxon._flow.run.dag.V1Dag()

Dag (Directed Acyclic Graphs) is a collection of all the operations you want to run, organized in a way that reflects their relationships and dependencies.

A dag’s main goal is to describe and run several operations necessary for a Machine Learning (ML) workflow.

A dag executes a dependency graph of operations, each operation runs a Kubernetes primitive described in its component.

Dags are defined in Polyaxon as a component runtime, which makes them compatible with all knowledge used for running other runtimes:

  • They can be defined in reusable components and can be registered in the Component Hub.
  • They get executed using operations.
  • They can be parametrized similar to jobs and services.
  • Since they are defined as components’ runtimes, and they run a graph of other components, they can be nested natively.
  • They can leverage all pipeline helpers.
  • They can run in parallel and can be used with mapping or other optimization algorithms.
  • They can run on schedule
  • They can subscribe to events
  • They can take advantage of all scheduling strategies to route operations to nodes, namespaces, and clusters even within the same DAG.

YAML usage

run:
  kind: dag
  operations:
  components:
  environment:
  connections:
  volumes:
  concurrency:
  earlyStopping:

Python usage

from polyaxon.schemas import V1Dag, V1Component, V1Environment, V1Operation
from polyaxon import k8s
dag = V1Dag(
    operations=[V1Operation(...)],
    components=[V1Component(...), V1Component(...)],
    environment=V1Environment(...),
    connections=["connection-name1"],
    volumes=[k8s.V1Volume(...)],
)

Fields

kind

The kind signals to the CLI, client, and other tools that this component’s runtime is a dag.

If you are using the python client to create the runtime, this field is not required and is set by default.

run:
  kind: dag

operations

A list of operations to run with their dependency definition. If the operations are defined with no dependencies or no params are passed from one operation to another, the operations will be running in parallel following the concurrency and other queue priority definitions.

run:
  kind: dag
  operations:
    - name: job1
      hubRef: component1:latest
      params:
        ...
    - name: job2
      hubRef: component1:2.1
      params:
        ...
    - name: job3
      urlRef: https://some_url.com
      params:
        param1:
          ref: ops.job2
          value: outputs.outputName

Note: For more information about managing the execution graph and creating dependencies between operations, please check the flow dependencies section.

references

A list of operations and their dependency definition. If operations are defined with dependencies or no params are passed from one operation to another, the operations will be running in parallel following the concurrency and other queue priority definitions.

Operations can reference components using:

run:
  kind: dag
  operations:
    - name: download1
    dagRef: download
    params:
      url: {value: 'gs://ml-pipeline-playground/shakespeare1.txt'}
      result: {value: 'result.txt'}
    - name: download2
      dagRef: download
      params:
        url: {value: 'gs://ml-pipeline-playground/shakespeare2.txt'}
        result: {value: 'result.txt'}

components

A list of reusable components defined inside the DAG that can be used by one or several operations. This field is only useful when you need to define inline components for your operations and more than one operation is using the same component definition.

operations:
  - name: download-url1
    dagRef: download
    ...
  - name: download-url2
    dagRef: download
    ...
components:
  - name: download
    inputs:
      - name: url
        type: url
    outputs:
      - name: result
        type: path
        delayValidation: false
    run:
      kind: job
      container:
        image: 'google/cloud-sdk:272.0.0'
        command: ['sh', '-c'],
        args: ['gsutil cat $0 | tee $1', "{{ url }}", "{{ outputs_path }}/{{ result }}"]

environment

Optional environment section, it provides a way to inject pod related information.

The environment definition will be passed to all children operations.

run:
  kind: dag
  environment:
    labels:
       key1: "label1"
       key2: "label2"
     annotations:
       key1: "value1"
       key2: "value2"
     nodeSelector:
       node_label: node_value
     ...
 ...

connections

A list of connection names to resolve for the dag.

If you are referencing a connection it must be configured. All referenced connections will be checked:
  • If they are accessible in the context of the project of this run

  • If the user running the operation can have access to those connections

The connections definition will be passed to all operations. After checks, the connections will be resolved and inject any volumes, secrets, configMaps, environment variables for your main container to function correctly.

run:
  kind: dag
  connections: [connection1, connection2]

volumes

A list of Kubernetes Volumes to resolve and mount for your jobs.

This is an advanced use-case where configuring a connection is not an option.

the volumes definition will be passed to all operations.

When you add a volume you need to mount it manually to your container(s).

run:
  kind: dag
  volumes:
    - name: volume1
      persistentVolumeClaim:
        claimName: pvc1
  ...

concurrency

An optional value to set the number of concurrent operations.

matrix:
  kind: dag
  concurrency: 2

For more details about concurrency management, please check the concurrency section.

earlyStopping

A list of early stopping conditions to check for terminating all operations managed by the pipeline. If one of the early stopping conditions is met, a signal will be sent to terminate all running and pending operations.

matrix:
  kind: dag
  earlyStopping: ...

For more details please check the early stopping section.