自定义资源
目录
自定义资源¶
Dask Operator 提供了一些自定义资源,可用于创建各种 Dask 组件。
DaskCluster 创建一个完整的 Dask 集群,包含调度器和工作节点。
DaskWorkerGroup 创建同类的工作节点组,
DaskCluster
默认创建一个,但如果您想要多种工作节点类型,可以添加更多。DaskJob 创建一个
Pod
,该 Pod 将运行脚本直至完成,同时创建一个DaskCluster
供脚本使用。
DaskCluster¶
DaskCluster
自定义资源通过创建一个调度器 Pod
、调度器 Service
以及默认的 DaskWorkerGroup 来创建一个 Dask 集群,默认的 DaskWorkerGroup 随后创建工作节点 Pod
资源。
graph TD DaskCluster(DaskCluster) SchedulerService(Scheduler Service) SchedulerPod(Scheduler Pod) DaskWorkerGroup(Default DaskWorkerGroup) WorkerPodA(Worker Pod A) WorkerPodB(Worker Pod B) WorkerPodC(Worker Pod C) DaskCluster --> SchedulerService DaskCluster --> SchedulerPod DaskCluster --> DaskWorkerGroup DaskWorkerGroup --> WorkerPodA DaskWorkerGroup --> WorkerPodB DaskWorkerGroup --> WorkerPodC classDef dask stroke:#FDA061,stroke-width:4px classDef dashed stroke-dasharray: 5 5 class DaskCluster dask class DaskWorkerGroup dask class DaskWorkerGroup dashed class SchedulerService dashed class SchedulerPod dashed class WorkerPodA dashed class WorkerPodB dashed class WorkerPodC dashed
我们创建一个名为 cluster.yaml
的示例文件,其配置如下
# cluster.yaml
apiVersion: kubernetes.dask.org/v1
kind: DaskCluster
metadata:
name: simple
spec:
worker:
replicas: 2
spec:
containers:
- name: worker
image: "ghcr.io/dask/dask:latest"
imagePullPolicy: "IfNotPresent"
args:
- dask-worker
- --name
- $(DASK_WORKER_NAME)
- --dashboard
- --dashboard-address
- "8788"
ports:
- name: http-dashboard
containerPort: 8788
protocol: TCP
scheduler:
spec:
containers:
- name: scheduler
image: "ghcr.io/dask/dask:latest"
imagePullPolicy: "IfNotPresent"
args:
- dask-scheduler
ports:
- name: tcp-comm
containerPort: 8786
protocol: TCP
- name: http-dashboard
containerPort: 8787
protocol: TCP
readinessProbe:
httpGet:
port: http-dashboard
path: /health
initialDelaySeconds: 5
periodSeconds: 10
livenessProbe:
httpGet:
port: http-dashboard
path: /health
initialDelaySeconds: 15
periodSeconds: 20
service:
type: NodePort
selector:
dask.org/cluster-name: simple
dask.org/component: scheduler
ports:
- name: tcp-comm
protocol: TCP
port: 8786
targetPort: "tcp-comm"
- name: http-dashboard
protocol: TCP
port: 8787
targetPort: "http-dashboard"
编辑此文件将更改 Dask 集群的默认配置。请参阅配置参考 DaskAutoscaler。现在应用 cluster.yaml
$ kubectl apply -f cluster.yaml
daskcluster.kubernetes.dask.org/simple created
我们可以列出我们的集群
$ kubectl get daskclusters
NAME AGE
simple 47s
要连接到此 Dask 集群,我们可以使用为我们创建的服务。
$ kubectl get svc -l dask.org/cluster-name=simple
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
simple ClusterIP 10.96.85.120 <none> 8786/TCP,8787/TCP 86s
我们可以看到,端口 8786
已暴露用于 Dask 通信,同时端口 8787
暴露用于 Dashboard。
如何访问这些服务端点取决于您的 Kubernetes 集群配置。对于这个快速示例,我们可以使用 kubectl
将服务端口转发到您的本地机器。
$ kubectl port-forward svc/simple 8786:8786
Forwarding from 127.0.0.1:8786 -> 8786
Forwarding from [::1]:8786 -> 8786
然后我们可以从 Python 会话连接到它。
>>> from dask.distributed import Client
>>> client = Client("localhost:8786")
>>> print(client)
<Client: 'tcp://10.244.0.12:8786' processes=3 threads=12, memory=23.33 GiB>
我们还可以列出 Operator 创建的用于运行集群的所有 Pod。
$ kubectl get po -l dask.org/cluster-name=simple
NAME READY STATUS RESTARTS AGE
simple-default-worker-13f4f0d13bbc40a58cfb81eb374f26c3 1/1 Running 0 104s
simple-default-worker-aa79dfae83264321a79f1f0ffe91f700 1/1 Running 0 104s
simple-default-worker-f13c4f2103e14c2d86c1b272cd138fe6 1/1 Running 0 104s
simple-scheduler 1/1 Running 0 104s
我们在此处看到的工作节点是由我们集群的默认 workergroup
资源创建的,该资源也是由 Operator 创建的。
您可以像扩展 Deployment
或 ReplicaSet
一样扩展 workergroup
$ kubectl scale --replicas=5 daskworkergroup simple-default
daskworkergroup.kubernetes.dask.org/simple-default
我们可以验证新的 Pod 是否已创建。
$ kubectl get po -l dask.org/cluster-name=simple
NAME READY STATUS RESTARTS AGE
simple-default-worker-13f4f0d13bbc40a58cfb81eb374f26c3 1/1 Running 0 5m26s
simple-default-worker-a52bf313590f432d9dc7395875583b52 1/1 Running 0 27s
simple-default-worker-aa79dfae83264321a79f1f0ffe91f700 1/1 Running 0 5m26s
simple-default-worker-f13c4f2103e14c2d86c1b272cd138fe6 1/1 Running 0 5m26s
simple-default-worker-f4223a45b49d49288195c540c32f0fc0 1/1 Running 0 27s
simple-scheduler 1/1 Running 0 5m26s
最后,我们可以通过删除之前应用的清单文件来删除集群,或者直接按名称删除
$ kubectl delete -f cluster.yaml
daskcluster.kubernetes.dask.org "simple" deleted
$ kubectl delete daskcluster simple
daskcluster.kubernetes.dask.org "simple" deleted
DaskWorkerGroup¶
当我们创建一个 DaskCluster
资源时,会为我们创建一个默认的工作节点组。但我们可以通过创建更多清单文件来添加更多工作节点组。这使我们能够创建不同规格和大小的工作节点,以便 Dask 用于不同的任务。
graph TD DaskCluster(DaskCluster) DefaultDaskWorkerGroup(Default DaskWorkerGroup) DefaultDaskWorkerPodA(Worker Pod A) DefaultDaskWorkerPodEllipsis(Worker Pod ...) HighMemDaskWorkerGroup(High Memory DaskWorkerGroup) HighMemDaskWorkerPodA(High Memory Worker Pod A) HighMemDaskWorkerPodEllipsis(High Memory Worker Pod ...) DaskCluster --> DefaultDaskWorkerGroup DefaultDaskWorkerGroup --> DefaultDaskWorkerPodA DefaultDaskWorkerGroup --> DefaultDaskWorkerPodEllipsis DaskCluster --> HighMemDaskWorkerGroup HighMemDaskWorkerGroup --> HighMemDaskWorkerPodA HighMemDaskWorkerGroup --> HighMemDaskWorkerPodEllipsis classDef dask stroke:#FDA061,stroke-width:4px classDef disabled stroke:#62636C classDef dashed stroke-dasharray: 5 5 class DaskCluster disabled class DefaultDaskWorkerGroup disabled class DefaultDaskWorkerGroup dashed class DefaultDaskWorkerPodA dashed class DefaultDaskWorkerPodA disabled class DefaultDaskWorkerPodEllipsis dashed class DefaultDaskWorkerPodEllipsis disabled class HighMemDaskWorkerGroup dask class HighMemDaskWorkerPodA dashed class HighMemDaskWorkerPodEllipsis dashed
我们创建一个名为 highmemworkers.yaml
的示例文件,其配置如下
# highmemworkers.yaml
apiVersion: kubernetes.dask.org/v1
kind: DaskWorkerGroup
metadata:
name: simple-highmem
spec:
cluster: simple
worker:
replicas: 2
spec:
containers:
- name: worker
image: "ghcr.io/dask/dask:latest"
imagePullPolicy: "IfNotPresent"
resources:
requests:
memory: "32Gi"
limits:
memory: "32Gi"
args:
- dask-worker
- --name
- $(DASK_WORKER_NAME)
- --resources
- MEMORY=32e9
- --dashboard
- --dashboard-address
- "8788"
ports:
- name: http-dashboard
containerPort: 8788
protocol: TCP
我们需要确保的主要一点是 cluster
选项与我们之前创建的集群名称匹配。这将使工作节点加入该集群。
请参阅 DaskAutoscaler。现在应用 highmemworkers.yaml
$ kubectl apply -f highmemworkers.yaml
daskworkergroup.kubernetes.dask.org/simple-highmem created
我们可以列出我们的集群
$ kubectl get daskworkergroups
NAME AGE
simple-default 2 hours
simple-highmem 47s
我们无需单独删除此工作节点组,因为它已加入现有集群,当 DaskCluster
资源被删除时,Kubernetes 会将其删除。
扩展方式与默认工作节点组相同,可以使用 kubectl scale
命令完成。
DaskJob¶
DaskJob
自定义资源的行为类似于其他 Kubernetes 批量资源。它创建一个 Pod
,该 Pod 执行命令直至完成。不同之处在于 DaskJob
同时创建一个 DaskCluster
,并将适当的配置注入到 job Pod
中,使其能够自动连接并利用 Dask 集群。
graph TD DaskJob(DaskJob) DaskCluster(DaskCluster) SchedulerService(Scheduler Service) SchedulerPod(Scheduler Pod) DaskWorkerGroup(Default DaskWorkerGroup) WorkerPodA(Worker Pod A) WorkerPodB(Worker Pod B) WorkerPodC(Worker Pod C) JobPod(Job Runner Pod) DaskJob --> DaskCluster DaskJob --> JobPod DaskCluster --> SchedulerService SchedulerService --> SchedulerPod DaskCluster --> DaskWorkerGroup DaskWorkerGroup --> WorkerPodA DaskWorkerGroup --> WorkerPodB DaskWorkerGroup --> WorkerPodC classDef dask stroke:#FDA061,stroke-width:4px classDef dashed stroke-dasharray: 5 5 class DaskJob dask class DaskCluster dask class DaskCluster dashed class DaskWorkerGroup dask class DaskWorkerGroup dashed class SchedulerService dashed class SchedulerPod dashed class WorkerPodA dashed class WorkerPodB dashed class WorkerPodC dashed class JobPod dashed
我们创建一个名为 job.yaml
的示例文件,其配置如下
# job.yaml
apiVersion: kubernetes.dask.org/v1
kind: DaskJob
metadata:
name: simple-job
namespace: default
spec:
job:
spec:
containers:
- name: job
image: "ghcr.io/dask/dask:latest"
imagePullPolicy: "IfNotPresent"
args:
- python
- -c
- "from dask.distributed import Client; client = Client(); # Do some work..."
cluster:
spec:
worker:
replicas: 2
spec:
containers:
- name: worker
image: "ghcr.io/dask/dask:latest"
imagePullPolicy: "IfNotPresent"
args:
- dask-worker
- --name
- $(DASK_WORKER_NAME)
- --dashboard
- --dashboard-address
- "8788"
ports:
- name: http-dashboard
containerPort: 8788
protocol: TCP
env:
- name: WORKER_ENV
value: hello-world # We dont test the value, just the name
scheduler:
spec:
containers:
- name: scheduler
image: "ghcr.io/dask/dask:latest"
imagePullPolicy: "IfNotPresent"
args:
- dask-scheduler
ports:
- name: tcp-comm
containerPort: 8786
protocol: TCP
- name: http-dashboard
containerPort: 8787
protocol: TCP
readinessProbe:
httpGet:
port: http-dashboard
path: /health
initialDelaySeconds: 5
periodSeconds: 10
livenessProbe:
httpGet:
port: http-dashboard
path: /health
initialDelaySeconds: 15
periodSeconds: 20
env:
- name: SCHEDULER_ENV
value: hello-world
service:
type: ClusterIP
selector:
dask.org/cluster-name: simple-job
dask.org/component: scheduler
ports:
- name: tcp-comm
protocol: TCP
port: 8786
targetPort: "tcp-comm"
- name: http-dashboard
protocol: TCP
port: 8787
targetPort: "http-dashboard"
编辑此文件将更改 Dask 作业的默认配置。请参阅 DaskAutoscaler。现在应用 job.yaml
$ kubectl apply -f job.yaml
daskjob.kubernetes.dask.org/simple-job created
现在如果我们检查集群资源,应该会看到正在创建的作业和集群 Pod。
$ kubectl get pods
NAME READY STATUS RESTARTS AGE
simple-job-scheduler 1/1 Running 0 8s
simple-job-runner 1/1 Running 0 8s
simple-job-default-worker-1f6c670fba 1/1 Running 0 8s
simple-job-default-worker-791f93d9ec 1/1 Running 0 8s
我们的运行器 Pod 将执行我们配置的任务。在我们的示例中,您可以看到我们只是像这样创建一个简单的 dask.distributed.Client
对象
from dask.distributed import Client
client = Client()
# Do some work...
我们可以这样做,因为作业 Pod 在运行时会设置一些额外的环境变量,这些变量告诉 Client
如何连接到集群,因此用户无需担心。
作业 Pod 的默认重启策略是 OnFailure
,因此如果它退出时返回码不是 0
,它将自动重启,直到成功完成。当它返回 0
时,将进入 Completed
状态,并且 Dask 集群将自动清理,释放 Kubernetes 集群资源。
$ kubectl get pods
NAME READY STATUS RESTARTS AGE
simple-job-runner 0/1 Completed 0 14s
simple-job-scheduler 1/1 Terminating 0 14s
simple-job-default-worker-1f6c670fba 1/1 Terminating 0 14s
simple-job-default-worker-791f93d9ec 1/1 Terminating 0 14s
当您删除 DaskJob
资源时,所有内容都会自动删除,无论是成功运行后遗留的 Completed
运行器 Pod,还是仍在运行的完整 Dask 集群和运行器。
$ kubectl delete -f job.yaml
daskjob.kubernetes.dask.org "simple-job" deleted
DaskAutoscaler¶
DaskAutoscaler
资源允许调度器使用 Dask 的自适应模式来扩展和缩减工作节点的数量。
通过创建该资源,Operator 控制器将定期轮询调度器,并请求所需的工作节点数量。调度器通过分析正在处理的任务,然后推断需要多少工作节点才能在目标持续时间内完成当前计算图来计算此数量。默认目标持续时间为 5 秒,但可以通过调度器配置中的“distributed.adaptive.target-duration”进行调整。
graph TD DaskCluster(DaskCluster) DaskAutoscaler(DaskAutoscaler) SchedulerPod(Scheduler Pod) DaskWorkerGroup(Default DaskWorkerGroup) WorkerPod1(Worker Pod 1) WorkerPod2(Worker Pod 2) WorkerPodDot(...) WorkerPod10(Worker Pod 10) SchedulerPod -. I need 10 workers .-> DaskAutoscaler DaskAutoscaler -. Scale to 10 workers .-> DaskWorkerGroup DaskCluster --> SchedulerPod DaskCluster --> DaskAutoscaler DaskCluster --> DaskWorkerGroup DaskWorkerGroup --> WorkerPod1 DaskWorkerGroup --> WorkerPod2 DaskWorkerGroup --> WorkerPodDot DaskWorkerGroup --> WorkerPod10 classDef dask stroke:#FDA061,stroke-width:4px classDef dashed stroke-dasharray: 5 5 class DaskCluster dask class DaskCluster dashed class DaskWorkerGroup dask class DaskAutoscaler dask class DaskWorkerGroup dashed class SchedulerPod dashed class WorkerPod1 dashed class WorkerPod2 dashed class WorkerPodDot dashed class WorkerPod10 dashed
控制器会将此数量限制在 DaskAutoscaler
资源中配置的 minimum
和 maximum
值之间,然后更新默认 DaskWorkerGroup
中的副本数量。
# autoscaler.yaml
apiVersion: kubernetes.dask.org/v1
kind: DaskAutoscaler
metadata:
name: simple
spec:
cluster: "simple"
minimum: 1 # we recommend always having a minimum of 1 worker so that an idle cluster can start working on tasks immediately
maximum: 10 # you can place a hard limit on the number of workers regardless of what the scheduler requests
$ kubectl apply -f autoscaler.yaml
daskautoscaler.kubernetes.dask.org/simple created
您可以随时通过删除该资源来结束自动扩展。工作节点的数量将保持在自动扩展器最后设置的值。
$ kubectl delete -f autoscaler.yaml
daskautoscaler.kubernetes.dask.org/simple deleted
注意
自动扩展器只会扩展默认的 WorkerGroup
。如果您配置了额外的工作节点组,它们将不被考虑在内。
标签和注解¶
标签和注解会传播到子资源,因此应用于 DaskCluster
的标签也会出现在它创建的 Pod
和 Service
资源上。
DaskCluster
上的标签/注解会传播到DaskWorkerGroup
、调度器Pod
和调度器Service
。DaskWorkerGroup
上的标签/注解会传播到工作节点Pod
。DaskJob
上的标签/注解会传播到 jobPod
和DaskCluster
。
一些资源还包含子资源元数据选项,用于在其创建的资源上设置标签和注解。
DaskCluster
具有spec.worker.metadata
,其内容将合并到DaskWorkerGroup
的标签/注解中。DaskCluster
具有spec.scheduler.metadata
,其内容将合并到调度器Pod
和调度器Service
的标签/注解中。DaskJob
具有spec.job.metadata
,其内容将合并到 jobPod
的标签/注解中。
标签/注解应用的顺序是 top_level <= subresource <= base
。因此,如果 DaskCluster
具有标签 foo=bar
,但 spec.worker.metadata.labels
中有标签 foo=baz
,则工作节点 Pod
将具有 foo=baz
。
同样,如果在顶级或子资源级别设置了保留的基础标签 dask.org/component
,此设置将被控制器覆盖。因此,在 DaskCluster.spec.worker.metadata.labels
中设置 dask.org/component=superworker
将无效,工作节点 Pod
仍将具有预期的标签 dask.org/component=worker
。
示例¶
以下 DaskCluster
具有顶级注解以及工作节点和调度器的子资源注解。
apiVersion: kubernetes.dask.org/v1
kind: DaskCluster
metadata:
name: example
annotations:
hello: world
spec:
worker:
replicas: 2
metadata:
annotations:
foo: bar
spec:
...
scheduler:
metadata:
annotations:
fizz: buzz
spec:
...
生成的调度器 Pod
元数据注解将是:
apiVersion: v1
kind: Pod
metadata:
name: example-scheduler
annotations:
fizz: buzz
hello: world
...
完整配置参考¶
完整的 DaskCluster
spec 参考。
apiVersion: kubernetes.dask.org/v1
kind: DaskCluster
metadata:
name: example
spec:
worker:
replicas: 2 # number of replica workers to spawn
spec: ... # PodSpec, standard k8s pod - https://kubernetes.ac.cn/docs/reference/generated/kubernetes-api/v1.21/#podspec-v1-core
scheduler:
spec: ... # PodSpec, standard k8s pod - https://kubernetes.ac.cn/docs/reference/generated/kubernetes-api/v1.21/#podspec-v1-core
service: ... # ServiceSpec, standard k8s service - https://kubernetes.ac.cn/docs/reference/generated/kubernetes-api/v1.21/#servicespec-v1-core
idleTimeout: 5 # Number of seconds to time out scheduler liveness probe if no activity
完整的 DaskWorkerGroup
spec 参考。
apiVersion: kubernetes.dask.org/v1
kind: DaskWorkerGroup
metadata:
name: example
spec:
cluster: "name of DaskCluster to associate worker group with"
worker:
replicas: 2 # number of replica workers to spawn
spec: ... # PodSpec, standard k8s pod - https://kubernetes.ac.cn/docs/reference/generated/kubernetes-api/v1.21/#podspec-v1-core
完整的 DaskJob
spec 参考。
apiVersion: kubernetes.dask.org/v1
kind: DaskJob
metadata:
name: example
spec:
job:
spec: ... # PodSpec, standard k8s pod - https://kubernetes.ac.cn/docs/reference/generated/kubernetes-api/v1.21/#podspec-v1-core
cluster:
spec: ... # ClusterSpec, DaskCluster resource spec
完整的 DaskAutoscaler
spec 参考。
apiVersion: kubernetes.dask.org/v1
kind: DaskAutoscaler
metadata:
name: example
spec:
cluster: "name of DaskCluster to autoscale"
minimum: 0 # minimum number of workers to create
maximum: 10 # maximum number of workers to create