Encapsulating secrets in Ressource Managers #4072
-
Resource managers are a fairly recent (#2913) but very useful type of task template for managing things like API sessions or database connections (context managers are a great feature of Python and I would miss not being able to use them in flows). My most common use case for these would be something like this (which doesn't even work, given resource managers are not actually tasks): shared between flows@resource_manager
class PostgresConnection:
def __init__(self, dsn: str, username: str, password: str):
self.dsn = dsn
self.username = username
self.password = password
def setup(self):
conn = psycopg2.connect(self.dsn, user=self.username, password=self.password)
return conn
def cleanup(self, conn):
conn.close() individual flowwith Flow("flow-with-database") as flow:
pg_db1_dsn = Secret('pg-db1-dsn')
pg_db1_username = Secret('pg-db1-username')
pg_db1_password = Secret('pg-db1-password')
with PostgresConnection(pg_db1_dsn, pg_db1_username, pg_db1_password) as pg_conn:
pass Since I'm always connecting to the same databases and these secrets always go together, I'm asking if I couldn't encapsulate them in a resource manager specifically for that instead: shared between flows@resource_manager
class PgDb1Connection:
def __init__(self, dsn: str, username: str, password: str):
self.dsn = Secret('pg-db1-dsn')
self.username = Secret('pg-db1-username')
self.password = Secret('pg-db1-password')
def setup(self):
conn = psycopg2.connect(self.dsn, user=self.username, password=self.password)
return conn
def cleanup(self, conn):
conn.close() individual flowwith Flow("flow-with-database") as flow:
with PgDb1Connection() as pg1_conn:
pass
with PgDb2Connection() as pg2_conn:
pass
with OracleDb1Connection() as oci1_conn:
pass Now, naturally, since Secrets are tasks, they can't run inside another task, they have to run at the "surface level" to do anything, so that wouldn't work, but what would be a practical/clean/idiomatic way of doing this in Prefect? |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments
-
Having attempted a number of things and looked into the documentation further, it seems like
With that said, the warning notice in the |
Beta Was this translation helpful? Give feedback.
-
Right, so it seems like my entire premise was misguided. You can encapsulate secrets ( Like the doc says, "This subtle and technical constraint actually informs a lot of design decisions in the Task library (as we will see shortly).", which explains why built-in tasks like To answer my own question, the correct way of doing this in Prefect is to never have an active object like a connection outlive a task. Everything that gets passed around tasks should be pickleable or serializable in some way. Encapsulating secrets inside tasks is fine if you don't want to see them on the DAG (the errors will still pop up in the logs for that task), but databases are just a bad use for resource managers. Ad hoc examplefrom prefect.client import Secret
@task
def adhoc_insert_and_get_ids(inserted_data):
# If any of these values are missing, the line will throw
dsn = Secret('pg-db1-dsn') .get()
username = Secret('pg-db1-username').get()
password = Secret('pg-db1-password').get()
try:
conn = psycopg2.connect(dsn, user=username , password=password)
with conn.transaction():
... # Insert rows
... # Get ids
return id_list
finally:
conn.close()
with Flow('foo'):
adhoc_insert_and_get_ids(['foo', 'bar', 'baz']) Example with secret parametersfrom prefect.tasks.secrets import PrefectSecret
@task
def insert_and_get_ids(dsn, username, password, inserted_data)
try:
conn = psycopg2.connect(dsn, user=username , password=password)
with conn.transaction():
... # Insert rows
... # Get ids
return id_list
finally:
conn.close()
with Flow('foo'):
dsn = PrefectSecret('pg-db1-dsn')
username = PrefectSecret('pg-db1-username')
password = PrefectSecret('pg-db1-password')
adhoc_insert_and_get_ids(dsn , username , password , ['foo', 'bar', 'baz']) |
Beta Was this translation helpful? Give feedback.
Right, so it seems like my entire premise was misguided. You can encapsulate secrets (
prefect.client.Secret
) inside any task just as well as you can have secret tasks (prefect.tasks.secrets.PrefectSecret
) that feed into other tasks, and resource managers are no different from any other task. The problem is that using resource managers for non-serializable results (like API sessions or database connections) will run into the same problems as having non-serializable results for any other task: they can't be passed between tasks running on different processes so the flow won't work on aDaskExecutor
set to use processes.Like the doc says, "This subtle and technical constraint actually infor…