扩展 (高级)

您可以通过编写插件来扩展 Dask Operator 控制器的功能。如果您希望操作符创建其他资源,例如 Istio 的 VirtualSerivceGatewayCertificate 资源,您可能希望这样做。像这样的额外资源可能最终成为一个常见需求,但考虑到 k8s 集群设置的无限可能性,很难将其配置化。

为了帮助集群管理员确保 Dask Operator 完全按照他们的需求工作,我们支持通过插件扩展控制器。

控制器设计概览

Dask Operator 的控制器使用 kopf 构建,这允许您用 Python 为任何 Kubernetes 事件编写自定义处理函数。Dask Operator 有一系列 自定义资源,控制器处理这些资源的创建/更新/删除事件。例如,每当创建 DaskCluster 资源时,控制器会将 status.phase 属性设置为 Created

@kopf.on.create("daskcluster.kubernetes.dask.org")
async def daskcluster_create(name, namespace, logger, patch, **kwargs):
   """When DaskCluster resource is created set the status.phase.

   This allows us to track that the operator is running.
   """
   logger.info(f"DaskCluster {name} created in {namespace}.")
   patch.status["phase"] = "Created"

然后还有另一个处理程序,它监听已进入 Created 阶段的 DaskCluster 资源。此处理程序会创建集群的 PodServiceDaskWorkerGroup 子资源,然后将其置于 Running 阶段。

@kopf.on.field("daskcluster.kubernetes.dask.org", field="status.phase", new="Created")
async def daskcluster_create_components(spec, name, namespace, logger, patch, **kwargs):
   """When the DaskCluster status.phase goes into Pending create the cluster components."""
   async with kubernetes.client.api_client.ApiClient() as api_client:
      api = kubernetes.client.CoreV1Api(api_client)
      custom_api = kubernetes.client.CustomObjectsApi(api_client)

      # Create scheduler Pod
      data = build_scheduler_pod_spec(...)
      kopf.adopt(data)
      await api.create_namespaced_pod(namespace=namespace, body=data)

      # Create scheduler Service
      data = build_scheduler_service_spec(...)
      kopf.adopt(data)
      await api.create_namespaced_service(namespace=namespace, body=data)

      # Create DaskWorkerGroup
      data = build_worker_group_spec(...)
      kopf.adopt(data)
      await custom_api.create_namespaced_custom_object(group="kubernetes.dask.org", version="v1", plural="daskworkergroups", namespace=namespace, body=data)

   # Set DaskCluster to Running phase
   patch.status["phase"] = "Running"

然后,当 DaskWorkerGroup 资源被创建时,会触发工作节点创建事件处理程序,该处理程序会创建更多 Pod 资源。反过来,PodService 资源的创建会触发 Kubernetes 中的内部事件处理程序,这些处理程序将创建容器、设置 iptable 规则等。

这种编写小型处理程序(由 Kubernetes 中的事件触发)的模型允许您使用简单的构建块创建强大的工具。

编写您自己的处理程序

为了避免用户必须编写自己的控制器,Dask Operator 控制器支持通过 entry_points 从其他包加载额外的处理程序。

自定义处理程序必须 打包为 Python 模块 并且可导入。

例如,假设您有一个结构如下的最小 Python 包

my_controller_plugin/
├── pyproject.toml
└── my_controller_plugin/
    ├── __init__.py
    └── plugin.py

如果您想编写一个自定义处理程序,它将在调度器 Service 创建时触发,那么 plugin.py 将会是这样

import kopf

@kopf.on.create("service", labels={"dask.org/component": "scheduler"})
async def handle_scheduler_service_create(meta, new, namespace, logger, **kwargs):
   # Do something here
   # See https://kopf.readthedocs.io/en/stable/handlers for documentation on what is possible here

然后您需要确保您的 pyproject.toml 将该插件注册为 dask_operator_plugin

...

[option.entry_points]
dask_operator_plugin =
   my_controller_plugin = my_controller_plugin.plugin

然后您可以将其打包并推送到您首选的 Python 包仓库。

安装您的插件

当 Dask Operator 控制器启动时,它会检查是否通过 dask_operator_plugin 入口点注册了任何插件,并也会加载这些插件。这意味着安装您的插件非常简单,只需确保您的插件包安装在控制器容器镜像中即可。

控制器默认使用 ghcr.io/dask/dask-kubernetes-operator:latest 容器镜像,因此您的自定义容器 Dockerfile 将会是这样

FROM ghcr.io/dask/dask-kubernetes-operator:latest

RUN pip install my-controller-plugin

然后,当您 安装控制器部署 时,无论是通过 manifest 文件还是使用 helm,您都应指定您的自定义容器镜像。

helm install --set image.name=my_controller_image myrelease dask/dask-kubernetes-operator