扩展 (高级)
目录
扩展 (高级)¶
您可以通过编写插件来扩展 Dask Operator 控制器的功能。如果您希望操作符创建其他资源,例如 Istio 的 VirtualSerivce、Gateway 和 Certificate 资源,您可能希望这样做。像这样的额外资源可能最终成为一个常见需求,但考虑到 k8s 集群设置的无限可能性,很难将其配置化。
为了帮助集群管理员确保 Dask Operator 完全按照他们的需求工作,我们支持通过插件扩展控制器。
控制器设计概览¶
Dask Operator 的控制器使用 kopf 构建,这允许您用 Python 为任何 Kubernetes 事件编写自定义处理函数。Dask Operator 有一系列 自定义资源,控制器处理这些资源的创建/更新/删除事件。例如,每当创建 DaskCluster 资源时,控制器会将 status.phase 属性设置为 Created。
@kopf.on.create("daskcluster.kubernetes.dask.org")
async def daskcluster_create(name, namespace, logger, patch, **kwargs):
"""When DaskCluster resource is created set the status.phase.
This allows us to track that the operator is running.
"""
logger.info(f"DaskCluster {name} created in {namespace}.")
patch.status["phase"] = "Created"
然后还有另一个处理程序,它监听已进入 Created 阶段的 DaskCluster 资源。此处理程序会创建集群的 Pod、Service 和 DaskWorkerGroup 子资源,然后将其置于 Running 阶段。
@kopf.on.field("daskcluster.kubernetes.dask.org", field="status.phase", new="Created")
async def daskcluster_create_components(spec, name, namespace, logger, patch, **kwargs):
"""When the DaskCluster status.phase goes into Pending create the cluster components."""
async with kubernetes.client.api_client.ApiClient() as api_client:
api = kubernetes.client.CoreV1Api(api_client)
custom_api = kubernetes.client.CustomObjectsApi(api_client)
# Create scheduler Pod
data = build_scheduler_pod_spec(...)
kopf.adopt(data)
await api.create_namespaced_pod(namespace=namespace, body=data)
# Create scheduler Service
data = build_scheduler_service_spec(...)
kopf.adopt(data)
await api.create_namespaced_service(namespace=namespace, body=data)
# Create DaskWorkerGroup
data = build_worker_group_spec(...)
kopf.adopt(data)
await custom_api.create_namespaced_custom_object(group="kubernetes.dask.org", version="v1", plural="daskworkergroups", namespace=namespace, body=data)
# Set DaskCluster to Running phase
patch.status["phase"] = "Running"
然后,当 DaskWorkerGroup 资源被创建时,会触发工作节点创建事件处理程序,该处理程序会创建更多 Pod 资源。反过来,Pod 和 Service 资源的创建会触发 Kubernetes 中的内部事件处理程序,这些处理程序将创建容器、设置 iptable 规则等。
这种编写小型处理程序(由 Kubernetes 中的事件触发)的模型允许您使用简单的构建块创建强大的工具。
编写您自己的处理程序¶
为了避免用户必须编写自己的控制器,Dask Operator 控制器支持通过 entry_points 从其他包加载额外的处理程序。
自定义处理程序必须 打包为 Python 模块 并且可导入。
例如,假设您有一个结构如下的最小 Python 包
my_controller_plugin/
├── pyproject.toml
└── my_controller_plugin/
├── __init__.py
└── plugin.py
如果您想编写一个自定义处理程序,它将在调度器 Service 创建时触发,那么 plugin.py 将会是这样
import kopf
@kopf.on.create("service", labels={"dask.org/component": "scheduler"})
async def handle_scheduler_service_create(meta, new, namespace, logger, **kwargs):
# Do something here
# See https://kopf.readthedocs.io/en/stable/handlers for documentation on what is possible here
然后您需要确保您的 pyproject.toml 将该插件注册为 dask_operator_plugin。
...
[option.entry_points]
dask_operator_plugin =
my_controller_plugin = my_controller_plugin.plugin
然后您可以将其打包并推送到您首选的 Python 包仓库。
安装您的插件¶
当 Dask Operator 控制器启动时,它会检查是否通过 dask_operator_plugin 入口点注册了任何插件,并也会加载这些插件。这意味着安装您的插件非常简单,只需确保您的插件包安装在控制器容器镜像中即可。
控制器默认使用 ghcr.io/dask/dask-kubernetes-operator:latest 容器镜像,因此您的自定义容器 Dockerfile 将会是这样
FROM ghcr.io/dask/dask-kubernetes-operator:latest
RUN pip install my-controller-plugin
然后,当您 安装控制器部署 时,无论是通过 manifest 文件还是使用 helm,您都应指定您的自定义容器镜像。
helm install --set image.name=my_controller_image myrelease dask/dask-kubernetes-operator