Skip to content

fix: allow calling Actor.reboot() from migrating handler, align reboot behavior with JS SDK #361

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/03-concepts/04-actor-events.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ During its runtime, the Actor receives Actor events sent by the Apify platform o
{' '}to another worker server soon.</p>
You can use it to persist the state of the Actor so that once it is executed again on the new server,
it doesn't have to start over from the beginning.
Once you have persisted the state of your Actor, you can call <a href="../../reference/class/Actor#reboot"><code>Actor.reboot()</code></a>
to reboot the Actor and trigger the migration immediately, to speed up the process.
</td>
</tr>
<tr>
Expand Down
25 changes: 22 additions & 3 deletions src/apify/_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import os
import sys
from datetime import timedelta
from itertools import chain
from typing import TYPE_CHECKING, Any, Callable, TypeVar, cast

from lazy_object_proxy import Proxy
Expand All @@ -13,7 +14,7 @@
from apify_shared.consts import ActorEnvVars, ActorExitCodes, ApifyEnvVars
from apify_shared.utils import ignore_docs, maybe_extract_enum_member_value
from crawlee import service_container
from crawlee.events._types import Event, EventPersistStateData
from crawlee.events._types import Event, EventMigratingData, EventPersistStateData

from apify._configuration import Configuration
from apify._consts import EVENT_LISTENERS_TIMEOUT
Expand Down Expand Up @@ -48,6 +49,7 @@ class _ActorType:
_apify_client: ApifyClientAsync
_configuration: Configuration
_is_exiting = False
_is_rebooting = False

def __init__(
self,
Expand Down Expand Up @@ -839,12 +841,29 @@ async def reboot(
self.log.error('Actor.reboot() is only supported when running on the Apify platform.')
return

if self._is_rebooting:
self.log.debug('Actor is already rebooting, skipping the additional reboot call.')
return

self._is_rebooting = True

if not custom_after_sleep:
custom_after_sleep = self._configuration.metamorph_after_sleep

self._event_manager.emit(event=Event.PERSIST_STATE, event_data=EventPersistStateData(is_migrating=True))
# Call all the listeners for the PERSIST_STATE and MIGRATING events, and wait for them to finish.
# We can't just emit the events and wait for all listeners to finish,
# because this method might be called from an event listener itself, and we would deadlock.
persist_state_listeners = chain.from_iterable(
(self._event_manager._listeners_to_wrappers[Event.PERSIST_STATE] or {}).values() # noqa: SLF001
)
migrating_listeners = chain.from_iterable(
(self._event_manager._listeners_to_wrappers[Event.MIGRATING] or {}).values() # noqa: SLF001
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might be missing something, but the chain.from_iterable doesn't seem to have a purpose here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does, _listeners_to_wrappers is a dict of dicts of lists of wrappers (because technically you can have one listener wrapped multiple times, even though it doesn't make practical sense).

Imagine this:

def my_event_listener(event_data):
    print(event_data)

Actor.on(Event.MIGRATING, my_event_listener)
Actor.on(Event.MIGRATING, my_event_listener)

Then _listeners_to_wrappers looks like this:

{
    Event.MIGRATING: {
        event_listener: [
            wrapper_for_my_event_listener_1,
            wrapper_for_my_event_listener_2,
        ],
    }
}

So _listeners_to_wrappers[Event.PERSIST_STATE]).values() is a list of lists, and with chain.from_iterable() I flatten it to a single-level list.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Huh, thanks. SDK already depends on Crawlee and that depends on more_itertools which contain flatten - could you add that dependency to SDK as well and use that instead? I know it serves no practical purpose other than readability.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will do.


await self._event_manager.__aexit__(None, None, None)
await asyncio.gather(
*[listener(EventPersistStateData(is_migrating=True)) for listener in persist_state_listeners],
*[listener(EventMigratingData()) for listener in migrating_listeners],
)

if not self._configuration.actor_run_id:
raise RuntimeError('actor_run_id cannot be None when running on the Apify platform.')
Expand Down
Loading