データパイプライン用にHDPAmbariとCeleryを使用したマルチノードAirflowクラスターのセットアップ

エアフローは、データパイプライン、つまりETLのオーケストレーションと計画に理想的な選択肢です。これは広く使用されており、将来のデータパイプラインで人気があります。関数型抽象化を通じて、埋め戻し、バージョン管理、および系統を提供します。













関数型プログラミングは未来です。







, DAG — . , . Apache Spark Apache Drill, , , , DAG! Airflow.







, :







  1. .







  2. .







  3. .







  4. .







  5. , .









— .







Haskell, Scala, Erlang Kotlin, , , , , ! . — .







ETL / Data Lake / Streaming Infrastructure , Hadoop / Spark , Hortonworks, MapR, Cloudera . . , , Apache Hadoop / Apache Spark Cluster, Airflow Cluster .







ETL , , Oozie, Luigi Airflow. Oozie XML, 2019 ! :), Luigi , Airflow Airbnb.







Luigi Airflow?







  1. Airflow , Luigi cron.







  2. Luigi .







  3. Luigi .







  4. Luigi - Cron.







  5. Luigi .







  6. Luigi , .









Airflow Luigi Scikit-learn, Numpy, Pandas, Theano . .







, Airflow Celery RabbitMQ Ambari.







, .







Airflow Hadoop Spark Cluster, Airflow Spark/Hive/Hadoop Map Reduce, .







!







airflow-ambari-mpack ( Apache Airflow Apache Ambari), FOSS Contributor https://github.com/miho120/ambari-airflow-mpack, .







:







1 4, RabbitMQ .







Airflow Celery RabbitMQ







  1. Apache MPack Airflow


a. git clone https://github.com/miho120/ambari-mpack.git
b. stop ambari server
c. install the apache mpack for airflow on ambari server
d. start ambari server
      
      





  1. Airflow Service Ambari


Ambari







http://<HOST_NAME>:8080
      
      





Ambari (Ambari UI), -> . (Actions -> Add Service)













HDP Ambari Dashboard







1 , Airflow Ambari.













Airflow Ambari







, -, . Airflow -, master , , Install Worker data-.













Ambari Master / Name Airflow







, - Airflow Airflow Name Hadoop / Spark.













, Airflow Worker Data .







, 3 (worker) data .













Airflow Ambari













Ambari UI: 3 Airflow







, , / , . + .







Airflow Ambari:







Airflow Service, Config Ambari.













Airflow Ambari







  1. Executor


executor = CeleryExecutor
      
      











Advanced airflow-core-site Executor CeleryExecutor.







  1. SQL Alchemy Connection


sql_alchemy_conn = postgresql+psycopg2://airflow:airflow@{HOSTNAME}/airflow
      
      











SQL Alchemy Connection







SQL Alchemy postgresql, .







  1. URL-


broker_url= pyamqp://guest:guest@{RabbitMQ-HOSTNAME}:5672/
celery_result_backend = db+postgresql://airflow:airflow@{HOSTNAME}/airflow
      
      











URL- Celery result backend Airflow









dags_are_paused_at_creation = True
load_examples = False
      
      











Airflow-core-site.







, Ambari Airflow, Ambari , Service Actions -> InitDB.













Airflow Initdb Ambari







airflow. Airflow.







- - Airflow:







  1. RabbitMQ :








  1. RabbitMQ :








  1. RabbitMQ :








  1. Celery Flower


Celery Flower — - Celery. — 5555.













, 3 , «» Celery .







Celery Flower







, «Celery Flower», -, Celery, . airflow flower



, - Flower.







nohup airflow flower >> /var/local/airflow/logs/flower.logs &
      
      





Airflow Ambari HDP Hadoop / Spark Cluster.







, .







Multi-Node Airflow Cluster







, Apache Airflow, « Multi-Node Airflow Cluster HDP Ambari Celery ». , . , .







, Multi-Node Airflow Cluster.







1. LocalExecutor CeleryExecutor , .







Worker Scheduler Celery.







:







AttributeError: ‘DisabledBackend’ object has no attribute ‘_get_task_meta_for’
Apr 10 21:03:52 charlie-prod airflow_control.sh: [2019–04–10 21:03:51,962] {celery_executor.py:112} ERROR — Error syncing the celery executor, ignoring it:
Apr 10 21:03:52 charlie-prod airflow_control.sh: [2019–04–10 21:03:51,962] {celery_executor.py:113} ERROR — ‘DisabledBackend’ object has no attribute ‘_get_task_meta_for’
      
      





Airflow , , , . . Celery .







:







Celery 3.3.5 ( Airflow 1.10 ( ).







pip install --upgrade celery
3.3.5 => 4.3
      
      





2: DAG CeleryExecutor DAG - , , .







Apr 11 14:13:13 charlie-prod airflow_control.sh: return load(BytesIO(s))
Apr 11 14:13:13 charlie-prod airflow_control.sh: TypeError: Required argument ‘object’ (pos 1) not found
Apr 11 14:13:13 charlie-prod airflow_control.sh: [2019–04–11 14:13:13,847: ERROR/ForkPoolWorker-6285] Pool process <celery.concurrency.asynpool.Worker object at 0x7f3a88b7b250> error: TypeError(“Required argument ‘object’ (pos 1) not found”,)
Apr 11 14:13:13 charlie-prod airflow_control.sh: Traceback (most recent call last):
Apr 11 14:13:13 charlie-prod airflow_control.sh: File “/usr/lib64/python2.7/site-packages/billiard/pool.py”, line 289, in __call__
Apr 11 14:13:13 charlie-prod airflow_control.sh: sys.exit(self.workloop(pid=pid))
Apr 11 14:13:13 charlie-prod airflow_control.sh: File “/usr/lib64/python2.7/site-packages/billiard/pool.py”, line 347, in workloop
Apr 11 14:13:13 charlie-prod airflow_control.sh: req = wait_for_job()
Apr 11 14:13:13 charlie-prod airflow_control.sh: File “/usr/lib64/python2.7/site-packages/billiard/pool.py”, line 447, in receive
Apr 11 14:13:13 charlie-prod airflow_control.sh: ready, req = _receive(1.0)
Apr 11 14:13:13 charlie-prod airflow_control.sh: File “/usr/lib64/python2.7/site-packages/billiard/pool.py”, line 419, in _recv
Apr 11 14:13:13 charlie-prod airflow_control.sh: return True, loads(get_payload())
Apr 11 14:13:13 charlie-prod airflow_control.sh: File “/usr/lib64/python2.7/site-packages/billiard/common.py”, line 101, in pickle_loads
Apr 11 14:13:13 charlie-prod airflow_control.sh: return load(BytesIO(s))
Apr 11 14:13:13 charlie-prod airflow_control.sh: TypeError: Required argument ‘object’ (pos 1) not found
      
      





:







.







airflow : https://blog.csdn.net/u013492463/article/details/80881260







, , , , .







:







broker_url= amqp://guest:guest@{RabbitMQ-HOSTNAME}:5672/
      
      





, pyamqp , , .







amqp://



— , librabbitmq



, , py-amqp



, .







pyamqp://



librabbitmq://



, , . pyamqp: // amqp (http://github.com/celery/py-amqp)







:







broker_url= pyamqp://guest:guest@{RabbitMQ-HOSTNAME}:5672/
      
      





amqp pyamqp .







:







pip install pyamqp
      
      





3: SQL Alchemy







:







SQL alchemy connection







sql_alchemy_conn = postgresql://airflow:airflow@{HOST_NAME}:5432/airflow
      
      





:







:







sql_alchemy_conn = postgresql+psycopg2://airflow:airflow@{HOSTNAME}/airflow
      
      





psycopg2



pip



wheel.







PostGreSQL: psycopg2









Psycopg



— PostgreSQL Python.







4: HDP 2.6.2 Ambari, Worker Installation .







- , , Celery worker , DAG .







:) .







by ‘SSLError(SSLError(“bad handshake: SysCallError(104, ‘ECONNRESET’)”,),)’: /simple/apache-airflow/
 Retrying (Retry(total=2, connect=None, read=None, redirect=None, status=None)) after connection broken by ‘SSLError(SSLError(“bad handshake: SysCallError(104, ‘ECONNRESET’)”,),)’: /simple/apache-airflow/
 Retrying (Retry(total=1, connect=None, read=None, redirect=None, status=None)) after connection broken by ‘SSLError(SSLError(“bad handshake: SysCallError(104, ‘ECONNRESET’)”,),)’: /simple/apache-airflow/
 Retrying (Retry(total=0, connect=None, read=None, redirect=None, status=None)) after connection broken by ‘SSLError(SSLError(“bad handshake: SysCallError(104, ‘ECONNRESET’)”,),)’: /simple/apache-airflow/
 Could not fetch URL https://pypi.org/simple/apache-airflow/: There was a problem confirming the ssl certificate: HTTPSConnectionPool(host=’pypi.org’, port=443): Max retries exceeded with url: /simple/apache-airflow/ (Caused by SSLError(SSLError(“bad handshake: SysCallError(104, ‘ECONNRESET’)”,),)) — skipping
      
      





:







, pip wheel , worker Ambari. wheel’s of pip.







, pypi wheel.







pip install --trusted-host pypi.python.org --trusted-host pypi.org --trusted-host files.pythonhosted.org --upgrade  --ignore-installed apache-airflow[celery]==1.10.0' returned 1. Collecting apache-airflow[celery]==1.10.0
      
      





, , . , .







resource_management.core.exceptions.ExecutionFailed: Execution of ‘export SLUGIFY_USES_TEXT_UNIDECODE=yes && pip install — trusted-host pypi.python.org — trusted-host pypi.org — trusted-host files.pythonhosted.org — upgrade — ignore-installed apache-airflow[celery]==1.10.0’ returned 1. Collecting apache-airflow[celery]==1.10.0
 Retrying (Retry(total=4, connect=None, read=None, redirect=None)) after connection broken by ‘ProtocolError(‘Connection aborted.’, error(104, ‘Connection reset by peer’))’: /simple/apache-airflow/
 Retrying (Retry(total=3, connect=None, read=None, redirect=None)) after connection broken by ‘ProtocolError(‘Connection aborted.’, error(104, ‘Connection reset by peer’))’: /simple/apache-airflow/
 Retrying (Retry(total=2, connect=None, read=None, redirect=None)) after connection broken by ‘ProtocolError(‘Connection aborted.’, error(104, ‘Connection reset by peer’))’: /simple/apache-airflow/
 Retrying (Retry(total=1, connect=None, read=None, redirect=None)) after connection broken by ‘ProtocolError(‘Connection aborted.’, error(104, ‘Connection reset by peer’))’: /simple/apache-airflow/
 Retrying (Retry(total=0, connect=None, read=None, redirect=None)) after connection broken by ‘ProtocolError(‘Connection aborted.’, error(104, ‘Connection reset by peer’))’: /simple/apache-airflow/
 Could not find a version that satisfies the requirement apache-airflow[celery]==1.10.0 (from versions: )
No matching distribution found for apache-airflow[celery]==1.10.0
You are using pip version 8.1.2, however version 19.0.3 is available.
You should consider upgrading via the ‘pip install — upgrade pip’ command.
      
      





pip, .







, Hack , , — , celery wheel pip, .







. , . https://github.com/miho120/ambari-airflow-mpack/blob/e1c9ca004adaa3320e35ab7baa7fdb9b9695b635/airflow-service-mpack/common-services/AIRFLOW/1.10.0/package/scripts/airflow_worker_control.py







クラスター内で、これらの行を一時的に手動でコメントアウトし(後でワーカーのインストールに成功した後、元に戻しました)、チャームのように機能するAmbariのワーカーを追加しました:)そしてこのハックは私の一日を作りました。







別のノードにワーカーをインストールした後、Ambariからエアフローサービスを再起動する必要がある場合があります。私の以前のブログ投稿から詳細を知ることができます。データパイプライン用のHDPAmbariとCeleryを使用したマルチノードエアフロークラスターの構成








All Articles