This is part of our commercial offering.

V1Dask

polyaxon.polyflow.run.dask.V1Dask(kind='dask', threads=None, scale=None, adapt_min=None, adapt_max=None, adapt_interval=None, environment=None, connections=None, volumes=None, init=None, sidecars=None, container=None)

Dask jobs are used to run distributed jobs using a Dask cluster.

Dask Kubernetes deploys Dask workers on Kubernetes clusters using native Kubernetes APIs. It is designed to dynamically launch short-lived deployments of workers during the lifetime of a Python process.

The Dask job spawn a temporary adaptive Dask cluster with a Dask scheduler and workers to run your container.

  • Args:
    • kind: str, should be equal dask
    • threads: int, optiona
    • scale: int, optional
    • adapt_min: int, optional
    • adapt_max: int, optional
    • adapt_interval: int, optional
    • environment: V1Environment, optional
    • connections: List[str], optional
    • volumes: List[Kubernetes Volume], optional
    • init: List[V1Init], optional
    • sidecars: List[sidecar containers], optional
    • container: Kubernetes Container

YAML usage

run:
  kind: dask
  threads:
  scale:
  adaptMin:
  adaptMax:
  adaptInterval:
  environment:
  connections:
  volumes:
  init:
  sidecars:
  container:

Python usage

from polyaxon.polyflow import V1Environment, V1Init, V1Dask
from polyaxon.k8s import k8s_schemas
dask_job = V1Dask(
    threads=2,
    scale=None,
    adapt_min=1,
    adapt_max=100,
    adapt_interval=1000,
    environment=V1Environment(...),
    connections=["connection-name1"],
    volumes=[k8s_schemas.V1Volume(...)],
    init=[V1Init(...)],
    sidecars=[k8s_schemas.V1Container(...)],
    container=k8s_schemas.V1Container(...),
)

Fields

kind

The kind signals to the CLI, client, and other tools that this component’s runtime is a dask job.

If you are using the python client to create the runtime, this field is not required and is set by default.

run:
  kind: dask

threads

Number of threads to pass to the Dask worker, default is 1.

run:
  kind: dask
  threads: 2

scale

To specify number of workers explicitly.

run:
  kind: dask
  scale: 20

adaptMin

To specify adaptive mode min workers and dynamically scale based on current workload.

run:
  kind: dask
  adaptMin: 2

adaptMax

To specify adaptive mode max workers and dynamically scale based on current workload.

run:
  kind: dask
  adaptMax: 20

adaptInterval

To specify adaptive mode interval check, default 1000 ms.

run:
  kind: dask
  adaptInterval: 20000

environment

Optional environment section, it provides a way to inject pod related information.

run:
  kind: dask
  environment:
    labels:
       key1: "label1"
       key2: "label2"
     annotations:
       key1: "value1"
       key2: "value2"
     nodeSelector:
       node_label: node_value
     ...
 ...

connections

A list of connection names to resolve for the job.

If you are referencing a connection it must be configured. All referenced connections will be checked:
  • If they are accessible in the context of the project of this run

  • If the user running the operation can have access to those connections

After checks, the connections will be resolved and inject any volumes, secrets, configMaps, environment variables for your main container to function correctly.

run:
  kind: dask
  connections: [connection1, connection2]

volumes

A list of Kubernetes Volumes to resolve and mount for your jobs.

This is an advanced use-case where configuring a connection is not an option.

When you add a volume you need to mount it manually to your container(s).

run:
  kind: dask
  volumes:
    - name: volume1
      persistentVolumeClaim:
        claimName: pvc1
  ...
  container:
    name: myapp-container
    image: busybox:1.28
    command: ['sh', '-c', 'echo custom init container']
    volumeMounts:
    - name: volume1
      mountPath: /mnt/vol/path

init

A list of init handlers and containers to resolve for the job.

If you are referencing a connection it must be configured. All referenced connections will be checked:
  • If they are accessible in the context of the project of this run

  • If the user running the operation can have access to those connections

run:
  kind: dask
  init:
    - artifacts:
        dirs: ["path/on/the/default/artifacts/store"]
    - connection: gcs-large-datasets
      artifacts:
        dirs: ["data"]
      container:
        resources:
          requests:
            memory: "256Mi"
            cpu: "500m"
    - container:
      name: myapp-container
      image: busybox:1.28
      command: ['sh', '-c', 'echo custom init container']

sidecars

A list of sidecar containers that will used as sidecars.

run:
  kind: dask
  sidecars:
    - name: sidecar2
      image: busybox:1.28
      command: ['sh', '-c', 'echo sidecar2']
    - name: sidecar1
      image: busybox:1.28
      command: ['sh', '-c', 'echo sidecar1']
      resources:
        requests:
          memory: "128Mi"
          cpu: "500m"

container

The main Kubernetes Container that will run your experiment training or data processing logic.

run:
  kind: dask
  container:
    name: tensorflow:2.1
    init:
      - connection: my-tf-code-repo
    command: ["python", "/plx-context/artifacts/my-tf-code-repo/model.py"]