Package to allow observability of SSIS and MSSQL jobs
This simplifies the tracking of SSIS and MSSQL jobs in SQL Server to use as upstream dependencies for other dagster assets.
This builds assets to track the successful completion of SSIS and MSSQL jobs in a SQL instance.
You need the ODBC drivers installed on the machine that is running the dagster pipeline.
See Microsoft's documentation for more information.
For SSIS the user must be a member of the role ssis_logreader
or have permissions to read from the catalog.executions
view.
For MSSQL Jobs the user must be able to select from msdb.dbo.sysjobs
and msdb.dbo.sysjobhistory
.
The Easiest method is to grant SELECT
on these two tables as the default roles grant much higher levels of privilage then required.
Create an instance of the SQLServerResource
to connect to the database. This is then used in the sensor to check for completion of a job/task.
Defaults to mssql+pyodbc
but this can be changed with the py_driver
prop.
from dagster_ssis import SQLServerResource
my_db_resource = SQLServerResource(
host='localhost',
database='MyDB',
username='...',
password='...',
query_props={
"driver": "ODBC Driver 18 for SQL Server",
"TrustServerCertificate": "yes",
}
)
Creates an asset to represent an SSIS Package and the assets (tables, etc.) that are generated by it.
This captures executions outside of dagster, but reports events to be triggered and responded to by downstream assets, such as dbt.
Two primary methods to use the assets.
The asset key will be Folder + Project Name + Package Name
.
# Create a single SSIS Asset
ssis_asset = SSISAsset(
project_name='Project',
package_name='Package.dtsx'
)
# get a list with a single asset spec for the SSIS package
asset_spec = ssis_asset.asset_specs
# Create a single SSIS Asset representing multiple other assets, such as table in MSSQL, or a stored proceedure
# This pattern is good if the asset should be defined differently then the package itself.
table_assets = [
AssetSpec(key='my_table'), AssetSpec(key='my_other_table')
]
ssis_asset = SSISAsset(
project_name='Project',
package_name='Package.dtsx',
asset_list=table_assets
)
# get the list of all the assets, including the package asset
asset_spec = ssis_asset.asset_specs
# using helper function `build_ssis_assets`, produce the same as above
# this assigns the key of the sub assets to the ssis path
# Good for composing the ssis package and all related assets together
ssis_asset = build_ssis_assets(
project_name='Project',
package_name='Package.dtsx',
asset_list=['my_table', 'my_other_table']
)
# get the list of all the assets, including the package asset
asset_spec = ssis_asset.asset_specs
To pass anything along to the asset spec created, pass a dictionary of arguments to asset_spec_kwargs
. These will be sent along to the AssetSpec
for the asset, or sub assets.
ssis_asset = build_ssis_assets(
project_name='Project',
package_name='Package.dtsx',
asset_list=['my_table', 'my_other_table'],
asset_spec_kwargs={
'owner': 'someone',
'skippable': True,
'metadata': {
'my_meta': 'data'
}
}
)
To report the materialization events, a sensor can be used to check the ssisdb
for completed or successful events from the catalog.executions
view.
You will need a resource defined that connects to the database and exposes a connect
function, which returns a SQLAlchemy connection object.
this can be created with build_ssis_asset_sensor
Continuing from the example above
ssis_sensor = build_ssis_asset_sensor(
ssis_assets=[ssis_asset],
sensor_name='ssis_sensor',
database_resource_key='my_db_resource'
)
To represent the execution of a MSSQL Job, it is defined through a MSSQLJobSpec.
The user must be able to select from msdb.dbo.sysjobs
and msdb.dbo.sysjobhistory
Easiest method is to grant SELECT
on these two tables.
Two primary methods to use the assets.
The asset key will either be MSSQLJob + Job Name
or if using the helper function, MSSQLJob + Job Name + Job Name
to allow other assets to sit along side it.
# single asset
job_asset = MSSQLJobAsset(
job_name='job_name',
)
# or adding specific specs to capture alongside
job_asset = MSSQLJobAsset(
job_name='job_name', asset_list=[AssetSpec('a'), AssetSpec('b')]
)
# single asset from helper with child assets
job_asset = build_mssql_job_assets(
"job_name", asset_list=["other_asset"]
)
As above, you can also pass the keyword args to the AssetSpec using asset_spec_kwargs
To report the materialization events, a sensor can be used to check the msdb
for completed or successful events from the dbo.sysjobhistories
table.
You will need a resource defined that connects to the database and exposes a connect
function, which returns a SQLAlchemy connection object.
The sensor can be created with build_mssql_job_asset_sensor
See the file examples/full_example.py for a complete example.