Skip to content

Commit

Permalink
Improve the documentation and add MySQL external source (#11)
Browse files Browse the repository at this point in the history
* Improve the README and add examples
* Add the MySQL external source in the Python binding
  • Loading branch information
fvaleye committed Jan 13, 2022
1 parent 2d9edd1 commit ba6706c
Show file tree
Hide file tree
Showing 34 changed files with 847 additions and 86 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 5 additions & 4 deletions README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ Using Rust, it makes blazing fast multi-regex matching.
- Deltalake
- GCP: BigQuery
- Snowflake
- Kafka Schema Registry

== Data Rules
The available data rules are: *https://github.com/fvaleye/metadata-guardian/blob/main/python/metadata_guardian/rules/pii_rules.yaml[PII]* and *https://github.com/fvaleye/metadata-guardian/blob/main/python/metadata_guardian/rules/inclusion_rules.yaml[INCLUSION]*. But it aims to be extended with custom data rules that could serve multiple purposes (for example: detect data that may contain IA biais, detect credentials...).
The available data rules are here: *https://github.com/fvaleye/metadata-guardian/blob/main/python/metadata_guardian/rules/pii_rules.yaml[PII]* and *https://github.com/fvaleye/metadata-guardian/blob/main/python/metadata_guardian/rules/inclusion_rules.yaml[INCLUSION]*. But it aims to be extended with custom data rules that could serve multiple purposes (for example: detect data that may contain IA biais, detect credentials...).

== Where to get it

Expand All @@ -35,12 +36,12 @@ pip install 'metadata_guardian[all]'
```

```sh
# Install with one data source
pip install 'metadata_guardian[snowflake,avro,aws,gcp,deltalake]'
# Install with one metadata source in the list
pip install 'metadata_guardian[snowflake,avro,aws,gcp,deltalake,kafka_schema_registry]'
```

== Licence
https://raw.githubusercontent.com/fvaleye/metadata-guardian/main/LICENSE.txt[Apache License 2.0]

== Documentation
The documentation is hosted here: https://fvaleye.github.io/metadata-guardian/python/
The documentation is hosted here: https://fvaleye.github.io/metadata-guardian/python/
2 changes: 1 addition & 1 deletion python/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "metadata_guardian-python"
version = "0.1.0"
version = "0.1.1"
authors = ["Florian Valeye <fvaleye@github.com>"]
homepage = "https://fvaleye.github.io/metadata-guardian/python"
license = "Apache-2.0"
Expand Down
4 changes: 2 additions & 2 deletions python/docs/source/installation.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ Using Pip
# Install all the metadata sources
pip install 'metadata_guardian[all]'
# Install one metadata source in the list
pip install 'metadata_guardian[snowflake,avro,aws,gcp,deltalake,devel]'
# Install with one metadata source in the list
pip install 'metadata_guardian[snowflake,avro,aws,gcp,deltalake,kafka_schema_registry]'
42 changes: 30 additions & 12 deletions python/docs/source/usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,15 @@ Usage
Metadata Guardian
-----------------

Scan the column names of a local source:
**Workflow:**

>>> from metadata_guardian import DataRules, ColumnScanner, AvailableCategory
>>> from metadata_guardian.source import ParquetSource
>>>
>>> data_rules = DataRules.from_available_category(category=AvailableCategory.PII)
>>> source = ParquetSource("file.parquet")
>>> column_scanner = ColumnScanner(data_rules=data_rules)
>>> report = column_scanner.scan_local(source)
>>> report.to_console()
1. Create the Data Rules
2. Create the Metadata Source
3. Scan the Metadata Source
4. Analyze the reports

Scan an external Metadata Source
--------------------------------

Scan the column names of a external source on a table:

Expand All @@ -23,7 +22,8 @@ Scan the column names of a external source on a table:
>>> data_rules = DataRules.from_available_category(category=AvailableCategory.PII)
>>> source = SnowflakeSource(sf_account="account", sf_user="sf_user", sf_password="sf_password", warehouse="warehouse", schema_name="schema_name")
>>> column_scanner = ColumnScanner(data_rules=data_rules)
>>> report = column_scanner.scan_external(source, database_name="database_name", table_name="table_name", include_comment=True)
>>> with source:
>>> report = column_scanner.scan_external(source, database_name="database_name", table_name="table_name", include_comment=True)
>>> report.to_console()

Scan the column names of a external source on database:
Expand All @@ -34,7 +34,8 @@ Scan the column names of a external source on database:
>>> data_rules = DataRules.from_available_category(category=AvailableCategory.PII)
>>> source = SnowflakeSource(sf_account="account", sf_user="sf_user", sf_password="sf_password", warehouse="warehouse", schema_name="schema_name")
>>> column_scanner = ColumnScanner(data_rules=data_rules)
>>> report = column_scanner.scan_external(source, database_name="database_name", include_comment=True)
>>> with source:
>>> report = column_scanner.scan_external(source, database_name="database_name", include_comment=True)
>>> report.to_console()

Scan the column names of an external source for a database asynchronously with asyncio:
Expand All @@ -46,7 +47,23 @@ Scan the column names of an external source for a database asynchronously with a
>>> data_rules = DataRules.from_available_category(category=AvailableCategory.PII)
>>> source = SnowflakeSource(sf_account="account", sf_user="sf_user", sf_password="sf_password", warehouse="warehouse", schema_name="schema_name")
>>> column_scanner = ColumnScanner(data_rules=data_rules)
>>> report = asyncio.run(column_scanner.scan_external_async(source, database_name="database_name", include_comment=True))
>>> with source:
>>> report = asyncio.run(column_scanner.scan_external_async(source, database_name="database_name", include_comment=True))
>>> report.to_console()


Scan an internal Metadata Source
--------------------------------

Scan the column names of a local source:

>>> from metadata_guardian import DataRules, ColumnScanner, AvailableCategory
>>> from metadata_guardian.source import ParquetSource
>>>
>>> data_rules = DataRules.from_available_category(category=AvailableCategory.PII)
>>> column_scanner = ColumnScanner(data_rules=data_rules)
>>> with ParquetSource("file.parquet") as source:
>>> report = column_scanner.scan_local(source)
>>> report.to_console()

Scan the column names of a local source:
Expand All @@ -57,6 +74,7 @@ Scan the column names of a local source:
>>> data_rules = DataRules.from_available_category(category=AvailableCategory.PII)
>>> column_scanner = ColumnScanner(data_rules=data_rules)
>>> report = MetadataGuardianReport()
>>> paths = ["first_path", "second_path"]
>>> for path in paths:
>>> source = ParquetSource(path)
>>> report.append(column_scanner.scan_local(source))
Expand Down
3 changes: 3 additions & 0 deletions python/examples/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Examples
This directory contains various examples of the Metadata Guardian features.
Make sure Metadata Guardian is installed and run the examples using the command line with python.
87 changes: 87 additions & 0 deletions python/examples/scan_external_sources_custom_data_rules.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import argparse
import os

from metadata_guardian import (
AvailableCategory,
ColumnScanner,
DataRules,
ExternalMetadataSource,
)
from metadata_guardian.source import (
AthenaSource,
BigQuerySource,
DeltaTableSource,
GlueSource,
KafkaSchemaRegistrySource,
SnowflakeSource,
)


def get_snowflake() -> ExternalMetadataSource:
return SnowflakeSource(
sf_account=os.environ["SNOWFLAKE_ACCOUNT"],
sf_user=os.environ["SNOWFLAKE_USER"],
sf_password=os.environ["SNOWFLAKE_PASSWORD"],
warehouse=os.environ["SNOWFLAKE_WAREHOUSE"],
schema_name=os.environ["SNOWFLAKE_SCHEMA_NAME"],
)


def get_gcp_bigquery() -> ExternalMetadataSource:
return BigQuerySource(
service_account_json_path=os.environ["BIGQUERY_SERVICE_ACCOUNT"],
project=os.environ["BIGQUERY_PROJECT"],
location=os.environ["BIGQUERY_LOCATION"],
)


def get_kafka_schema_registry() -> ExternalMetadataSource:
return KafkaSchemaRegistrySource(url=os.environ["KAFKA_SCHEMA_REGISTRY_URL"])


def get_delta_table() -> ExternalMetadataSource:
return DeltaTableSource(uri=os.environ["DELTA_TABLE_URI"])


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--data-rules-path",
required=True,
help="The Data Rules specification yaml file path to use for creating the Data Rules",
)
parser.add_argument(
"--external-source",
choices=["Snowflake", "GCP BigQuery", "Kafka Schema Registry", "Delta Table"],
required=True,
help="The External Metadata Source to use",
)
parser.add_argument(
"--scanner", choices=["ColumnScanner"], help="The scanner to use"
)
parser.add_argument(
"--database_name", required=True, help="The database name to scan"
)
parser.add_argument(
"--include_comments", default=True, help="Include the comments in the scan"
)
args = parser.parse_args()
data_rules = DataRules(path=args.data_rules_path)
column_scanner = ColumnScanner(data_rules=data_rules)

if args.external_source == "Snowflake":
source = get_snowflake()
elif args.external_source == "GCP BigQuery":
source = get_gcp_bigquery()
elif args.external_source == "Kafka Schema Registry":
source = get_kafka_schema_registry()
elif args.external_source == "Delta Table":
source = get_delta_table()

with source:
report = column_scanner.scan_external(
source,
database_name=args.database_name,
include_comment=args.include_comments,
)
report.to_console()
107 changes: 107 additions & 0 deletions python/examples/scan_external_sources_database.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
import argparse
import os

from metadata_guardian import (
AvailableCategory,
ColumnScanner,
DataRules,
ExternalMetadataSource,
)
from metadata_guardian.source import (
AthenaSource,
BigQuerySource,
DeltaTableSource,
GlueSource,
KafkaSchemaRegistrySource,
MySQLSource,
SnowflakeSource,
)


def get_snowflake() -> ExternalMetadataSource:
return SnowflakeSource(
sf_account=os.environ["SNOWFLAKE_ACCOUNT"],
sf_user=os.environ["SNOWFLAKE_USER"],
sf_password=os.environ["SNOWFLAKE_PASSWORD"],
warehouse=os.environ["SNOWFLAKE_WAREHOUSE"],
schema_name=os.environ["SNOWFLAKE_SCHEMA_NAME"],
)


def get_gcp_bigquery() -> ExternalMetadataSource:
return BigQuerySource(
service_account_json_path=os.environ["BIGQUERY_SERVICE_ACCOUNT"],
project=os.environ["BIGQUERY_PROJECT"],
location=os.environ["BIGQUERY_LOCATION"],
)


def get_kafka_schema_registry() -> ExternalMetadataSource:
return KafkaSchemaRegistrySource(url=os.environ["KAFKA_SCHEMA_REGISTRY_URL"])


def get_delta_table() -> ExternalMetadataSource:
return DeltaTableSource(uri=os.environ["DELTA_TABLE_URI"])


def get_mysql() -> ExternalMetadataSource:
return MySQLSource(
user=os.environ["MYSQL_USER"],
password=os.environ["MYSQL_PASSWORD"],
host=os.environ["MYSQL_HOST"],
)


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--data-rules",
choices=["PII", "INCLUSION"],
default="PII",
help="The Data Rules to use",
)
parser.add_argument(
"--external-source",
choices=[
"Snowflake",
"GCP BigQuery",
"Kafka Schema Registry",
"Delta Table",
"MySQL",
],
required=True,
help="The External Metadata Source to use",
)
parser.add_argument(
"--scanner", choices=["ColumnScanner"], help="The scanner to use"
)
parser.add_argument(
"--database_name", required=True, help="The database name to scan"
)
parser.add_argument(
"--include_comments", default=True, help="Include the comments in the scan"
)
args = parser.parse_args()
data_rules = DataRules.from_available_category(
category=AvailableCategory[args.data_rules]
)
column_scanner = ColumnScanner(data_rules=data_rules)

if args.external_source == "Snowflake":
source = get_snowflake()
elif args.external_source == "GCP BigQuery":
source = get_gcp_bigquery()
elif args.external_source == "Kafka Schema Registry":
source = get_kafka_schema_registry()
elif args.external_source == "Delta Table":
source = get_delta_table()
elif args.external_source == "MySQL":
source = get_mysql()

with source:
report = column_scanner.scan_external(
source,
database_name=args.database_name,
include_comment=args.include_comments,
)
report.to_console()
Loading

0 comments on commit ba6706c

Please sign in to comment.