Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Covalent dispatcher requires all of a workflow's package dependencies. #748

Closed
cjao opened this issue Jun 30, 2022 · 9 comments · Fixed by #754
Closed

Covalent dispatcher requires all of a workflow's package dependencies. #748

cjao opened this issue Jun 30, 2022 · 9 comments · Fixed by #754
Assignees
Labels
Team West Covalent Team West

Comments

@cjao
Copy link
Contributor

cjao commented Jun 30, 2022

This issue goes hand in hand with the import dependencies issue #674.

The current implementation of Covalent dispatcher requires that the dispatcher's environment satisfies all of a workflow's package dependencies. However, different workflows may have possibly conflicting dependencies. Worse, some of their deps might conflict with Covalent's own dependencies. For instance, our own Quantum Chemistry tutorial is incompatible with the version of numpy required by Covalent. In addition, in a on-prem deployment with a central Covalent server it is impractical, not to mention really bad practice, for each end user to modify the server's environment; the server should ideally run in a lean and fixed environment.

To avoid this limitation, Covalent must process a workflow without unpickling any workflow-specific data. These include:

  • workflow functions
  • tasks (electrons)
  • inputs and outputs
    These must remain serialized in the server process. In particular, the server process must never try to directly execute any code submitted by the user, which it currently does during post-processing or building sublattice graphs.
  1. Create two conda environments: covalent-server, covalent-client and install Covalent in both.
  2. Start Covalent in covalent-server with the --no-cluster option.
  3. In covalent-client, install pandas.

The following snippet fails when run from covalent-client:

import covalent as ct
from covalent.executor import DaskExecutor
from dask.distributed import LocalCluster

import pandas as pd

def run_exp(dask_exec):
    # Construct tasks as "electrons"
    @ct.electron(executor=dask_exec)
    def create_arr():
        arr = pd.Series([1, 2, 3])
        return arr

    @ct.lattice(executor=dask_exec)
    def simple_workflow():

        return create_arr()
    # simple_workflow.set_metadata("workflow_executor", dask_exec)
    return ct.dispatch(simple_workflow)()

if __name__ == "__main__":
    cluster = LocalCluster()
    dask_exec = DaskExecutor(scheduler_address=cluster.scheduler_address)
    dispatch_id = run_exp(dask_exec)
    print(dispatch_id)
    res = ct.get_result(dispatch_id, wait=True)
    print(res.get_all_node_outputs())
    print("Result: ", res.result)
    print("Result status:", res.status)

covalent logs:

[2022-06-30 12:20:47,806] ERROR in app: Exception on /api/submit [POST]
Traceback (most recent call last):
  File "/var/home/casey/.conda/envs/covaenv/lib/python3.8/site-packages/flask/app.py", line 2070, in wsgi_app
    response = self.full_dispatch_request()
  File "/var/home/casey/.conda/envs/covaenv/lib/python3.8/site-packages/flask/app.py", line 1515, in full_dispatch_request
    rv = self.handle_user_exception(e)
  File "/var/home/casey/.conda/envs/covaenv/lib/python3.8/site-packages/flask_cors/extension.py", line 165, in wrapped_function
    return cors_after_request(app.make_response(f(*args, **kwargs)))
  File "/var/home/casey/.conda/envs/covaenv/lib/python3.8/site-packages/flask/app.py", line 1513, in full_dispatch_request
    rv = self.dispatch_request()
  File "/var/home/casey/.conda/envs/covaenv/lib/python3.8/site-packages/flask/app.py", line 1499, in dispatch_request
    return self.ensure_sync(self.view_functions[rule.endpoint])(**req.view_args)
  File "/var/home/casey/Agnostiq/code/covalent/covalent_dispatcher/_service/app.py", line 56, in submit
    result_object = pickle.loads(data)
  File "/var/home/casey/.conda/envs/covaenv/lib/python3.8/site-packages/cloudpickle/cloudpickle.py", line 679, in subimport
    __import__(name)
ModuleNotFoundError: No module named 'pandas'

Indeed, the Covalent server currently handles the raw (unserialized) workflow data in several places, and covalent-server doesn't have pandas.

@cjao
Copy link
Contributor Author

cjao commented Jun 30, 2022

The encoded-workflow-processing-exp branch (work in progress but basically complete modulo sublattices) eliminates the intermediate unpickling mentioned above; in particular, workflows are now submitted to the server purely using JSON-serialization. If you install that branch using pip install -e into covalent-server and uncomment the commented-out line, then the workflow runs successfully from covalent-client:

a4f10fd6-0c35-413e-86bc-86a30a438dfc
{'create_arr(0)': <covalent.TransportableObject object at 0x7f7ac5348a30>}
Result:  0    1
1    2
2    3
dtype: int64
Result status: COMPLETED

While a proper design doc is forthcoming, the basic idea is that all data related to the workflow is now decoded and processed in the covalent-client conda environment and not in covalent-server. For example, post-processing (which requires executing the workflow function) is offloaded to the Dask cluster running in covalent-client.

If one instead replaces the "workflow_executor" line with

simple_workflow.set_metadata("workflow_executor", "client")

then one gets the following output:

6cd6abd4-c99a-4c2f-b48f-db72c146407c
{'create_arr(0)': <covalent.TransportableObject object at 0x7ff139745970>}
Result:  None
Result status: PENDING_POSTPROCESSING

In this case, the dispatcher has halted workflow execution just before postprocessing. One can then postprocess the workflow "offline" (without the Covalent server running) by calling res.post_process():

0    1
1    2
2    3
dtype: int64

@santoshkumarradha
Copy link
Member

Hey @cjao thanks for the detailed issue. Is there any way to relax the assumption of client side get result having the same environment? Ideally we want those to be different (at-least just to look at the result).

I suspect we can somehow already get the "return" electron/type by post processing while preprocessing happens and just set the result to that. Maybe I am missing something. Thoughts @kessler-frost ?

@cjao
Copy link
Contributor Author

cjao commented Jul 1, 2022

Hi @santoshkumarradha , the workflow is constructed by the client, so the client environment is necessarily able to understand the type of the decoded result.

Perhaps you are referring to the possibility that the client environment changes after workflow submission. In that case, the client can still view a string or JSON representation of the result (if the result is JSON-serializable); results are encoded as TransportableObjects, which have grown two properties: object_string and json.

@santoshkumarradha
Copy link
Member

Indeed true it's in the client side, but need not be the same client nor have the same environment. But if there is a string representation, that's good, but I may be missing something, don't we need postprocessong to happen for us to know what a lattice result is even for getting the string representation ? If so, where is this post processing happening for the string ?

@santoshkumarradha
Copy link
Member

santoshkumarradha commented Jul 1, 2022 via email

@cjao
Copy link
Contributor Author

cjao commented Jul 1, 2022

The string representation of the workflow result is indeed computed during post-processing. However, the post-processing doesn't have to take place client-side. The only requirement is that the post-processing environment satisfies the dependencies of the workflow. For instance, one can perform the post-processing on a different computer; internally, post-processing simply becomes another task that can be run using any executor.

@kessler-frost
Copy link
Member

kessler-frost commented Jul 1, 2022

Is there any way to relax the assumption of client side get result having the same environment? Ideally we want those to be different (at-least just to look at the result).

@santoshkumarradha So, to the best of my understanding, the only assumption on the client side is that it should have sufficient packages installed to understand each node's result and the final lattice's result; it is not necessary for the client to have all the packages installed that were used during the actual execution, even if it is doing post-processing on the client-side (assuming there are no non-electron executions happening inside the lattice definition).

Is that what you were asking or am I misunderstanding something?

@cjao
Copy link
Contributor Author

cjao commented Jul 1, 2022

To elaborate on @kessler-frost's reply:

There are several types of dependencies:

  1. Deps needed to run a task
  2. Deps needed to interpret each task's output.
  3. Other workflow deps; for instance, if the workflow accepts a Pandas Dataframe as input, then the workflow function would require Pandas even if no statements in the body of the workflow depend on Pandas.

Each class of deps is dealt with differently.

  1. The first is already handled by the recent addition of the wrapper_fn during execution.
  2. The linked PR handles the second type by modifying the wrapper_fn to serialize the output before passing it back to the dispatcher.
  3. Finally, the third type is also handled in the PR by ensuring that the dispatcher never deserializes the workflow function or workflow-level inputs.

In develop, the dispatcher unpickles workflow data in several places:

  • Post-processing
  • Building sublattice transport graphs.
  • When receiving a workflow

The first two steps are now moved outside the server process and can be run using any executor whose environment satisfies the dependencies. As for the last point, the PR modifies the SDK so that the server can reconstruct a Lattice purely using json.loads().

@cjao
Copy link
Contributor Author

cjao commented Jul 2, 2022

Design doc

Terms

  • Workflow function
  • Workflow input -- arguments passed when dispatching the workflow
  • Workflow output -- return value of the workflow function
  • Task
  • Task input
  • Task output
  • Task dependencies -- required to execute a task.
  • Workflow-level dependencies -- required to step through the workflow function while substituting each task call with the task output. For instance, if a task returns a Pandas dataframe, then Pandas would be a workflow-level dependency.

Rationale

The basic design of Covalent already allows the dispatcher to manage a workflow without being fully aware of the workflow data; the essential information is the input-output relationships between tasks, in the words, the structure of the transport graph and not the raw contents of each task.

To actually process a workflow without requiring its dependencies, the Covalent server must observe two basic principles:

  • It must not handle any raw (unserialized) data types during workflow processing.
  • It must not unpickle any data submitted by users. Unpickling not only requires that all the dependencies of the underlying objects be satisfied but also could destabilize the server; the Python pickle documentation warns:

The pickle module is not secure. Only unpickle data you trust.

It is possible to construct malicious pickle data which will execute arbitrary code during unpickling. > Never unpickle data that could have come from an untrusted source, or that could have been tampered with.

Safer serialization formats such as json may be more appropriate if you are processing untrusted data. See Comparison with json.

Presently, the Covalent server violates the first principle in several ways:

  1. The server processes the raw inputs and outputs of each task.
  2. The server executes a lattice's workflow function, both during postprocessing and when building sublattice workflow graphs.

It also unpickles data during several steps of the workflow processing pipeline:

  1. Upon receiving a workflow; the SDK submits a pickled result object which is unpickled by the dispatcher.
  2. Dispatching sublattices

The proposed change moves all unpickling and pickling out of the server process. In addition, the server process only handles data types defined by Covalent:

  • Structures describing a workflow, such as electrons and lattices, and the TransportGraph
  • A single type representing serialized data. Precisely, any user-defined data which could be of arbitrary type is encoded in a TransportableObject. When an object is encoded as a TransportableObject, its string representation is stored in TransportableObject.object_string.

Main modifications

  • Executors now accept serialized inputs, including the underlying callable of a task, its inputs, and any call_before/call_after deps. In addition, they return output encoded as a TransportableObject. This is achieved by a wrapper function which performs the following steps in the executor's environment:

    • Deserialize the task callable and task inputs.
    • Invoke the callable.
    • Serialize its return value.
      Since the dispatcher now only shuttles TransportableObjects between tasks, some magic methods invoked during workflow construction require special handling. For instance Electron.__getitem__ was previously converted to a simple dictionary key access output[key] during workflow execution. However, this expression no longer makes sense if output or even key are TransportableObjects. The following magic methods on Electrons were adjusted to work with encoded inputs: __getitem__, __getattr__, __iter__, __add__, __sub__, __mul__, __div__.
  • A consequence of the above is that the workflow result is returned and stored in Result._result as a TransportableObject. Calling Result.result automatically attempts to deserialize the result; the new property Result.encoded_result returns the encoded result.

  • The UI displays TransportableObjects using their their JSON form (if the underlying object is JSON-serializable) or their string representations (TransportableObject(obj) stores str(obj) together with the base64-encoded, pickled form of obj)

  • Lattices always store their underlying workflow function in serialized form (a TransportableObject). During build_graph, or when the lattice is called as a regular Python function, the workflow inputs are passed to a deserialized copy of the workflow function. All inputs are serialized before storing them in the lattice or transport graph nodes.

  • Postprocessing, which involves deserializing and executing the workflow function on the deserialized workflow inputs, becomes a task to be run in a designated executor whose environment satisfies the workflow dependencies. Lattices now expose a workflow_executor property analogous to the existing executor property. In addition to the usual executors, workflow_executor supports a special value called "client". Setting workflow_executor="client" causes the dispatcher to halt workflow execution just before preprocessing and set the workflow status to Pending Postprocessing. The client can take the Result, which contains only encoded node outputs, and postprocess the workflow client-side by calling Result.post_process(). This returns the deserialized workflow result.

  • Since postprocessing is now decoupled from the main workflow execution, the Result has acquired some new statuses: POSTPROCESSING, PENDING_POSTPROCESSING, FAILED_POSTPROCESSING. The last one would be encountered if the workflow_executor specified by the user doesn't actually satisfy the workflow dependencies, or if the postprocessing fails for any other reason; for instance, a remote executor might time out if the workflow contains a long sleep() statement. Whatever the reason, one can still reattempt postprocessing client-side.

  • Sublattices are also deserialized and built using the workflow_executor. They are presently handled by deserializing the sublattice and calling dispatch_sync from the Covalent server. This issue is a bit more involved than postprocessing since the server needs the sublattice transport graph to execute the sublattice. If the graph-building executor were to simply return the transport graph as a TransportableObject, the Covalent server would need to unpickle the return value and violate the second Basic Principle. This is robustly handled by the next change:

  • For all intents and purposes, lattices can now be reconstructed from a JSON string purely using json.loads(). This is accomplished by teaching TransportGraph, and associated metadata objects (Deps and Executors), to serialize themselves to JSON and rehydrate themselves from JSON using json.loads(). This task is made easier by the fact that we are serializing most non-JSONable data as TransportableObjects, whose data attributes consist entirely of strings. The main benefits of this change are:

    • An external process to build sublattice graphs can return the sublattice as a JSON string. The Covalent server can then rehydrate the sublattice using json.loads, which unlike pickle.loads doesn't involve arbitrary code execution or require any external dependencies. The sublattice can then be dispatched in the same process (or even the same thread if one uses an asyncio event loop), all without any web API calls.
    • The /api/submit endpoint now loads user-submitted workflows purely using json.loads. The client SDK has been updated to submit a JSON-serialized lattice after building the graph.

Assuming that a workflow doesn't use the local executor, the Covalent server no longer unpickles any data.

Some Implications

  • Each executor needs to understand TransportableObject. This presently means a dependency on Covalent. An easy mitigation would be to split the PyPI Covalent package into covalent-server and a covalent-sdk, the latter of which would only contain core data types. The TransportableObject abstraction allows us to extend the data encoding scheme in the future,

  • @FyzHsn The ongoing work to persist Result using the new data store is essentially unaffected and maybe even simplified a bit using the Transport Graph's new method to serialize itself to JSON. For instance,

json.loads(lattice.transport_graph.serialize_to_json())

returns the transport graph in node-link form, with the additional benefit that all metadata objects are in their JSON representations. Thus one can save and load the data using json.dump and json.load.

  • Wrapping the task output in a TransportableObject before returning from the executor should in principle allow us to side-step issues with Dask's internal serialization protocol, as Dask itself would only need to serialize and deserialize TransportableObject, which is essentially a dictionary of text strings. Thus we can concentrate all efforts on TransportableObject's own serialization protocol (currently cloudpickle + base64 encoding).

Status

  • Implementation is complete to the best of my knowledge.
  • All existing tests pass. So the implementation at least doesn't introduce obvious regressions as a drop-in replacement. Some unit tests for the new JSON-serialization/deserialization code have been added.
  • I have successfully run both the Astronomy and MNIST tutorials, both from the same conda environment as the Covalent server and from a different environment. By the latter I mean the following:
    1. Create two conda environments covalent-server and covalent-client, and install the PR branch in each using pip install -e
    2. Start Covalent from conda-server with the --no-cluster option since we want stuff to run out-of-process whenever possible. In covalent-client, install the dependencies for the tutorial (such as PyTorch). Run the tutorial from covalent-client using a Dask cluster running in that environment for both executor and workflow_executor (the latter is set on the lattice).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Team West Covalent Team West
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants