guopengfa
发布于 2021-06-20 / 824 阅读 / 0 评论 / 0 点赞

Python任务调度工具Apache-Airflow的安装使用

安装airflow

pip install apache-airflow

配置文件

会在~目录下生成一个airflow目录,airflow.cfg为配置表,可配置mysql,默认为sqlite

查看当前数据库配置情况

>>>airflow config get-value core sql_alchemy_conn
sqlite:////tmp/airflow/airflow.db # 为当前配置

若配置mysql需要先生成airflow的库和表和user 再进行替换配置

CREATE DATABASE airflow_db CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
CREATE USER 'airflow_user' IDENTIFIED BY 'airflow_pass';
GRANT ALL PRIVILEGES ON airflow_db.* TO 'airflow_user';

再在airflow.cfg中配置mysql:
mysql+pymysql://<user>:<password>@<host>[:<port>]/<dbname>

初始化airflow

airflow db init

创建用户

airflow users create \
    --role Admin \
    --username admin \
    --firstname FIRST_NAME \
    --lastname LAST_NAME \
    --email EMAIL@example.org

安装在线编辑代码插件

pip install airflow-code-editor

启动网页服务

airflow webserver --port 8080

启动定时任务模块

airflow scheduler

airflow插件Dags Code Editor的文件位置

~/airflow/dags

dag任务示例

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
import sys
sys.path.append('/usr/local/project1')
from project1 import task1

sys.path.append('/usr/local/yibai-price-strategy')
from success import success_tasks, fail_tasks

default_args = {
    'owner': 'SNOW',
    'start_date': datetime(2021, 3, 1, 10, 0),
    'retries': 3,
    'retry_delay': timedelta(minutes=1),
    'on_failure_callback': fail_tasks,  # 失败了触发这个
    'on_success_callback': success_tasks,  # 成功了触发这个
}

Dag = DAG('task1', default_args=default_args, catchup=False, schedule_interval='15 10 * * *',
          tags=['task1任务'], )


task1 = PythonOperator(
    task_id="task1_id",
    python_callable= task1,
    dag=Dag
)


task1

完成,愉快使用,官方文档


评论