エアフローは、データパイプライン、つまりETLのオーケストレーションと計画に理想的な選択肢です。これは広く使用されており、将来のデータパイプラインで人気があります。関数型抽象化を通じて、埋め戻し、バージョン管理、および系統を提供します。
関数型プログラミングは未来です。
, DAG — . , . Apache Spark Apache Drill, , , , DAG! Airflow.
, :
.
.
.
.
, .
— .
Haskell, Scala, Erlang Kotlin, , , , , ! . — .
ETL / Data Lake / Streaming Infrastructure , Hadoop / Spark , Hortonworks, MapR, Cloudera . . , , Apache Hadoop / Apache Spark Cluster, Airflow Cluster .
ETL , , Oozie, Luigi Airflow. Oozie XML, 2019 ! :), Luigi , Airflow Airbnb.
Luigi Airflow?
Airflow , Luigi cron.
Luigi .
Luigi .
Luigi - Cron.
Luigi .
Luigi , .
Airflow Luigi Scikit-learn, Numpy, Pandas, Theano . .
, Airflow Celery RabbitMQ Ambari.
, .
Airflow Hadoop Spark Cluster, Airflow Spark/Hive/Hadoop Map Reduce, .
!
airflow-ambari-mpack ( Apache Airflow Apache Ambari), FOSS Contributor https://github.com/miho120/ambari-airflow-mpack, .
:
1 4, RabbitMQ .
- Apache MPack Airflow
a. git clone https://github.com/miho120/ambari-mpack.git b. stop ambari server c. install the apache mpack for airflow on ambari server d. start ambari server
- Airflow Service Ambari
Ambari
http://<HOST_NAME>:8080
Ambari (Ambari UI), -> . (Actions -> Add Service)
HDP Ambari Dashboard
1 , Airflow Ambari.
Airflow Ambari
, -, . Airflow -, master , , Install Worker data-.
Ambari Master / Name Airflow
, - Airflow Airflow Name Hadoop / Spark.
, Airflow Worker Data .
, 3 (worker) data .
Airflow Ambari
Ambari UI: 3 Airflow
, , / , . + .
Airflow Ambari:
Airflow Service, Config Ambari.
Airflow Ambari
- Executor
executor = CeleryExecutor
Advanced airflow-core-site Executor CeleryExecutor.
- SQL Alchemy Connection
sql_alchemy_conn = postgresql+psycopg2://airflow:airflow@{HOSTNAME}/airflow
SQL Alchemy Connection
SQL Alchemy postgresql, .
- URL-
broker_url= pyamqp://guest:guest@{RabbitMQ-HOSTNAME}:5672/ celery_result_backend = db+postgresql://airflow:airflow@{HOSTNAME}/airflow
URL- Celery result backend Airflow
dags_are_paused_at_creation = True load_examples = False
Airflow-core-site.
, Ambari Airflow, Ambari , Service Actions -> InitDB.
Airflow Initdb Ambari
airflow. Airflow.
- - Airflow:
- RabbitMQ :
- RabbitMQ :
- RabbitMQ :
- Celery Flower
Celery Flower — - Celery. — 5555.
, 3 , «» Celery .
, «Celery Flower», -, Celery, . airflow flower
, - Flower.
nohup airflow flower >> /var/local/airflow/logs/flower.logs &
Airflow Ambari HDP Hadoop / Spark Cluster.
, .
, Apache Airflow, « Multi-Node Airflow Cluster HDP Ambari Celery ». , . , .
, Multi-Node Airflow Cluster.
1. LocalExecutor CeleryExecutor , .
Worker Scheduler Celery.
:
AttributeError: ‘DisabledBackend’ object has no attribute ‘_get_task_meta_for’ Apr 10 21:03:52 charlie-prod airflow_control.sh: [2019–04–10 21:03:51,962] {celery_executor.py:112} ERROR — Error syncing the celery executor, ignoring it: Apr 10 21:03:52 charlie-prod airflow_control.sh: [2019–04–10 21:03:51,962] {celery_executor.py:113} ERROR — ‘DisabledBackend’ object has no attribute ‘_get_task_meta_for’
Airflow , , , . . Celery .
:
Celery 3.3.5 ( Airflow 1.10 ( ).
pip install --upgrade celery 3.3.5 => 4.3
2: DAG CeleryExecutor DAG - , , .
Apr 11 14:13:13 charlie-prod airflow_control.sh: return load(BytesIO(s)) Apr 11 14:13:13 charlie-prod airflow_control.sh: TypeError: Required argument ‘object’ (pos 1) not found Apr 11 14:13:13 charlie-prod airflow_control.sh: [2019–04–11 14:13:13,847: ERROR/ForkPoolWorker-6285] Pool process <celery.concurrency.asynpool.Worker object at 0x7f3a88b7b250> error: TypeError(“Required argument ‘object’ (pos 1) not found”,) Apr 11 14:13:13 charlie-prod airflow_control.sh: Traceback (most recent call last): Apr 11 14:13:13 charlie-prod airflow_control.sh: File “/usr/lib64/python2.7/site-packages/billiard/pool.py”, line 289, in __call__ Apr 11 14:13:13 charlie-prod airflow_control.sh: sys.exit(self.workloop(pid=pid)) Apr 11 14:13:13 charlie-prod airflow_control.sh: File “/usr/lib64/python2.7/site-packages/billiard/pool.py”, line 347, in workloop Apr 11 14:13:13 charlie-prod airflow_control.sh: req = wait_for_job() Apr 11 14:13:13 charlie-prod airflow_control.sh: File “/usr/lib64/python2.7/site-packages/billiard/pool.py”, line 447, in receive Apr 11 14:13:13 charlie-prod airflow_control.sh: ready, req = _receive(1.0) Apr 11 14:13:13 charlie-prod airflow_control.sh: File “/usr/lib64/python2.7/site-packages/billiard/pool.py”, line 419, in _recv Apr 11 14:13:13 charlie-prod airflow_control.sh: return True, loads(get_payload()) Apr 11 14:13:13 charlie-prod airflow_control.sh: File “/usr/lib64/python2.7/site-packages/billiard/common.py”, line 101, in pickle_loads Apr 11 14:13:13 charlie-prod airflow_control.sh: return load(BytesIO(s)) Apr 11 14:13:13 charlie-prod airflow_control.sh: TypeError: Required argument ‘object’ (pos 1) not found
:
.
airflow : https://blog.csdn.net/u013492463/article/details/80881260
, , , , .
:
broker_url= amqp://guest:guest@{RabbitMQ-HOSTNAME}:5672/
, pyamqp , , .
amqp://
— , librabbitmq
, , py-amqp
, .
pyamqp://
librabbitmq://
, , . pyamqp: // amqp (http://github.com/celery/py-amqp)
:
broker_url= pyamqp://guest:guest@{RabbitMQ-HOSTNAME}:5672/
amqp pyamqp .
:
pip install pyamqp
3: SQL Alchemy
:
SQL alchemy connection
sql_alchemy_conn = postgresql://airflow:airflow@{HOST_NAME}:5432/airflow
:
:
sql_alchemy_conn = postgresql+psycopg2://airflow:airflow@{HOSTNAME}/airflow
psycopg2
pip
wheel.
PostGreSQL: psycopg2
Psycopg
— PostgreSQL Python.
4: HDP 2.6.2 Ambari, Worker Installation .
- , , Celery worker , DAG .
:) .
by ‘SSLError(SSLError(“bad handshake: SysCallError(104, ‘ECONNRESET’)”,),)’: /simple/apache-airflow/ Retrying (Retry(total=2, connect=None, read=None, redirect=None, status=None)) after connection broken by ‘SSLError(SSLError(“bad handshake: SysCallError(104, ‘ECONNRESET’)”,),)’: /simple/apache-airflow/ Retrying (Retry(total=1, connect=None, read=None, redirect=None, status=None)) after connection broken by ‘SSLError(SSLError(“bad handshake: SysCallError(104, ‘ECONNRESET’)”,),)’: /simple/apache-airflow/ Retrying (Retry(total=0, connect=None, read=None, redirect=None, status=None)) after connection broken by ‘SSLError(SSLError(“bad handshake: SysCallError(104, ‘ECONNRESET’)”,),)’: /simple/apache-airflow/ Could not fetch URL https://pypi.org/simple/apache-airflow/: There was a problem confirming the ssl certificate: HTTPSConnectionPool(host=’pypi.org’, port=443): Max retries exceeded with url: /simple/apache-airflow/ (Caused by SSLError(SSLError(“bad handshake: SysCallError(104, ‘ECONNRESET’)”,),)) — skipping
:
, pip wheel , worker Ambari. wheel’s of pip.
, pypi wheel.
pip install --trusted-host pypi.python.org --trusted-host pypi.org --trusted-host files.pythonhosted.org --upgrade --ignore-installed apache-airflow[celery]==1.10.0' returned 1. Collecting apache-airflow[celery]==1.10.0
, , . , .
resource_management.core.exceptions.ExecutionFailed: Execution of ‘export SLUGIFY_USES_TEXT_UNIDECODE=yes && pip install — trusted-host pypi.python.org — trusted-host pypi.org — trusted-host files.pythonhosted.org — upgrade — ignore-installed apache-airflow[celery]==1.10.0’ returned 1. Collecting apache-airflow[celery]==1.10.0 Retrying (Retry(total=4, connect=None, read=None, redirect=None)) after connection broken by ‘ProtocolError(‘Connection aborted.’, error(104, ‘Connection reset by peer’))’: /simple/apache-airflow/ Retrying (Retry(total=3, connect=None, read=None, redirect=None)) after connection broken by ‘ProtocolError(‘Connection aborted.’, error(104, ‘Connection reset by peer’))’: /simple/apache-airflow/ Retrying (Retry(total=2, connect=None, read=None, redirect=None)) after connection broken by ‘ProtocolError(‘Connection aborted.’, error(104, ‘Connection reset by peer’))’: /simple/apache-airflow/ Retrying (Retry(total=1, connect=None, read=None, redirect=None)) after connection broken by ‘ProtocolError(‘Connection aborted.’, error(104, ‘Connection reset by peer’))’: /simple/apache-airflow/ Retrying (Retry(total=0, connect=None, read=None, redirect=None)) after connection broken by ‘ProtocolError(‘Connection aborted.’, error(104, ‘Connection reset by peer’))’: /simple/apache-airflow/ Could not find a version that satisfies the requirement apache-airflow[celery]==1.10.0 (from versions: ) No matching distribution found for apache-airflow[celery]==1.10.0 You are using pip version 8.1.2, however version 19.0.3 is available. You should consider upgrading via the ‘pip install — upgrade pip’ command.
pip, .
, Hack , , — , celery wheel pip, .
クラスター内で、これらの行を一時的に手動でコメントアウトし(後でワーカーのインストールに成功した後、元に戻しました)、チャームのように機能するAmbariのワーカーを追加しました:)そしてこのハックは私の一日を作りました。
別のノードにワーカーをインストールした後、Ambariからエアフローサービスを再起動する必要がある場合があります。私の以前のブログ投稿から詳細を知ることができます。データパイプライン用のHDPAmbariとCeleryを使用したマルチノードエアフロークラスターの構成