自定义资源

Dask Operator 提供了一些自定义资源,可用于创建各种 Dask 组件。

  • DaskCluster 创建一个完整的 Dask 集群,包含调度器和工作节点。

  • DaskWorkerGroup 创建同类的工作节点组,DaskCluster 默认创建一个,但如果您想要多种工作节点类型,可以添加更多。

  • DaskJob 创建一个 Pod,该 Pod 将运行脚本直至完成,同时创建一个 DaskCluster 供脚本使用。

DaskCluster

DaskCluster 自定义资源通过创建一个调度器 Pod、调度器 Service 以及默认的 DaskWorkerGroup 来创建一个 Dask 集群,默认的 DaskWorkerGroup 随后创建工作节点 Pod 资源。

        graph TD
  DaskCluster(DaskCluster)
  SchedulerService(Scheduler Service)
  SchedulerPod(Scheduler Pod)
  DaskWorkerGroup(Default DaskWorkerGroup)
  WorkerPodA(Worker Pod A)
  WorkerPodB(Worker Pod B)
  WorkerPodC(Worker Pod C)

  DaskCluster --> SchedulerService
  DaskCluster --> SchedulerPod
  DaskCluster --> DaskWorkerGroup
  DaskWorkerGroup --> WorkerPodA
  DaskWorkerGroup --> WorkerPodB
  DaskWorkerGroup --> WorkerPodC

  classDef dask stroke:#FDA061,stroke-width:4px
  classDef dashed stroke-dasharray: 5 5

  class DaskCluster dask
  class DaskWorkerGroup dask
  class DaskWorkerGroup dashed
  class SchedulerService dashed
  class SchedulerPod dashed
  class WorkerPodA dashed
  class WorkerPodB dashed
  class WorkerPodC dashed
    

我们创建一个名为 cluster.yaml 的示例文件,其配置如下

# cluster.yaml
apiVersion: kubernetes.dask.org/v1
kind: DaskCluster
metadata:
  name: simple
spec:
  worker:
    replicas: 2
    spec:
      containers:
      - name: worker
        image: "ghcr.io/dask/dask:latest"
        imagePullPolicy: "IfNotPresent"
        args:
          - dask-worker
          - --name
          - $(DASK_WORKER_NAME)
          - --dashboard
          - --dashboard-address
          - "8788"
        ports:
          - name: http-dashboard
            containerPort: 8788
            protocol: TCP
  scheduler:
    spec:
      containers:
      - name: scheduler
        image: "ghcr.io/dask/dask:latest"
        imagePullPolicy: "IfNotPresent"
        args:
          - dask-scheduler
        ports:
          - name: tcp-comm
            containerPort: 8786
            protocol: TCP
          - name: http-dashboard
            containerPort: 8787
            protocol: TCP
        readinessProbe:
          httpGet:
            port: http-dashboard
            path: /health
          initialDelaySeconds: 5
          periodSeconds: 10
        livenessProbe:
          httpGet:
            port: http-dashboard
            path: /health
          initialDelaySeconds: 15
          periodSeconds: 20
    service:
      type: NodePort
      selector:
        dask.org/cluster-name: simple
        dask.org/component: scheduler
      ports:
      - name: tcp-comm
        protocol: TCP
        port: 8786
        targetPort: "tcp-comm"
      - name: http-dashboard
        protocol: TCP
        port: 8787
        targetPort: "http-dashboard"

编辑此文件将更改 Dask 集群的默认配置。请参阅配置参考 DaskAutoscaler。现在应用 cluster.yaml

$ kubectl apply -f cluster.yaml
daskcluster.kubernetes.dask.org/simple created

我们可以列出我们的集群

$ kubectl get daskclusters
NAME             AGE
simple   47s

要连接到此 Dask 集群,我们可以使用为我们创建的服务。

$ kubectl get svc -l dask.org/cluster-name=simple
NAME                     TYPE        CLUSTER-IP     EXTERNAL-IP   PORT(S)             AGE
simple                   ClusterIP   10.96.85.120   <none>        8786/TCP,8787/TCP   86s

我们可以看到,端口 8786 已暴露用于 Dask 通信,同时端口 8787 暴露用于 Dashboard。

如何访问这些服务端点取决于您的 Kubernetes 集群配置。对于这个快速示例,我们可以使用 kubectl 将服务端口转发到您的本地机器。

$ kubectl port-forward svc/simple 8786:8786
Forwarding from 127.0.0.1:8786 -> 8786
Forwarding from [::1]:8786 -> 8786

然后我们可以从 Python 会话连接到它。

>>> from dask.distributed import Client
>>> client = Client("localhost:8786")
>>> print(client)
<Client: 'tcp://10.244.0.12:8786' processes=3 threads=12, memory=23.33 GiB>

我们还可以列出 Operator 创建的用于运行集群的所有 Pod。

$ kubectl get po -l dask.org/cluster-name=simple
NAME                                                                          READY   STATUS    RESTARTS   AGE
simple-default-worker-13f4f0d13bbc40a58cfb81eb374f26c3                        1/1     Running   0          104s
simple-default-worker-aa79dfae83264321a79f1f0ffe91f700                        1/1     Running   0          104s
simple-default-worker-f13c4f2103e14c2d86c1b272cd138fe6                        1/1     Running   0          104s
simple-scheduler                                                              1/1     Running   0          104s

我们在此处看到的工作节点是由我们集群的默认 workergroup 资源创建的,该资源也是由 Operator 创建的。

您可以像扩展 DeploymentReplicaSet 一样扩展 workergroup

$ kubectl scale --replicas=5 daskworkergroup simple-default
daskworkergroup.kubernetes.dask.org/simple-default

我们可以验证新的 Pod 是否已创建。

$ kubectl get po -l dask.org/cluster-name=simple
NAME                                                                          READY   STATUS    RESTARTS   AGE
simple-default-worker-13f4f0d13bbc40a58cfb81eb374f26c3                        1/1     Running   0          5m26s
simple-default-worker-a52bf313590f432d9dc7395875583b52                        1/1     Running   0          27s
simple-default-worker-aa79dfae83264321a79f1f0ffe91f700                        1/1     Running   0          5m26s
simple-default-worker-f13c4f2103e14c2d86c1b272cd138fe6                        1/1     Running   0          5m26s
simple-default-worker-f4223a45b49d49288195c540c32f0fc0                        1/1     Running   0          27s
simple-scheduler                                                              1/1     Running   0          5m26s

最后,我们可以通过删除之前应用的清单文件来删除集群,或者直接按名称删除

$ kubectl delete -f cluster.yaml
daskcluster.kubernetes.dask.org "simple" deleted

$ kubectl delete daskcluster simple
daskcluster.kubernetes.dask.org "simple" deleted

DaskWorkerGroup

当我们创建一个 DaskCluster 资源时,会为我们创建一个默认的工作节点组。但我们可以通过创建更多清单文件来添加更多工作节点组。这使我们能够创建不同规格和大小的工作节点,以便 Dask 用于不同的任务

        graph TD
  DaskCluster(DaskCluster)

  DefaultDaskWorkerGroup(Default DaskWorkerGroup)
  DefaultDaskWorkerPodA(Worker Pod A)
  DefaultDaskWorkerPodEllipsis(Worker Pod ...)

  HighMemDaskWorkerGroup(High Memory DaskWorkerGroup)
  HighMemDaskWorkerPodA(High Memory Worker Pod A)
  HighMemDaskWorkerPodEllipsis(High Memory Worker Pod ...)

  DaskCluster --> DefaultDaskWorkerGroup
  DefaultDaskWorkerGroup --> DefaultDaskWorkerPodA
  DefaultDaskWorkerGroup --> DefaultDaskWorkerPodEllipsis

  DaskCluster --> HighMemDaskWorkerGroup
  HighMemDaskWorkerGroup --> HighMemDaskWorkerPodA
  HighMemDaskWorkerGroup --> HighMemDaskWorkerPodEllipsis

  classDef dask stroke:#FDA061,stroke-width:4px
  classDef disabled stroke:#62636C
  classDef dashed stroke-dasharray: 5 5

  class DaskCluster disabled
  class DefaultDaskWorkerGroup disabled
  class DefaultDaskWorkerGroup dashed
  class DefaultDaskWorkerPodA dashed
  class DefaultDaskWorkerPodA disabled
  class DefaultDaskWorkerPodEllipsis dashed
  class DefaultDaskWorkerPodEllipsis disabled

  class HighMemDaskWorkerGroup dask
  class HighMemDaskWorkerPodA dashed
  class HighMemDaskWorkerPodEllipsis dashed
    

我们创建一个名为 highmemworkers.yaml 的示例文件,其配置如下

# highmemworkers.yaml
apiVersion: kubernetes.dask.org/v1
kind: DaskWorkerGroup
metadata:
  name: simple-highmem
spec:
  cluster: simple
  worker:
    replicas: 2
    spec:
      containers:
      - name: worker
        image: "ghcr.io/dask/dask:latest"
        imagePullPolicy: "IfNotPresent"
        resources:
          requests:
            memory: "32Gi"
          limits:
            memory: "32Gi"
        args:
          - dask-worker
          - --name
          - $(DASK_WORKER_NAME)
          - --resources
          - MEMORY=32e9
          - --dashboard
          - --dashboard-address
          - "8788"
        ports:
          - name: http-dashboard
            containerPort: 8788
            protocol: TCP

我们需要确保的主要一点是 cluster 选项与我们之前创建的集群名称匹配。这将使工作节点加入该集群。

请参阅 DaskAutoscaler。现在应用 highmemworkers.yaml

$ kubectl apply -f highmemworkers.yaml
daskworkergroup.kubernetes.dask.org/simple-highmem created

我们可以列出我们的集群

$ kubectl get daskworkergroups
NAME                 AGE
simple-default       2 hours
simple-highmem       47s

我们无需单独删除此工作节点组,因为它已加入现有集群,当 DaskCluster 资源被删除时,Kubernetes 会将其删除。

扩展方式与默认工作节点组相同,可以使用 kubectl scale 命令完成。

DaskJob

DaskJob 自定义资源的行为类似于其他 Kubernetes 批量资源。它创建一个 Pod,该 Pod 执行命令直至完成。不同之处在于 DaskJob 同时创建一个 DaskCluster,并将适当的配置注入到 job Pod 中,使其能够自动连接并利用 Dask 集群。

        graph TD
  DaskJob(DaskJob)
  DaskCluster(DaskCluster)
  SchedulerService(Scheduler Service)
  SchedulerPod(Scheduler Pod)
  DaskWorkerGroup(Default DaskWorkerGroup)
  WorkerPodA(Worker Pod A)
  WorkerPodB(Worker Pod B)
  WorkerPodC(Worker Pod C)
  JobPod(Job Runner Pod)

  DaskJob --> DaskCluster
  DaskJob --> JobPod
  DaskCluster --> SchedulerService
  SchedulerService --> SchedulerPod
  DaskCluster --> DaskWorkerGroup
  DaskWorkerGroup --> WorkerPodA
  DaskWorkerGroup --> WorkerPodB
  DaskWorkerGroup --> WorkerPodC

  classDef dask stroke:#FDA061,stroke-width:4px
  classDef dashed stroke-dasharray: 5 5
  class DaskJob dask
  class DaskCluster dask
  class DaskCluster dashed
  class DaskWorkerGroup dask
  class DaskWorkerGroup dashed
  class SchedulerService dashed
  class SchedulerPod dashed
  class WorkerPodA dashed
  class WorkerPodB dashed
  class WorkerPodC dashed
  class JobPod dashed
    

我们创建一个名为 job.yaml 的示例文件,其配置如下

# job.yaml
apiVersion: kubernetes.dask.org/v1
kind: DaskJob
metadata:
  name: simple-job
  namespace: default
spec:
  job:
    spec:
      containers:
        - name: job
          image: "ghcr.io/dask/dask:latest"
          imagePullPolicy: "IfNotPresent"
          args:
            - python
            - -c
            - "from dask.distributed import Client; client = Client(); # Do some work..."

  cluster:
    spec:
      worker:
        replicas: 2
        spec:
          containers:
            - name: worker
              image: "ghcr.io/dask/dask:latest"
              imagePullPolicy: "IfNotPresent"
              args:
                - dask-worker
                - --name
                - $(DASK_WORKER_NAME)
                - --dashboard
                - --dashboard-address
                - "8788"
              ports:
                - name: http-dashboard
                  containerPort: 8788
                  protocol: TCP
              env:
                - name: WORKER_ENV
                  value: hello-world # We dont test the value, just the name
      scheduler:
        spec:
          containers:
            - name: scheduler
              image: "ghcr.io/dask/dask:latest"
              imagePullPolicy: "IfNotPresent"
              args:
                - dask-scheduler
              ports:
                - name: tcp-comm
                  containerPort: 8786
                  protocol: TCP
                - name: http-dashboard
                  containerPort: 8787
                  protocol: TCP
              readinessProbe:
                httpGet:
                  port: http-dashboard
                  path: /health
                initialDelaySeconds: 5
                periodSeconds: 10
              livenessProbe:
                httpGet:
                  port: http-dashboard
                  path: /health
                initialDelaySeconds: 15
                periodSeconds: 20
              env:
                - name: SCHEDULER_ENV
                  value: hello-world
        service:
          type: ClusterIP
          selector:
            dask.org/cluster-name: simple-job
            dask.org/component: scheduler
          ports:
            - name: tcp-comm
              protocol: TCP
              port: 8786
              targetPort: "tcp-comm"
            - name: http-dashboard
              protocol: TCP
              port: 8787
              targetPort: "http-dashboard"

编辑此文件将更改 Dask 作业的默认配置。请参阅 DaskAutoscaler。现在应用 job.yaml

$ kubectl apply -f job.yaml
  daskjob.kubernetes.dask.org/simple-job created

现在如果我们检查集群资源,应该会看到正在创建的作业和集群 Pod。

$ kubectl get pods
NAME                                                        READY   STATUS              RESTARTS   AGE
simple-job-scheduler                                        1/1     Running             0          8s
simple-job-runner                                           1/1     Running             0          8s
simple-job-default-worker-1f6c670fba                        1/1     Running             0          8s
simple-job-default-worker-791f93d9ec                        1/1     Running             0          8s

我们的运行器 Pod 将执行我们配置的任务。在我们的示例中,您可以看到我们只是像这样创建一个简单的 dask.distributed.Client 对象

from dask.distributed import Client


client = Client()

# Do some work...

我们可以这样做,因为作业 Pod 在运行时会设置一些额外的环境变量,这些变量告诉 Client 如何连接到集群,因此用户无需担心。

作业 Pod 的默认重启策略是 OnFailure,因此如果它退出时返回码不是 0,它将自动重启,直到成功完成。当它返回 0 时,将进入 Completed 状态,并且 Dask 集群将自动清理,释放 Kubernetes 集群资源。

$ kubectl get pods
NAME                                                        READY   STATUS              RESTARTS   AGE
simple-job-runner                                           0/1     Completed           0          14s
simple-job-scheduler                                        1/1     Terminating         0          14s
simple-job-default-worker-1f6c670fba                        1/1     Terminating         0          14s
simple-job-default-worker-791f93d9ec                        1/1     Terminating         0          14s

当您删除 DaskJob 资源时,所有内容都会自动删除,无论是成功运行后遗留的 Completed 运行器 Pod,还是仍在运行的完整 Dask 集群和运行器。

$ kubectl delete -f job.yaml
daskjob.kubernetes.dask.org "simple-job" deleted

DaskAutoscaler

DaskAutoscaler 资源允许调度器使用 Dask 的自适应模式来扩展和缩减工作节点的数量。

通过创建该资源,Operator 控制器将定期轮询调度器,并请求所需的工作节点数量。调度器通过分析正在处理的任务,然后推断需要多少工作节点才能在目标持续时间内完成当前计算图来计算此数量。默认目标持续时间为 5 秒,但可以通过调度器配置中的“distributed.adaptive.target-duration”进行调整

        graph TD
  DaskCluster(DaskCluster)
  DaskAutoscaler(DaskAutoscaler)
  SchedulerPod(Scheduler Pod)
  DaskWorkerGroup(Default DaskWorkerGroup)
  WorkerPod1(Worker Pod 1)
  WorkerPod2(Worker Pod 2)
  WorkerPodDot(...)
  WorkerPod10(Worker Pod 10)

  SchedulerPod -. I need 10 workers .-> DaskAutoscaler
  DaskAutoscaler -. Scale to 10 workers .-> DaskWorkerGroup
  DaskCluster --> SchedulerPod
  DaskCluster --> DaskAutoscaler
  DaskCluster --> DaskWorkerGroup
  DaskWorkerGroup --> WorkerPod1
  DaskWorkerGroup --> WorkerPod2
  DaskWorkerGroup --> WorkerPodDot
  DaskWorkerGroup --> WorkerPod10

  classDef dask stroke:#FDA061,stroke-width:4px
  classDef dashed stroke-dasharray: 5 5

  class DaskCluster dask
  class DaskCluster dashed
  class DaskWorkerGroup dask
  class DaskAutoscaler dask
  class DaskWorkerGroup dashed
  class SchedulerPod dashed
  class WorkerPod1 dashed
  class WorkerPod2 dashed
  class WorkerPodDot dashed
  class WorkerPod10 dashed
    

控制器会将此数量限制在 DaskAutoscaler 资源中配置的 minimummaximum 值之间,然后更新默认 DaskWorkerGroup 中的副本数量。

# autoscaler.yaml
apiVersion: kubernetes.dask.org/v1
kind: DaskAutoscaler
metadata:
  name: simple
spec:
  cluster: "simple"
  minimum: 1  # we recommend always having a minimum of 1 worker so that an idle cluster can start working on tasks immediately
  maximum: 10 # you can place a hard limit on the number of workers regardless of what the scheduler requests
$ kubectl apply -f autoscaler.yaml
daskautoscaler.kubernetes.dask.org/simple created

您可以随时通过删除该资源来结束自动扩展。工作节点的数量将保持在自动扩展器最后设置的值。

$ kubectl delete -f autoscaler.yaml
daskautoscaler.kubernetes.dask.org/simple deleted

注意

自动扩展器只会扩展默认的 WorkerGroup。如果您配置了额外的工作节点组,它们将不被考虑在内。

标签和注解

标签和注解会传播到子资源,因此应用于 DaskCluster 的标签也会出现在它创建的 PodService 资源上。

  • DaskCluster 上的标签/注解会传播到 DaskWorkerGroup、调度器 Pod 和调度器 Service

  • DaskWorkerGroup 上的标签/注解会传播到工作节点 Pod

  • DaskJob 上的标签/注解会传播到 job PodDaskCluster

一些资源还包含子资源元数据选项,用于在其创建的资源上设置标签和注解。

  • DaskCluster 具有 spec.worker.metadata,其内容将合并到 DaskWorkerGroup 的标签/注解中。

  • DaskCluster 具有 spec.scheduler.metadata,其内容将合并到调度器 Pod 和调度器 Service 的标签/注解中。

  • DaskJob 具有 spec.job.metadata,其内容将合并到 job Pod 的标签/注解中。

标签/注解应用的顺序是 top_level <= subresource <= base。因此,如果 DaskCluster 具有标签 foo=bar,但 spec.worker.metadata.labels 中有标签 foo=baz,则工作节点 Pod 将具有 foo=baz

同样,如果在顶级或子资源级别设置了保留的基础标签 dask.org/component,此设置将被控制器覆盖。因此,在 DaskCluster.spec.worker.metadata.labels 中设置 dask.org/component=superworker 将无效,工作节点 Pod 仍将具有预期的标签 dask.org/component=worker

示例

以下 DaskCluster 具有顶级注解以及工作节点和调度器的子资源注解。

apiVersion: kubernetes.dask.org/v1
kind: DaskCluster
metadata:
  name: example
  annotations:
    hello: world
spec:
  worker:
    replicas: 2
    metadata:
      annotations:
        foo: bar
    spec:
      ...
  scheduler:
    metadata:
      annotations:
        fizz: buzz
    spec:
      ...

生成的调度器 Pod 元数据注解将是:

apiVersion: v1
kind: Pod
metadata:
  name: example-scheduler
  annotations:
    fizz: buzz
    hello: world
...

完整配置参考

完整的 DaskCluster spec 参考。

apiVersion: kubernetes.dask.org/v1
kind: DaskCluster
metadata:
  name: example
spec:
  worker:
    replicas: 2 # number of replica workers to spawn
    spec: ... # PodSpec, standard k8s pod - https://kubernetes.ac.cn/docs/reference/generated/kubernetes-api/v1.21/#podspec-v1-core
  scheduler:
    spec: ... # PodSpec, standard k8s pod - https://kubernetes.ac.cn/docs/reference/generated/kubernetes-api/v1.21/#podspec-v1-core
    service: ... # ServiceSpec, standard k8s service - https://kubernetes.ac.cn/docs/reference/generated/kubernetes-api/v1.21/#servicespec-v1-core
  idleTimeout: 5  # Number of seconds to time out scheduler liveness probe if no activity

完整的 DaskWorkerGroup spec 参考。

apiVersion: kubernetes.dask.org/v1
kind: DaskWorkerGroup
metadata:
  name: example
spec:
  cluster: "name of DaskCluster to associate worker group with"
  worker:
    replicas: 2 # number of replica workers to spawn
    spec: ... # PodSpec, standard k8s pod - https://kubernetes.ac.cn/docs/reference/generated/kubernetes-api/v1.21/#podspec-v1-core

完整的 DaskJob spec 参考。

apiVersion: kubernetes.dask.org/v1
kind: DaskJob
metadata:
  name: example
spec:
  job:
    spec: ... # PodSpec, standard k8s pod - https://kubernetes.ac.cn/docs/reference/generated/kubernetes-api/v1.21/#podspec-v1-core
  cluster:
    spec: ... # ClusterSpec, DaskCluster resource spec

完整的 DaskAutoscaler spec 参考。

apiVersion: kubernetes.dask.org/v1
kind: DaskAutoscaler
metadata:
  name: example
spec:
  cluster: "name of DaskCluster to autoscale"
  minimum: 0  # minimum number of workers to create
  maximum: 10 # maximum number of workers to create