Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: End time #26

Merged
merged 4 commits into from
Mar 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ Built with the [Meltano Singer SDK](https://sdk.meltano.com).
| aws_endpoint_url | False | None | The complete URL to use for the constructed client. |
| aws_region_name | False | None | The AWS region name (e.g. us-east-1) |
| start_date | True | None | The earliest record date to sync |
| end_date | False | None | The last record date to sync. This tap uses a 5 minute buffer to allow Cloudwatch logs to arrive in full. If you request data from current time it will automatically adjust your end_date to now - 5 mins. |
| log_group_name | True | None | The log group on which to perform the query. |
| query | True | None | The query string to use. For more information, see [CloudWatch Logs Insights Query Syntax](https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/CWL_QuerySyntax.html). |
| batch_increment_s | False | 3600 | The size of the time window to query by, default 3,600 seconds (i.e. 1 hour). If the result set for a batch is greater than the max limit of 10,000 records then the tap will query the same window again where >= the most recent record received. This means that the same data is potentially being scanned >1 times but < 2 times, depending on the amount the results set went over the 10k max. For example a batch window with 15k records would scan the 15k once, receiving 10k results, then scan ~5k again to get the rest. The net result is the same data was scanned ~1.5 times for that batch. To avoid this you should set the batch window to avoid exceeding the 10k limit. |
Expand All @@ -34,6 +35,14 @@ Built with the [Meltano Singer SDK](https://sdk.meltano.com).

A full list of supported settings and capabilities is available by running: `tap-cloudwatch --about`

### Implementation Details

1. The tap always leaves a 5 minute buffer from realtime to handle any late or out of order logs on the Cloudwatch side to guarantee all data is replicated.
Challenges related to this were first observed and discussed in https://github.com/MeltanoLabs/tap-cloudwatch/issues/25.
It means that if you run the tap with no `end_date` configured it will attempt to retrieve data up until current time minus 5 mins.
2. Currently the tap uses a limit of 20 queries at a time. It sends a start_query API call then goes back to retrieve the data later once the query has completed.


### Configure using environment variables

This Singer tap will automatically import any environment variables within the working directory's
Expand Down
1 change: 1 addition & 0 deletions tap_cloudwatch/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ def get_records(self, context: Optional[dict]) -> Iterable[dict]:
self.config.get("log_group_name"),
self.config.get("query"),
self.config.get("batch_increment_s"),
self.config.get("end_date"),
)
for batch in cloudwatch_iter:
for record in batch:
Expand Down
33 changes: 24 additions & 9 deletions tap_cloudwatch/cloudwatch_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import os
from collections import deque
from datetime import datetime, timezone
from datetime import datetime, timedelta, timezone
from math import ceil
from typing import Deque

Expand Down Expand Up @@ -64,17 +64,25 @@ def _create_client(self, config):
return logs

def _split_batch_into_windows(self, start_time, end_time, batch_increment_s):
diff_s = end_time - start_time
start_time_epoch = start_time.timestamp()
end_time_epoch = end_time.timestamp()
diff_s = end_time_epoch - start_time_epoch
total_batches = ceil(diff_s / batch_increment_s)
batch_windows = []
for batch_num in range(total_batches):
if batch_num != 0:
# Inclusive start and end date, so on second iteration
# we can skip one second.
query_start = int(start_time + (batch_increment_s * batch_num) + 1)
query_start = int(
start_time_epoch + (batch_increment_s * batch_num) + 1
)
else:
query_start = int(start_time + (batch_increment_s * batch_num))
query_end = int(start_time + (batch_increment_s * (batch_num + 1)))
query_start = int(start_time_epoch + (batch_increment_s * batch_num))
# Never exceed the end_time
query_end = min(
int(start_time_epoch + (batch_increment_s * (batch_num + 1))),
int(end_time_epoch),
)
batch_windows.append((query_start, query_end))
return batch_windows

Expand Down Expand Up @@ -117,13 +125,20 @@ def _iterate_batches(self, batch_windows, log_group, query):
query_obj = self._get_completed_query(queue)
yield query_obj.get_results()

def get_records_iterator(self, bookmark, log_group, query, batch_increment_s):
def _alter_end_ts(self, end_time):
default_end_time = datetime.now(timezone.utc) - timedelta(minutes=5)
if end_time:
return min([end_time, default_end_time])
else:
return default_end_time

def get_records_iterator(
self, bookmark, log_group, query, batch_increment_s, end_time
):
"""Retrieve records from Cloudwatch."""
end_time = datetime.now(timezone.utc).timestamp()
start_time = bookmark.timestamp()
self._validate_query(query)
batch_windows = self._split_batch_into_windows(
start_time, end_time, batch_increment_s
bookmark, self._alter_end_ts(end_time), batch_increment_s
)

yield from self._iterate_batches(batch_windows, log_group, query)
10 changes: 10 additions & 0 deletions tap_cloudwatch/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,16 @@ class TapCloudWatch(Tap):
required=True,
description="The earliest record date to sync",
),
th.Property(
"end_date",
th.DateTimeType,
description=(
"The last record date to sync. This tap uses a 5 minute buffer to "
"allow Cloudwatch logs to arrive in full. If you request data from "
"current time it will automatically adjust your end_date to now -"
" 5 mins."
),
),
th.Property(
"log_group_name",
th.StringType,
Expand Down
68 changes: 59 additions & 9 deletions tap_cloudwatch/tests/test_cloudwatch_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,59 @@
from contextlib import nullcontext as does_not_raise

import pytest
from freezegun import freeze_time

from tap_cloudwatch.cloudwatch_api import CloudwatchAPI
from tap_cloudwatch.exception import InvalidQueryException
from tap_cloudwatch.tests.utils import datetime_from_str


@pytest.mark.parametrize(
"start,end,batch,expected",
[
[1672272000, 1672275600, 3600, [(1672272000, 1672275600)]],
[
1672272000,
1672275601,
datetime_from_str("2022-12-29 00:00:00"),
datetime_from_str("2022-12-29 01:00:00"),
3600,
[(1672272000, 1672275600), (1672275601, 1672279200)],
[
(
datetime_from_str("2022-12-29 00:00:00").timestamp(),
datetime_from_str("2022-12-29 01:00:00").timestamp(),
)
],
],
[
1672272000,
1672282800,
datetime_from_str("2022-12-29 00:00:00"),
datetime_from_str("2022-12-29 01:00:01"),
3600,
[
(1672272000, 1672275600),
(1672275601, 1672279200),
(1672279201, 1672282800),
(
datetime_from_str("2022-12-29 00:00:00").timestamp(),
datetime_from_str("2022-12-29 01:00:00").timestamp(),
),
(
datetime_from_str("2022-12-29 01:00:01").timestamp(),
datetime_from_str("2022-12-29 01:00:01").timestamp(),
),
],
],
[
datetime_from_str("2022-12-29 00:00:00"),
datetime_from_str("2022-12-29 03:00:00"),
3600,
[
(
datetime_from_str("2022-12-29 00:00:00").timestamp(),
datetime_from_str("2022-12-29 01:00:00").timestamp(),
),
(
datetime_from_str("2022-12-29 01:00:01").timestamp(),
datetime_from_str("2022-12-29 02:00:00").timestamp(),
),
(
datetime_from_str("2022-12-29 02:00:01").timestamp(),
datetime_from_str("2022-12-29 03:00:00").timestamp(),
),
],
],
],
Expand Down Expand Up @@ -55,3 +85,23 @@ def test_validate_query(query, expectation):
api = CloudwatchAPI(None)
with expectation:
api._validate_query(query)


@pytest.mark.parametrize(
"input_end_ts,expectation",
[
[None, datetime_from_str("2022-12-29 23:55:00")],
[
datetime_from_str("2022-12-29 00:00:00"),
datetime_from_str("2022-12-29 00:00:00"),
],
[
datetime_from_str("2022-12-29 23:59:00"),
datetime_from_str("2022-12-29 23:55:00"),
],
],
)
@freeze_time("2022-12-30")
def test_alter_end_ts(input_end_ts, expectation):
api = CloudwatchAPI(None)
assert api._alter_end_ts(input_end_ts) == expectation
5 changes: 3 additions & 2 deletions tap_cloudwatch/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from tap_cloudwatch.cloudwatch_api import CloudwatchAPI
from tap_cloudwatch.tap import TapCloudWatch
from tap_cloudwatch.tests.utils import datetime_from_str

SAMPLE_CONFIG = {
"log_group_name": "my_log_group_name",
Expand All @@ -31,11 +32,11 @@ def test_standard_tap_tests(patch_client):
"start_query",
{"queryId": "123"},
{
"endTime": 1672358400,
"endTime": int(datetime_from_str("2022-12-29 23:55:00").timestamp()),
"limit": 10000,
"logGroupName": "my_log_group_name",
"queryString": "fields @timestamp, @message | sort @timestamp asc",
"startTime": 1672272000,
"startTime": int(datetime_from_str("2022-12-29 00:00:00").timestamp()),
},
)
stubber.add_response(
Expand Down
8 changes: 8 additions & 0 deletions tap_cloudwatch/tests/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
"""Utility functions for tests."""

from datetime import datetime, timezone


def datetime_from_str(date_str):
"""Convert string to datetime."""
return datetime.strptime(date_str, "%Y-%m-%d %H:%M:%S").replace(tzinfo=timezone.utc)