KubeCluster

集群管理器

Operator 有一个名为 dask_kubernetes.operator.KubeCluster 的集群管理器,您可以使用它方便地在 Python 中创建和管理 Dask 集群。然后直接将 Dask distributed.Client 对象连接到它并执行您的工作。

集群管理器的目标是抽象化 Kubernetes 资源的复杂性,并提供一个简洁易用的 Python API 来管理集群,同时仍享受 Operator 的所有优势。

在底层,Python 集群管理器将与 Kubernetes API 交互,为我们创建 自定义资源

要在默认命名空间中创建集群,请运行以下命令

from dask_kubernetes.operator import KubeCluster

cluster = KubeCluster(name='foo')

您可以通过向集群的 python 类传递附加参数(namespacen_workers 等)来更改集群的默认配置。请参阅 API 参考 API

您可以扩缩集群

# Scale up the cluster
cluster.scale(5)

# Scale down the cluster
cluster.scale(1)

您可以自动扩缩集群

# Allow cluster to autoscale between 1 and 10 workers
cluster.adapt(minimum=1, maximum=10)

# Disable autoscaling by explicitly scaling to your desired number of workers
cluster.scale(1)

您可以连接到客户端

from dask.distributed import Client

# Connect Dask to the cluster
client = Client(cluster)

最后通过运行以下命令删除集群

cluster.close()

附加工作节点组

附加工作节点组也可以通过 Python 中的集群管理器创建。

from dask_kubernetes.operator import KubeCluster

cluster = KubeCluster(name='foo')

cluster.add_worker_group(name="highmem", n_workers=2, resources={"requests": {"memory": "2Gi"}, "limits": {"memory": "64Gi"}})

我们还可以从集群对象按名称扩缩工作节点组。

cluster.scale(5, worker_group="highmem")

附加工作节点组也可以在 Python 中删除。

cluster.delete_worker_group(name="highmem")

您创建的任何附加工作节点组将在集群删除时被删除。

自定义集群

KubeCluster 类可以接受一系列关键字参数,以便快速轻松地入门,但底层 DaskCluster 资源可能复杂得多,并可通过多种方式配置。您可以传递有效的 DaskCluster 资源规范来创建集群,而不是通过关键字参数暴露所有可能性。您还可以使用 make_cluster_spec() 生成一个规范,KubeCluster 内部使用它,然后您可以根据自定义选项修改它。

from dask_kubernetes.operator import KubeCluster, make_cluster_spec

config = {
   "name": "foo",
   "n_workers": 2,
   "resources":{"requests": {"memory": "2Gi"}, "limits": {"memory": "64Gi"}}
}

cluster = KubeCluster(**config)
# is equivalent to
cluster = KubeCluster(custom_cluster_spec=make_cluster_spec(**config))

您还可以在将规范传递给 KubeCluster 之前修改它,例如,如果您想在工作节点 Pod 上设置 nodeSelector,您可以这样做

from dask_kubernetes.operator import KubeCluster, make_cluster_spec

spec = make_cluster_spec(name="selector-example", n_workers=2)
spec["spec"]["worker"]["spec"]["nodeSelector"] = {"disktype": "ssd"}

cluster = KubeCluster(custom_cluster_spec=spec)

您还可以让调度器运行一个 Jupyter 服务器。通过此配置,您可以通过 Dask 仪表盘访问 Jupyter 服务器。

from dask_kubernetes.operator import KubeCluster, make_cluster_spec

spec = make_cluster_spec(name="jupyter-example", n_workers=2, env={"EXTRA_PIP_PACKAGES": "jupyterlab"})
spec["spec"]["scheduler"]["spec"]["containers"][0]["args"].append("--jupyter")

cluster = KubeCluster(custom_cluster_spec=spec)

cluster.add_worker_group() 方法还支持传递 custom_spec 关键字参数,该参数可以使用 make_worker_spec() 生成。

from dask_kubernetes.operator import KubeCluster, make_worker_spec

cluster = KubeCluster(name="example")

worker_spec = make_worker_spec(cluster_name=cluster.name, n_workers=2, resources={"limits": {"nvidia.com/gpu": 1}})
worker_spec["spec"]["nodeSelector"] = {"cloud.google.com/gke-nodepool": "gpu-node-pool"}

cluster.add_worker_group(custom_spec=worker_spec)

私有容器镜像仓库

一个常见的 make_cluster_spec 派上用场的使用场景是从私有镜像仓库拉取容器镜像。Kubernetes 文档建议创建一个包含您的镜像仓库凭据的 Secret,然后在 Pod 规范中设置 imagePullSecrets 选项。KubeCluster 类没有暴露设置 imagePullSecrets 的任何方式,因此我们需要在创建集群之前生成并更新一个规范。幸运的是,make_pod_spec 使这变得快速且轻松。

$ kubectl create secret docker-registry regcred \
      --docker-server=<your-registry-server> \
      --docker-username=<your-name> \
      --docker-password=<your-pword> \
      --docker-email=<your-email>
from dask_kubernetes.operator import KubeCluster, make_cluster_spec

# Generate the spec
spec = make_cluster_spec(name="custom", image="foo.com/jacobtomlinson/dask:latest")

# Set the imagePullSecrets for the scheduler and worker pods
spec["spec"]["worker"]["spec"]["imagePullSecrets"] = [{"name": "regcred"}]
spec["spec"]["scheduler"]["spec"]["imagePullSecrets"] = [{"name": "regcred"}]

# Create the cluster
cluster = KubeCluster(custom_cluster_spec=spec)

基于角色的访问控制 (RBAC)

为了从集群上运行的 Pod 中生成 Dask 集群,创建该 Pod 的服务帐户需要一组 RBAC 权限。创建一个将用于 Dask 的服务帐户,然后通过 ClusterRoleBinding 将以下 ClusterRole 附加到该 ServiceAccount

kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: dask-cluster-role
rules:
  # Application: watching & handling for the custom resource we declare.
  - apiGroups: [kubernetes.dask.org]
    resources: [daskclusters, daskworkergroups, daskworkergroups/scale, daskjobs, daskautoscalers]
    verbs: [get, list, watch, patch, create, delete]

  # Application: other resources it needs to watch and get information from.
  - apiGroups:
    - ""  # indicates the core API group
    resources: [pods, pods/status]
    verbs:
    - "get"
    - "list"
    - "watch"

  - apiGroups:
    - ""  # indicates the core API group
    resources: [services]
    verbs:
    - "get"
    - "list"
    - "watch"
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: dask-cluster-role-binding
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: dask-cluster-role
subjects:
  - kind: ServiceAccount
    name: dask-sa  # adjust name based on the service account you created

API

KubeCluster(*[, name, namespace, image, ...])

使用 Operator 在 Kubernetes 上启动 Dask 集群

KubeCluster.scale(n[, worker_group])

将集群扩缩到 n 个工作节点

KubeCluster.adapt([minimum, maximum])

开启自适应

KubeCluster.get_logs()

获取 Dask 调度器和工作节点的日志。

KubeCluster.add_worker_group(name[, ...])

按名称创建 Dask 工作节点组

KubeCluster.delete_worker_group(name)

按名称删除 Dask 工作节点组

KubeCluster.close([timeout])

删除 Dask 集群

class dask_kubernetes.operator.KubeCluster(*, name: Optional[str] = None, namespace: Optional[str] = None, image: Optional[str] = None, n_workers: Optional[int] = None, resources: Optional[Dict[str, str]] = None, env: Optional[Union[List[dict], Dict[str, str]]] = None, worker_command: Optional[List[str]] = None, port_forward_cluster_ip: Optional[bool] = None, create_mode: Optional[dask_kubernetes.operator.kubecluster.kubecluster.CreateMode] = None, shutdown_on_close: Optional[bool] = None, idle_timeout: Optional[int] = None, resource_timeout: Optional[int] = None, scheduler_service_type: Optional[str] = None, custom_cluster_spec: Optional[Union[str, dict]] = None, scheduler_forward_port: Optional[int] = None, jupyter: bool = False, loop: Optional[tornado.ioloop.IOLoop] = None, em class="sig-param">asynchronous: bool = False, em class="sig-param">quiet: bool = False, **kwargs)[source]

使用 Operator 在 Kubernetes 上启动 Dask 集群

此集群管理器通过部署 Dask Operator 创建 Pod 所需的 Kubernetes 资源来创建 Dask 集群。它还可以通过提供集群名称来连接到现有集群。

参数
name: str

赋予 Dask 集群的名称。必需,除非传递了 custom_cluster_spec,此时将忽略此参数,优先使用 custom_cluster_spec[“metadata”][“name”]。

namespace: str(可选)

启动工作节点的命名空间。默认为当前命名空间(如果可用)或“default”

image: str(可选)

在调度器和工作节点 Pod 中运行的镜像。

n_workers: int

初始启动时的工作节点数量。将来使用 scale 方法更改此数量

resources: Dict[str, str]
env: List[dict] | Dict[str, str]

要在调度器和工作节点上设置的环境变量列表。可以是使用与 k8s envs 相同结构的 dict 列表,也可以是键值对的单个字典

worker_command: List[str] | str

启动工作节点时使用的命令。如果命令包含多个单词,应作为字符串列表传递。默认为“dask-worker”。

port_forward_cluster_ip: bool(可选)

如果 chart 使用 ClusterIP 类型的服务,请在本地转发端口。如果您在本地运行,它应该是您转发到的 端口。

create_mode: CreateMode(可选)

如果集群资源已存在,如何处理集群创建。默认行为是,如果同名集群不存在则创建新集群,如果存在则连接到现有集群。您也可以设置 CreateMode.CREATE_ONLY,如果同名集群已存在则抛出异常。或者设置 CreateMode.CONNECT_ONLY,如果同名集群不存在则抛出异常。

shutdown_on_close: bool(可选)

关闭此对象时是否删除集群资源。创建集群时默认为 True,连接到现有集群时默认为 False。

idle_timeout: int(可选)

如果设置了此参数,如果调度器空闲时间超过此超时时长(秒),Kubernetes 将自动删除集群。

resource_timeout: int(可选)

等待 Kubernetes 资源进入预期状态的超时时长(秒)。示例:如果创建的 DaskCluster 资源未被控制器推入已知的 status.phase 状态,则可能表明控制器未运行或出现故障,我们将超时并提供有用的错误进行清理。示例 2:如果调度器 Pod 进入 CrashBackoffLoop 状态的时间超过此超时时长,我们将放弃并提供有用的错误。默认为 60 秒。

scheduler_service_type: str(可选)

用于调度器的 Kubernetes 服务类型。默认为 ClusterIP

jupyter: bool(可选)

在调度器节点上启动 Jupyter。

custom_cluster_spec: str | dict(可选)

YAML 清单的路径或 DaskCluster 资源对象的字典表示,将使用它来创建集群,而不是从其他关键字参数生成。

scheduler_forward_port: int(可选)

转发调度器仪表盘时使用的端口。默认将使用随机端口

quiet: bool

如果启用,抑制所有打印输出。默认为 False。

**kwargs: dict

要传递给 LocalCluster 的附加关键字参数

另请参阅

KubeCluster.from_name

示例

>>> from dask_kubernetes.operator import KubeCluster
>>> cluster = KubeCluster(name="foo")

您可以添加另一组工作节点(默认为 3 个工作节点)>>> cluster.add_worker_group(‘additional’, n=4)

然后您可以使用 scale 方法调整集群大小 >>> cluster.scale(10)

并且可以选择扩缩特定的工作节点组 >>> cluster.scale(10, worker_group=’additional’)

您还可以自适应地调整集群大小并指定工作节点范围 >>> cluster.adapt(20, 50)

您可以将此集群直接传递给 Dask 客户端 >>> from dask.distributed import Client >>> client = Client(cluster)

您还可以访问集群日志 >>> cluster.get_logs()

您还可以连接到现有集群 >>> existing_cluster = KubeCluster.from_name(name=”ialreadyexist”)

属性
asynchronous

我们是否在事件循环中运行?

called_from_running_loop
dashboard_link
jupyter_link
loop
name
observed
plan
requested
scheduler_address

方法

adapt([minimum, maximum])

开启自适应

add_worker_group(name[, n_workers, image, ...])

按名称创建 Dask 工作节点组

close([timeout])

删除 Dask 集群

delete_worker_group(name)

按名称删除 Dask 工作节点组

from_name(name, **kwargs)

创建此类的实例以按名称表示现有集群。

get_client()

返回集群的客户端

get_logs()

获取 Dask 调度器和工作节点的日志。

scale(n[, worker_group])

将集群扩缩到 n 个工作节点

sync(func, *args[, asynchronous, ...])

根据调用上下文同步或异步调用 func 和 args

wait_for_workers(n_workers[, timeout])

阻塞调用,等待 n 个工作节点后继续

generate_rich_output

logs

adapt(minimum=None, maximum=None)[source]

开启自适应

参数
minimumint

最小工作节点数量

minimumint

最大工作节点数量

示例

>>> cluster.adapt()  # Allow scheduler to add/remove workers within k8s cluster resource limits
>>> cluster.adapt(minimum=1, maximum=10) # Allow scheduler to add/remove workers within 1-10 range
add_worker_group(name, n_workers=3, image=None, resources=None, worker_command=None, env=None, custom_spec=None)[source]

按名称创建 Dask 工作节点组

参数
name: str

工作节点组的名称

n_workers: int

初始启动时的工作节点数量。将来使用 .scale(n_workers, worker_group=name) 更改此数量。

image: str(可选)

在调度器和工作节点 Pod 中运行的镜像。如果省略,将使用集群默认设置。

resources: Dict[str, str]

要传递给底层 Pod 的资源。如果省略,将使用集群默认设置。

env: List[dict]

要传递给工作节点 Pod 的环境变量列表。如果省略,将使用集群默认设置。

custom_spec: dict(可选)

工作节点规范的字典表示,将使用它来创建 DaskWorkerGroup,而不是从其他关键字参数生成。

示例

>>> cluster.add_worker_group("high-mem-workers", n_workers=5)
close(timeout=3600)[source]

删除 Dask 集群

delete_worker_group(name)[source]

按名称删除 Dask 工作节点组

参数
name: str

工作节点组的名称

示例

>>> cluster.delete_worker_group("high-mem-workers")
classmethod from_name(name, **kwargs)[source]

创建此类的实例以按名称表示现有集群。

如果同名集群不存在,将会失败。

参数
name: str

要连接的集群名称

示例

>>> cluster = KubeCluster.from_name(name="simple-cluster")
get_logs()[source]

获取 Dask 调度器和工作节点的日志。

示例

>>> cluster.get_logs()
{'foo': ...,
'foo-default-worker-0269dbfa0cfd4a22bcd9d92ae032f4d2': ...,
'foo-default-worker-7c1ccb04cd0e498fb21babaedd00e5d4': ...,
'foo-default-worker-d65bee23bdae423b8d40c5da7a1065b6': ...}
Each log will be a string of all logs for that container. To view
it is recommeded that you print each log.
>>> print(cluster.get_logs()["testdask-scheduler-5c8ffb6b7b-sjgrg"])
...
distributed.scheduler - INFO - -----------------------------------------------
distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO -   Scheduler at:   tcp://10.244.0.222:8786
distributed.scheduler - INFO -   dashboard at:                     :8787
...
scale(n, worker_group='default')[source]

将集群扩缩到 n 个工作节点

参数
nint

目标工作节点数量

worker_groupstr

要扩缩的工作节点组

示例

>>> cluster.scale(10)  # scale cluster to ten workers
>>> cluster.scale(7, worker_group="high-mem-workers") # scale worker group high-mem-workers to seven workers
dask_kubernetes.operator.make_cluster_spec(name, image='ghcr.io/dask/dask:latest', n_workers=None, resources=None, env=None, worker_command='dask-worker', scheduler_service_type='ClusterIP', idle_timeout=0, jupyter=False)[source]

生成一个 DaskCluster Kubernetes 资源。

用一些常见选项填充模板以生成 DaskCluster Kubernetes 资源。

参数
name: str

集群名称

image: str(可选)

用于调度器和工作节点的容器镜像

n_workers: int(可选)

默认工作节点组中的工作节点数量

resources: dict(可选)

在调度器和工作节点上设置的资源限制

env: dict(可选)

在调度器和工作节点上设置的环境变量

worker_command: str(可选)

启动工作节点时使用的命令

idle_timeout: int(可选)

清理空闲集群的超时时长

jupyter: bool(可选)

在 Dask 调度器上启动 Jupyter

dask_kubernetes.operator.make_worker_spec(image='ghcr.io/dask/dask:latest', n_workers=3, resources=None, env=None, worker_command='dask-worker')[source]