Airflow2.0の新しいKubernetesExecutor2.0

KubernetesExecutor2.0の新機能を紹介します。スポイラー警告 !!!このプロセスは、より速く、より柔軟で、理解しやすくなっています。







Airflow 2.0とともに、完全に再設計されたKubernetesExecutorをご紹介します。この新しいアーキテクチャは、KubernetesExecutor 1.10よりも高速で、柔軟性があり、理解しやすいものです。最初のステップとして、KubernetesExecutor2.0の新機能を紹介します。







KubernetesExecutorとは何ですか?



2018年には、自動スケーリングと柔軟性のアイデアに基づいKubernetesExecutorを導入しました。 Airflowには、Celery Workerの自動スケーリングに関する明確な概念がまだありませんでした(この点に関するKEDAとの最近の作業は非常に成功していますが)。そのため、ユーザーのニーズを満たすことができるシステムを作成したいと考えました。この調査の結果、KubernetesAPIを使用してポッドのエアフローごとのタスクを実行するシステムが作成されました。このKubernetesAPIベースのシステムの有益な副作用は、ユーザーが各タスクに固有のアドオンと制約を追加できるようになったことです。







Airflowユーザーは、Kubernetes APIとKubernetesExecutorを使用して、特定のタスクが特定のシークレットにアクセスできること、またはタスクが欧州連合に存在するノードでのみ実行できることを確認できます(データ管理に役立ちます)。ユーザーは、タスクが使用しているリソースの数を指定することもできます。これは、タスクの実行内容によって大きく異なります(たとえば、TensorFlowスクリプトを実行するにはGPUへのアクセスが必要です)。このAPIを使用すると、KubernetesExecutorを使用すると、データエンジニアは、既存のCeleryキューを使用する場合よりも、Airflowがタスクを実行する方法をより細かく制御できます。







, KubernetesExecutor . pod , , Celery ( , ). , CeleryExecutor , . , CeleryExecutor, KubernetesExecutor Airflow, Airflow 2.0 , CeleryKubernetesExecutor, !







KubernetesExecutor



podtemplate



Airflow 1.10.12 pod_template_file



. Kubernetes KubernetesExecutor. , Airflow API Kubernetes .







pod_template_files



Airflow. pod_template_file



, , , CeleryExecutor .







pod pod_template_files



, 2.0 , , pod Kubernetes, . pod , Celery. — KubernetesExecutor.







Execitor_config



Airflow 2.0 executor_config



, . , Python , API Kubernetes. executor_config



podOverride



. , .







, executeor_config



- Airflow 2.0, . , .







podmutationhook



1.10.12, pod_mutation_hook



Kubernetes V1Pod Airflow pod Kubernetes API , Airflow pod. pod, KubernetesExecutor, pod, KubernetesPodOperator.









KubernetesExecutor. , pod_template_file



pod, Kubernetes pod_override



pod_mutation_hook



pod. , .







, KubernetesExecutor.













, , , . Pod , . .







.













. pod, . V1pod, .









Airflow DevOps, .







, DAG, , executor_config



podOverride. , Kubernetes DAG, , KubernetesPodOperator . KubernetesPodOperator Docker , . , executeor_config



, Kubernetes API podOverride , , , , . . , .







, , , , Python pod, . executeor_config



podOverride , PythonOperator API TaskFlow. DAG :







from airflow.decorators import dag, task
from datetime import datetime

import os
import json
import requests
from kubernetes.client import models as k8s

new_config ={ "pod_override": k8s.V1Pod(
                metadata=k8s.V1ObjectMeta(labels={"purpose": "pod-override-example"}),
                spec=k8s.V1PodSpec(
                    containers=[
                        k8s.V1Container(
                            name="base",
                            env=[
                                k8s.V1EnvVar(name="STATE", value="wa")
                                ],
                            )
                        ]
                    )
                )
            }

default_args = {
    'start_date': datetime(2021, 1, 1)
}

@dag('k8s_executor_example', schedule_interval='@daily', default_args=default_args, catchup=False)
def taskflow():

    @task(executor_config=new_config)
    def get_testing_increase():
        """
        Gets totalTestResultsIncrease field from Covid API for given state and returns value
        """
        url = 'https://covidtracking.com/api/v1/states/'
        res = requests.get(url+'{0}/current.json'.format(os.environ['STATE']))
        return{'testing_increase': json.loads(res.text)['totalTestResultsIncrease']}

    get_testing_increase()

dag = taskflow()
      
      





new_config



, pod Kubernetes API. DAG , API Covid . , podOverride. Airflow Kubernetes.







KubernetesExecutor



KubernetesExecutor, . , — .







YAML. DAG, DAG git DAG Kubernetes Volume.







, airflow.cfg YAML . YAML .







これらの3つの新機能の最も優れている点は、すべてAirflow1.10.13で利用できることです。移行プロセスをすぐに開始して、このシンプルな設計のメリットと加速を享受できます。ご意見、ご感想をお待ちしております。ご不明な点、機能のリクエスト、ドキュメントについては、お気軽にお問い合わせください。








All Articles