创建简单 DAG 文件

创建简单 DAG 文件

DAG 文件示例

可以在主页查看到很多 DAG 示例文件,进入对应 DAG code 页面,可以看到对应的代码

当前编写一个简单的 DAG 文件, test_dag.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
# [START import_module]
from datetime import datetime, timedelta
from textwrap import dedent
import os
import time


# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG

# Operators; we need this to operate!

from airflow.operators.python import PythonOperator # 使用 PythonOperator

from airflow.models import Variable

from airflow.providers.postgres.operators.postgres import PostgresOperator
import psycopg2

from airflow.operators.email import EmailOperator


from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.providers.http.sensors.http import HttpSensor


import json

from airflow.operators.trigger_dagrun import TriggerDagRunOperator
# [END import_module]

# [START default_args]
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['[email protected]'],
'email_on_failure': True, # 设置邮件失败也会发送
'email_on_retry': True, # 设置邮件重试也会发送
'retries': 2, # 重试次数
'retry_delay': timedelta(minutes=5), # 重试间隔
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'dag': dag,
# 'sla': timedelta(hours=2),
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function,
# 'on_success_callback': some_other_function,
# 'on_retry_callback': another_function,
# 'sla_miss_callback': yet_another_function,
# 'trigger_rule': 'all_success'
}
# [END default_args]

def sum100():
sum = 0
for i in range(100):
sum += i
print(sum)
return sum # 测试 xcom 跨任务通信

# 获取 sum_1_100 的值,测试 xcom 跨任务通信
def get_sum100(**context):
sum_value = context['ti'].xcom_pull(dag_id='test_dag',task_ids='sum_1_100',key= "return_value")
#sum_value = context['ti'].xcom_pull(task_ids='sum_1_100', key= "return_value")
print(sum_value)
print('1+2+...+101=',sum_value+101)


def use_variable():
# 需要在网页上配置 variable,以及导入包
variable_1 = Variable.get("key1")
variable_2 = Variable.get("key2_json", deserialize_json = True)
print(variable_1)
print(variable_2)

def run_with_config(**context):
config_value = context["dag_run"].conf.get("key")
value = 'not found config_value'
if config_value:
value = config_value
print(value)

def run_failed():
print('this task will fail')
print(1/0)


def use_pg_insert():
# 通过 hook 获取 connection 中保存的信息
from airflow.hooks.base_hook import BaseHook
conn = BaseHook.get_connection('airflow_pg')
print(conn)

pgdb = psycopg2.connect(
host=conn.host,
user=conn.login,
password=conn.password,
database=conn.schema,
port=conn.port
)
pgcursor = pgdb.cursor()
now_1 = datetime.now()
str_now_1 = now_1.strftime('%Y%m%d-%H%M%S')
now_2 = datetime.now()
str_now_2 = now_2.strftime('%Y%m%d-%H%M%S')
now_times_list = [str_now_1, str_now_2]
for now_time_str in now_times_list:
sql = f"""
INSERT INTO test(str_time) VALUES ('{now_time_str}')
"""
print(sql)
pgcursor.execute(sql)
pgdb.commit()

print(pgcursor.rowcount, "record inserted.")
pgdb.close()

def check(response):
if response == 200:
print("Returning True")
return True
else:
print("Returning False")
return False

def sleep_60s():
print('sleep 60s')
time.sleep(60)
print('sleep 60s finished')

def sleep_30s():
print('sleep 30s')
time.sleep(30)
print('sleep 30s finished')

# 测试此 task 运行完后,然后另一个 DAG 运行
def print_time_to_txt():
print('hello world')
os.system("echo $(date)-from_task_dag_print_time > /opt/airflow/logs/date.txt")

# [START instantiate_dag]
with DAG(
'test_dag', # DAG 的名字
default_args=default_args, # 使用的上面定义的基本变量
description='简单 DAG 示例', # DAG 的描述信息
schedule_interval='30 20 * * *', # 每天 20:30 运行一次
start_date=datetime(2021, 11, 15), # 开始时间
catchup=False, # 关闭补跑
max_active_runs = 1, # 同时只能进行 1个 任务执行
tags=['example'], # 标签
) as dag:
# [END instantiate_dag]

# t1, t2 and t3 are examples of tasks created by instantiating operators
# [START basic_task]

dag.doc_md = """
This is a documentation placed anywhere
""" # otherwise, type it like this

# [END basic_task]

# 需要引入 PythonOperator
sum100_task = PythonOperator(
task_id = "sum_1_100",
python_callable = sum100
)

get_sum_100 = PythonOperator(
task_id = "get_sum_100",
python_callable = get_sum100
)

print_time_to_text_task = PythonOperator(
task_id = "print_time_to_txt",
python_callable = print_time_to_txt
)
use_variable = PythonOperator(
task_id = "use_variable",
python_callable = use_variable
)
run_with_config = PythonOperator(
task_id = "run_with_config",
python_callable = run_with_config
)
# 测试重试次数和间隔
run_failed = PythonOperator(
task_id = "run_failed",
python_callable = run_failed
)

use_pg_insert_task =PythonOperator(
task_id='use_pg_insert',
python_callable=use_pg_insert,
)
# 需要引入 PostgresOperator ,并且在网页中 admin/connections 中配置好连接信息
pgsql_file_task = PostgresOperator(
task_id='pgsql_file_task',
postgres_conn_id='airflow_pg',
sql='test_update_id1.sql', # ./dag 目录下
dag=dag,
)

# 需要引入EmailOperator ,并且正确配置邮件信息
email_task = EmailOperator(
task_id = "send_email",
to='[email protected]',
subject='DAG TEST - {{ds}}', # 测试 jinja_templated
html_content="""
<h3>Email Test</h3> {{ ds_nodash }}<br/>{{ dag }}<br/>{{ conf }}<br/>{{ next_ds }}<br/>{{ yesterday_ds }}<br/>{{ tomorrow_ds }}<br/>{{ execution_date }}<br/>
""",
dag=dag
)

# 使用 HttpSensor 和 SimpleHttpOperator 测试 sensor 功能以及 provider
task_http_sensor_check = HttpSensor(
task_id='checkuser_from_httpsensor',
http_conn_id='http_default',
method='GET',
endpoint='api/v1/users/httpsensor',
response_check=lambda response: True if check(response.status_code) is True else False,
poke_interval=5,
dag=dag,

)
task_sleep_60s =PythonOperator(
task_id='task_sleep_60s',
python_callable=sleep_60s,
)

task_sleep_30s =PythonOperator(
task_id='task_sleep_30s',
python_callable=sleep_30s,
)

task_adduser_from_api = SimpleHttpOperator(
task_id='adduser_from_api',
method='POST',
endpoint='api/v1/users',
data = json.dumps(
{
"email": "[email protected]",
"first_name": "httpsensor",
"last_name": "httpsensor",
"roles": [
{
"name": "Public"
}
],
"username": "httpsensor",
"password": "httpsensor"
}
),
headers={"Content-Type": "application/json"},
dag=dag,
)

task_deluser_from_api = SimpleHttpOperator(
task_id='deluser_from_api',
method='DELETE',
endpoint='api/v1/users/httpsensor',
headers={"Content-Type": "application/x-www-form-urlencoded"},
dag=dag,
)


# 测试通知 下一个 DAG 运行
trigger_next_dag = TriggerDagRunOperator(
trigger_dag_id = "after_print_time_to_txt",
task_id = "get_time_txt_from_test_dag",
wait_for_completion =False # 只通知运行,不等待运行结果
)

sum100_task >> get_sum_100 >> email_task # 任务 sum100_task 执行完后执行 get_sum_100

use_pg_insert_task >> pgsql_file_task >> email_task
task_sleep_60s >> task_adduser_from_api >>task_sleep_30s >> task_deluser_from_api

print_time_to_text_task >> trigger_next_dag
# [END tutorial]

详细信息

正常情况下,一个 DAG 最好只运行一组相关的任务。此 DAG 仅作测试

导入的包和基本变量

一般最好写在 任务导入的地方, 此处先把所有的 导入的包展示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# [START import_module]
from datetime import datetime, timedelta
from textwrap import dedent

import os
import time


# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG

# Operators; we need this to operate!

from airflow.operators.python import PythonOperator # 使用 PythonOperator

from airflow.models import Variable

from airflow.hooks.base_hook import BaseHook
from airflow.providers.postgres.operators.postgres import PostgresOperator
import psycopg2

from airflow.operators.email import EmailOperator


from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.providers.http.sensors.http import HttpSensor

import json

from airflow.operators.trigger_dagrun import TriggerDagRunOperator

default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['[email protected]'],
'email_on_failure': True, # 设置邮件失败也会发送
'email_on_retry': True, # 设置邮件重试也会发送
'retries': 2, # 重试次数
'retry_delay': timedelta(minutes=5), # 重试间隔
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'dag': dag,
# 'sla': timedelta(hours=2),
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function,
# 'on_success_callback': some_other_function,
# 'on_retry_callback': another_function,
# 'sla_miss_callback': yet_another_function,
# 'trigger_rule': 'all_success'
}
# [END default_args]
测试 xcom 跨任务通信

一个任务的值,另一个任务可以获取

1
2
3
4
5
6
7
8
9
10
11
12
13
def sum100():
sum = 0
for i in range(100):
sum += i
print(sum)
return sum # 测试 xcom 跨任务通信

# 获取 sum_1_100 的值,测试 xcom 跨任务通信
def get_sum100(**context):
sum_value = context['ti'].xcom_pull(dag_id='test_dag',task_ids='sum_1_100',key= "return_value")
#sum_value = context['ti'].xcom_pull(task_ids='sum_1_100', key= "return_value")
print(sum_value)
print('1+2+...+101=',sum_value+101)

查看日志,有获取到值

xcomlog

查看页面,也有显示

xcoms页面

测试全局变量
1
2
3
4
5
6
7
8
9

def use_variable():
# 需要在网页上配置 variable,以及导入包

from airflow.models import Variable
variable_1 = Variable.get("key1")
variable_2 = Variable.get("key2_json", deserialize_json = True)
print(variable_1)
print(variable_2)

定义的 variable 页面

variable页面

查看日志,有获取到值

variable日志

测试运行任务时传参
1
2
3
4
5
6
def run_with_config(**context): 
config_value = context["dag_run"].conf.get("key")
value = 'not found config_value'
if config_value:
value = config_value
print(value)

未传入参数时

未传入参数时

传入的参数

传入参数

获取到传入的值

获取到传入的值

测试失败的任务

主要测试 retry 次数

1
2
3
def run_failed():
print('this task will fail')
print(1/0)

根据定义重试了2次,

失败次数

测试获取connections 信息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def use_pg_insert():
# 通过 hook 获取 connection 中保存的信息
from airflow.hooks.base_hook import BaseHook
conn = BaseHook.get_connection('airflow_pg')
print(conn)

pgdb = psycopg2.connect(
host=conn.host,
user=conn.login,
password=conn.password,
database=conn.schema,
port=conn.port
)
pgcursor = pgdb.cursor()
now_1 = datetime.now()
str_now_1 = now_1.strftime('%Y%m%d-%H%M%S')
now_2 = datetime.now()
str_now_2 = now_2.strftime('%Y%m%d-%H%M%S')
now_times_list = [str_now_1, str_now_2]
for now_time_str in now_times_list:
sql = f"""
INSERT INTO test(str_time) VALUES ('{now_time_str}')
"""
print(sql)
pgcursor.execute(sql)
pgdb.commit()

print(pgcursor.rowcount, "record inserted.")
pgdb.close()

获取 连接信息

connection页面

查看已插入信息

已插入信息

同时可以看到 id=1 的值 被修改

测试 PostgresOperator
1
2
3
4
5
6
7
8
# 需要引入 PostgresOperator ,并且在网页中 admin/connections 中配置好连接信息
from airflow.providers.postgres.operators.postgres import PostgresOperator
pgsql_file_task = PostgresOperator(
task_id='pgsql_file_task',
postgres_conn_id='airflow_pg',
sql='test_update_id1.sql', # ./dag 目录下
dag=dag,
)

test_update_id1.sql 路径在 ./dag 下

1
update test set str_time = now() where id = 1

id = 1 的值已被修改

已插入信息

测试发送邮件,以及templating-with-jinja
1
2
3
4
5
6
7
8
9
10
11
# 需要引入EmailOperator ,并且正确配置邮件信息
from airflow.operators.email import EmailOperator
email_task = EmailOperator(
task_id = "send_email",
to='[email protected]',
subject='DAG TEST - {{ds}}', # 测试 jinja_templated
html_content="""
<h3>Email Test</h3> {{ ds_nodash }}<br/>{{ dag }}<br/>{{ conf }}<br/>{{ next_ds }}<br/>{{ yesterday_ds }}<br/>{{ tomorrow_ds }}<br/>{{ execution_date }}<br/>
""",
dag=dag
)

使用了 jinja_templated,详细可参考 https://airflow.apache.org/docs/apache-airflow/stable/tutorial.html#templating-with-jinja

邮件模板

定义的失败也发送邮件

失败也发送邮件

测试 sensor 功能以及 provider

使用 HttpSensor 和 SimpleHttpOperator 测试 sensor 功能以及 provider

task_http_sensor_check : 每 5s 监控 是否有 httpsensor 用户信息

60s 后,创建用户 httpsensor ,30s 后, 然后删除用户 httpsensor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.providers.http.sensors.http import HttpSensor

import json


def check(response):
if response == 200:
print("Returning True")
return True
else:
print("Returning False")
return False

def sleep_60s():
print('sleep 60s')
time.sleep(60)
print('sleep 60s finished')

def sleep_30s():
print('sleep 30s')
time.sleep(30)
print('sleep 30s finished')
......................

# 使用 HttpSensor 和 SimpleHttpOperator 测试 sensor 功能以及 provider
task_http_sensor_check = HttpSensor(
task_id='checkuser_from_httpsensor',
http_conn_id='http_default',
method='GET',
endpoint='api/v1/users/httpsensor',
response_check=lambda response: True if check(response.status_code) is True else False,
poke_interval=5,
dag=dag,

)
task_sleep_60s =PythonOperator(
task_id='task_sleep_60s',
python_callable=sleep_60s,
)
task_sleep_30s =PythonOperator(
task_id='task_sleep_30s',
python_callable=sleep_30s,
)
task_adduser_from_api = SimpleHttpOperator(
task_id='adduser_from_api',
method='POST',
endpoint='api/v1/users',
data = json.dumps(
{
"email": "[email protected]",
"first_name": "httpsensor",
"last_name": "httpsensor",
"roles": [
{
"name": "Public"
}
],
"username": "httpsensor",
"password": "httpsensor"
}
),
headers={"Content-Type": "application/json"},
dag=dag,
)

task_deluser_from_api = SimpleHttpOperator(
task_id='deluser_from_api',
method='DELETE',
endpoint='api/v1/users/httpsensor',
headers={"Content-Type": "application/x-www-form-urlencoded"},
dag=dag,
)

# 60s 后,创建用户,30s后, 然后删除用户
task_sleep_60s >> task_adduser_from_api >> task_sleep_30s >> task_deluser_from_api

可以查看到每隔 5秒观察一次,直到成功

每隔5秒观察一次

刚好这个时间点 创建用户 httpsensor

api创建用户

30s 后删除用户

api删除用户

测试此 task 运行完后,然后通知另一个 DAG 运行

此 task 运行完后,然后通知另一个 DAG 运行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def print_time_to_txt():
print('hello world')
os.system("echo $(date)-from_task_dag_print_time > /opt/airflow/logs/date.txt")


'..................'


from airflow.operators.trigger_dagrun import TriggerDagRunOperator
trigger_next_dag = TriggerDagRunOperator(
trigger_dag_id = "after_print_time_to_txt",
task_id = "get_time_txt_from_test_dag",
wait_for_completion =False # 只通知运行,不等待运行结果
)

其中 after_print_time_to_txt.py DAG 文件如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
from datetime import datetime, timedelta
from textwrap import dedent

# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG

# Operators; we need this to operate!
from airflow.operators.bash import BashOperator

# [END import_module]

# [START default_args]
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'dag': dag,
# 'sla': timedelta(hours=2),
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function,
# 'on_success_callback': some_other_function,
# 'on_retry_callback': another_function,
# 'sla_miss_callback': yet_another_function,
# 'trigger_rule': 'all_success'
}
# [END default_args]

# [START instantiate_dag]
with DAG(
'after_print_time_to_txt',
default_args=default_args,
description='after_print_time_to_txt',
schedule_interval=timedelta(days=1),
start_date=datetime(2021, 1, 1),
catchup=False,
tags=['example'],
) as dag:
# [END instantiate_dag]

# t1, t2 and t3 are examples of tasks created by instantiating operators
# [START basic_task]
t1 = BashOperator(
task_id='print_date',
bash_command='cat /opt/airflow/logs/date.txt',
)

dag.doc_md = __doc__ # providing that you have a docstring at the beginning of the DAG
dag.doc_md = """
This is a documentation placed anywhere
""" # otherwise, type it like this
# [END documentation]
t1
# [END tutorial]

通知下一个DAG时间

通知下一个DAG时间

下一个DAG运行时间

下一个DAG运行

测试sensor, 观察到 test_dag 运行完后,再运行 watch_print_time_to_txt.py

watch_print_time_to_txt.py 内容如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
from airflow.sensors.external_task_sensor import ExternalTaskMarker, ExternalTaskSensor

default_args={
"owner": "airflow",
"start_date": datetime(2021,11, 19),
}
with DAG(
dag_id="watch_print_time_to_txt",
default_args=default_args,
schedule_interval='35 20 * * *',
#concurrency=1,
#max_active_runs=1,
tags=['example1'],
) as child_dag:
# [START howto_operator_external_task_sensor]
child_task1 = ExternalTaskSensor(
task_id="watch_print_time_to_txt",
external_dag_id="test_dag",
external_task_id="print_time_to_text_task",
# timeout=600,
# allowed_states=['success'],
# failed_states=['failed', 'skipped'],
execution_delta=timedelta(minutes=5) # 观察 5 分钟前
# mode= "reschedule",
)

# [END howto_operator_external_task_sensor]
t1 = BashOperator(
task_id='print_date',
bash_command='cat /opt/airflow/logs/date.txt',
dag = child_dag
)

child_task1 >> t1

不停观察务开始5分钟前的test_dag

不停观察务开始5分钟前的test_dag

测试 Dynamic_DAG

一个 python 文件,生成多个 DAG 文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from datetime import timedelta


def print_hello():
print('hello')


def print_world():
print('world')


default_args = {
'owner' : 'airflow',
'email': ['[email protected]'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 1,
'retry_delay': timedelta(seconds=30),
}


def generate_dag(investment_type):
with DAG(
dag_id = f'print_{investment_type}_value',
default_args=default_args,
description=f'This DAG print {investment_type} value',
schedule_interval='5 5 * * *',
start_date=days_ago(2),
tags=['example'],
catchup=False,
max_active_runs=1,
) as dag:
print_hello_task = PythonOperator(
task_id=f'print_{investment_type}_hello',
python_callable=print_hello,
)
print_world_task = PythonOperator(
task_id=f'print_{investment_type}_world',
python_callable=print_world,
)
print_hello_task >> print_world_task
return dag

investment_types= ['DAG_A', 'DAG_B', 'DAG_C', 'DAG_D']
for _type in investment_types:
globals()["Dynamic_DAG"+_type] = generate_dag(_type)

可以的看到 生成了 多个 DAG

生成多个DAG

写完保存到 ./dag 文件夹下可以被定时扫描到,并且可在 code 界面上详细查看