
Hosted By
-
Ash Berlin-Taylor Director, Airflow Engineering
Astronomer
Presented by Ash Berlin-Taylor, Director of Airflow Engineering at Astronomer
Airflow 2.2.0 is finally almost here! The new release combines two new big features and a whole lot of small quality of life improvements to make Airflow even more powerful, and also fixes some long-standing complaints.
Airflow 2.2.0 new features
AIP stands for Airflow Improvement Proposal. Any kind of big architectural change or a fundamental change to the way Airflow operates goes through the Airflow improvement proposal process and a vote. It ensures that the big fundamental changes get by in front of the community.
1. AIP-39: Custom Timetables
- Schedule where you couldn’t go before!
- Cron expressions only got us as far as regular time intervals
- For example, daily Monday-Friday (but not weekend) wasn’t possible.
- Full back-compatibility maintained,
schedule_interval
is not going away - Timetables also introduce explicit “data interval” - super useful when looking at a given data for a specific period of time
- Now possible to draw Friday data on Saturday or any other funky interval.
-
No more “why didn’t my dag run yet?”
- The concept of “execution_date” was confusing to every new user, so now it is deprecated! In its place there is:
logical_date
(aka execution_date)data_interval_start
(same value as execution_date for built in)data_interval_end
(same value asnext_execution_date, at least for the built-in Timetables)
- The concept of “execution_date” was confusing to every new user, so now it is deprecated! In its place there is:
-
Pluggable timetables! Airflow 2.2 ships with a few built-in timetables that mirror the behavior of schedule_interval.
You can also add your own timetable! Example timetable:
class RunAtTimetable(Timetable):
def __init__(self, cron: str, timezone: Timezone) -> None:
self._expression = cron_presets.get(cron, cron)
self._timezone = timezone
def infer_manual_data_interval(self, *, run_after: DateTime) -> DataInterval:
return DataInterval(None, None)
def next_dagrun_info(self, *,
last_automated_data_interval: DataInterval | None,
restriction: TimeRestriction) -> DagRunInfo | None:
# TODO: handle restriction.latest and restriction.catchup
when = last_automated_data_interval or restriction.earliest
cron = croniter(self._expression, start_time=when)
scheduled = cron.get_next(datetime.datetime)
return make_aware(scheduled.in_timezone(self._timezone))
-
Limitations
- Should return same result every time it’s called (no HTTP requests please - event triggering coming in future)
- Timetables “evaluated” inside scheduler when creating DagRuns, to keep it fast and error-free
-
NYSE trading timetable (Astronomer customers only!)
from astronomer.timetables.trading_hours \
import USTradingHoursTimetable
with DAG(timetable=USTradingHoursTimetable()):
@task.python
def fetch_daily_trades():
2. AIP-40: Deferrable Tasks
Allows tasks or sensors to free up worker resources when waiting for external systems/events.
-
Ideal use case: submit then poll operators
- Airbnb introduced smart sensors, a first tackle of this issue
- Deferrable task is a great for anything that submits a job to external system then polls for status (not just sensors)
- Does not consume a worker slot while in deferral mode - instead, runs hundreds at once in an async process
- Uses fewer resources, and is more reliable
- Doesn’t need a DAG running
-
Advantages of async
- New component!
- Async operators:
-
DateTimeSensorAsync
-
TimeDeltaSensorAsync
-
Astronomer customers only:
- DatabricksRunNowOperatorAsync
- DatabricksSubmitRunOperatorAsync
- HttpSensorAsync
- ExternalTaskSensorAsync
-
3. @task.docker decorator
@task.docker(image='python:3.9-slim-buster', multiple_outputs=True)
def transform(order_data_dict: dict):
total_order_value = 0
for value in order_data_dict.values():
total_order_value += value
return {"total_order_value": total_order_value}
4. Other features
- Validation of DAG params
The building blocks for true parameterized DAGs
with DAG(
'my_dag',
params: {
# a int param with default value
'int_param': Param(10, type='integer', minimum=0, maximum=20),
# a mandatory str param
'str_param': Param(type='string', minLength=2, maxLength=4),
# a param which can be None as well
'dummy_param': Param(type=['null', 'number', 'string']),
# i.e. no data or type validations
'old_param': 'old_way_of_passing',
# i.e. no data or type validations
'simple_param': Param('im_just_like_old_param'),
'email_param': Param(
default='example@example.com',
type='string',
format='idn-email',
minLength=5,
maxLength=255,
),
}
):
- Airflow standalone Run all the airflow components (migrations, scheduler, webserver, triggerer, etc) directly without a docker.
Thank you for your attention and see you on the day of the release!

Get Apache Airflow Certified
If you want to learn more about how to get started with Airflow, you can join the thousands of other data engineers who have received the Astronomer Certification for Apache Airflow Fundamentals. This exam assesses an understanding of the basics of the Airflow architecture and the ability to create simple data pipelines for scheduling and monitoring tasks.
Learn More About Certification