安装airflow
pip install apache-airflow
配置文件
会在~目录下生成一个airflow目录,airflow.cfg为配置表,可配置mysql,默认为sqlite
查看当前数据库配置情况
>>>airflow config get-value core sql_alchemy_conn
sqlite:////tmp/airflow/airflow.db # 为当前配置
若配置mysql需要先生成airflow的库和表和user 再进行替换配置
CREATE DATABASE airflow_db CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
CREATE USER 'airflow_user' IDENTIFIED BY 'airflow_pass';
GRANT ALL PRIVILEGES ON airflow_db.* TO 'airflow_user';
再在airflow.cfg中配置mysql:
mysql+pymysql://<user>:<password>@<host>[:<port>]/<dbname>
初始化airflow
airflow db init
创建用户
airflow users create \
--role Admin \
--username admin \
--firstname FIRST_NAME \
--lastname LAST_NAME \
--email EMAIL@example.org
安装在线编辑代码插件
pip install airflow-code-editor
启动网页服务
airflow webserver --port 8080
启动定时任务模块
airflow scheduler
airflow插件Dags Code Editor的文件位置
~/airflow/dags
dag任务示例
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
import sys
sys.path.append('/usr/local/project1')
from project1 import task1
sys.path.append('/usr/local/yibai-price-strategy')
from success import success_tasks, fail_tasks
default_args = {
'owner': 'SNOW',
'start_date': datetime(2021, 3, 1, 10, 0),
'retries': 3,
'retry_delay': timedelta(minutes=1),
'on_failure_callback': fail_tasks, # 失败了触发这个
'on_success_callback': success_tasks, # 成功了触发这个
}
Dag = DAG('task1', default_args=default_args, catchup=False, schedule_interval='15 10 * * *',
tags=['task1任务'], )
task1 = PythonOperator(
task_id="task1_id",
python_callable= task1,
dag=Dag
)
task1