airflow icon indicating copy to clipboard operation
airflow copied to clipboard

Airflow 2 to 3 auto migration rules

Open Lee-W opened this issue 1 year ago • 52 comments

Description

Why

As we're introducing breaking changes to the main branch, it would be better to begin recording the things we could use migration tools to help our users migrate from Airflow 2 to 3.

The breaking changes can be found at https://github.com/apache/airflow/pulls?q=is%3Apr+label%3Aairflow3.0%3Abreaking and through newsfragments/(\d{5}).significant.rst

What


List of significant news fragments and rules after #44080


The following rules has been reorganized and merged into #44556 and #44555

List of significant news fragments and rules before #44080

List of significant news fragments and rules (by Nov 27)

  • [x] #24842
    • ❓ I guess we don't need to do something for it based on the reason why we changed it.
  • [x] #40029
    • Remove config allow_raw_html_descriptions
  • [x] #40931
    • ❌ Model related change
  • [x] #41096
    • config scheduler.processor_poll_intervalscheduler.scheduler_idle_sleep_time
  • [x] #41348
    • module airflow.datasetsairflow.sdk.definitions.asset
      • class DatasetAliasAssetAlias
      • class DatasetAllAssetAll
      • class DatasetAnyAssetAny
      • function expand_alias_to_datasetsexpand_alias_to_assets
      • class DatasetAliasEventAssetAliasEvent
        • attribute dest_dataset_uriBaseAsset
      • class BaseDatasetBaseAsset
        • method iter_datasetsiter_assets
        • method iter_dataset_aliasesiter_asset_aliases
      • class DatasetAsset
        • method iter_datasetsiter_assets
        • method iter_dataset_aliasesiter_asset_aliases
      • class _DatasetBooleanCondition_AssetBooleanCondition
        • method iter_datasetsiter_assets
        • method iter_dataset_aliasesiter_asset_aliases
    • module airflow.datasets.managerairflow.assets.manager
      • variable dataset_managerasset_manager
      • function resolve_dataset_managerresolve_asset_manager
      • class DatasetManagerAssetManager
        • method register_dataset_changeregister_asset_change
        • method create_datasetscreate_assets
        • method register_dataset_changenotify_asset_created
        • method notify_dataset_changednotify_asset_changed
        • method notify_dataset_alias_creatednotify_asset_alias_created
    • module airflow.listeners.spec.datasetairflow.listeners.spec.asset
      • function on_dataset_createdon_asset_created
      • function on_dataset_changedon_asset_changed
    • module airflow.timetables.datasetsairflow.timetables.assets
      • class DatasetOrTimeScheduleAssetOrTimeSchedule
    • module airflow.datasets.metadataairflow.sdk.definitions.asset.metadata
    • module airflow.listeners.spec.datasetairflow.listeners.spec.asset
      • function on_dataset_createdon_asset_created
      • function on_dataset_changedon_asset_changed
    • class airflow.timetables.datasets.DatasetOrTimeScheduleairflow.timetables.assets.AssetOrTimeSchedule
    • decorator airflow.api_connexion.security.requires_access_datasetairflow.api_connexion.security.requires_access_dataset.requires_access_asset
    • class airflow.auth.managers.models.resource_details.DatasetDetailsairflow.auth.managers.models.resource_details.AssetDetails
    • function airflow.auth.managers.base_auth_manager.is_authorized_datasetairflow.auth.managers.base_auth_manager.is_authorized_asset
    • class airflow.timetables.simple.DatasetTriggeredTimetableairflow.timetables.simple.AssetTriggeredTimetable
    • in class airflow.providers_manager.ProvidersManager
      • method initialize_providers_dataset_uri_resourcesinitialize_providers_asset_uri_resources
      • property dataset_factoriesasset_factories
      • property dataset_uri_handlersasset_uri_handlers
      • property dataset_to_openlineage_convertersasset_to_openlineage_converters
    • constant airflow.security.permissions.RESOURCE_DATASETairflow.security.permissions.RESOURCE_ASSET
    • function airflow.www.auth.has_access_datasetairflow.www.auth.has_access_dataset.has_access_asset
    • class airflow.lineage.hook.DatasetLineageInfoairflow.lineage.hook.AssetLineageInfo
      • attribute datasetasset
    • In class airflow.lineage.hook.HookLineageCollector
      • method create_datasetcreate_asset
      • method add_input_datasetadd_input_asset
      • method add_output_datasetadd_output_asset
      • method collected_datasetscollected_assets
    • context key triggering_dataset_eventstriggering_asset_events
    • resource key dataset-urisasset-uris (for providers amazon, common.io, mysql, fab, postgres, trino)
    • In amazon provider
      • package airflow.providers.amazon.aws.datasetsairflow.providers.amazon.aws.assets
        • in module s3
          • method create_datasetcreate_asset
          • method convert_dataset_to_openlineageconvert_asset_to_openlineage
      • attribute airflow.providers.amazon.auth_manager.avp.entities.AvpEntities.DATASETairflow.providers.amazon.auth_manager.avp.entities.AvpEntities.ASSET
      • airflow.providers.amazon.auth_manager.aws_auth_manager.AwsAuthManager.is_authorized_datasetairflow.providers.amazon.auth_manager.aws_auth_manager.AwsAuthManager.is_authorized_asset
    • In Common IO Provider
      • package airflow.providers.common.io.datasetsairflow.providers.common.io.assets
        • in module file
          • method create_datasetcreate_asset
          • method convert_dataset_to_openlineageconvert_asset_to_openlineage
    • In fab provider
      • function airflow.providers.fab.auth_manager.fab_auth_manager.is_authorized_datasetairflow.providers.fab.auth_manager.fab_auth_manager.is_authorized_asset
    • In openlineage provider
      • in module airflow.providers.openlineage.utils.utils
        • class DatasetInfoAssetInfo
        • function translate_airflow_datasettranslate_airflow_asset
    • In postgres provider
      • package airflow.providers.postgres.datasetsairflow.providers.postgres.assets
    • In mysql provider
      • package airflow.providers.mysql.datasetsairflow.providers.mysql.assets
    • In trino provider
      • package airflow.providers.trino.datasetsairflow.providers.trino.assets
    • ❌ ignored
      • airflow.api_connexion.schemas.dataset_schema
      • airflow.api_ui.views.datasets
      • airflow.serialization.pydantic.dataset
      • airflow.serialization.pydantic.taskinstance
      • airflow.serialization.enums.DagAttributeTypes
      • airflow.serialization.serialized_objects
      • airflow.utils.context
      • models
      • DagDependency names
      • private methods
  • [x] #41366
    • #44385
  • [x] #41367
    • airflow.models.ImportErrorairflow.models.errors.ParseImportError
  • [x] #41368
    • Remove airflow.executors.*
    • Remove airflow.hooks.*
    • Remove airflow.macros.*
    • Remove airflow.operators.*
    • Remove airflow.sensors.*
  • [x] #41390
    • Remove package airflow.operators.subdag
  • [x] #41391
    • airflow.sensors.external_task.ExternalTaskSensorLinkairflow.sensors.external_task.ExternalDagLin
  • [x] #41393
    • in DayOfWeekSensor
      • parameter use_task_execution_dayuse_task_logical_date
  • [x] #41394
    • airflow.models.taskMixin.TaskMixinairflow.models.taskMixin.DependencyMixin
  • [x] #41395
    • Remove airflow.executors.executor_loader.UNPICKLEABLE_EXECUTORS
    • Remove airflow.utils.dag_cycle_tester.test_cycle
    • Remove airflow.utils.file.TemporaryDirectory
    • Remove airflow.utils.file.mkdirs
    • Remove airflow.utils.state.SHUTDOWN
    • Remove airflow.utils.state.terminating_states
  • [x] #41420
    • ❌ Internal change
  • [x] #41434
    • ❌ REST APIchange
  • [x] #41440
    • ❌ Model change
  • [x] #41453
    • in DAG
      • Remove argument schedule_interval
      • Remove argument timetable
  • [x] #41496
    • Remove airflow.utils.dates.date_range
    • Remove airflow.utils.dates.days_ago → ❓ do we need to change it to pendulum.today('UTC').add(days=-N, ...)
  • [x] #41520
    • airflow.utils.helpers.chainairflow.models.baseoperator.chain
    • airflow.utils.helpers.chainairflow.models.baseoperator.cross_downstream
  • [x] #41533
    • airflow.secrets.local_filesystem.load_connectionsairflow.secrets.local_filesystem.load_connections_dict
    • airflow.secrets.local_filesystem.get_connectionairflow.secrets.local_filesystem.load_connections_dict
  • [x] #41539
    • Remove config smtp.smtp_user
    • Remove config smtp.smtp_password
  • [x] #41550
    • Remove config webserver.session_lifetime_days → use webserver.session_lifetime_minutes
    • Remove config webserver.force_log_out_after → use webserver.session_lifetime_minutes
    • config section policytask_policy
  • [x] #41552
    • In airflow.utils.log.file_task_handler.FileTaskHandler
      • Remove parameter filename_template
  • [x] #41579
    • Remove function airflow.utils.decorators.apply_defaults
  • [x] #41609
    • Remove config scheduler.dependency_detector
  • [x] #41635
    • ❌ CLI changes
  • [x] #41642
    • airflow.secrets.base_secrets.BaseSecretsBackend.get_conn_uriairflow.secrets.base_secrets.BaseSecretsBackend.get_conn_value
    • airflow.secrets.base_secrets.BaseSecretsBackend.get_connectionsairflow.secrets.base_secrets.BaseSecretsBackend.get_connection
  • [x] #41663
    • airflow.api.auth.backend.basic_authairflow.providers.fab.auth_manager.api.auth.backend.basic_auth
  • [x] #41693
    • airflow.api.auth.backend.kerberos_authairflow.providers.fab.auth_manager.api.auth.backend.kerberos_auth
    • airflow.auth.managers.fab.api.auth.backend.kerberos_authairflow.providers.fab.auth_manager.api.auth.backend.kerberos_auth
  • [x] #41708
    • airflow.auth.managers.fab.fab_auth_managerairflow.providers.fab.auth_manager.security_manager.override
    • airflow.auth.managers.fab.security_manager.overrideairflow.providers.fab.auth_manager.security_manager.override
  • [x] #41733
    • Remove airflow.hooks.base.BaseHook.get_connections (❓ related to 41642)
  • [x] #41735
    • Remove airflow.kubernetes
  • [x] #41736
    • in airflow.operators.datetime.BranchDateTimeOperator
      • parameter use_task_execution_dayuse_task_logical_date
    • in airflow.operators.trigger_dagrun.TriggerDagRunOperator
      • remove parameter execution_date
    • in airflow.operators.weekday.BranchDayOfWeekOperator
      • parameter use_task_execution_dayuse_task_logical_date
  • [x] #41737
    • Remove airflow.triggers.external_task.TaskStateTrigger
  • [x] #41739
    • ❌ CLI changes
  • [x] #41748
    • airflow.hooks.dbapiairflow.providers.common.sql.hooks.sql
  • [x] #41758
    • Remove airflow.www.auth.has_access → use airflow.www.auth.has_access_*
    • module airflow.www.securityairflow.providers.fab.auth_manager.security_manager.override.FabAirflowSecurityManagerOverride
    • airflow.www.utils.get_sensitive_variables_fieldsairflow.utils.log.secrets_masker.get_sensitive_variables_fields
    • airflow.www.utils.should_hide_value_for_keyairflow.utils.log.secrets_masker.should_hide_value_for_key
  • [x] #41761
    • in BaseOperator
      • paramter task_concurrencymax_active_tis_per_dag https://github.com/astral-sh/ruff/pull/14616
      • remove trigger_rule dummy
      • remove trigger_rule none_failed_or_skipped
    • remove config operators.ALLOW_ILLEGAL_ARGUMENTS
    • airflow.models.baseoperator.BaseOperatorLinkairflow.models.baseoperatorlink.BaseOperatorLink
  • [x] #41762
    • Remove airflow.models.connection.parse_netloc_to_hostname
    • Remove airflow.models.connection.Connection.parse_from_uri
    • Remove airflow.models.connection.Connection.log_info
    • Remove airflow.models.connection.Connection.debug_info
  • [x] #41774
    • ❌ Model change
  • [x] #41776
    • ❌ Model change
  • [x] #41778
    • ❌ Model change
  • [x] #41779
    • ❌ Model change
  • [x] #41780
    • ❌ Model change
  • [x] #41784
    • ❌ Model change
  • [x] #41808
    • ❌ Model change
  • [x] #41814
    • ❌ overwritten by 43915
  • [x] #41857
    • ❌ Package dependency change
  • [x] #41910
    • Remove airflow.api_connexion.security.requires_access → use requires_access_*
  • [x] #41964
    • ❌ CLI changed
    • ❌ Model change
  • [x] #41975
    • Remove config metrics.metrics_use_pattern_match
    • Remove airflow.metrics.validators.AllowListValidator → suggest using airflow.metrics.validators.PatternAllowListValidator (not direct mapping)
    • Remove airflow.metrics.validators.BlockListValidator → suggest using airflow.metrics.validators.PatternBlockListValidator (not direct mapping)
  • [x] #42023
    • ❌ Model change
  • [x] #42042
    • ~~ Remove property airflow.auth.managers.models.base_user.is_active~~ ❌ Users are not likely to use it
  • [x] #42054
    • ❓ not sure what we can do
  • [x] #42060
    • Remove config celery.stalled_task_timeout
    • config kubernetes_executor.worker_pods_pending_timeoutscheduler.task_queued_timeout
  • [x] #42088
    • config metrics.statsd_allow_listmetrics.metrics_allow_list
    • config metrics.statsd_block_listmetrics.metrics_block_list
    • config scheduler.statsd_onmetrics.statsd_on
    • config scheduler.statsd_hostmetrics.statsd_host
    • config scheduler.statsd_portmetrics.statsd_port
    • config scheduler.statsd_prefixmetrics.statsd_prefix
    • config scheduler.statsd_allow_listmetrics.statsd_allow_list
    • config scheduler.stat_name_handlermetrics.stat_name_handler
    • config scheduler.statsd_datadog_enabledmetrics.statsd_datadog_enabled
    • config scheduler.statsd_datadog_tagsmetrics.statsd_datadog_tags
    • config scheduler.statsd_datadog_metrics_tagsmetrics.statsd_datadog_metrics_tags
    • config scheduler.statsd_custom_client_pathmetrics.statsd_custom_client_path
  • [x] #42100
    • config core.interleave_timestamp_parserlogging.interleave_timestamp_parser
    • config core.base_log_folderlogging.base_log_folder
    • config core.remote_logginglogging.remote_logging
    • config core.remote_log_conn_idlogging.remote_log_conn_id
    • config core.remote_base_log_folderlogging.remote_base_log_folder
    • config core.encrypt_s3_logslogging.encrypt_s3_logs
    • config core.logging_levellogging.logging_level
    • config core.fab_logging_levellogging.fab_logging_level
    • config core.logging_config_classlogging.logging_config_class
    • config core.colored_console_loglogging.colored_console_log
    • config core.colored_log_formatlogging.colored_log_format
    • config core.colored_formatter_classlogging.colored_formatter_class
    • config core.log_formatlogging.log_format
    • config core.simple_log_formatlogging.simple_log_format
    • config core.task_log_prefix_templatelogging.task_log_prefix_template
    • config core.log_filename_templatelogging.log_filename_template
    • config core.log_processor_filename_templatelogging.log_processor_filename_template
    • config core.dag_processor_manager_log_locationlogging.dag_processor_manager_log_location
    • config core.task_log_readerlogging.task_log_reader
  • [x] #42126
    • config core.sql_alchemy_conndatabase.sql_alchemy_conn
    • config core.sql_engine_encodingdatabase.sql_engine_encoding
    • config core.sql_engine_collation_for_idsdatabase.sql_engine_collation_for_ids
    • config core.sql_alchemy_pool_enableddatabase.sql_alchemy_pool_enabled
    • config core.sql_alchemy_pool_sizedatabase.sql_alchemy_pool_size
    • config core.sql_alchemy_max_overflowdatabase.sql_alchemy_max_overflow
    • config core.sql_alchemy_pool_recycledatabase.sql_alchemy_pool_recycle
    • config core.sql_alchemy_pool_pre_pingdatabase.sql_alchemy_pool_pre_ping
    • config core.sql_alchemy_schemadatabase.sql_alchemy_schema
    • config core.sql_alchemy_connect_argsdatabase.sql_alchemy_connect_args
    • config core.load_default_connectionsdatabase.load_default_connections
    • config core.max_db_retriesdatabase.max_db_retries
  • [x] #42129
    • config core.worker_precheckcelery.worker_precheck
    • config scheduler.max_threadsscheduler.parsing_processes
    • config celery.default_queueoperators.default_queue
    • config admin.hide_sensitive_variable_fieldscore.hide_sensitive_var_conn_fields
    • config admin.sensitive_variable_fieldscore.sensitive_var_conn_names
    • config core.non_pooled_task_slot_countcore.default_pool_task_slot_count
    • config core.dag_concurrencycore.max_active_tasks_per_dag
    • config api.access_control_allow_originapi.access_control_allow_origins
    • config api.auth_backendapi.auth_backends
    • config scheduler.deactivate_stale_dags_intervalscheduler.parsing_cleanup_interval
    • config kubernetes_executor.worker_pods_pending_timeout_check_intervalscheduler.task_queued_timeout_check_interval
    • config webserver.update_fab_permsfab.update_fab_perms
    • config webserver.auth_rate_limitedfab.auth_rate_limited
    • config webserver.auth_rate_limitfab.auth_rate_limit
    • config section kuberneteskubernetes_executor
  • [x] #42137
    • ❌ package dependency change
  • [x] #42280
    • ❌ REST APIchange
  • [x] #42285
    • Remove config core.check_slas
    • In DAG
      • Remove argument sla_miss_callback
    • In BaseOperator
      • Remove argument sla
  • [x] #42343
    • ❌ Internal change
  • [x] #42436
    • ❓ Should we raise a warning if that dag_ignore_file_syntax has changed
  • #42548
    • ❌ Model change
  • [x] #42579
    • ❌ REST APIchange
  • [x] #42640
    • ❌ Test change
  • [x] #42647
    • ❌ Build change
  • [x] #42658
    • ❌ REST API change
  • [x] #42660
    • ❌ REST API change
  • [x] #42739
    • ❌ Model change
  • [x] #42776
    • ❌ Model change
  • [x] #42953
    • ❓ Should we warn that DAG.max_active_runs behavior has been changed
  • [x] #43067
    • ❌ Model change
  • [x] #43073
    • ❌ UI change
  • [x] #43096
    • airflow.api.auth.backend.default airflow.providers.fab.auth_manager.api.auth.backend.session
  • [x] #43102
    • ❌ REST API change
  • [x] #43183
    • Remove config logging.enable_task_context_logger
  • [x] #43289
    • ❌ Remove airflow.executors.*
  • [x] #43291
    • Remove airflow.hook.*
  • [x] #43368
    • trigger_rule=TriggerRule.ALWAYS is blocked in a dynamic mapped task
  • [x] #43490
    • ❌ Model change
  • [x] #43530
    • function airflow.config.getairflow.config.conf.get
    • function airflow.config.getbooleanairflow.config.conf.getboolean
    • function airflow.config.getfloatairflow.config.conf.getfloat
    • function airflow.config.getintairflow.config.conf.getint
    • function airflow.config.has_optionairflow.config.conf.has_option
    • function airflow.config.remove_optionairflow.config.conf.remove_option
    • function airflow.config.as_dictairflow.config.conf.as_dict
    • function airflow.config.setairflow.config.conf.set
    • function airflow.config.airflow.config.conf.
  • [x] #43533
    • Remove function airflow.utils.dates.parse_execution_date
    • Remove function airflow.utils.dates.round_time
    • Remove function airflow.utils.dates.scale_time_units
    • Remove function airflow.utils.dates.infer_time_unit
  • [x] #43562
    • airflow.PY36if sys.version_info >== (3, 6)
    • airflow.PY37if sys.version_info >== (3, 7)
  • [x] #43568
    • ❌ don't think we need to do anything
  • [x] #43611
    • ❌ don't think we can do anything
  • [x] #43612
    • ❌ More like a new feature. Change behavior but probably don't need to do something for it
  • [x] #43902
    • https://github.com/apache/airflow/issues/44409
  • [x] #43915
    • Remove config core.strict_dataset_uri_validation
  • [x] #43943
    • Remove config traces.otel_task_log_event
  • [x] #43975
    • Remove config metrics.timer_unit_consistency
  • [x] #44080
    • ❌ DB version change
  • aip-72.significant.rst

Related issues

No response

Are you willing to submit a PR?

  • [X] Yes I am willing to submit a PR!

Code of Conduct

Lee-W avatar Aug 21 '24 11:08 Lee-W

The Rules now is an example of how these changes can be recorded. I will check the existing breaking changes and update the rules. It would be great if folks could help update this list if you know there are breaking changes.

Lee-W avatar Aug 21 '24 11:08 Lee-W

I pinned the issue - this way it will show up at the top of "Issues" list in the repo

potiuk avatar Aug 21 '24 13:08 potiuk

image

potiuk avatar Aug 21 '24 13:08 potiuk

Thanks!

Lee-W avatar Aug 21 '24 14:08 Lee-W

We can just go over all the significant newsfragments and create a rule for them or keep some reasoning why it doesn't require one

eladkal avatar Aug 24 '24 17:08 eladkal

We should add something for the public API change too. API v1 won't work anymore. Those are being changed as part of AIP-84 to a new FastApi based app. GitHub project for it: https://github.com/orgs/apache/projects/414

kaxil avatar Oct 24 '24 15:10 kaxil

Issue here to regroup Rest API breaking changes https://github.com/apache/airflow/issues/43378

pierrejeambrun avatar Oct 25 '24 12:10 pierrejeambrun

I have started prototyping a small package based on LibCST to build a Python 2to3 like tool for Airflow 2to3 that does simple and straight forward replacements. My main motivation was around lot of our users in our Airflow instance using schedule_interval in Airflow 2 that was deprecated and renamed to schedule in Airflow 3. It would require updating thousands of dags manually and some automation could help. This could also help in places with import statements changes .E.g. Task SDK need to be updated from from airflow import DAG to from airflow.sdk import DAG. Something like this could eventually become part of Airflow cli so that users can run airflow migrate /airflow/dags for migration or serve as a starter point for migration. It can update the file in place or show diff. Currently it does the following changes :

Dags

  • schedule_interval -> schedule
  • timetable -> schedule
  • concurrency -> max_active_tasks
  • Removal of unused full_filepath parameter
  • default_view (tree -> grid)

Operators

  • task_concurrency -> max_active_tis_per_dag
  • trigger_rule (none_failed_or_skipped -> none_failed_min_one_success)

Sample file

import datetime

from airflow import DAG
from airflow.decorators import dag, task
from airflow.operators.empty import EmptyOperator
from airflow.timetables.events import EventsTimetable


with DAG(
    dag_id="my_dag_name",
    default_view="tree",
    start_date=datetime.datetime(2021, 1, 1),
    schedule_interval="@daily",
    concurrency=2,
):
    op = EmptyOperator(
        task_id="task", task_concurrency=1, trigger_rule="none_failed_or_skipped"
    )


@dag(
    default_view="graph",
    start_date=datetime.datetime(2021, 1, 1),
    schedule_interval=EventsTimetable(event_dates=[datetime.datetime(2022, 4, 5)]),
    max_active_tasks=2,
    full_filepath="/tmp/test_dag.py"
)
def my_decorated_dag():
    op = EmptyOperator(task_id="task")


my_decorated_dag()

Sample usage

python -m libcst.tool codemod dag_fixer.DagFixerCommand -u 1 tests/test_dag.py
Calculating full-repo metadata...
Executing codemod...
reformatted -

All done! ✨ 🍰 ✨
1 file reformatted.
--- /home/karthikeyan/stuff/python/libcst-tut/tests/test_dag.py
+++ /home/karthikeyan/stuff/python/libcst-tut/tests/test_dag.py
@@ -10,6 +10,6 @@
     dag_id="my_dag_name",
-    default_view="tree",
+    default_view="grid",
     start_date=datetime.datetime(2021, 1, 1),
-    schedule_interval="@daily",
-    concurrency=2,
+    schedule="@daily",
+    max_active_tasks=2,
 ):
@@ -23,5 +23,4 @@
     start_date=datetime.datetime(2021, 1, 1),
-    schedule_interval=EventsTimetable(event_dates=[datetime.datetime(2022, 4, 5)]),
+    schedule=EventsTimetable(event_dates=[datetime.datetime(2022, 4, 5)]),
     max_active_tasks=2,
-    full_filepath="/tmp/test_dag.py"
 )
Finished codemodding 1 files!
 - Transformed 1 files successfully.
 - Skipped 0 files.
 - Failed to codemod 0 files.
 - 0 warnings were generated.

Repo : https://github.com/tirkarthi/Airflow-2to3

tirkarthi avatar Oct 27 '24 06:10 tirkarthi

NICE! @tirkarthi -> you should start a thread about it at devlist and propose adding it to the repo. The sooner we start working on it and let poeple test it, the better it will be. And we can already start adding not only the newsfragments but also rules to the migration tools (cc: @vikramkoka @kaxil ) - we can even think about keeping a database of old-way-dags and running such migration tool on them and letting airflow scheduler from Airflow 3 process them (and maybe even execute) as part of our CI. This would tremendously help with maintaining and updating such a tool if we will make it a part of our CI pipeline.

potiuk avatar Oct 27 '24 12:10 potiuk

BTW. I like it a lot how simple it is with libCST - we previously used quite a bit more complex tool from Facebook that allowed to do refactoring at scale in parallell (https://github.com/facebookincubator/Bowler) , but it was rather brittle to develop rules for it and it had some weird problems and missing features. One thing that was vere useful - is that it had a nice "parallelism" features - which allowed to refactor 1000s of files in seconds (but also made it difficult to debug).

I think if we get it working with libCST - it will be way more generic and maintainable, also we can easily add parallelism later on when/if we see it is slow.

potiuk avatar Oct 27 '24 12:10 potiuk

One small watchout though - such a tool should have a way to isolate rules - so that they are not in a single big method - some abstraction that will allow us to easily develop and selectively apply (or skip) different rules - see https://github.com/apache/airflow/tree/v1-10-test/airflow/upgrade where we have documentation and information about the upgrade check we've done in Airflow 1 -> 2 migration.

Also we have to discuss, whether it should be a separate repo or whether it should be in airflow's monorepo. Both have pros and cons - in 1.10 we chose to keep it 1.10 branch of airflow, because it imported some of the airflow code and it was easier, but we could likely create a new repo for it, add CI there and keep it there.

We even have this archived repo https://github.com/apache/airflow-upgrade-check which we never used and archived, we could re-open it. We also have https://pypi.org/project/apache-airflow-upgrade-check/ - package in PyPI - and we could release new upgrade check versions (2.* ?) with "apache-airflow>=2.11.0" as dependency.

All that should likely be discussed at devlist :)

potiuk avatar Oct 27 '24 13:10 potiuk

Thanks @potiuk for the details. I will start a discussion on this at the devlist and continue there. Bowler looks interesting. Using libcst.tool from cli parallelizes the process. Right now this needs python -m libcst.tool to execute it as a codemod. Initially I had designed them as standalone Transformer for each category like (dag, operator) where the updated AST from one transformer can be passed to another. The codemod looked like a recommended abstraction for running it and changed it that way to later find cli accepts only one codemod at a time. I need to check how composable they are.

python -m libcst.tool codemod --help | grep -i -A 1 'jobs JOBS'
  -j JOBS, --jobs JOBS  Number of jobs to use when processing files. Defaults to number of cores

time python -m libcst.tool codemod dag_fixer.DagFixerCommand -u 1 ~/airflow/dags > /dev/null 2>&1 
python -m libcst.tool codemod dag_fixer.DagFixerCommand -u 1 ~/airflow/dags >  
6.95s user 0.61s system 410% cpu 1.843 total

# Single core
time python -m libcst.tool codemod dag_fixer.DagFixerCommand -u 1 -j 1 ~/airflow/dags > /dev/null 2>&1
python -m libcst.tool codemod dag_fixer.DagFixerCommand -u 1 -j 1  > 
/dev/nul  4.66s user 0.38s system 99% cpu 5.035 total

# 4 core
python -m libcst.tool codemod dag_fixer.DagFixerCommand -u 1 -j 4 ~/airflow/dags > /dev/null 2>&1
python -m libcst.tool codemod dag_fixer.DagFixerCommand -u 1 -j 4  > 
/dev/nul  5.45s user 0.54s system 253% cpu 2.358 total

tirkarthi avatar Oct 27 '24 13:10 tirkarthi

Bowler looks interesting.

Don't be deceived by it :).

It was helpful for Provider's migration at some point in time, but I had many rough edges - like debugging a problem was a nightmare until we learned how to do it properly, also it had some annoying limitations - you had to learn a completely new non-standard abstractions (an SQLAlchemy-like DSL to perform modifications) - which did not cover all the refactorings we wanted to do. We had to really dig-deep into the code an find some workarounds for things we wanted to do, when authors of Bowler have not thoght about them. And sometimes those were nasty workarounds.

query = (
    Query(<paths to modify>)
    .select_function("old_name")
    .rename("new_name")
    .diff(interactive=True)
)

Example that I remember above is that we could not rename some of the object types easily because it was not "foreseen" (can't remember exactly) - we had a few surprises there.

Also Bowler seems to be not maintained for > 3 years and it means that it's unlikely to handle some constructs even in 3.9+ Airflow.

What I like about libcst is that it is really "low-level" interface that you have to program in Python rather than in abstract DSL - similar to "ast". You write actual python code to perform what you want to perform rather than rely on incomplete abstractions, even if you have to copy&paste rename code between different "rules" (for example) (which you can then abstract away as 'common` util if you need, so no big deal).

potiuk avatar Oct 27 '24 20:10 potiuk

BTW. Codemod .... is also 5 years not maintained. Not that it is disqualification - but they list python2 as their dependency ... so .....

potiuk avatar Oct 27 '24 20:10 potiuk

I tried to use libcst in airflow as a tiny POC of this issue here https://github.com/apache/airflow/blob/5b7977a149492168688e6f013a7dcd4fe3561a49/scripts/ci/pre_commit/check_deferrable_default.py#L34. It mostly works great except for its speed. I was also thinking about whether to add these migrations thing info ruff airflow linter but not yet explore much on the rust/ruff side.

Lee-W avatar Oct 28 '24 01:10 Lee-W

:eyes: :eyes: rust project :) ...

Me :heart: it (but I doubt we want to invest in it as it might be difficult to maintain, unless we find quite a few committers who are somewhat ruff profficient to at least be able to review the code) . But it's tempting I must admit.

But to be honest - while I'd love to finally get a serious rust project, it's not worth it I think we are talking of one-time migration for even a 10.000 dags it will take at most single minutes and we can turn it maybe in under one minute with rust - so not a big gain for a lot of pain :) . Or at lest this is what my intuition tells me.

I think parallelism will do the job nicely. My intuition tells me (but this is just intuition and understanding on some limits ans speed of certain operation) - that we will get from multiple 10s of minutes (when running such migration sequentially) to single minutes when we allow to run migration in parallel using multiple processors and processes - even with Python and libcst. This task is really suitable for such parallelisation because each file is complete, independent task that can be run in complete isolation from all other tasks - so spawning multiple paralllel interpreters, ideally forking them right after all the imports and common code is loaded so that they use shared memory for those - this should do the job nicely (at least intuitively).

Using RUST for that might be classic premature optimisation - we might likely not need it :). But would be worth to make some calculations and get some "numbers" for big installation - i.e. how many dags of what size are out there, and how long it will be to parse them all with libcst and write back (even unmodified or with a simple modification). I presume that parsing and writing back will be the bulk of the job - and modifications will add very little overhead as they will be mostly operating on in memory data structures.

potiuk avatar Oct 28 '24 09:10 potiuk

Me ❤️ it (but I doubt we want to invest in it as it might be difficult to maintain, unless we find quite a few committers who are somewhat ruff profficient to at least be able to review the code) . But it's tempting I must admit.

But to be honest - while I'd love to finally get a serious rust project, it's not worth it I think we are talking of one-time migration for even a 10.000 dags it will take at most single minutes and we can turn it maybe in under one minute with rust - so not a big gain for a lot of pain :) . Or at lest this is what my intuition tells me.

Yep, totally agree. I just want to raise this idea which might be interesting. 👀

I presume that parsing and writing back will be the bulk of the job - and modifications will add very little overhead as they will be mostly operating on in memory data structures.

Yep, I think you're right. My previous default deferrable script took around 10 sec to process ~400 operators. Using ast for checking took around 1 sec

Lee-W avatar Oct 30 '24 01:10 Lee-W

Mostly as curiosity: One option we might consider is https://github.com/alexpovel/srgn - I've heard about it recently, it's a "grep that understands code" with capabilities of running different actions. Written in rust, and allows to add extensions apparently where you can define your own "scopes" of search and modification.

But I am not too convinced - this is mostly a command line tool so we would have to have a sequence of "script commands" to run - seems that plugging in our own rules and AST parsing should also be more flexible, even if slower.

potiuk avatar Nov 11 '24 23:11 potiuk

Mostly as curiosity: One option we might consider is https://github.com/alexpovel/srgn - I've heard about it recently, it's a "grep that understands code" with capabilities of running different actions. Written in rust, and allows to add extensions apparently where you can define your own "scopes" of search and modification.

But I am not too convinced - this is mostly a command line tool so we would have to have a sequence of "script commands" to run - seems that plugging in our own rules and AST parsing should also be more flexible, even if slower.

Yep, not that convinced either. but it is always good to have an alternative we could consider 🤔

Lee-W avatar Nov 14 '24 01:11 Lee-W

My best idea right now is to split this into two tools. We don’t really want to invest too much time into building a very rich CLI tool to show users what need to be changed—we’ll effectively be rebuilding the error reporting interface in ruff (or flake8). Those squiggle lines, colors, error codes, and code context things are not easy to build.

It is probably easiest to tack the linter part on Ruff—it is Rust, but the code to implement a lint rule isn’t that hard if you know Python AST and just a general idea about C-like languages. The rewrite part is a lot more difficult, so it’s probably better to implement this as a different tool in Python with libcst. I’m thinking something like

$ ruff check --select AIR
This spits out lint errors with codes like AIR005 AIR123...

$ airflow2to3 --select AIR005 -- path/to/dag/file.py
This fixes the given error(s) in given file(s) in-place with a minimal CLI...

I plan to start experiementing some rules in Ruff to see how easy the first part actually is. We should be able to save a lot of effort if it is viable.

uranusjr avatar Nov 18 '24 08:11 uranusjr

I tried to change the format a bit and list the rules in the following format.

* [ ] link to the pr with breaking change
    * [ ] things to do

Once the things to do have been listed, we can check the root pr. After implementing the rule, we can mark the things to do as done.

I also updated the format for #41366, #41367, #41368, #41391, #41393

If anyone has anything to add but do not have permission to update the description. Please just tag me and I'll take a look

Lee-W avatar Nov 18 '24 11:11 Lee-W

It is probably easiest to tack the linter part on Ruff—it is Rust, but the code to implement a lint rule isn’t that hard if you know Python AST and just a general idea about C-like languages. The rewrite part is a lot more difficult, so it’s probably better to implement this as a different tool in Python with libcst. I’m thinking something like

Actually I am convinced too - I quite like this one after a bit of thought. This is not something that might be maintained by a lot of people and a number of contributors, and even for them, this is so far from the main "airflow code" - it's really a "one-time" tool - that it might be worth treating it as our first "rust experiment". And I quite agree that, the AST code on it's own is not really that "pythonic" and if you know what you want, and have already existing examples, adding a new rule in RUST, should not be difficult even if you do not know it (and AI driven development here might be even pretty cool exercise). I'd myself be happy to add a few rules at some point of time and maybe even take part in implementing the tooling for rust for our CI environment.

potiuk avatar Nov 19 '24 00:11 potiuk

The things we'll need to migrate for 41348

  • [ ] https://github.com/apache/airflow/pull/41348
    • [ ] module airflow.datasets -> airflow.sdk.definitions.asset
      • [ ] class DatasetAlias -> AssetAlias
      • [ ] class DatasetAll -> AssetAll
      • [ ] class DatasetAny -> AssetAny
      • [ ] function expand_alias_to_datasets -> expand_alias_to_assets
      • [ ] class DatasetAliasEvent -> AssetAliasEvent
        • [ ] attribute dest_dataset_uri -> BaseAsset
      • [ ] class BaseDataset -> BaseAsset
        • [ ] method iter_datasets -> iter_assets
        • [ ] method iter_dataset_aliases -> iter_asset_aliases
      • [ ] class Dataset -> Asset
        • [ ] method iter_datasets -> iter_assets
        • [ ] method iter_dataset_aliases -> iter_asset_aliases
      • [ ] class _DatasetBooleanCondition -> _AssetBooleanCondition
        • [ ] method iter_datasets -> iter_assets
        • [ ] method iter_dataset_aliases -> iter_asset_aliases
    • [ ] module airflow.datasets.managerairflow.assets.manager
      • [ ] variable dataset_managerasset_manager
      • [ ] function resolve_dataset_managerresolve_asset_manager
      • [ ] class DatasetManagerAssetManager
        • [ ] method register_dataset_changeregister_asset_change
        • [ ] method create_datasetscreate_assets
        • [ ] method register_dataset_changenotify_asset_created
        • [ ] method notify_dataset_changednotify_asset_changed
        • [ ] method notify_dataset_alias_creatednotify_asset_alias_created
    • [ ] module airflow.listeners.spec.datasetairflow.listeners.spec.asset
      • [ ] function on_dataset_createdon_asset_created
      • [ ] function on_dataset_changedon_asset_changed
    • [ ] module airflow.timetables.datasetsairflow.timetables.assets
      • [ ] class DatasetOrTimeScheduleAssetOrTimeSchedule
    • [ ] module airflow.datasets.metadataairflow.sdk.definitions.asset.metadata
    • [ ] module airflow.listeners.spec.datasetairflow.listeners.spec.asset
      • [ ] function on_dataset_createdon_asset_created
      • [ ] function on_dataset_changedon_asset_changed
    • [ ] class airflow.timetables.datasets.DatasetOrTimeScheduleairflow.timetables.assets.AssetOrTimeSchedule
    • [ ] decorator airflow.api_connexion.security.requires_access_datasetairflow.api_connexion.security.requires_access_dataset.requires_access_asset
    • [ ] class airflow.auth.managers.models.resource_details.DatasetDetails → `airflow.auth.managers.models.resource_details.AssetDetails
    • [ ] function airflow.auth.managers.base_auth_manager.is_authorized_datasetairflow.auth.managers.base_auth_manager.is_authorized_asset
    • [ ] class airflow.timetables.simple.DatasetTriggeredTimetableairflow.timetables.simple.AssetTriggeredTimetable
    • in class airflow.providers_manager.ProvidersManager
      • [ ] method initialize_providers_dataset_uri_resourcesinitialize_providers_asset_uri_resources
      • [ ] property dataset_factoriesasset_factories
      • [ ] property dataset_uri_handlersasset_uri_handlers
      • [ ] property dataset_to_openlineage_convertersasset_to_openlineage_converters
    • [ ] constant airflow.security.permissions.RESOURCE_DATASETairflow.security.permissions.RESOURCE_ASSET
    • [ ] function airflow.www.auth.has_access_datasetairflow.www.auth.has_access_dataset.has_access_asset
    • [ ] class airflow.lineage.hook.DatasetLineageInfoairflow.lineage.hook.AssetLineageInfo
      • [ ] attribute datasetasset
    • In class airflow.lineage.hook.HookLineageCollector
      • [ ] method create_datasetcreate_asset
      • [ ] method add_input_datasetadd_input_asset
      • [ ] method add_output_datasetadd_output_asset
      • [ ] method collected_datasetscollected_assets
    • [ ] context key triggering_dataset_eventstriggering_asset_events
    • In amazon provider
      • package airflow.providers.amazon.aws.datasetsairflow.providers.amazon.aws.assets
        • in module s3
          • [ ] method create_datasetcreate_asset
          • [ ] method convert_dataset_to_openlineageconvert_asset_to_openlineage
      • [ ] attribute airflow.providers.amazon.auth_manager.avp.entities.AvpEntities.DATASETairflow.providers.amazon.auth_manager.avp.entities.AvpEntities.ASSET
      • [ ] method airflow.providers.amazon.auth_manager.aws_auth_manager.AwsAuthManager.is_authorized_datasetairflow.providers.amazon.auth_manager.aws_auth_manager.AwsAuthManager.is_authorized_asset
      • [ ] resource key dataset-urisasset-uris
    • In Common IO Provider
      • [ ] package airflow.providers.common.io.datasetsairflow.providers.common.io.assets
        • in module file
          • [ ] method create_datasetcreate_asset
          • [ ] method convert_dataset_to_openlineageconvert_asset_to_openlineage
      • [ ] resource key dataset-urisasset-uris
    • In fab provider
      • [ ] function airflow.providers.fab.auth_manager.fab_auth_manager.is_authorized_datasetairflow.providers.fab.auth_manager.fab_auth_manager.is_authorized_asset
    • In openlineage provider
      • in module airflow.providers.openlineage.utils.utils
        • [ ] class DatasetInfoAssetInfo
        • [ ] function translate_airflow_datasettranslate_airflow_asset
    • In postgres provider
      • [ ] package airflow.providers.postgres.datasetsairflow.providers.postgres.assets
      • [ ] resource key dataset-urisasset-uris
    • In mysql provider
      • [ ] package airflow.providers.mysql.datasetsairflow.providers.mysql.assets
      • [ ] resource key dataset-urisasset-uris
    • In trino provider
      • [ ] package airflow.providers.trino.datasetsairflow.providers.trino.assets
      • [ ] resource key dataset-urisasset-uris
    • ❌ ignored
      • airflow.api_connexion.schemas.dataset_schema
      • airflow.api_ui.views.datasets
      • airflow.serialization.pydantic.dataset
      • airflow.serialization.pydantic.taskinstance
      • airflow.serialization.enums.DagAttributeTypes
      • airflow.serialization.serialized_objects
      • airflow.utils.context
      • models
      • DagDependency names
      • private methods

Lee-W avatar Nov 19 '24 07:11 Lee-W

Hi all, I'm trying to read through the significant news fragment and compile a list of rules we should migrate. It would be nice if you could take a look and check if I missed anything.

  • @jscheffl
    • [x] https://github.com/apache/airflow/pull/40029
    • [x] https://github.com/apache/airflow/pull/41733
    • [x] https://github.com/apache/airflow/pull/41735
    • [x] https://github.com/apache/airflow/pull/41736
    • [x] https://github.com/apache/airflow/pull/41737
    • [x] https://github.com/apache/airflow/pull/41739
    • [x] https://github.com/apache/airflow/pull/41761
    • [x] https://github.com/apache/airflow/pull/41762
    • [x] https://github.com/apache/airflow/pull/41774
    • [x] https://github.com/apache/airflow/pull/41776
    • [x] https://github.com/apache/airflow/pull/41778
    • [x] https://github.com/apache/airflow/pull/41779
    • [x] https://github.com/apache/airflow/pull/41780
    • [x] https://github.com/apache/airflow/pull/41808
  • @dirrao
    • [ ] https://github.com/apache/airflow/pull/40931
    • [ ] https://github.com/apache/airflow/pull/41096
    • [ ] https://github.com/apache/airflow/pull/41539
    • [ ] https://github.com/apache/airflow/pull/41496
    • [ ] https://github.com/apache/airflow/pull/41550
    • [ ] https://github.com/apache/airflow/pull/41552
    • [ ] https://github.com/apache/airflow/pull/41579
    • [ ] https://github.com/apache/airflow/pull/41609
    • [ ] https://github.com/apache/airflow/pull/41635
    • [ ] https://github.com/apache/airflow/pull/41642
    • [ ] https://github.com/apache/airflow/pull/41663
    • [ ] https://github.com/apache/airflow/pull/41693
    • [ ] https://github.com/apache/airflow/pull/41708
    • [ ] https://github.com/apache/airflow/pull/41748
    • [ ] https://github.com/apache/airflow/pull/41784
    • [ ] https://github.com/apache/airflow/pull/41910
    • [ ] https://github.com/apache/airflow/pull/42060
    • [ ] https://github.com/apache/airflow/pull/42088
    • [ ] https://github.com/apache/airflow/pull/42100
    • [ ] https://github.com/apache/airflow/pull/42126
    • [ ] https://github.com/apache/airflow/pull/42129
  • @kaxil
    • [ ] https://github.com/apache/airflow/pull/41390
    • [ ] https://github.com/apache/airflow/pull/41393
  • @dstandish
    • [ ] https://github.com/apache/airflow/pull/41440
  • @pierrejeambrun
    • [ ] https://github.com/apache/airflow/pull/41857
  • @jedcunningham
    • [ ] https://github.com/apache/airflow/pull/41964
  • @uranusjr
    • [ ] https://github.com/apache/airflow/pull/42054

Lee-W avatar Nov 19 '24 11:11 Lee-W

@kaxil @ashb would also like to confirm whether we're still allowing users to use models in airflow 3.0? If not, should we just skip all the changes related to models. Thanks

Lee-W avatar Nov 19 '24 11:11 Lee-W

I tried my hands on implementing a rule in Ruff. This one checks if a DAG uses the schedule argument explicitly, and errors if there’s no such argument (i.e. user is relying on the implicit default, which changes in 3.0), or a deprecated argument is used.

Does this look reasonable enough for people to build on? I’ll produce a more detailed writeup of what to do if we feel this is the way to go.

https://github.com/uranusjr/ruff/pull/1/files

uranusjr avatar Nov 19 '24 11:11 uranusjr

@kaxil @ashb would also like to confirm whether we're still allowing users to use models in airflow 3.0? If not, should we just skip all the changes related to models. Thanks

Which models? But no, the plan is to not have/"allow" users to import anything from airflow.models at all. Exact details and new names are to be determined yet though

ashb avatar Nov 19 '24 12:11 ashb

@kaxil https://github.com/apache/airflow/pull/41390 https://github.com/apache/airflow/pull/41393 https://github.com/apache/airflow/pull/41390

Duplicate entries for SubDAGs

kaxil avatar Nov 19 '24 13:11 kaxil

@kaxil #41390 #41393 #41390

Duplicate entries for SubDAGs

oops, just fixed!

Lee-W avatar Nov 19 '24 13:11 Lee-W

@kaxil @ashb would also like to confirm whether we're still allowing users to use models in airflow 3.0? If not, should we just skip all the changes related to models. Thanks

Which models? But no, the plan is to not have/"allow" users to import anything from airflow.models at all. Exact details and new names are to be determined yet though

Pretty much every model 👀 Sounds good. Just want to confirm I'm not misunderstanding anything. I'll just mark it as model change and not going to migrate for now till we have anything decided

Lee-W avatar Nov 19 '24 13:11 Lee-W