扩展 (高级)
目录
扩展 (高级)¶
您可以通过编写插件来扩展 Dask Operator 控制器的功能。如果您希望操作符创建其他资源,例如 Istio 的 VirtualSerivce
、Gateway
和 Certificate
资源,您可能希望这样做。像这样的额外资源可能最终成为一个常见需求,但考虑到 k8s 集群设置的无限可能性,很难将其配置化。
为了帮助集群管理员确保 Dask Operator 完全按照他们的需求工作,我们支持通过插件扩展控制器。
控制器设计概览¶
Dask Operator 的控制器使用 kopf 构建,这允许您用 Python 为任何 Kubernetes 事件编写自定义处理函数。Dask Operator 有一系列 自定义资源,控制器处理这些资源的创建/更新/删除事件。例如,每当创建 DaskCluster
资源时,控制器会将 status.phase
属性设置为 Created
。
@kopf.on.create("daskcluster.kubernetes.dask.org")
async def daskcluster_create(name, namespace, logger, patch, **kwargs):
"""When DaskCluster resource is created set the status.phase.
This allows us to track that the operator is running.
"""
logger.info(f"DaskCluster {name} created in {namespace}.")
patch.status["phase"] = "Created"
然后还有另一个处理程序,它监听已进入 Created
阶段的 DaskCluster
资源。此处理程序会创建集群的 Pod
、Service
和 DaskWorkerGroup
子资源,然后将其置于 Running
阶段。
@kopf.on.field("daskcluster.kubernetes.dask.org", field="status.phase", new="Created")
async def daskcluster_create_components(spec, name, namespace, logger, patch, **kwargs):
"""When the DaskCluster status.phase goes into Pending create the cluster components."""
async with kubernetes.client.api_client.ApiClient() as api_client:
api = kubernetes.client.CoreV1Api(api_client)
custom_api = kubernetes.client.CustomObjectsApi(api_client)
# Create scheduler Pod
data = build_scheduler_pod_spec(...)
kopf.adopt(data)
await api.create_namespaced_pod(namespace=namespace, body=data)
# Create scheduler Service
data = build_scheduler_service_spec(...)
kopf.adopt(data)
await api.create_namespaced_service(namespace=namespace, body=data)
# Create DaskWorkerGroup
data = build_worker_group_spec(...)
kopf.adopt(data)
await custom_api.create_namespaced_custom_object(group="kubernetes.dask.org", version="v1", plural="daskworkergroups", namespace=namespace, body=data)
# Set DaskCluster to Running phase
patch.status["phase"] = "Running"
然后,当 DaskWorkerGroup
资源被创建时,会触发工作节点创建事件处理程序,该处理程序会创建更多 Pod
资源。反过来,Pod
和 Service
资源的创建会触发 Kubernetes 中的内部事件处理程序,这些处理程序将创建容器、设置 iptable 规则等。
这种编写小型处理程序(由 Kubernetes 中的事件触发)的模型允许您使用简单的构建块创建强大的工具。
编写您自己的处理程序¶
为了避免用户必须编写自己的控制器,Dask Operator 控制器支持通过 entry_points
从其他包加载额外的处理程序。
自定义处理程序必须 打包为 Python 模块 并且可导入。
例如,假设您有一个结构如下的最小 Python 包
my_controller_plugin/
├── pyproject.toml
└── my_controller_plugin/
├── __init__.py
└── plugin.py
如果您想编写一个自定义处理程序,它将在调度器 Service
创建时触发,那么 plugin.py
将会是这样
import kopf
@kopf.on.create("service", labels={"dask.org/component": "scheduler"})
async def handle_scheduler_service_create(meta, new, namespace, logger, **kwargs):
# Do something here
# See https://kopf.readthedocs.io/en/stable/handlers for documentation on what is possible here
然后您需要确保您的 pyproject.toml
将该插件注册为 dask_operator_plugin
。
...
[option.entry_points]
dask_operator_plugin =
my_controller_plugin = my_controller_plugin.plugin
然后您可以将其打包并推送到您首选的 Python 包仓库。
安装您的插件¶
当 Dask Operator 控制器启动时,它会检查是否通过 dask_operator_plugin
入口点注册了任何插件,并也会加载这些插件。这意味着安装您的插件非常简单,只需确保您的插件包安装在控制器容器镜像中即可。
控制器默认使用 ghcr.io/dask/dask-kubernetes-operator:latest
容器镜像,因此您的自定义容器 Dockerfile
将会是这样
FROM ghcr.io/dask/dask-kubernetes-operator:latest
RUN pip install my-controller-plugin
然后,当您 安装控制器部署 时,无论是通过 manifest 文件还是使用 helm,您都应指定您的自定义容器镜像。
helm install --set image.name=my_controller_image myrelease dask/dask-kubernetes-operator