Airflow 中编写第一个 DAG-自研课程-0101001


要在 Apache Airflow 中编写第一个 DAG(有向无环图),您可以按照以下步骤进行:

  1. 创建 DAG 文件:在 Airflow 的 dags 目录下创建一个 Python 文件,例如 hello_world_dag.py。如果 dags 目录尚不存在,请先创建该目录。

  2. 导入必要的模块:在 DAG 文件中,导入 DAG 类和所需的操作符(Operator),例如 PythonOperatorBashOperator,以及日期时间模块。

python from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime

  1. 定义默认参数:设置 DAG 的默认参数,例如所有任务的拥有者、开始日期、重试次数等。

python default_args = { 'owner': 'airflow', 'start_date': datetime(2025, 4, 4), 'retries': 1, 'retry_delay': timedelta(minutes=5), }

  1. 实例化 DAG 对象:创建一个 DAG 实例,设置 DAG 的唯一标识符(dag_id)、描述、默认参数和调度间隔。

python with DAG( 'hello_world_dag', default_args=default_args, description='我的第一个 Airflow DAG', schedule_interval='@daily', # 每天执行一次 ) as dag: # 任务定义将在此处添加

  1. 定义任务:在 DAG 的上下文中,定义任务并指定其操作。例如,使用 PythonOperator 来执行一个简单的 Python 函数。

```python def hello_world(): print('Hello, World!')

t1 = PythonOperator( task_id='hello_task', python_callable=hello_world, dag=dag, ) ``` 

  1. 设置任务依赖:如果有多个任务,您可以设置任务之间的依赖关系,确定执行顺序。例如:

python t1 # 如果只有一个任务,直接定义即可

  1. 部署 DAG 文件:将编写好的 DAG 文件保存到 Airflow 的 dags 目录中。Airflow 会自动检测该目录中的新文件,并加载其中的 DAG。

  2. 启动 Airflow 服务:确保 Airflow 的 Web 服务器和调度器正在运行。您可以使用以下命令启动它们:

bash airflow webserver -p 8080 # 启动 Web 服务器,监听 8080 端口 airflow scheduler # 启动调度器

  1. 访问 Web 界面:在浏览器中访问 http://localhost:8080,您将看到 Airflow 的 Web 界面。在“DAGs”列表中,您应该能够看到刚刚创建的 hello_world_dag,并可以手动触发或等待其按计划执行。

通过上述步骤,您就成功编写并部署了第一个 Airflow DAG。