KubeCluster
目录
KubeCluster¶
集群管理器¶
Operator 有一个名为 dask_kubernetes.operator.KubeCluster
的集群管理器,您可以使用它方便地在 Python 中创建和管理 Dask 集群。然后直接将 Dask distributed.Client
对象连接到它并执行您的工作。
集群管理器的目标是抽象化 Kubernetes 资源的复杂性,并提供一个简洁易用的 Python API 来管理集群,同时仍享受 Operator 的所有优势。
在底层,Python 集群管理器将与 Kubernetes API 交互,为我们创建 自定义资源。
要在默认命名空间中创建集群,请运行以下命令
from dask_kubernetes.operator import KubeCluster
cluster = KubeCluster(name='foo')
您可以通过向集群的 python 类传递附加参数(namespace
、n_workers
等)来更改集群的默认配置。请参阅 API 参考 API
您可以扩缩集群
# Scale up the cluster
cluster.scale(5)
# Scale down the cluster
cluster.scale(1)
您可以自动扩缩集群
# Allow cluster to autoscale between 1 and 10 workers
cluster.adapt(minimum=1, maximum=10)
# Disable autoscaling by explicitly scaling to your desired number of workers
cluster.scale(1)
您可以连接到客户端
from dask.distributed import Client
# Connect Dask to the cluster
client = Client(cluster)
最后通过运行以下命令删除集群
cluster.close()
附加工作节点组¶
附加工作节点组也可以通过 Python 中的集群管理器创建。
from dask_kubernetes.operator import KubeCluster
cluster = KubeCluster(name='foo')
cluster.add_worker_group(name="highmem", n_workers=2, resources={"requests": {"memory": "2Gi"}, "limits": {"memory": "64Gi"}})
我们还可以从集群对象按名称扩缩工作节点组。
cluster.scale(5, worker_group="highmem")
附加工作节点组也可以在 Python 中删除。
cluster.delete_worker_group(name="highmem")
您创建的任何附加工作节点组将在集群删除时被删除。
自定义集群¶
KubeCluster
类可以接受一系列关键字参数,以便快速轻松地入门,但底层 DaskCluster 资源可能复杂得多,并可通过多种方式配置。您可以传递有效的 DaskCluster
资源规范来创建集群,而不是通过关键字参数暴露所有可能性。您还可以使用 make_cluster_spec()
生成一个规范,KubeCluster
内部使用它,然后您可以根据自定义选项修改它。
from dask_kubernetes.operator import KubeCluster, make_cluster_spec
config = {
"name": "foo",
"n_workers": 2,
"resources":{"requests": {"memory": "2Gi"}, "limits": {"memory": "64Gi"}}
}
cluster = KubeCluster(**config)
# is equivalent to
cluster = KubeCluster(custom_cluster_spec=make_cluster_spec(**config))
您还可以在将规范传递给 KubeCluster
之前修改它,例如,如果您想在工作节点 Pod 上设置 nodeSelector
,您可以这样做
from dask_kubernetes.operator import KubeCluster, make_cluster_spec
spec = make_cluster_spec(name="selector-example", n_workers=2)
spec["spec"]["worker"]["spec"]["nodeSelector"] = {"disktype": "ssd"}
cluster = KubeCluster(custom_cluster_spec=spec)
您还可以让调度器运行一个 Jupyter 服务器。通过此配置,您可以通过 Dask 仪表盘访问 Jupyter 服务器。
from dask_kubernetes.operator import KubeCluster, make_cluster_spec
spec = make_cluster_spec(name="jupyter-example", n_workers=2, env={"EXTRA_PIP_PACKAGES": "jupyterlab"})
spec["spec"]["scheduler"]["spec"]["containers"][0]["args"].append("--jupyter")
cluster = KubeCluster(custom_cluster_spec=spec)
cluster.add_worker_group()
方法还支持传递 custom_spec
关键字参数,该参数可以使用 make_worker_spec()
生成。
from dask_kubernetes.operator import KubeCluster, make_worker_spec
cluster = KubeCluster(name="example")
worker_spec = make_worker_spec(cluster_name=cluster.name, n_workers=2, resources={"limits": {"nvidia.com/gpu": 1}})
worker_spec["spec"]["nodeSelector"] = {"cloud.google.com/gke-nodepool": "gpu-node-pool"}
cluster.add_worker_group(custom_spec=worker_spec)
私有容器镜像仓库¶
一个常见的 make_cluster_spec
派上用场的使用场景是从私有镜像仓库拉取容器镜像。Kubernetes 文档建议创建一个包含您的镜像仓库凭据的 Secret
,然后在 Pod 规范中设置 imagePullSecrets
选项。KubeCluster
类没有暴露设置 imagePullSecrets
的任何方式,因此我们需要在创建集群之前生成并更新一个规范。幸运的是,make_pod_spec
使这变得快速且轻松。
$ kubectl create secret docker-registry regcred \
--docker-server=<your-registry-server> \
--docker-username=<your-name> \
--docker-password=<your-pword> \
--docker-email=<your-email>
from dask_kubernetes.operator import KubeCluster, make_cluster_spec
# Generate the spec
spec = make_cluster_spec(name="custom", image="foo.com/jacobtomlinson/dask:latest")
# Set the imagePullSecrets for the scheduler and worker pods
spec["spec"]["worker"]["spec"]["imagePullSecrets"] = [{"name": "regcred"}]
spec["spec"]["scheduler"]["spec"]["imagePullSecrets"] = [{"name": "regcred"}]
# Create the cluster
cluster = KubeCluster(custom_cluster_spec=spec)
基于角色的访问控制 (RBAC)¶
为了从集群上运行的 Pod 中生成 Dask 集群,创建该 Pod 的服务帐户需要一组 RBAC 权限。创建一个将用于 Dask 的服务帐户,然后通过 ClusterRoleBinding 将以下 ClusterRole 附加到该 ServiceAccount
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: dask-cluster-role
rules:
# Application: watching & handling for the custom resource we declare.
- apiGroups: [kubernetes.dask.org]
resources: [daskclusters, daskworkergroups, daskworkergroups/scale, daskjobs, daskautoscalers]
verbs: [get, list, watch, patch, create, delete]
# Application: other resources it needs to watch and get information from.
- apiGroups:
- "" # indicates the core API group
resources: [pods, pods/status]
verbs:
- "get"
- "list"
- "watch"
- apiGroups:
- "" # indicates the core API group
resources: [services]
verbs:
- "get"
- "list"
- "watch"
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: dask-cluster-role-binding
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: dask-cluster-role
subjects:
- kind: ServiceAccount
name: dask-sa # adjust name based on the service account you created
API¶
|
使用 Operator 在 Kubernetes 上启动 Dask 集群 |
|
将集群扩缩到 n 个工作节点 |
|
开启自适应 |
获取 Dask 调度器和工作节点的日志。 |
|
|
按名称创建 Dask 工作节点组 |
按名称删除 Dask 工作节点组 |
|
|
删除 Dask 集群 |
- class dask_kubernetes.operator.KubeCluster(*, name: Optional[str] = None, namespace: Optional[str] = None, image: Optional[str] = None, n_workers: Optional[int] = None, resources: Optional[Dict[str, str]] = None, env: Optional[Union[List[dict], Dict[str, str]]] = None, worker_command: Optional[List[str]] = None, port_forward_cluster_ip: Optional[bool] = None, create_mode: Optional[dask_kubernetes.operator.kubecluster.kubecluster.CreateMode] = None, shutdown_on_close: Optional[bool] = None, idle_timeout: Optional[int] = None, resource_timeout: Optional[int] = None, scheduler_service_type: Optional[str] = None, custom_cluster_spec: Optional[Union[str, dict]] = None, scheduler_forward_port: Optional[int] = None, jupyter: bool = False, loop: Optional[tornado.ioloop.IOLoop] = None, em class="sig-param">asynchronous: bool = False, em class="sig-param">quiet: bool = False, **kwargs)[source]¶
使用 Operator 在 Kubernetes 上启动 Dask 集群
此集群管理器通过部署 Dask Operator 创建 Pod 所需的 Kubernetes 资源来创建 Dask 集群。它还可以通过提供集群名称来连接到现有集群。
- 参数
- name: str
赋予 Dask 集群的名称。必需,除非传递了 custom_cluster_spec,此时将忽略此参数,优先使用 custom_cluster_spec[“metadata”][“name”]。
- namespace: str(可选)
启动工作节点的命名空间。默认为当前命名空间(如果可用)或“default”
- image: str(可选)
在调度器和工作节点 Pod 中运行的镜像。
- n_workers: int
初始启动时的工作节点数量。将来使用
scale
方法更改此数量- resources: Dict[str, str]
- env: List[dict] | Dict[str, str]
要在调度器和工作节点上设置的环境变量列表。可以是使用与 k8s envs 相同结构的 dict 列表,也可以是键值对的单个字典
- worker_command: List[str] | str
启动工作节点时使用的命令。如果命令包含多个单词,应作为字符串列表传递。默认为“dask-worker”。
- port_forward_cluster_ip: bool(可选)
如果 chart 使用 ClusterIP 类型的服务,请在本地转发端口。如果您在本地运行,它应该是您转发到的
端口。 - create_mode: CreateMode(可选)
如果集群资源已存在,如何处理集群创建。默认行为是,如果同名集群不存在则创建新集群,如果存在则连接到现有集群。您也可以设置
CreateMode.CREATE_ONLY
,如果同名集群已存在则抛出异常。或者设置CreateMode.CONNECT_ONLY
,如果同名集群不存在则抛出异常。- shutdown_on_close: bool(可选)
关闭此对象时是否删除集群资源。创建集群时默认为 True,连接到现有集群时默认为 False。
- idle_timeout: int(可选)
如果设置了此参数,如果调度器空闲时间超过此超时时长(秒),Kubernetes 将自动删除集群。
- resource_timeout: int(可选)
等待 Kubernetes 资源进入预期状态的超时时长(秒)。示例:如果创建的
DaskCluster
资源未被控制器推入已知的status.phase
状态,则可能表明控制器未运行或出现故障,我们将超时并提供有用的错误进行清理。示例 2:如果调度器 Pod 进入CrashBackoffLoop
状态的时间超过此超时时长,我们将放弃并提供有用的错误。默认为 60 秒。- scheduler_service_type: str(可选)
用于调度器的 Kubernetes 服务类型。默认为
ClusterIP
。- jupyter: bool(可选)
在调度器节点上启动 Jupyter。
- custom_cluster_spec: str | dict(可选)
YAML 清单的路径或
DaskCluster
资源对象的字典表示,将使用它来创建集群,而不是从其他关键字参数生成。- scheduler_forward_port: int(可选)
转发调度器仪表盘时使用的端口。默认将使用随机端口
- quiet: bool
如果启用,抑制所有打印输出。默认为 False。
- **kwargs: dict
要传递给 LocalCluster 的附加关键字参数
示例
>>> from dask_kubernetes.operator import KubeCluster >>> cluster = KubeCluster(name="foo")
您可以添加另一组工作节点(默认为 3 个工作节点)>>> cluster.add_worker_group(‘additional’, n=4)
然后您可以使用 scale 方法调整集群大小 >>> cluster.scale(10)
并且可以选择扩缩特定的工作节点组 >>> cluster.scale(10, worker_group=’additional’)
您还可以自适应地调整集群大小并指定工作节点范围 >>> cluster.adapt(20, 50)
您可以将此集群直接传递给 Dask 客户端 >>> from dask.distributed import Client >>> client = Client(cluster)
您还可以访问集群日志 >>> cluster.get_logs()
您还可以连接到现有集群 >>> existing_cluster = KubeCluster.from_name(name=”ialreadyexist”)
- 属性
asynchronous
我们是否在事件循环中运行?
- called_from_running_loop
- dashboard_link
- jupyter_link
- loop
- name
- observed
- plan
- requested
- scheduler_address
方法
adapt
([minimum, maximum])开启自适应
add_worker_group
(name[, n_workers, image, ...])按名称创建 Dask 工作节点组
close
([timeout])删除 Dask 集群
delete_worker_group
(name)按名称删除 Dask 工作节点组
from_name
(name, **kwargs)创建此类的实例以按名称表示现有集群。
get_client
()返回集群的客户端
get_logs
()获取 Dask 调度器和工作节点的日志。
scale
(n[, worker_group])将集群扩缩到 n 个工作节点
sync
(func, *args[, asynchronous, ...])根据调用上下文同步或异步调用 func 和 args
wait_for_workers
(n_workers[, timeout])阻塞调用,等待 n 个工作节点后继续
generate_rich_output
logs
- adapt(minimum=None, maximum=None)[source]¶
开启自适应
- 参数
- minimumint
最小工作节点数量
- minimumint
最大工作节点数量
示例
>>> cluster.adapt() # Allow scheduler to add/remove workers within k8s cluster resource limits >>> cluster.adapt(minimum=1, maximum=10) # Allow scheduler to add/remove workers within 1-10 range
- add_worker_group(name, n_workers=3, image=None, resources=None, worker_command=None, env=None, custom_spec=None)[source]¶
按名称创建 Dask 工作节点组
- 参数
- name: str
工作节点组的名称
- n_workers: int
初始启动时的工作节点数量。将来使用
.scale(n_workers, worker_group=name)
更改此数量。- image: str(可选)
在调度器和工作节点 Pod 中运行的镜像。如果省略,将使用集群默认设置。
- resources: Dict[str, str]
要传递给底层 Pod 的资源。如果省略,将使用集群默认设置。
- env: List[dict]
要传递给工作节点 Pod 的环境变量列表。如果省略,将使用集群默认设置。
- custom_spec: dict(可选)
工作节点规范的字典表示,将使用它来创建 DaskWorkerGroup,而不是从其他关键字参数生成。
示例
>>> cluster.add_worker_group("high-mem-workers", n_workers=5)
- delete_worker_group(name)[source]¶
按名称删除 Dask 工作节点组
- 参数
- name: str
工作节点组的名称
示例
>>> cluster.delete_worker_group("high-mem-workers")
- classmethod from_name(name, **kwargs)[source]¶
创建此类的实例以按名称表示现有集群。
如果同名集群不存在,将会失败。
- 参数
- name: str
要连接的集群名称
示例
>>> cluster = KubeCluster.from_name(name="simple-cluster")
- get_logs()[source]¶
获取 Dask 调度器和工作节点的日志。
示例
>>> cluster.get_logs() {'foo': ..., 'foo-default-worker-0269dbfa0cfd4a22bcd9d92ae032f4d2': ..., 'foo-default-worker-7c1ccb04cd0e498fb21babaedd00e5d4': ..., 'foo-default-worker-d65bee23bdae423b8d40c5da7a1065b6': ...} Each log will be a string of all logs for that container. To view it is recommeded that you print each log. >>> print(cluster.get_logs()["testdask-scheduler-5c8ffb6b7b-sjgrg"]) ... distributed.scheduler - INFO - ----------------------------------------------- distributed.scheduler - INFO - Clear task state distributed.scheduler - INFO - Scheduler at: tcp://10.244.0.222:8786 distributed.scheduler - INFO - dashboard at: :8787 ...
- dask_kubernetes.operator.make_cluster_spec(name, image='ghcr.io/dask/dask:latest', n_workers=None, resources=None, env=None, worker_command='dask-worker', scheduler_service_type='ClusterIP', idle_timeout=0, jupyter=False)[source]¶
生成一个
DaskCluster
Kubernetes 资源。用一些常见选项填充模板以生成
DaskCluster
Kubernetes 资源。- 参数
- name: str
集群名称
- image: str(可选)
用于调度器和工作节点的容器镜像
- n_workers: int(可选)
默认工作节点组中的工作节点数量
- resources: dict(可选)
在调度器和工作节点上设置的资源限制
- env: dict(可选)
在调度器和工作节点上设置的环境变量
- worker_command: str(可选)
启动工作节点时使用的命令
- idle_timeout: int(可选)
清理空闲集群的超时时长
- jupyter: bool(可选)
在 Dask 调度器上启动 Jupyter