[airflow] Second positional argument to Asset/Dataset should not be a dictionary (AIR303) (#22453)

<!--
Thank you for contributing to Ruff/ty! To help us out with reviewing,
please consider the following:

- Does this pull request include a summary of the change? (See below.)
- Does this pull request include a descriptive title? (Please prefix
with `[ty]` for ty pull
  requests.)
- Does this pull request include references to any relevant issues?
-->

## Summary

This PR is related to https://github.com/apache/airflow/issues/48389.

In Airflow 3, the function signature for `airflow.Dataset`,
`airflow.datasets.Dataset`, and `airflow.sdk.Asset` has changed. Below
is the function signature for `airflow.sdk.Asset`. The second positional
argument is `uri` which is of type `str`. In the older version, the
second positional argument can be a dictionary which is the `extra:
'dict | None' = None` argument. Therefore, we need to flag this in
Airflow 3.

```python
Asset(name: 'str | None' = None, uri: 'str | None' = None, *, group: 'str | None' = None, extra: 'dict | None' = None, watchers: 'list[AssetWatcher | SerializedAssetWatcher] | None' = None) -> 'None'
```

As this is a check on constructor call, we need to create a new method
`check_constructor_arguments`, instead of on method call. The new rule
check whether the index 1 (the second argument) is a dictionary (either
a literal or a `dict()` call).

## Test Plan

The `AIR303.py` test file have been updated with new test cases. The
snapshot has been updated and all other test cases passed.

@Lee-W , could you please review it when you have a chance, and let me
know if you have further feedback. Thanks!
This commit is contained in:
Kevin Yang
2026-01-17 15:16:12 -05:00
committed by GitHub
parent 3608c620ac
commit 704d30f473
3 changed files with 290 additions and 17 deletions

View File

@@ -29,3 +29,46 @@ HookLineageCollector().create_asset(*args, extra="value")
kwargs = {"uri": "value", "name": "test"}
hlc.create_asset(**kwargs)
HookLineageCollector().create_asset(**kwargs)
# airflow.Dataset
from airflow import Dataset
extra_params = {"extra": "metadata"}
# second argument is of type dict, should raise diagnostic
Dataset("ds1", {"extra": "metadata"})
Dataset("ds1", {})
Dataset("ds1", dict())
Dataset("ds1", extra_params)
Dataset("ds1", {k: v for k, v in zip(["extra"], ["metadata"])})
# second argument is not of type dict, should not raise diagnostic
Dataset("ds1")
Dataset("ds1", extra={"key": "value"})
Dataset(uri="ds1", extra={"key": "value"})
# airflow.datasets.Dataset
from airflow.datasets import Dataset
Dataset("ds2", {"extra": "metadata"})
Dataset("ds2", {})
Dataset("ds2", dict())
Dataset("ds2", extra_params)
Dataset("ds2", {k: v for k, v in zip(["extra"], ["metadata"])})
Dataset("ds2")
Dataset("ds2", extra={"key": "value"})
Dataset(uri="ds2", extra={"key": "value"})
# airflow.sdk.Asset
from airflow.sdk import Asset
Asset("asset1", {"extra": "metadata"})
Asset("asset1", {})
Asset("asset1", dict())
Asset("asset1", extra_params)
Asset("asset1", {k: v for k, v in zip(["extra"], ["metadata"])})
Asset("asset1")
Asset("asset1", extra={"key": "value"})
Asset(uri="asset1", extra={"key": "value"})

View File

@@ -51,7 +51,8 @@ impl Violation for Airflow3IncompatibleFunctionSignature {
change_type,
} = self;
match change_type {
FunctionSignatureChangeType::KeywordOnly { .. } => {
FunctionSignatureChangeType::KeywordOnly { .. }
| FunctionSignatureChangeType::PositionalArgumentChange { .. } => {
format!("`{function_name}` signature is changed in Airflow 3.0")
}
}
@@ -60,7 +61,10 @@ impl Violation for Airflow3IncompatibleFunctionSignature {
fn fix_title(&self) -> Option<String> {
let Airflow3IncompatibleFunctionSignature { change_type, .. } = self;
match change_type {
FunctionSignatureChangeType::KeywordOnly { message } => Some(message.to_string()),
FunctionSignatureChangeType::KeywordOnly { message }
| FunctionSignatureChangeType::PositionalArgumentChange { message } => {
Some(message.to_string())
}
}
}
}
@@ -78,26 +82,31 @@ pub(crate) fn airflow_3_incompatible_function_signature(checker: &Checker, expr:
return;
};
let Expr::Attribute(ExprAttribute { attr, value, .. }) = func.as_ref() else {
// Handle method calls on instances
if let Expr::Attribute(ExprAttribute { attr, value, .. }) = func.as_ref() {
// Resolve the qualified name: try variable assignments first, then fall back to direct
// constructor calls.
let qualified_name = typing::resolve_assignment(value, checker.semantic()).or_else(|| {
value
.as_call_expr()
.and_then(|call| checker.semantic().resolve_qualified_name(&call.func))
});
if let Some(qualified_name) = qualified_name {
check_method_arguments(checker, &qualified_name, attr, arguments);
}
return;
}
// Handle direct constructor calls
let Some(qualified_name) = checker.semantic().resolve_qualified_name(func) else {
return;
};
// Resolve the qualified name: try variable assignments first, then fall back to direct
// constructor calls.
let qualified_name = typing::resolve_assignment(value, checker.semantic()).or_else(|| {
value
.as_call_expr()
.and_then(|call| checker.semantic().resolve_qualified_name(&call.func))
});
let Some(qualified_name) = qualified_name else {
return;
};
check_keyword_only_method(checker, &qualified_name, attr, arguments);
check_constructor_arguments(checker, &qualified_name, arguments, func);
}
fn check_keyword_only_method(
fn check_method_arguments(
checker: &Checker,
qualified_name: &QualifiedName,
attr: &Identifier,
@@ -121,8 +130,55 @@ fn check_keyword_only_method(
}
}
fn check_constructor_arguments(
checker: &Checker,
qualified_name: &QualifiedName,
arguments: &Arguments,
func: &Expr,
) {
if let ["airflow", "Dataset"]
| ["airflow", "datasets", "Dataset"]
| ["airflow", "sdk", "Asset"] = qualified_name.segments()
{
if let Some(second_arg) = arguments.find_positional(1) {
if is_dict_expression(checker, second_arg) {
let function_name = qualified_name.segments().last().unwrap_or(&"").to_string();
checker.report_diagnostic(
Airflow3IncompatibleFunctionSignature {
function_name,
change_type: FunctionSignatureChangeType::PositionalArgumentChange {
message: "Use keyword argument `extra` instead of passing a dict as the second positional argument (e.g., `Asset(name=..., uri=..., extra=...)`)",
},
},
func.range(),
);
}
}
}
}
/// Check if an expression is a dictionary.
fn is_dict_expression(checker: &Checker, expr: &Expr) -> bool {
match expr {
Expr::Dict(_) => true,
Expr::DictComp(_) => true,
Expr::Call(call) => checker
.semantic()
.resolve_builtin_symbol(&call.func)
.is_some_and(|name| name == "dict"),
Expr::Name(name) => checker
.semantic()
.resolve_name(name)
.map(|id| checker.semantic().binding(id))
.is_some_and(|binding| typing::is_dict(binding, checker.semantic())),
_ => false,
}
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub(crate) enum FunctionSignatureChangeType {
/// Function signature changed to only accept keyword arguments.
KeywordOnly { message: &'static str },
/// Function signature changed to not accept certain positional arguments.
PositionalArgumentChange { message: &'static str },
}

View File

@@ -112,3 +112,177 @@ AIR303 `create_asset` signature is changed in Airflow 3.0
28 | # Double-starred keyword arguments
|
help: Pass positional arguments as keyword arguments (e.g., `create_asset(uri=...)`)
AIR303 `Dataset` signature is changed in Airflow 3.0
--> AIR303.py:39:1
|
38 | # second argument is of type dict, should raise diagnostic
39 | Dataset("ds1", {"extra": "metadata"})
| ^^^^^^^
40 | Dataset("ds1", {})
41 | Dataset("ds1", dict())
|
help: Use keyword argument `extra` instead of passing a dict as the second positional argument (e.g., `Asset(name=..., uri=..., extra=...)`)
AIR303 `Dataset` signature is changed in Airflow 3.0
--> AIR303.py:40:1
|
38 | # second argument is of type dict, should raise diagnostic
39 | Dataset("ds1", {"extra": "metadata"})
40 | Dataset("ds1", {})
| ^^^^^^^
41 | Dataset("ds1", dict())
42 | Dataset("ds1", extra_params)
|
help: Use keyword argument `extra` instead of passing a dict as the second positional argument (e.g., `Asset(name=..., uri=..., extra=...)`)
AIR303 `Dataset` signature is changed in Airflow 3.0
--> AIR303.py:41:1
|
39 | Dataset("ds1", {"extra": "metadata"})
40 | Dataset("ds1", {})
41 | Dataset("ds1", dict())
| ^^^^^^^
42 | Dataset("ds1", extra_params)
43 | Dataset("ds1", {k: v for k, v in zip(["extra"], ["metadata"])})
|
help: Use keyword argument `extra` instead of passing a dict as the second positional argument (e.g., `Asset(name=..., uri=..., extra=...)`)
AIR303 `Dataset` signature is changed in Airflow 3.0
--> AIR303.py:42:1
|
40 | Dataset("ds1", {})
41 | Dataset("ds1", dict())
42 | Dataset("ds1", extra_params)
| ^^^^^^^
43 | Dataset("ds1", {k: v for k, v in zip(["extra"], ["metadata"])})
|
help: Use keyword argument `extra` instead of passing a dict as the second positional argument (e.g., `Asset(name=..., uri=..., extra=...)`)
AIR303 `Dataset` signature is changed in Airflow 3.0
--> AIR303.py:43:1
|
41 | Dataset("ds1", dict())
42 | Dataset("ds1", extra_params)
43 | Dataset("ds1", {k: v for k, v in zip(["extra"], ["metadata"])})
| ^^^^^^^
44 |
45 | # second argument is not of type dict, should not raise diagnostic
|
help: Use keyword argument `extra` instead of passing a dict as the second positional argument (e.g., `Asset(name=..., uri=..., extra=...)`)
AIR303 `Dataset` signature is changed in Airflow 3.0
--> AIR303.py:53:1
|
51 | from airflow.datasets import Dataset
52 |
53 | Dataset("ds2", {"extra": "metadata"})
| ^^^^^^^
54 | Dataset("ds2", {})
55 | Dataset("ds2", dict())
|
help: Use keyword argument `extra` instead of passing a dict as the second positional argument (e.g., `Asset(name=..., uri=..., extra=...)`)
AIR303 `Dataset` signature is changed in Airflow 3.0
--> AIR303.py:54:1
|
53 | Dataset("ds2", {"extra": "metadata"})
54 | Dataset("ds2", {})
| ^^^^^^^
55 | Dataset("ds2", dict())
56 | Dataset("ds2", extra_params)
|
help: Use keyword argument `extra` instead of passing a dict as the second positional argument (e.g., `Asset(name=..., uri=..., extra=...)`)
AIR303 `Dataset` signature is changed in Airflow 3.0
--> AIR303.py:55:1
|
53 | Dataset("ds2", {"extra": "metadata"})
54 | Dataset("ds2", {})
55 | Dataset("ds2", dict())
| ^^^^^^^
56 | Dataset("ds2", extra_params)
57 | Dataset("ds2", {k: v for k, v in zip(["extra"], ["metadata"])})
|
help: Use keyword argument `extra` instead of passing a dict as the second positional argument (e.g., `Asset(name=..., uri=..., extra=...)`)
AIR303 `Dataset` signature is changed in Airflow 3.0
--> AIR303.py:56:1
|
54 | Dataset("ds2", {})
55 | Dataset("ds2", dict())
56 | Dataset("ds2", extra_params)
| ^^^^^^^
57 | Dataset("ds2", {k: v for k, v in zip(["extra"], ["metadata"])})
|
help: Use keyword argument `extra` instead of passing a dict as the second positional argument (e.g., `Asset(name=..., uri=..., extra=...)`)
AIR303 `Dataset` signature is changed in Airflow 3.0
--> AIR303.py:57:1
|
55 | Dataset("ds2", dict())
56 | Dataset("ds2", extra_params)
57 | Dataset("ds2", {k: v for k, v in zip(["extra"], ["metadata"])})
| ^^^^^^^
58 |
59 | Dataset("ds2")
|
help: Use keyword argument `extra` instead of passing a dict as the second positional argument (e.g., `Asset(name=..., uri=..., extra=...)`)
AIR303 `Asset` signature is changed in Airflow 3.0
--> AIR303.py:66:1
|
64 | from airflow.sdk import Asset
65 |
66 | Asset("asset1", {"extra": "metadata"})
| ^^^^^
67 | Asset("asset1", {})
68 | Asset("asset1", dict())
|
help: Use keyword argument `extra` instead of passing a dict as the second positional argument (e.g., `Asset(name=..., uri=..., extra=...)`)
AIR303 `Asset` signature is changed in Airflow 3.0
--> AIR303.py:67:1
|
66 | Asset("asset1", {"extra": "metadata"})
67 | Asset("asset1", {})
| ^^^^^
68 | Asset("asset1", dict())
69 | Asset("asset1", extra_params)
|
help: Use keyword argument `extra` instead of passing a dict as the second positional argument (e.g., `Asset(name=..., uri=..., extra=...)`)
AIR303 `Asset` signature is changed in Airflow 3.0
--> AIR303.py:68:1
|
66 | Asset("asset1", {"extra": "metadata"})
67 | Asset("asset1", {})
68 | Asset("asset1", dict())
| ^^^^^
69 | Asset("asset1", extra_params)
70 | Asset("asset1", {k: v for k, v in zip(["extra"], ["metadata"])})
|
help: Use keyword argument `extra` instead of passing a dict as the second positional argument (e.g., `Asset(name=..., uri=..., extra=...)`)
AIR303 `Asset` signature is changed in Airflow 3.0
--> AIR303.py:69:1
|
67 | Asset("asset1", {})
68 | Asset("asset1", dict())
69 | Asset("asset1", extra_params)
| ^^^^^
70 | Asset("asset1", {k: v for k, v in zip(["extra"], ["metadata"])})
|
help: Use keyword argument `extra` instead of passing a dict as the second positional argument (e.g., `Asset(name=..., uri=..., extra=...)`)
AIR303 `Asset` signature is changed in Airflow 3.0
--> AIR303.py:70:1
|
68 | Asset("asset1", dict())
69 | Asset("asset1", extra_params)
70 | Asset("asset1", {k: v for k, v in zip(["extra"], ["metadata"])})
| ^^^^^
71 |
72 | Asset("asset1")
|
help: Use keyword argument `extra` instead of passing a dict as the second positional argument (e.g., `Asset(name=..., uri=..., extra=...)`)