From 04335268970408aa6c81d8dae1c5279498c100e0 Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Tue, 4 Nov 2025 04:20:20 +0800 Subject: [PATCH] [`airflow`] extend deprecated argument `concurrency` in `airflow..DAG` (`AIR301`) (#21220) ## Summary * extend AIR301 to include deprecated argument `concurrency` in `airflow....DAG` ## Test Plan update the existing test fixture in the first commit and then reorganize in the second one --- .../test/fixtures/airflow/AIR301_args.py | 1 + .../src/rules/airflow/rules/removal_in_3.rs | 1 + ...airflow__tests__AIR301_AIR301_args.py.snap | 316 ++++++++++-------- 3 files changed, 172 insertions(+), 146 deletions(-) diff --git a/crates/ruff_linter/resources/test/fixtures/airflow/AIR301_args.py b/crates/ruff_linter/resources/test/fixtures/airflow/AIR301_args.py index ce35d79338..e275a54bcd 100644 --- a/crates/ruff_linter/resources/test/fixtures/airflow/AIR301_args.py +++ b/crates/ruff_linter/resources/test/fixtures/airflow/AIR301_args.py @@ -22,6 +22,7 @@ DAG(dag_id="class_schedule_interval", schedule_interval="@hourly") DAG(dag_id="class_timetable", timetable=NullTimetable()) +DAG(dag_id="class_concurrency", concurrency=12) DAG(dag_id="class_fail_stop", fail_stop=True) diff --git a/crates/ruff_linter/src/rules/airflow/rules/removal_in_3.rs b/crates/ruff_linter/src/rules/airflow/rules/removal_in_3.rs index 78c89da0b4..562f37230d 100644 --- a/crates/ruff_linter/src/rules/airflow/rules/removal_in_3.rs +++ b/crates/ruff_linter/src/rules/airflow/rules/removal_in_3.rs @@ -196,6 +196,7 @@ fn check_call_arguments(checker: &Checker, qualified_name: &QualifiedName, argum match qualified_name.segments() { ["airflow", .., "DAG" | "dag"] => { // with replacement + diagnostic_for_argument(checker, arguments, "concurrency", Some("max_active_tasks")); diagnostic_for_argument(checker, arguments, "fail_stop", Some("fail_fast")); diagnostic_for_argument(checker, arguments, "schedule_interval", Some("schedule")); diagnostic_for_argument(checker, arguments, "timetable", Some("schedule")); diff --git a/crates/ruff_linter/src/rules/airflow/snapshots/ruff_linter__rules__airflow__tests__AIR301_AIR301_args.py.snap b/crates/ruff_linter/src/rules/airflow/snapshots/ruff_linter__rules__airflow__tests__AIR301_AIR301_args.py.snap index 6f783edc9f..e0daf99000 100644 --- a/crates/ruff_linter/src/rules/airflow/snapshots/ruff_linter__rules__airflow__tests__AIR301_AIR301_args.py.snap +++ b/crates/ruff_linter/src/rules/airflow/snapshots/ruff_linter__rules__airflow__tests__AIR301_AIR301_args.py.snap @@ -28,6 +28,8 @@ AIR301 [*] `timetable` is removed in Airflow 3.0 22 | 23 | DAG(dag_id="class_timetable", timetable=NullTimetable()) | ^^^^^^^^^ +24 | +25 | DAG(dag_id="class_concurrency", concurrency=12) | help: Use `schedule` instead 20 | @@ -36,249 +38,271 @@ help: Use `schedule` instead - DAG(dag_id="class_timetable", timetable=NullTimetable()) 23 + DAG(dag_id="class_timetable", schedule=NullTimetable()) 24 | -25 | -26 | DAG(dag_id="class_fail_stop", fail_stop=True) +25 | DAG(dag_id="class_concurrency", concurrency=12) +26 | -AIR301 [*] `fail_stop` is removed in Airflow 3.0 - --> AIR301_args.py:26:31 +AIR301 [*] `concurrency` is removed in Airflow 3.0 + --> AIR301_args.py:25:33 | -26 | DAG(dag_id="class_fail_stop", fail_stop=True) - | ^^^^^^^^^ -27 | -28 | DAG(dag_id="class_default_view", default_view="dag_default_view") +23 | DAG(dag_id="class_timetable", timetable=NullTimetable()) +24 | +25 | DAG(dag_id="class_concurrency", concurrency=12) + | ^^^^^^^^^^^ +26 | +27 | DAG(dag_id="class_fail_stop", fail_stop=True) | -help: Use `fail_fast` instead +help: Use `max_active_tasks` instead +22 | 23 | DAG(dag_id="class_timetable", timetable=NullTimetable()) 24 | -25 | + - DAG(dag_id="class_concurrency", concurrency=12) +25 + DAG(dag_id="class_concurrency", max_active_tasks=12) +26 | +27 | DAG(dag_id="class_fail_stop", fail_stop=True) +28 | + +AIR301 [*] `fail_stop` is removed in Airflow 3.0 + --> AIR301_args.py:27:31 + | +25 | DAG(dag_id="class_concurrency", concurrency=12) +26 | +27 | DAG(dag_id="class_fail_stop", fail_stop=True) + | ^^^^^^^^^ +28 | +29 | DAG(dag_id="class_default_view", default_view="dag_default_view") + | +help: Use `fail_fast` instead +24 | +25 | DAG(dag_id="class_concurrency", concurrency=12) +26 | - DAG(dag_id="class_fail_stop", fail_stop=True) -26 + DAG(dag_id="class_fail_stop", fail_fast=True) -27 | -28 | DAG(dag_id="class_default_view", default_view="dag_default_view") -29 | +27 + DAG(dag_id="class_fail_stop", fail_fast=True) +28 | +29 | DAG(dag_id="class_default_view", default_view="dag_default_view") +30 | AIR301 `default_view` is removed in Airflow 3.0 - --> AIR301_args.py:28:34 + --> AIR301_args.py:29:34 | -26 | DAG(dag_id="class_fail_stop", fail_stop=True) -27 | -28 | DAG(dag_id="class_default_view", default_view="dag_default_view") +27 | DAG(dag_id="class_fail_stop", fail_stop=True) +28 | +29 | DAG(dag_id="class_default_view", default_view="dag_default_view") | ^^^^^^^^^^^^ -29 | -30 | DAG(dag_id="class_orientation", orientation="BT") +30 | +31 | DAG(dag_id="class_orientation", orientation="BT") | AIR301 `orientation` is removed in Airflow 3.0 - --> AIR301_args.py:30:33 + --> AIR301_args.py:31:33 | -28 | DAG(dag_id="class_default_view", default_view="dag_default_view") -29 | -30 | DAG(dag_id="class_orientation", orientation="BT") +29 | DAG(dag_id="class_default_view", default_view="dag_default_view") +30 | +31 | DAG(dag_id="class_orientation", orientation="BT") | ^^^^^^^^^^^ -31 | -32 | allow_future_exec_dates_dag = DAG(dag_id="class_allow_future_exec_dates") +32 | +33 | allow_future_exec_dates_dag = DAG(dag_id="class_allow_future_exec_dates") | AIR301 [*] `schedule_interval` is removed in Airflow 3.0 - --> AIR301_args.py:41:6 + --> AIR301_args.py:42:6 | -41 | @dag(schedule_interval="0 * * * *") +42 | @dag(schedule_interval="0 * * * *") | ^^^^^^^^^^^^^^^^^ -42 | def decorator_schedule_interval(): -43 | pass +43 | def decorator_schedule_interval(): +44 | pass | help: Use `schedule` instead -38 | pass -39 | +39 | pass 40 | +41 | - @dag(schedule_interval="0 * * * *") -41 + @dag(schedule="0 * * * *") -42 | def decorator_schedule_interval(): -43 | pass -44 | +42 + @dag(schedule="0 * * * *") +43 | def decorator_schedule_interval(): +44 | pass +45 | AIR301 [*] `timetable` is removed in Airflow 3.0 - --> AIR301_args.py:46:6 + --> AIR301_args.py:47:6 | -46 | @dag(timetable=NullTimetable()) +47 | @dag(timetable=NullTimetable()) | ^^^^^^^^^ -47 | def decorator_timetable(): -48 | pass +48 | def decorator_timetable(): +49 | pass | help: Use `schedule` instead -43 | pass -44 | +44 | pass 45 | +46 | - @dag(timetable=NullTimetable()) -46 + @dag(schedule=NullTimetable()) -47 | def decorator_timetable(): -48 | pass -49 | +47 + @dag(schedule=NullTimetable()) +48 | def decorator_timetable(): +49 | pass +50 | AIR301 [*] `execution_date` is removed in Airflow 3.0 - --> AIR301_args.py:54:62 + --> AIR301_args.py:55:62 | -52 | def decorator_deprecated_operator_args(): -53 | trigger_dagrun_op = trigger_dagrun.TriggerDagRunOperator( -54 | task_id="trigger_dagrun_op1", trigger_dag_id="test", execution_date="2024-12-04" +53 | def decorator_deprecated_operator_args(): +54 | trigger_dagrun_op = trigger_dagrun.TriggerDagRunOperator( +55 | task_id="trigger_dagrun_op1", trigger_dag_id="test", execution_date="2024-12-04" | ^^^^^^^^^^^^^^ -55 | ) -56 | trigger_dagrun_op2 = TriggerDagRunOperator( +56 | ) +57 | trigger_dagrun_op2 = TriggerDagRunOperator( | help: Use `logical_date` instead -51 | @dag() -52 | def decorator_deprecated_operator_args(): -53 | trigger_dagrun_op = trigger_dagrun.TriggerDagRunOperator( +52 | @dag() +53 | def decorator_deprecated_operator_args(): +54 | trigger_dagrun_op = trigger_dagrun.TriggerDagRunOperator( - task_id="trigger_dagrun_op1", trigger_dag_id="test", execution_date="2024-12-04" -54 + task_id="trigger_dagrun_op1", trigger_dag_id="test", logical_date="2024-12-04" -55 | ) -56 | trigger_dagrun_op2 = TriggerDagRunOperator( -57 | task_id="trigger_dagrun_op2", trigger_dag_id="test", execution_date="2024-12-04" +55 + task_id="trigger_dagrun_op1", trigger_dag_id="test", logical_date="2024-12-04" +56 | ) +57 | trigger_dagrun_op2 = TriggerDagRunOperator( +58 | task_id="trigger_dagrun_op2", trigger_dag_id="test", execution_date="2024-12-04" AIR301 [*] `execution_date` is removed in Airflow 3.0 - --> AIR301_args.py:57:62 + --> AIR301_args.py:58:62 | -55 | ) -56 | trigger_dagrun_op2 = TriggerDagRunOperator( -57 | task_id="trigger_dagrun_op2", trigger_dag_id="test", execution_date="2024-12-04" +56 | ) +57 | trigger_dagrun_op2 = TriggerDagRunOperator( +58 | task_id="trigger_dagrun_op2", trigger_dag_id="test", execution_date="2024-12-04" | ^^^^^^^^^^^^^^ -58 | ) +59 | ) | help: Use `logical_date` instead -54 | task_id="trigger_dagrun_op1", trigger_dag_id="test", execution_date="2024-12-04" -55 | ) -56 | trigger_dagrun_op2 = TriggerDagRunOperator( +55 | task_id="trigger_dagrun_op1", trigger_dag_id="test", execution_date="2024-12-04" +56 | ) +57 | trigger_dagrun_op2 = TriggerDagRunOperator( - task_id="trigger_dagrun_op2", trigger_dag_id="test", execution_date="2024-12-04" -57 + task_id="trigger_dagrun_op2", trigger_dag_id="test", logical_date="2024-12-04" -58 | ) -59 | -60 | branch_dt_op = datetime.BranchDateTimeOperator( +58 + task_id="trigger_dagrun_op2", trigger_dag_id="test", logical_date="2024-12-04" +59 | ) +60 | +61 | branch_dt_op = datetime.BranchDateTimeOperator( AIR301 [*] `use_task_execution_day` is removed in Airflow 3.0 - --> AIR301_args.py:61:33 + --> AIR301_args.py:62:33 | -60 | branch_dt_op = datetime.BranchDateTimeOperator( -61 | task_id="branch_dt_op", use_task_execution_day=True, task_concurrency=5 +61 | branch_dt_op = datetime.BranchDateTimeOperator( +62 | task_id="branch_dt_op", use_task_execution_day=True, task_concurrency=5 | ^^^^^^^^^^^^^^^^^^^^^^ -62 | ) -63 | branch_dt_op2 = BranchDateTimeOperator( +63 | ) +64 | branch_dt_op2 = BranchDateTimeOperator( | help: Use `use_task_logical_date` instead -58 | ) -59 | -60 | branch_dt_op = datetime.BranchDateTimeOperator( +59 | ) +60 | +61 | branch_dt_op = datetime.BranchDateTimeOperator( - task_id="branch_dt_op", use_task_execution_day=True, task_concurrency=5 -61 + task_id="branch_dt_op", use_task_logical_date=True, task_concurrency=5 -62 | ) -63 | branch_dt_op2 = BranchDateTimeOperator( -64 | task_id="branch_dt_op2", +62 + task_id="branch_dt_op", use_task_logical_date=True, task_concurrency=5 +63 | ) +64 | branch_dt_op2 = BranchDateTimeOperator( +65 | task_id="branch_dt_op2", AIR301 [*] `task_concurrency` is removed in Airflow 3.0 - --> AIR301_args.py:61:62 + --> AIR301_args.py:62:62 | -60 | branch_dt_op = datetime.BranchDateTimeOperator( -61 | task_id="branch_dt_op", use_task_execution_day=True, task_concurrency=5 +61 | branch_dt_op = datetime.BranchDateTimeOperator( +62 | task_id="branch_dt_op", use_task_execution_day=True, task_concurrency=5 | ^^^^^^^^^^^^^^^^ -62 | ) -63 | branch_dt_op2 = BranchDateTimeOperator( +63 | ) +64 | branch_dt_op2 = BranchDateTimeOperator( | help: Use `max_active_tis_per_dag` instead -58 | ) -59 | -60 | branch_dt_op = datetime.BranchDateTimeOperator( +59 | ) +60 | +61 | branch_dt_op = datetime.BranchDateTimeOperator( - task_id="branch_dt_op", use_task_execution_day=True, task_concurrency=5 -61 + task_id="branch_dt_op", use_task_execution_day=True, max_active_tis_per_dag=5 -62 | ) -63 | branch_dt_op2 = BranchDateTimeOperator( -64 | task_id="branch_dt_op2", +62 + task_id="branch_dt_op", use_task_execution_day=True, max_active_tis_per_dag=5 +63 | ) +64 | branch_dt_op2 = BranchDateTimeOperator( +65 | task_id="branch_dt_op2", AIR301 [*] `use_task_execution_day` is removed in Airflow 3.0 - --> AIR301_args.py:65:9 + --> AIR301_args.py:66:9 | -63 | branch_dt_op2 = BranchDateTimeOperator( -64 | task_id="branch_dt_op2", -65 | use_task_execution_day=True, +64 | branch_dt_op2 = BranchDateTimeOperator( +65 | task_id="branch_dt_op2", +66 | use_task_execution_day=True, | ^^^^^^^^^^^^^^^^^^^^^^ -66 | sla=timedelta(seconds=10), -67 | ) +67 | sla=timedelta(seconds=10), +68 | ) | help: Use `use_task_logical_date` instead -62 | ) -63 | branch_dt_op2 = BranchDateTimeOperator( -64 | task_id="branch_dt_op2", +63 | ) +64 | branch_dt_op2 = BranchDateTimeOperator( +65 | task_id="branch_dt_op2", - use_task_execution_day=True, -65 + use_task_logical_date=True, -66 | sla=timedelta(seconds=10), -67 | ) -68 | +66 + use_task_logical_date=True, +67 | sla=timedelta(seconds=10), +68 | ) +69 | AIR301 [*] `use_task_execution_day` is removed in Airflow 3.0 - --> AIR301_args.py:92:9 + --> AIR301_args.py:93:9 | -90 | follow_task_ids_if_true=None, -91 | week_day=1, -92 | use_task_execution_day=True, +91 | follow_task_ids_if_true=None, +92 | week_day=1, +93 | use_task_execution_day=True, | ^^^^^^^^^^^^^^^^^^^^^^ -93 | ) +94 | ) | help: Use `use_task_logical_date` instead -89 | follow_task_ids_if_false=None, -90 | follow_task_ids_if_true=None, -91 | week_day=1, +90 | follow_task_ids_if_false=None, +91 | follow_task_ids_if_true=None, +92 | week_day=1, - use_task_execution_day=True, -92 + use_task_logical_date=True, -93 | ) -94 | -95 | trigger_dagrun_op >> trigger_dagrun_op2 +93 + use_task_logical_date=True, +94 | ) +95 | +96 | trigger_dagrun_op >> trigger_dagrun_op2 AIR301 `filename_template` is removed in Airflow 3.0 - --> AIR301_args.py:102:15 + --> AIR301_args.py:103:15 | -101 | # deprecated filename_template argument in FileTaskHandler -102 | S3TaskHandler(filename_template="/tmp/test") +102 | # deprecated filename_template argument in FileTaskHandler +103 | S3TaskHandler(filename_template="/tmp/test") | ^^^^^^^^^^^^^^^^^ -103 | HdfsTaskHandler(filename_template="/tmp/test") -104 | ElasticsearchTaskHandler(filename_template="/tmp/test") +104 | HdfsTaskHandler(filename_template="/tmp/test") +105 | ElasticsearchTaskHandler(filename_template="/tmp/test") | AIR301 `filename_template` is removed in Airflow 3.0 - --> AIR301_args.py:103:17 + --> AIR301_args.py:104:17 | -101 | # deprecated filename_template argument in FileTaskHandler -102 | S3TaskHandler(filename_template="/tmp/test") -103 | HdfsTaskHandler(filename_template="/tmp/test") +102 | # deprecated filename_template argument in FileTaskHandler +103 | S3TaskHandler(filename_template="/tmp/test") +104 | HdfsTaskHandler(filename_template="/tmp/test") | ^^^^^^^^^^^^^^^^^ -104 | ElasticsearchTaskHandler(filename_template="/tmp/test") -105 | GCSTaskHandler(filename_template="/tmp/test") +105 | ElasticsearchTaskHandler(filename_template="/tmp/test") +106 | GCSTaskHandler(filename_template="/tmp/test") | AIR301 `filename_template` is removed in Airflow 3.0 - --> AIR301_args.py:104:26 + --> AIR301_args.py:105:26 | -102 | S3TaskHandler(filename_template="/tmp/test") -103 | HdfsTaskHandler(filename_template="/tmp/test") -104 | ElasticsearchTaskHandler(filename_template="/tmp/test") +103 | S3TaskHandler(filename_template="/tmp/test") +104 | HdfsTaskHandler(filename_template="/tmp/test") +105 | ElasticsearchTaskHandler(filename_template="/tmp/test") | ^^^^^^^^^^^^^^^^^ -105 | GCSTaskHandler(filename_template="/tmp/test") +106 | GCSTaskHandler(filename_template="/tmp/test") | AIR301 `filename_template` is removed in Airflow 3.0 - --> AIR301_args.py:105:16 + --> AIR301_args.py:106:16 | -103 | HdfsTaskHandler(filename_template="/tmp/test") -104 | ElasticsearchTaskHandler(filename_template="/tmp/test") -105 | GCSTaskHandler(filename_template="/tmp/test") +104 | HdfsTaskHandler(filename_template="/tmp/test") +105 | ElasticsearchTaskHandler(filename_template="/tmp/test") +106 | GCSTaskHandler(filename_template="/tmp/test") | ^^^^^^^^^^^^^^^^^ -106 | -107 | FabAuthManager(None) +107 | +108 | FabAuthManager(None) | AIR301 `appbuilder` is removed in Airflow 3.0 - --> AIR301_args.py:107:15 + --> AIR301_args.py:108:15 | -105 | GCSTaskHandler(filename_template="/tmp/test") -106 | -107 | FabAuthManager(None) +106 | GCSTaskHandler(filename_template="/tmp/test") +107 | +108 | FabAuthManager(None) | ^^^^^^ | help: The constructor takes no parameter now