CeleryとRabbitMQを使用してマルチノードAirflowクラスターをセットアップする方法

エアフローとは何ですか?







Apache Airflowは、高度なワークフローマネージャーであり、最新のデータエンジニアの武器に欠かせないツールです。







Airflowを使用すると、タスクの有向非巡回グラフ(DAG)の形式でワークフローを作成できます。さまざまなコマンドラインユーティリティがDAGで複雑な操作を実行します。ユーザーインターフェイスは、実稼働環境で実行されているパイプラインを簡単に視覚化し、進行状況を監視し、必要に応じてトラブルシューティングを行います。







プログラムでワークフローを作成、計画、および制御します。これは、べき等DAG(有向非巡回グラフ)の形式で機能の抽象化を提供します。指定された間隔でタスクを実行するための抽象化サービスとしての機能。







1つのAirflowノードを持つクラスター







単一ノードのAirflowクラスターでは、すべてのコンポーネント(ワーカー、スケジューラー、Webサーバー)が「マスターノード」と呼ばれる単一のノードにインストールされます。単一ノードクラスターをスケーリングするには、Airflow



で構成する必要がありますLocalExecutor



ワーカーはIPC(プロセス間通信)キューからタスクを取得(プル)します。これは、リソースがマスターノードで使用可能である限り非常に適切に拡張されます。複数のノード間でエアフローをスケーリングするには、を有効にする必要がありますCelery Executor















エアフローシングルノードアーキテクチャ







エアフローマルチノードクラスター







Airflow . - , , , . , Airflow CeleryExecutor



.







Celery CeleryExecutor



Airflow. / Celery Redis RabbitMQ. RabbitMQ — . — . IPC, , RabbitMQ — . RabbitMQ / , Celery . .













Airflow







Celery:







Celery — , . , . Airflow . Airflow Airflow, .







Airflow Celery:







. CentOS 7 Linux.







  1. RabbitMQ


yum install epel-release
yum install rabbitmq-server
      
      





  1. RabbitMQ Server


systemctl enable rabbitmq-server.service
systemctl start rabbitmq-server.service
      
      





  1. - RabbitMQ


rabbitmq-plugins enable rabbitmq_management
      
      











rabbitmq — 15672



, - — admin/admin



.













  1. pyamqp



    RabbitMQ PostGreSQL


pip install pyamqp
      
      





amqp://



— , librabbitmq, , py-amqp



, .







pyamqp://



librabbitmq://



, , . pyamqp://



amqp



(http://github.com/celery/py-amqp)







PostGreSQL: psycopg2







Psycopg — PostgreSQL Python.







pip install psycopg2
      
      





  1. Airflow.


pip install 'apache-airflow[all]'
      
      





airflow







airflow version
      
      











Airflow v1.10.0, .









airflow initdb
      
      





, . Airflow .







  1. Celery


Celery .







pip install celery==4.3.0
      
      





Celery







celery --version
4.3.0 (rhubarb)
      
      





  1. airflow.cfg Celery Executor.


executor = CeleryExecutor
sql_alchemy_conn = postgresql+psycopg2://airflow:airflow@{HOSTNAME}/airflow 
broker_url= pyamqp://guest:guest@{RabbitMQ-HOSTNAME}:5672/
celery_result_backend = db+postgresql://airflow:airflow@{HOSTNAME}/airflow 
dags_are_paused_at_creation = True
load_examples = False
      
      





airflow.cfg



airflow airflow initdb



, airflow



.







- airflow







# default port is 8080
airflow webserver -p 8000
      
      











# start the scheduler
airflow scheduler
      
      





airflow .







airflow worker
      
      





さまざまなエアフローサービスの開始が完了したら、次のコマンドを使用して素晴らしいエアフローインターフェイスを確認できます。







http://<IP-ADDRESS/HOSTNAME>:8000
      
      





Webサーバーサービスの開始コマンドでポート8000​​を指定したため、それ以外の場合、デフォルトのポート番号は8080です。







はい!Airflowマルチノードアーキテクチャを使用したクラスターの作成が完了しました。:)








All Articles