Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate to bcp for bulk insert performance #6

Open
wants to merge 78 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
78 commits
Select commit Hold shift + click to select a range
9dfe892
add all properties inside brackets
arilton Nov 19, 2024
b901f74
cannot set key_properties
arilton Nov 19, 2024
b02c7e9
only applying brackets in reserved words in table and column names. r…
arilton Nov 19, 2024
83799f2
only bracket names in insert/merge statements
arilton Nov 19, 2024
10cb388
force drop table for testing
hsyyid Nov 19, 2024
a41f9c4
actually force drop the table
hsyyid Nov 19, 2024
90f7103
clean the code and improve against sql injection
arilton Nov 19, 2024
09af618
fix drop logic
hsyyid Nov 19, 2024
6a9b761
Revert "clean the code and improve against sql injection"
hsyyid Nov 19, 2024
9540822
turn off force drop logic
hsyyid Nov 19, 2024
c886bc8
alter varchar columns to max size before insert data
arilton Nov 20, 2024
30260f7
add BEGIN TRANSACTION
arilton Nov 20, 2024
bf9bb8b
execute the merge inside a transaction
arilton Nov 21, 2024
068e3ca
check if current_type.length is None at merge_sql_types
arilton Nov 22, 2024
49b0137
Revert "turn off force drop logic"
hsyyid Nov 22, 2024
8293ac5
change drop table
hsyyid Nov 22, 2024
06e5318
fix
hsyyid Nov 22, 2024
91e74eb
remove drop tables logic
hsyyid Nov 25, 2024
0ad21fc
Force TDS version
hsyyid Jan 10, 2025
c352f06
use pyodbc
hsyyid Jan 10, 2025
0cf60e4
fix
hsyyid Jan 10, 2025
d299a59
fix
hsyyid Jan 10, 2025
7d6a7f0
fix issue with passwords with special chars
hsyyid Jan 10, 2025
75415de
fix issues with schemas
hsyyid Jan 10, 2025
a873586
fix schema
hsyyid Jan 10, 2025
432d0bb
Merge branch 'hgi-6830' into feature/hgi-7043
hsyyid Jan 14, 2025
b858e9f
add default constraint copy
hsyyid Jan 14, 2025
6e752cd
bug fix
hsyyid Jan 14, 2025
12f489a
turn off alter varchar logic
hsyyid Jan 14, 2025
ecf3f43
update logic
hsyyid Jan 14, 2025
b7aaccd
fix
hsyyid Jan 14, 2025
699d96e
fix
hsyyid Jan 14, 2025
addea70
fix
hsyyid Jan 14, 2025
f5cad84
more debug
hsyyid Jan 14, 2025
6e6f3af
more debug
hsyyid Jan 14, 2025
a4f20f1
new approach
hsyyid Jan 14, 2025
0561126
stupid
hsyyid Jan 14, 2025
0c6525b
undo
hsyyid Jan 14, 2025
8c73291
escape
hsyyid Jan 14, 2025
833ae88
update default logic
hsyyid Jan 14, 2025
324ad02
fix
hsyyid Jan 14, 2025
4cd04f5
fix
hsyyid Jan 14, 2025
0f8d2a0
trunc
hsyyid Jan 14, 2025
2ee27ee
fix
hsyyid Jan 14, 2025
23d8cb3
Remove trunc
hsyyid Jan 14, 2025
408504e
what
hsyyid Jan 14, 2025
7ad375a
testing
hsyyid Jan 15, 2025
3feac5b
turn on speedy
hsyyid Jan 15, 2025
b57f923
HGI-7050 Create physical table instead of temporary table, this works…
izkapadia-hg Jan 15, 2025
8797f6b
HGI-7050 Create physical table instead of temporary table, this works…
izkapadia-hg Jan 15, 2025
f017f60
HGI-7050 int field are now created is big int, drop tmp table at the …
izkapadia-hg Jan 15, 2025
e771016
HGI-7050 Drop actual table before fisrt batch.
izkapadia-hg Jan 15, 2025
9331a53
HGI-7050 using the engine that explictly create to run connector drop…
izkapadia-hg Jan 15, 2025
ef1a8c7
fix
hsyyid Jan 15, 2025
2275da1
Switch to bcp
hsyyid Jan 15, 2025
9116bf6
handle int pks
hsyyid Jan 15, 2025
ed4763f
fix bcp path
hsyyid Jan 15, 2025
cb66637
fix drop
hsyyid Jan 15, 2025
ab01083
fix
hsyyid Jan 15, 2025
5ff35e8
handle nvarchar
hsyyid Jan 15, 2025
5f08005
fix
hsyyid Jan 15, 2025
8213af2
fix
hsyyid Jan 15, 2025
466fde4
add jenkinsfile
hsyyid Jan 15, 2025
c2be646
add log
hsyyid Jan 15, 2025
351db4e
fix table ref
hsyyid Jan 15, 2025
3beb506
fix
hsyyid Jan 15, 2025
9e4c232
turn off truncate
hsyyid Jan 16, 2025
f415c6e
add trunc back
hsyyid Jan 17, 2025
0ad1c82
escape drop
hsyyid Jan 17, 2025
4d78eca
recreate connection every hour to ensure freshness
hsyyid Jan 17, 2025
46b0579
HGI-7077 case is the internal keywork, need to put square brackets ar…
izkapadia-hg Jan 17, 2025
6e1f642
HGI-7077 split schema and table name, and enclose table name with squ…
izkapadia-hg Jan 17, 2025
dc4f6cb
HGI-7077 disable main table drop and recreate.
izkapadia-hg Jan 18, 2025
f31f0f4
fix missing dot
hsyyid Jan 20, 2025
605ba62
HGI-7077 i) If new columns are selected in singer schema, add them to…
izkapadia-hg Jan 21, 2025
2e33ebb
HGI-7077 i) Field name in tables are differet from field names in sch…
izkapadia-hg Jan 21, 2025
97e5e0b
HGI-7101 added a metadata.json.
izkapadia-hg Jan 23, 2025
afa364b
fix import
hsyyid Jan 23, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added .DS_Store
Binary file not shown.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -140,4 +140,5 @@ dmypy.json
.devcontainer/
plugins/
config.json
poetry.lock
poetry.lock
.history
10 changes: 10 additions & 0 deletions Jenkinsfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
pipeline {
agent {label 'linux'}
stages {
stage('Deploy connector') {
steps {
sh('cd /var/lib/jenkins/ && ./deploy-connector.sh')
}
}
}
}
4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ license = "Apache 2.0"
python = "<3.11,>=3.7.1"
requests = "^2.25.1"
singer-sdk = "==0.13.0"
pymssql = ">=2.2.5"
pyodbc = ">=4.0.34"
numpy = "^1.26.4"
pandas = "^1.3.5"

[tool.poetry.dev-dependencies]
pytest = "^6.2.5"
Expand Down
280 changes: 256 additions & 24 deletions target_mssql/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
from typing import Any, Dict, Iterable, List, Optional, cast

import sqlalchemy
import urllib.parse
from singer_sdk.helpers._typing import get_datelike_property_type
from singer_sdk.sinks import SQLConnector
from sqlalchemy.dialects import mssql
from target_mssql.metadata import write_event


class mssqlConnector(SQLConnector):
Expand All @@ -19,6 +21,162 @@ class mssqlConnector(SQLConnector):
allow_column_alter: bool = True # Whether altering column types is supported.
allow_merge_upsert: bool = True # Whether MERGE UPSERT is supported.
allow_temp_tables: bool = True # Whether temp tables are supported.
dropped_tables = dict()


def create_sqlalchemy_engine(self) -> sqlalchemy.engine.Engine:
"""Return a new SQLAlchemy engine using the provided config.
Developers can generally override just one of the following:
`sqlalchemy_engine`, sqlalchemy_url`.
Returns:
A newly created SQLAlchemy engine object.
"""
engine = sqlalchemy.create_engine(
self.sqlalchemy_url,
echo=False,
pool_pre_ping=True,
pool_recycle=3600
)

return engine

def table_exists(self, full_table_name: str) -> bool:
"""Determine if the target table already exists.

Args:
full_table_name: the target table name.

Returns:
True if table exists, False if not, None if unsure or undetectable.
"""
kwargs = dict()

if "." in full_table_name:
kwargs["schema"] = full_table_name.split(".")[0]
full_table_name = full_table_name.split(".")[1]

self.logger.info(f"Checking table exists: {full_table_name} kwrags={kwargs}")

return cast(
bool,
sqlalchemy.inspect(self._engine).has_table(full_table_name, **kwargs),
)

def prepare_column(
self,
full_table_name: str,
column_name: str,
sql_type: sqlalchemy.types.TypeEngine,
) -> None:

schema = full_table_name.split(".")[0] if "." in full_table_name else "dbo"
table_name = full_table_name.split(".")[-1]

"""
Ensure a column exists in the table with the correct data type.
If the column does not exist, create it.

:param full_table_name: The full table name (schema.table_name).
:param column_name: The name of the column to ensure.
:param sql_type: The SQLAlchemy type to enforce.
"""
# Check if the column exists
column_exists_query = f"""
SELECT COUNT(*)
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = '{schema}'
AND TABLE_NAME = '{table_name}'
AND COLUMN_NAME = '{column_name}'
"""
result = self.connection.execute(column_exists_query).scalar()

if result == 0:
# Add the column if it does not exist
alter_sql = f"ALTER TABLE {schema}.[{table_name}] ADD [{column_name}] {sql_type.compile(dialect=mssql.dialect())}"
with self.connection.begin():
self.connection.execute(alter_sql)
self.logger.info(f"Added column {column_name} to {full_table_name}.")

from sqlalchemy import create_engine

def get_column_order(self, full_table_name):

schema = full_table_name.split(".")[0] if "." in full_table_name else "dbo"
table_name = full_table_name.split(".")[-1]

# SQL query to get the column order
query = f"""
SELECT
COLUMN_NAME
FROM
INFORMATION_SCHEMA.COLUMNS
WHERE
TABLE_SCHEMA = '{schema}' AND
TABLE_NAME = '{table_name}'
ORDER BY
ORDINAL_POSITION;
"""

with self.connection.begin():
result = self.connection.execute(query)

# Fetch all the results and return the column names in order
column_order = [row['COLUMN_NAME'] for row in result]

# Return the column order as a list
return column_order

def prepare_table(
self,
full_table_name: str,
schema: dict,
primary_keys: list[str],
partition_keys: list[str] | None = None,
as_temp_table: bool = False,
) -> None:
"""Adapt target table to provided schema if possible.

Args:
full_table_name: the target table name.
schema: the JSON Schema for the table.
primary_keys: list of key properties.
partition_keys: list of partition keys.
as_temp_table: True to create a temp table.
"""
# NOTE: Force create the table
# TODO: remove this
# if not self.dropped_tables.get(full_table_name, False):
# self.logger.info(f"Force dropping the table {full_table_name}!")
# with self.connection.begin(): # Starts a transaction
# drop_table = '.'.join([f'[{x}]' for x in full_table_name.split('.')])
# self.connection.execute(f"DROP TABLE IF EXISTS {drop_table};")
# self.dropped_tables[full_table_name] = True

if not self.table_exists(full_table_name=full_table_name):
self.create_empty_table(
full_table_name=full_table_name,
schema=schema,
primary_keys=primary_keys,
partition_keys=partition_keys,
as_temp_table=as_temp_table,
)
return

for property_name, property_def in schema["properties"].items():
self.prepare_column(
full_table_name, property_name, self.to_sql_type(property_def)
)

def prepare_schema(self, schema_name: str) -> None:
"""Create the target database schema.

Args:
schema_name: The target schema name.
"""
schema_exists = self.schema_exists(schema_name)
if not schema_exists:
write_event({"event": "SCHEMA_CREATED", "name": schema_name})
self.create_schema(schema_name)

def create_table_with_records(
self,
Expand All @@ -42,6 +200,7 @@ def create_table_with_records(
if primary_keys is None:
primary_keys = self.key_properties
partition_keys = partition_keys or None

self.connector.prepare_table(
full_table_name=full_table_name,
primary_keys=primary_keys,
Expand All @@ -60,13 +219,19 @@ def get_sqlalchemy_url(self, config: dict) -> str:
"""

connection_url = sqlalchemy.engine.url.URL.create(
drivername="mssql+pymssql",
username=config["user"],
password=config["password"],
drivername="mssql+pyodbc",
username=config['user'],
password=urllib.parse.quote_plus(config["password"]),
host=config["host"],
port=config["port"],
database=config["database"],
query={
"driver": "ODBC Driver 17 for SQL Server", # Use Microsoft's ODBC driver
"Encrypt": "yes", # Ensures SSL encryption for Azure SQL
"TrustServerCertificate": "yes", # Prevents bypassing certificate validation
}
)

return str(connection_url)

def create_empty_table(
Expand Down Expand Up @@ -107,9 +272,12 @@ def create_empty_table(

columntype = self.to_sql_type(property_jsonschema)

# In MSSQL, Primary keys can not be more than 900 bytes. Setting at 255
if isinstance(columntype, sqlalchemy.types.VARCHAR) and is_primary_key:
columntype = sqlalchemy.types.VARCHAR(255)
if is_primary_key:
# In MSSQL, Primary keys can not be more than 900 bytes. Setting at 255
if isinstance(columntype, sqlalchemy.types.VARCHAR) or isinstance(columntype, sqlalchemy.types.NVARCHAR):
columntype = sqlalchemy.types.VARCHAR(255)
# elif isinstance(columntype, sqlalchemy.types.BIGINT):
# columntype = sqlalchemy.types.INTEGER()

columns.append(
sqlalchemy.Column(
Expand All @@ -119,9 +287,20 @@ def create_empty_table(
)
)

_ = sqlalchemy.Table(full_table_name, meta, *columns)
kwargs = dict()

schema = full_table_name.split(".")[0] if "." in full_table_name else "dbo"
full_table_name = full_table_name.split(".")[-1]

kwargs["schema"] = schema

_ = sqlalchemy.Table(full_table_name, meta, *columns, **kwargs)
meta.create_all(self._engine)

write_event({"event": "TABLE_CREATED", "name": full_table_name, "schema": schema})

# self.logger.info(f"Create table with cols = {columns}")

def merge_sql_types( # noqa
self, sql_types: list[sqlalchemy.types.TypeEngine]
) -> sqlalchemy.types.TypeEngine: # noqa
Expand Down Expand Up @@ -176,6 +355,7 @@ def merge_sql_types( # noqa
if (
(opt_len is None)
or (opt_len == 0)
or (current_type.length is None)
or (opt_len >= current_type.length)
):
return opt
Expand All @@ -187,6 +367,7 @@ def merge_sql_types( # noqa
if (
(opt_len is None)
or (opt_len == 0)
or (current_type.length is None)
or (opt_len >= current_type.length)
):
return opt
Expand Down Expand Up @@ -340,37 +521,88 @@ def to_sql_type(self, jsonschema_type: dict) -> sqlalchemy.types.TypeEngine: #

maxlength = jsonschema_type.get("maxLength")
return cast(
sqlalchemy.types.TypeEngine, sqlalchemy.types.VARCHAR(maxlength)
sqlalchemy.types.TypeEngine, sqlalchemy.types.NVARCHAR(maxlength)
)

if self._jsonschema_type_check(jsonschema_type, ("integer",)):
return cast(sqlalchemy.types.TypeEngine, sqlalchemy.types.INTEGER())
return cast(sqlalchemy.types.TypeEngine, sqlalchemy.types.BIGINT())
if self._jsonschema_type_check(jsonschema_type, ("number",)):
return cast(sqlalchemy.types.TypeEngine, sqlalchemy.types.NUMERIC(29, 16))
if self._jsonschema_type_check(jsonschema_type, ("boolean",)):
return cast(sqlalchemy.types.TypeEngine, mssql.VARCHAR(1))
return cast(sqlalchemy.types.TypeEngine, mssql.NVARCHAR(1))

if self._jsonschema_type_check(jsonschema_type, ("object",)):
return cast(sqlalchemy.types.TypeEngine, sqlalchemy.types.VARCHAR())
return cast(sqlalchemy.types.TypeEngine, sqlalchemy.types.NVARCHAR())

if self._jsonschema_type_check(jsonschema_type, ("array",)):
return cast(sqlalchemy.types.TypeEngine, sqlalchemy.types.VARCHAR())
return cast(sqlalchemy.types.TypeEngine, sqlalchemy.types.NVARCHAR())

return cast(sqlalchemy.types.TypeEngine, sqlalchemy.types.VARCHAR())

def create_temp_table_from_table(self, from_table_name):
"""Temp table from another table."""
def drop_temp_table_from_table(self, temp_table):
"""Drop the temp table from an existing table, preserving identity columns and default values."""

try:
self.logger.info("Dropping existing temp table.")
self.connection.execute(f"DROP TABLE IF EXISTS #{from_table_name};")
except:
self.logger.info("No temp table to drop.")

ddl = f"""
SELECT TOP 0 *
into #{from_table_name}
FROM {from_table_name}
self.logger.info(f"Dropping existing temp table {temp_table}")
with self.connection.begin(): # Starts a transaction
temp_table = '.'.join([f'[{x}]' for x in temp_table.split('.')])
self.connection.execute(f"DROP TABLE IF EXISTS {temp_table};")
except Exception as e:
self.logger.info(f"No temp table to drop. Error: {e}")

def create_temp_table_from_table(self, from_table_name):
"""Create a temp table from an existing table, preserving identity columns and default values."""
parts = from_table_name.split(".")
schema = parts[0] + "." if "." in from_table_name else ""
table = "temp_" + parts[-1]

self.drop_temp_table_from_table(f"{schema}{table}")

# Query to get column definitions, including identity property
get_columns_query = f"""
SELECT
c.name AS COLUMN_NAME,
t.name AS DATA_TYPE,
c.max_length AS COLUMN_LENGTH,
c.precision AS PRECISION_VALUE,
c.scale AS SCALE_VALUE,
d.definition AS COLUMN_DEFAULT,
COLUMNPROPERTY(c.object_id, c.name, 'IsIdentity') AS IS_IDENTITY
FROM sys.columns c
JOIN sys.types t ON c.user_type_id = t.user_type_id
LEFT JOIN sys.default_constraints d ON c.default_object_id = d.object_id
WHERE c.object_id = OBJECT_ID('{from_table_name}')
"""

columns = self.connection.execute(get_columns_query).fetchall()
# self.logger.info(f"Fetched columns: {columns}")

# Construct the CREATE TABLE statement
column_definitions = []
for col in columns:
col_name = col[0]
col_type = col[1]
col_length = col[2]
precision_value = col[3]
scale_value = col[4]
col_default = f"DEFAULT {col[5]}" if col[5] else ""
is_identity = col[6]

identity_str = "IDENTITY(1,1)" if is_identity else ""

# Apply length only if it's a varchar/nvarchar type
if col_type.lower() in ["varchar", "nvarchar"]:
col_length_str = "(MAX)" if col_length == -1 else f"({col_length})"
else:
col_length_str = ""

column_definitions.append(f"[{col_name}] {col_type}{col_length_str} {identity_str} {col_default}")

create_temp_table_sql = f"""
CREATE TABLE {schema}{table} (
{", ".join(column_definitions)}
);
"""

self.connection.execute(ddl)
# self.logger.info(f"Generated SQL for temp table:\n{create_temp_table_sql}")
self.connection.execute(create_temp_table_sql)
Loading