要在 Apache Airflow 中编写第一个 DAG(有向无环图),您可以按照以下步骤进行:
-
创建 DAG 文件:在 Airflow 的
dags
目录下创建一个 Python 文件,例如hello_world_dag.py
。如果dags
目录尚不存在,请先创建该目录。 -
导入必要的模块:在 DAG 文件中,导入
DAG
类和所需的操作符(Operator),例如PythonOperator
或BashOperator
,以及日期时间模块。
python
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
- 定义默认参数:设置 DAG 的默认参数,例如所有任务的拥有者、开始日期、重试次数等。
python
default_args = {
'owner': 'airflow',
'start_date': datetime(2025, 4, 4),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
- 实例化 DAG 对象:创建一个 DAG 实例,设置 DAG 的唯一标识符(
dag_id
)、描述、默认参数和调度间隔。
python
with DAG(
'hello_world_dag',
default_args=default_args,
description='我的第一个 Airflow DAG',
schedule_interval='@daily', # 每天执行一次
) as dag:
# 任务定义将在此处添加
- 定义任务:在 DAG 的上下文中,定义任务并指定其操作。例如,使用
PythonOperator
来执行一个简单的 Python 函数。
```python def hello_world(): print('Hello, World!')
t1 = PythonOperator( task_id='hello_task', python_callable=hello_world, dag=dag, ) ```
- 设置任务依赖:如果有多个任务,您可以设置任务之间的依赖关系,确定执行顺序。例如:
python
t1 # 如果只有一个任务,直接定义即可
-
部署 DAG 文件:将编写好的 DAG 文件保存到 Airflow 的
dags
目录中。Airflow 会自动检测该目录中的新文件,并加载其中的 DAG。 -
启动 Airflow 服务:确保 Airflow 的 Web 服务器和调度器正在运行。您可以使用以下命令启动它们:
bash
airflow webserver -p 8080 # 启动 Web 服务器,监听 8080 端口
airflow scheduler # 启动调度器
- 访问 Web 界面:在浏览器中访问
http://localhost:8080
,您将看到 Airflow 的 Web 界面。在“DAGs”列表中,您应该能够看到刚刚创建的hello_world_dag
,并可以手动触发或等待其按计划执行。
通过上述步骤,您就成功编写并部署了第一个 Airflow DAG。