从经典迁移

经典的 KubeCluster 类已替换为一个使用 Kubernetes Operator 模式构建的新版本。

安装操作员

要使用 KubeCluster 的新实现,你需要安装 Dask 操作员自定义资源和控制器

自定义资源允许我们将 Dask 集群组件描述为原生的 Kubernetes 资源,而不是像经典实现那样直接创建 PodService 资源。

不幸的是,这需要在你的 Kubernetes 集群上进行少量首次设置才能开始使用 dask-kubernetes。这是新实现包含破坏性更改的一个关键原因。最快的安装方式是使用 helm

$ helm repo add dask https://helm.dask.org
"dask" has been added to your repositories

$ helm repo update
Hang tight while we grab the latest from your chart repositories...
...Successfully got an update from the "dask" chart repository
Update Complete. ⎈Happy Helming!⎈

$ helm install --create-namespace -n dask-operator --generate-name dask/dask-kubernetes-operator
NAME: dask-kubernetes-operator-1666875935
NAMESPACE: dask-operator
STATUS: deployed
REVISION: 1
TEST SUITE: None
NOTES:
Operator has been installed successfully.

现在你的集群上已经安装了控制器和 CRD,你可以开始使用新的 dask_kubernetes.operator.KubeCluster

使用新的 KubeCluster

使用 KubeCluster 创建集群的方式发生了变化,所以让我们看一些比较并探讨如何从经典版本迁移到新版本。

简化的 Python API

我们进行的首批重大改变之一是简化了简单的用例。创建最小集群所需的唯一操作是为其指定一个名称。

from dask_kubernetes.operator import KubeCluster

cluster = KubeCluster(name="mycluster")

我们发现用户自定义集群的第一步是修改容器镜像、环境变量、资源等。我们已将所有最常见的选项作为关键字参数提供,以便于进行微小更改。

from dask_kubernetes.operator import KubeCluster

cluster = KubeCluster(name="mycluster",
                      image='ghcr.io/dask/dask:latest',
                      n_workers=3
                      env={"FOO": "bar"},
                      resources={"requests": {"memory": "2Gi"}, "limits": {"memory": "64Gi"}})

高级 YAML API

我们努力为新用户简化 API,同时也努力确保新实现为高级用户提供更大的灵活性。

经典 KubeCluster 实现的用户对 worker pod 的外观有很大的控制权,因为你需要提供完整的 YAML Pod 规范。新实现不是直接创建分散的 Pod 资源集合,而是将所有内容分组到一个 DaskCluster 自定义资源中。此资源包含一些集群配置选项以及用于 worker pod 和 scheduler pod/service 的嵌套规范。这样可以无限地进行配置,只是要小心不要弄巧成拙。

经典入门页面包含以下 pod 规范示例

# worker-spec.yml
kind: Pod
metadata:
  labels:
    foo: bar
spec:
  restartPolicy: Never
  containers:
  - image: ghcr.io/dask/dask:latest
    imagePullPolicy: IfNotPresent
    args: [dask-worker, --nthreads, '2', --no-dashboard, --memory-limit, 6GB, --death-timeout, '60']
    name: dask-worker
    env:
      - name: EXTRA_PIP_PACKAGES
        value: git+https://github.com/dask/distributed
    resources:
      limits:
        cpu: "2"
        memory: 6G
      requests:
        cpu: "2"
        memory: 6G

在新实现中,具有相同选项的集群规范将如下所示

# cluster-spec.yml
apiVersion: kubernetes.dask.org/v1
kind: DaskCluster
metadata:
  name: example
  labels:
    foo: bar
spec:
  worker:
    replicas: 2
    spec:
      restartPolicy: Never
      containers:
      - name: worker
        image: "ghcr.io/dask/dask:latest"
        imagePullPolicy: "IfNotPresent"
        args: [dask-worker, --nthreads, '2', --no-dashboard, --memory-limit, 6GB, --death-timeout, '60', '--name', $(DASK_WORKER_NAME)]
        env:
          - name: EXTRA_PIP_PACKAGES
            value: git+https://github.com/dask/distributed
        resources:
          limits:
            cpu: "2"
            memory: 6G
          requests:
            cpu: "2"
            memory: 6G
  scheduler:
    spec:
      containers:
      - name: scheduler
        image: "ghcr.io/dask/dask:latest"
        imagePullPolicy: "IfNotPresent"
        args:
          - dask-scheduler
        ports:
          - name: tcp-comm
            containerPort: 8786
            protocol: TCP
          - name: http-dashboard
            containerPort: 8787
            protocol: TCP
        readinessProbe:
          httpGet:
            port: http-dashboard
            path: /health
          initialDelaySeconds: 5
          periodSeconds: 10
        livenessProbe:
          httpGet:
            port: http-dashboard
            path: /health
          initialDelaySeconds: 15
          periodSeconds: 20
    service:
      type: ClusterIP
      selector:
        dask.org/cluster-name: example
        dask.org/component: scheduler
      ports:
      - name: tcp-comm
        protocol: TCP
        port: 8786
        targetPort: "tcp-comm"
      - name: http-dashboard
        protocol: TCP
        port: 8787
        targetPort: "http-dashboard"

注意,新集群规范的 spec.worker.spec 部分与旧 pod 规范的 spec 相匹配。但正如你所见,此示例中提供了更多配置,包括对 scheduler pod 和 service 的一流控制。

使用我们自己的自定义资源的一个强大区别在于,集群的*所有*信息都包含在 DaskCluster 规范中,并且所有集群生命周期逻辑都由我们在 Kubernetes 中的自定义控制器处理。这意味着我们可以同样通过 Python 或通过 kubectl CLI 创建集群。如果你有其他想要原生集成的 Kubernetes 工具,甚至不需要安装 dask-kubernetes 来管理你的集群。

from dask_kubernetes.operator import KubeCluster

cluster = KubeCluster(custom_cluster_spec="cluster-spec.yml")

与此相同

$ kubectl apply -f cluster-spec.yml

你仍然可以通过名称在 Python 中连接到通过 kubectl 创建的集群,并拥有使用集群管理器对象的所有便利。

from dask.distributed import Client
from dask_kubernetes.operator import KubeCluster

cluster = KubeCluster.from_name("example")
cluster.scale(5)
client = Client(cluster)

中间方案

对于喜欢停留在 Python 并希望大部分规范为其生成,但仍希望能够进行复杂自定义的用户,也有一个中间方案。

使用关键字参数创建新的 KubeCluster 时,这些参数会传递给 dask_kubernetes.operator.make_cluster_spec 的调用,这类似于你过去可能使用过的 dask_kubernetes.make_pod_spec。此函数会生成 DaskCluster 规范的字典表示,你可以修改并自己传递给 KubeCluster

from dask_kubernetes.operator import KubeCluster, make_cluster_spec

cluster = KubeCluster(name="foo", n_workers= 2, env={"FOO": "bar"})

# is equivalent to

spec = make_cluster_spec(name="foo", n_workers= 2, env={"FOO": "bar"})
cluster = KubeCluster(custom_cluster_spec=spec)

如果你希望使用关键字参数来方便地设置常用选项,但仍能进行高级调整,例如在 worker pod 上设置 nodeSelector 选项,这会很有用。

from dask_kubernetes.operator import KubeCluster, make_cluster_spec

spec = make_cluster_spec(name="selector-example", n_workers=2)
spec["spec"]["worker"]["spec"]["nodeSelector"] = {"disktype": "ssd"}

cluster = KubeCluster(custom_cluster_spec=spec)

如果你正在使用 make_pod_spec,这也能帮助你从现有工具平滑迁移,因为经典 pod 规范是新集群规范的一个子集。

from dask_kubernetes.operator import KubeCluster, make_cluster_spec
from dask_kubernetes.classic import make_pod_spec

# generate your existing classic pod spec
pod_spec = make_pod_spec(**your_custom_options)
pod_spec[...] = ... # Your existing tweaks to the pod spec

# generate a new cluster spec and merge in the existing pod spec
cluster_spec = make_cluster_spec(name="merge-example")
cluster_spec["spec"]["worker"]["spec"] = pod_spec["spec"]

cluster = KubeCluster(custom_cluster_spec=cluster_spec)

故障排除

从经典实现迁移到基于操作员的新实现将需要你付出一些努力。对此我们深感抱歉。

希望本指南已为你提供了足够的信息,让你有动力和能力进行此更改。但是,如果你遇到困难或希望获得 Dask 维护者的意见,请随时通过 Dask 论坛与我们联系。