KubernetesExecutor2.0の新機能を紹介します。スポイラー警告 !!!このプロセスは、より速く、より柔軟で、理解しやすくなっています。
Airflow 2.0とともに、完全に再設計されたKubernetesExecutorをご紹介します。この新しいアーキテクチャは、KubernetesExecutor 1.10よりも高速で、柔軟性があり、理解しやすいものです。最初のステップとして、KubernetesExecutor2.0の新機能を紹介します。
KubernetesExecutorとは何ですか?
2018年には、自動スケーリングと柔軟性のアイデアに基づいてKubernetesExecutorを導入しました。 Airflowには、Celery Workerの自動スケーリングに関する明確な概念がまだありませんでした(この点に関するKEDAとの最近の作業は非常に成功していますが)。そのため、ユーザーのニーズを満たすことができるシステムを作成したいと考えました。この調査の結果、KubernetesAPIを使用してポッドのエアフローごとのタスクを実行するシステムが作成されました。このKubernetesAPIベースのシステムの有益な副作用は、ユーザーが各タスクに固有のアドオンと制約を追加できるようになったことです。
Airflowユーザーは、Kubernetes APIとKubernetesExecutorを使用して、特定のタスクが特定のシークレットにアクセスできること、またはタスクが欧州連合に存在するノードでのみ実行できることを確認できます(データ管理に役立ちます)。ユーザーは、タスクが使用しているリソースの数を指定することもできます。これは、タスクの実行内容によって大きく異なります(たとえば、TensorFlowスクリプトを実行するにはGPUへのアクセスが必要です)。このAPIを使用すると、KubernetesExecutorを使用すると、データエンジニアは、既存のCeleryキューを使用する場合よりも、Airflowがタスクを実行する方法をより細かく制御できます。
, KubernetesExecutor . pod , , Celery ( , ). , CeleryExecutor , . , CeleryExecutor, KubernetesExecutor Airflow, Airflow 2.0 , CeleryKubernetesExecutor, !
KubernetesExecutor
podtemplate
Airflow 1.10.12 pod_template_file
. Kubernetes KubernetesExecutor. , Airflow API Kubernetes .
pod_template_files
Airflow. pod_template_file
, , , CeleryExecutor .
pod pod_template_files
, 2.0 , , pod Kubernetes, . pod , Celery. — KubernetesExecutor.
Execitor_config
Airflow 2.0 executor_config
, . , Python , API Kubernetes. executor_config
podOverride
. , .
, executeor_config
- Airflow 2.0, . , .
podmutationhook
1.10.12, pod_mutation_hook
Kubernetes V1Pod Airflow pod Kubernetes API , Airflow pod. pod, KubernetesExecutor, pod, KubernetesPodOperator.
KubernetesExecutor. , pod_template_file
pod, Kubernetes pod_override
pod_mutation_hook
pod. , .
, KubernetesExecutor.

, , , . Pod , . .
.

. pod, . V1pod, .
Airflow DevOps, .
, DAG, , executor_config
podOverride. , Kubernetes DAG, , KubernetesPodOperator . KubernetesPodOperator Docker , . , executeor_config
, Kubernetes API podOverride , , , , . . , .
, , , , Python pod, . executeor_config
podOverride , PythonOperator API TaskFlow. DAG :
from airflow.decorators import dag, task from datetime import datetime import os import json import requests from kubernetes.client import models as k8s new_config ={ "pod_override": k8s.V1Pod( metadata=k8s.V1ObjectMeta(labels={"purpose": "pod-override-example"}), spec=k8s.V1PodSpec( containers=[ k8s.V1Container( name="base", env=[ k8s.V1EnvVar(name="STATE", value="wa") ], ) ] ) ) } default_args = { 'start_date': datetime(2021, 1, 1) } @dag('k8s_executor_example', schedule_interval='@daily', default_args=default_args, catchup=False) def taskflow(): @task(executor_config=new_config) def get_testing_increase(): """ Gets totalTestResultsIncrease field from Covid API for given state and returns value """ url = 'https://covidtracking.com/api/v1/states/' res = requests.get(url+'{0}/current.json'.format(os.environ['STATE'])) return{'testing_increase': json.loads(res.text)['totalTestResultsIncrease']} get_testing_increase() dag = taskflow()
new_config
, pod Kubernetes API. DAG , API Covid . , podOverride. Airflow Kubernetes.
KubernetesExecutor
KubernetesExecutor, . , — .
YAML. DAG, DAG git DAG Kubernetes Volume.
, airflow.cfg YAML . YAML .
これらの3つの新機能の最も優れている点は、すべてAirflow1.10.13で利用できることです。移行プロセスをすぐに開始して、このシンプルな設計のメリットと加速を享受できます。ご意見、ご感想をお待ちしております。ご不明な点、機能のリクエスト、ドキュメントについては、お気軽にお問い合わせください。