Skip to content

cody-scott/dagster-ssis

Repository files navigation

dagster-ssis

Package to allow observability of SSIS and MSSQL jobs

This simplifies the tracking of SSIS and MSSQL jobs in SQL Server to use as upstream dependencies for other dagster assets.

This builds assets to track the successful completion of SSIS and MSSQL jobs in a SQL instance.

Requirements

Drivers

You need the ODBC drivers installed on the machine that is running the dagster pipeline.

See Microsoft's documentation for more information.

Permissions

For SSIS the user must be a member of the role ssis_logreader or have permissions to read from the catalog.executions view.

For MSSQL Jobs the user must be able to select from msdb.dbo.sysjobs and msdb.dbo.sysjobhistory. The Easiest method is to grant SELECT on these two tables as the default roles grant much higher levels of privilage then required.

Resources

Create an instance of the SQLServerResource to connect to the database. This is then used in the sensor to check for completion of a job/task.

Defaults to mssql+pyodbc but this can be changed with the py_driver prop.

from dagster_ssis import SQLServerResource

my_db_resource = SQLServerResource(
    host='localhost',
    database='MyDB',
    username='...',
    password='...',
    query_props={
        "driver": "ODBC Driver 18 for SQL Server",
        "TrustServerCertificate": "yes",
    }
)

SSIS

Creates an asset to represent an SSIS Package and the assets (tables, etc.) that are generated by it.

This captures executions outside of dagster, but reports events to be triggered and responded to by downstream assets, such as dbt.

SSISAsset Usage

Two primary methods to use the assets. The asset key will be Folder + Project Name + Package Name.

# Create a single SSIS Asset
ssis_asset = SSISAsset(
    project_name='Project',
    package_name='Package.dtsx'
)

# get a list with a single asset spec for the SSIS package
asset_spec = ssis_asset.asset_specs
# Create a single SSIS Asset representing multiple other assets, such as table in MSSQL, or a stored proceedure
# This pattern is good if the asset should be defined differently then the package itself.

table_assets = [
    AssetSpec(key='my_table'), AssetSpec(key='my_other_table')
]

ssis_asset = SSISAsset(
    project_name='Project',
    package_name='Package.dtsx',
    asset_list=table_assets
)
# get the list of all the assets, including the package asset
asset_spec = ssis_asset.asset_specs
# using helper function `build_ssis_assets`, produce the same as above
# this assigns the key of the sub assets to the ssis path

# Good for composing the ssis package and all related assets together
ssis_asset = build_ssis_assets(
    project_name='Project',
    package_name='Package.dtsx',
    asset_list=['my_table', 'my_other_table']
)

# get the list of all the assets, including the package asset
asset_spec = ssis_asset.asset_specs

To pass anything along to the asset spec created, pass a dictionary of arguments to asset_spec_kwargs. These will be sent along to the AssetSpec for the asset, or sub assets.

ssis_asset = build_ssis_assets(
    project_name='Project',
    package_name='Package.dtsx',
    asset_list=['my_table', 'my_other_table'],
    asset_spec_kwargs={
        'owner': 'someone',
        'skippable': True,
        'metadata': {
            'my_meta': 'data'
        }
    }
)

Sensor

To report the materialization events, a sensor can be used to check the ssisdb for completed or successful events from the catalog.executions view.

You will need a resource defined that connects to the database and exposes a connect function, which returns a SQLAlchemy connection object.

this can be created with build_ssis_asset_sensor

Continuing from the example above

ssis_sensor = build_ssis_asset_sensor(
    ssis_assets=[ssis_asset],
    sensor_name='ssis_sensor',
    database_resource_key='my_db_resource'
)

Jobs

To represent the execution of a MSSQL Job, it is defined through a MSSQLJobSpec.

The user must be able to select from msdb.dbo.sysjobs and msdb.dbo.sysjobhistory Easiest method is to grant SELECT on these two tables.

MSSQL Job Asset

Two primary methods to use the assets. The asset key will either be MSSQLJob + Job Name or if using the helper function, MSSQLJob + Job Name + Job Name to allow other assets to sit along side it.

# single asset
job_asset = MSSQLJobAsset(
    job_name='job_name',
)

# or adding specific specs to capture alongside
job_asset = MSSQLJobAsset(
    job_name='job_name', asset_list=[AssetSpec('a'), AssetSpec('b')]
)
# single asset from helper with child assets
job_asset = build_mssql_job_assets(
    "job_name", asset_list=["other_asset"]
)

As above, you can also pass the keyword args to the AssetSpec using asset_spec_kwargs

MSSQL Job Sensor

To report the materialization events, a sensor can be used to check the msdb for completed or successful events from the dbo.sysjobhistories table.

You will need a resource defined that connects to the database and exposes a connect function, which returns a SQLAlchemy connection object.

The sensor can be created with build_mssql_job_asset_sensor

Complete Example

See the file examples/full_example.py for a complete example.

About

Package to allow observability of SSIS and MSSQL jobs

Resources

License

Stars

Watchers

Forks

Packages

No packages published