Skip to content

Commit

Permalink
Add TDEngineTarget (#516)
Browse files Browse the repository at this point in the history
* Add `TDEngineTarget`

[ML-6367](https://iguazio.atlassian.net/browse/ML-6367)

* Add support for websocket

* Fix TDEngine credentials check
  • Loading branch information
gtopper authored May 7, 2024
1 parent 19d0d11 commit 58ce43c
Show file tree
Hide file tree
Showing 4 changed files with 243 additions and 0 deletions.
1 change: 1 addition & 0 deletions dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ redis~=4.3
sqlalchemy~=1.4
s3fs~=2023.9.2
adlfs~=2023.9.0
taospy[ws]>=2,<3
116 changes: 116 additions & 0 deletions integration/test_tdengine.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
import os
from datetime import datetime

import pytest
import taosrest
from taosrest import ConnectError
from taosws import QueryError

from storey import SyncEmitSource, build_flow
from storey.targets import TDEngineTarget

url = os.getenv("TDENGINE_URL")
user = os.getenv("TDENGINE_USER")
password = os.getenv("TDENGINE_PASSWORD")
has_tdengine_credentials = all([url, user, password]) or (url and url.startswith("taosws"))


@pytest.fixture()
def tdengine():
db_name = "storey"
table_name = "test"

# Setup
if url.startswith("taosws"):
import taosws

connection = taosws.connect(url)
db_prefix = ""
else:
db_prefix = db_name + "."
connection = taosrest.connect(
url=url,
user=user,
password=password,
timeout=30,
)

try:
connection.execute(f"CREATE DATABASE {db_name};")
except (ConnectError, QueryError) as err: # websocket connection raises QueryError
if "Database already exists" not in str(err):
raise err

if not db_prefix:
connection.execute(f"USE {db_name}")

try:
connection.execute(f"DROP TABLE {db_prefix}{table_name};")
except (ConnectError, QueryError) as err: # websocket connection raises QueryError
if "Table does not exist" not in str(err):
raise err

connection.execute(f"CREATE TABLE {db_prefix}{table_name} (time TIMESTAMP, my_int INT, my_string NCHAR(10));")

# Test runs
yield connection, url, user, password, db_name, table_name, db_prefix

# Teardown
connection.execute(f"DROP TABLE {db_prefix}{table_name};")
connection.close()


@pytest.mark.skipif(not has_tdengine_credentials, reason="Missing TDEngine URL, user, and/or password")
def test_tdengine_target(tdengine):
connection, url, user, password, db_name, table_name, db_prefix = tdengine
time_format = "%d/%m/%y %H:%M:%S UTC%z"
controller = build_flow(
[
SyncEmitSource(),
TDEngineTarget(
url=url,
user=user,
password=password,
database=db_name,
table=table_name,
time_col="time",
columns=["my_int", "my_string"],
time_format=time_format,
max_events=2,
),
]
).run()

date_time_str = "18/09/19 01:55:1"
for i in range(9):
timestamp = f"{date_time_str}{i} UTC-0000"
controller.emit({"time": timestamp, "my_int": i, "my_string": f"hello{i}"})

controller.terminate()
controller.await_termination()

result = connection.query(f"SELECT * FROM {db_prefix}{table_name};")
if url.startswith("taosws"):
result_list = []
for row in result:
row = list(row)
for field_index, field in enumerate(result.fields):
if field.type() == "TIMESTAMP":
t = datetime.fromisoformat(row[field_index])
# REST API returns a naive timestamp, but websocket returns a timestamp with a time zone
t = t.replace(tzinfo=None)
row[field_index] = t
result_list.append(row)
else:
result_list = result.data
assert result_list == [
[datetime(2019, 9, 18, 9, 55, 10), 0, "hello0"],
[datetime(2019, 9, 18, 9, 55, 11), 1, "hello1"],
[datetime(2019, 9, 18, 9, 55, 12), 2, "hello2"],
[datetime(2019, 9, 18, 9, 55, 13), 3, "hello3"],
[datetime(2019, 9, 18, 9, 55, 14), 4, "hello4"],
[datetime(2019, 9, 18, 9, 55, 15), 5, "hello5"],
[datetime(2019, 9, 18, 9, 55, 16), 6, "hello6"],
[datetime(2019, 9, 18, 9, 55, 17), 7, "hello7"],
[datetime(2019, 9, 18, 9, 55, 18), 8, "hello8"],
]
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ def load_deps(file_name):
"kafka": ["kafka-python~=2.0"],
"redis": ["redis~=4.3"],
"sqlalchemy": ["sqlalchemy~=1.3"],
"tdengine": ["taospy[ws]>=2,<3"],
}


Expand Down
125 changes: 125 additions & 0 deletions storey/targets.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import random
import traceback
import uuid
from io import StringIO
from typing import Any, Callable, List, Optional, Tuple, Union
from urllib.parse import urlparse

Expand Down Expand Up @@ -773,6 +774,130 @@ async def _emit(self, batch, batch_key, batch_time, batch_events, last_event_tim
self._frames_client.write("tsdb", self._path, df)


class TDEngineTarget(_Batching, _Writer):
"""Writes incoming events to a TDEngine table.
:param url: TDEngine Websocket or REST API URL.
:param user: Username with which to connect. This is ignored when url is a Websocket URL, which should already
contain the username.
:param password: Password with which to connect. This is ignored when url is a Websocket URL, which should already
contain the password.
:param database: Name of the database where events will be written.
:param table: Name of the table in the database where events will be written.
:param time_col: Name of the time column.
:param columns: List of column names to be passed to the DataFrame constructor. Use = notation for renaming fields
(e.g. write_this=event_field). Use $ notation to refer to metadata ($key, event_time=$time).
:param timeout: REST API timeout in seconds.
:param time_format: If time_col is a string column, and its format is not compatible with ISO-8601, use this
parameter to determine the expected format.
:param max_events: Maximum number of events to write at a time. If None (default), all events will be written on
flow termination, or after flush_after_seconds (if flush_after_seconds is set).
:type max_events: int
:param flush_after_seconds: Maximum number of seconds to hold events before they are written. If None (default), all
events will be written on flow termination, or after max_events are accumulated (if max_events is set).
:type flush_after_seconds: int
"""

def __init__(
self,
url: str,
user: Optional[str],
password: Optional[str],
database: Optional[str],
table: str,
time_col: str,
columns: List[str],
timeout: Optional[int] = None,
time_format: Optional[str] = None,
**kwargs,
):
parsed_url = urlparse(url)
if parsed_url.scheme not in ("taosws", "http", "https"):
raise ValueError("URL must start with taosws://, http://, or https://")

kwargs["url"] = url
kwargs["user"] = user
kwargs["password"] = password
kwargs["database"] = database
kwargs["table"] = table
kwargs["time_col"] = time_col
kwargs["columns"] = columns
if timeout:
kwargs["timeout"] = timeout
if time_format:
kwargs["time_format"] = time_format
_Batching.__init__(self, **kwargs)
self._time_col = time_col
_Writer.__init__(
self,
[time_col] + columns,
infer_columns_from_data=False,
retain_dict=True,
time_field=time_col,
time_format=time_format,
)

self._url = url
self._user = user
self._password = password
self._database = database
self._table = table
self._timeout = timeout

self._connection = None
self._using_websocket = None

def _init(self):
import taosrest

_Batching._init(self)
_Writer._init(self)

parsed_url = urlparse(self._url)

if parsed_url.scheme == "taosws":
import taosws

self._using_websocket = True
self._connection = taosws.connect(self._url)
self._connection.execute(f"USE {self._database}")
else:
self._using_websocket = False
self._connection = taosrest.connect(
url=self._url,
user=self._user,
password=self._password,
timeout=self._timeout or 30,
)

def _event_to_batch_entry(self, event):
return self._event_to_writer_entry(event)

async def _emit(self, batch, batch_key, batch_time, batch_events, last_event_time=None):
with StringIO() as b:
b.write("INSERT INTO ")
if not self._using_websocket:
b.write(self._database)
b.write(".")
b.write(self._table)
b.write(" VALUES ")
for record in batch:
b.write("(")
for column_index in range(len(self._columns)):
value = record.get(self._columns[column_index], "NULL")
if isinstance(value, datetime.datetime):
value = round(value.timestamp() * 1000)
elif isinstance(value, str):
value = f"'{value}'"
b.write(str(value))
if column_index < len(self._columns) - 1:
b.write(",")
b.write(") ")
b.write(";")
insert_statement = b.getvalue()
self._connection.execute(insert_statement)


class StreamTarget(Flow, _Writer):
"""Writes all incoming events into a V3IO stream.
Expand Down

0 comments on commit 58ce43c

Please sign in to comment.