Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement enough interface for MappedOperator to be baggable #20945

Merged
merged 11 commits into from
Jan 24, 2022

Conversation

uranusjr
Copy link
Member

@uranusjr uranusjr commented Jan 19, 2022

The tests are changed to actually bag mapped tasks so we can test they are actually compatible.

@@ -34,9 +41,115 @@
ID_LEN = 250


# used for typing
class Operator:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Anyone else find it odd that Operator is "lower level" than BaseOperator 😁

@uranusjr uranusjr force-pushed the mapped-operator-bag branch 3 times, most recently from c1fd8bc to 20c98a3 Compare January 19, 2022 10:06
template_ext: Sequence[str] = ()
# Template field renderers indicating type of the field, for example sql, json, bash
# Implementing Operator.
template_fields: Collection[str] = ()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why change from a collection to a sequence? (This might cause knock-on impacts for typing of all other operators)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It’s the other way around, from Sequence to a Collection; Collection is looser than Sequence and should work in subclasses (I think; we can always change this back).

@uranusjr
Copy link
Member Author

uranusjr commented Jan 20, 2022

Hmm, #20931 still does not correctly establish task dependencies, we’re missing something.

Oh that’s a simple yet difficult-to-spot oversight. Everything should be working now!

@uranusjr uranusjr marked this pull request as ready for review January 20, 2022 07:28
@uranusjr uranusjr requested a review from ashb January 20, 2022 07:28
@uranusjr
Copy link
Member Author

As an unrelated discovery, we can sort of trace what we need to check/change to account for MappedOperator by adding annotation to DAG.add_task() and DAG.add_tasks(). It cascades into a nice chain of errors that alerts us what originally can only contain BaseOperator now may also contain MappedOperator and may need to be modified.

# partial_kwargs-populated values e.g. 'queue' below, but we
# must match BaseOperator's implementation and declare them
# as writable attributes instead.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could choose to make them writable properties that affect the slot in partial_kwargs couldn't we?

    @weight_rule.setter
    def _set_weigh_rule(self, value):
        self.partial_kwargs['weight_rule'] = value

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm yeah theoratically (and maybe also a deleter for Mypy). I’ll investigate.

Copy link
Member Author

@uranusjr uranusjr Jan 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uh, Mypy is not smart enough unfortunately python/mypy#4125.

We can’t use the Protocol workaround mentioned in the issue either because BaseOperator has a custom metaclass, while Protocol is also implemented with metaclass, and the two would conflict.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Type ignore it is the other option?

@github-actions github-actions bot added the full tests needed We need to run full set of tests for this PR to merge label Jan 21, 2022
@github-actions
Copy link

The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.

@uranusjr
Copy link
Member Author

(Documenting for myself on Monday) This is currently failing with

   tests/models/test_taskinstance.py:2288: 
  _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
  airflow/utils/session.py:71: in wrapper
      return func(*args, session=session, **kwargs)
  airflow/models/taskinstance.py:1621: in run
      session=session,
  airflow/utils/session.py:68: in wrapper
      return func(*args, **kwargs)
  airflow/models/taskinstance.py:1194: in check_and_change_state_before_execution
      dep_context=non_requeueable_dep_context, session=session, verbose=True
  airflow/utils/session.py:68: in wrapper
      return func(*args, **kwargs)
  airflow/models/taskinstance.py:1031: in are_dependencies_met
      for dep_status in self.get_failed_dep_statuses(dep_context=dep_context, session=session):
  airflow/models/taskinstance.py:1052: in get_failed_dep_statuses
      for dep_status in dep.get_dep_statuses(self, session, dep_context):
  airflow/ti_deps/deps/base_ti_dep.py:95: in get_dep_statuses
      yield from self._get_dep_statuses(ti, session, dep_context)
  _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
  
  self = <TIDep(Previous Dagrun State)>
  ti = <[DetachedInstanceError('Instance <TaskInstance at 0x7fefa43f9850> is not bound to a Session; attribute refresh operation cannot proceed') raised in repr()] TaskInstance object at 0x7fefa43f9850>
  session = <sqlalchemy.orm.session.Session object at 0x7fefd5d5f050>
  dep_context = <airflow.ti_deps.dep_context.DepContext object at 0x7fefa4430050>
  
      @provide_session
      def _get_dep_statuses(self, ti, session, dep_context):
          if dep_context.ignore_depends_on_past:
              reason = "The context specified that the state of past DAGs could be ignored."
              yield self._passing_status(reason=reason)
              return
      
  >       if not ti.task.depends_on_past:
  E       AttributeError: 'MappedOperator' object has no attribute 'depends_on_past'
  
  airflow/ti_deps/deps/prev_dagrun_dep.py:41: AttributeError

@ashb
Copy link
Member

ashb commented Jan 22, 2022

We also need wait_for_downstream in a similar vein for the scheduler - but that can be added in a separate pr

@uranusjr
Copy link
Member Author

depends_on_past depends on wait_for_downstream, so I implemented them both.

@ashb
Copy link
Member

ashb commented Jan 24, 2022

Only failure is an actual timing error:

test_heartbeat_failed_fast: assert 0.5497589999999999 < 0.5

Good to go now!

@uranusjr uranusjr merged commit ff3bbc3 into apache:main Jan 24, 2022
@uranusjr uranusjr deleted the mapped-operator-bag branch January 24, 2022 13:52
@jedcunningham jedcunningham added the changelog:skip Changes that should be skipped from the changelog (CI, tests, etc..) label Mar 1, 2022
@jedcunningham jedcunningham added this to the Airflow 2.3.0 milestone Apr 26, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:dynamic-task-mapping AIP-42 changelog:skip Changes that should be skipped from the changelog (CI, tests, etc..) full tests needed We need to run full set of tests for this PR to merge
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants