Dask Kubernetes Operator

PyPI Conda Forge Python Support Kubernetes Support

欢迎阅读 Dask Kubernetes Operator 的文档。

注意

如果您正在寻找关于在 Kubernetes 上部署 Dask 的高级文档,新用户应该前往 Dask 关于 Kubernetes 的文档页面

dask-kubernetes 提供了用于 Kubernetes 的 Dask operator。dask-kubernetes 是部署 Dask 集群的众多选项之一,请参阅 Dask 文档中的 部署 Dask,以了解其他选项的概述。

快速入门

KubeCluster 使用自定义 Kubernetes 资源在 Kubernetes 集群上部署 Dask 集群。它旨在动态启动即席部署。

$ # Install operator CRDs and controller, needs to be done once on your Kubernetes cluster
$ helm install --repo https://helm.dask.org --create-namespace -n dask-operator --generate-name dask-kubernetes-operator
$ # Install dask-kubernetes
$ pip install dask-kubernetes
from dask_kubernetes.operator import KubeCluster
cluster = KubeCluster(name="my-dask-cluster", image='ghcr.io/dask/dask:latest')
cluster.scale(10)

什么是 Operator?

Dask Operator 是一组自定义资源和一个控制器,它运行在您的 Kubernetes 集群上,允许您将 Dask 集群作为 Kubernetes 资源创建和管理。创建集群可以通过 使用 kubectl 的 Kubernetes API使用 KubeCluster 的 Python API 完成。

安装 Operator,您需要应用一些自定义资源定义,这些定义允许我们描述 Dask 资源以及 Operator 本身。Operator 是一个小的 Python 应用程序,它监视 Kubernetes API 中与我们的自定义资源相关的事件,并相应地创建其他资源,例如 PodsServices

Operator 管理哪些资源?

Operator 管理一系列分层资源,包括表示 Dask 原语(如集群和 Worker 组)的一些自定义资源,以及运行集群进程和促进通信的原生 Kubernetes 资源(如 Pods 和 Services)。

        graph TD
  DaskJob(DaskJob)
  DaskCluster(DaskCluster)
  DaskAutoscaler(DaskAutoscaler)
  SchedulerService(Scheduler Service)
  SchedulerPod(Scheduler Pod)
  DaskWorkerGroup(DaskWorkerGroup)
  WorkerPodA(Worker Pod A)
  WorkerPodB(Worker Pod B)
  WorkerPodC(Worker Pod C)
  JobPod(Job Runner Pod)

  DaskJob --> DaskCluster
  DaskJob --> JobPod
  DaskCluster --> SchedulerService
  DaskCluster --> DaskAutoscaler
  SchedulerService --> SchedulerPod
  DaskCluster --> DaskWorkerGroup
  DaskWorkerGroup --> WorkerPodA
  DaskWorkerGroup --> WorkerPodB
  DaskWorkerGroup --> WorkerPodC

  classDef dask stroke:#FDA061,stroke-width:4px
  classDef dashed stroke-dasharray: 5 5
  class DaskJob dask
  class DaskCluster dask
  class DaskWorkerGroup dask
  class DaskAutoscaler dask
  class DaskAutoscaler dashed
  class SchedulerService dashed
  class SchedulerPod dashed
  class WorkerPodA dashed
  class WorkerPodB dashed
  class WorkerPodC dashed
  class JobPod dashed
    

Worker 组

一个 DaskWorkerGroup 代表一组同质的、可伸缩的 Worker。该资源类似于原生的 Kubernetes Deployment,因为它管理着一组对 Pod 生命周期具有一定智能的 Worker。Worker 组必须附加到 Dask Cluster 资源才能工作。

Kubernetes annotationsDaskWorkerGroup 资源上的所有注解都将传递给 Worker Pod 资源。由 kopfkubectl 创建的注解(即以“kopf.zalando.org”或“kubectl.kubernetes.io”开头的注解)不会传递。

集群

DaskCluster 自定义资源通过创建 scheduler Pod、scheduler Service 和默认的 DaskWorkerGroup 来创建一个 Dask 集群,后者又会创建 worker Pod 资源。

Worker 通过 scheduler Service 连接到 scheduler,该服务也可以暴露给用户,以便连接客户端并执行工作。

Operator 还支持创建额外的 Worker 组。这些是具有不同配置设置的额外 Worker 组,可以单独伸缩。然后,您可以使用 资源注解 将不同的任务调度到不同的组。

Kubernetes annotations <https://kubernetes.ac.cn/docs/concepts/overview/working-with-objects/annotations/>DaskCluster 资源上的所有注解将传递给 scheduler PodService 以及 DaskWorkerGroup 资源。由 kopfkubectl 创建的注解(即以“kopf.zalando.org”或“kubectl.kubernetes.io”开头的注解)不会传递。

例如,您可能希望有一小组 Worker,它们具有更多内存用于内存密集型任务,或者具有 GPU 用于计算密集型任务。

作业

一个 DaskJob 是一种批处理风格的资源,它创建一个 Pod 来从头到尾执行某些特定任务,同时创建一个可以利用来执行工作的 DaskCluster

Kubernetes annotations <https://kubernetes.ac.cn/docs/concepts/overview/working-with-objects/annotations/>DaskJob 资源上的所有注解将传递给 job-runner Pod 资源。如果还想在集群相关资源(scheduler 和 worker Pods)上设置 Kubernetes 注解,可以在 DaskJob 资源中将其设置为 spec.cluster.metadata。由 kopfkubectl 创建的注解(即以“kopf.zalando.org”或“kubectl.kubernetes.io”开头的注解)不会传递。

一旦 job Pod 运行完成,集群将自动移除以节省资源。这对于使用 Dask 训练分布式机器学习模型等工作流非常有用。

自动伸缩器

一个 DaskAutoscaler 资源将定期与 scheduler 通信,并将默认的 DaskWorkerGroup 自动伸缩到所需的 Worker 数量。

from dask_kubernetes.operator import KubeCluster
cluster = KubeCluster(name="my-dask-cluster", image='ghcr.io/dask/dask:latest')
cluster.adapt(minimum=1, maximum=10)