Skip to content

Commit

Permalink
fix: use put_records for kinesis due to rate exceeded
Browse files Browse the repository at this point in the history
  • Loading branch information
RanbirAulakh committed Sep 23, 2024
1 parent bc3cdf7 commit f8034ec
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 106 deletions.
4 changes: 3 additions & 1 deletion src/aws/osml/model_runner/app_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ class ServiceConfig:
throttling_retry_timeout: str = os.getenv("THROTTLING_RETRY_TIMEOUT", "10")

# constant configuration
kinesis_max_record_size: str = "1048576"
kinesis_max_record_per_batch: str = "500"
kinesis_max_record_size_batch: str = "5242880" # 5 MB in bytes
kinesis_max_record_size: str = "1048576" # 1 MB in bytes
ddb_max_item_size: str = "200000"
noop_bounds_model_name: str = "NOOP_BOUNDS_MODEL_NAME"
noop_geom_model_name: str = "NOOP_GEOM_MODEL_NAME"
Expand Down
68 changes: 44 additions & 24 deletions src/aws/osml/model_runner/sink/kinesis_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from aws.osml.model_runner.app_config import BotoConfig, ServiceConfig
from aws.osml.model_runner.common import get_credentials_for_assumed_role

from .exceptions import InvalidKinesisStreamException
from .sink import Sink

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -41,42 +42,61 @@ def __init__(
# container will be sufficient to write to the Kinesis stream.
self.kinesis_client = boto3.client("kinesis", config=BotoConfig.default)

def _flush_stream(self, partition_key: str, features: List[Feature]) -> None:
record = geojson.dumps(FeatureCollection(features))
self.kinesis_client.put_record(
StreamName=self.stream,
PartitionKey=partition_key,
Data=record,
)
def _flush_stream(self, records: List[dict]) -> None:
"""
Flushes a batch of records to the Kinesis stream.
:param records: A list of records to be sent to the Kinesis stream.
"""
try:
self.kinesis_client.put_records(StreamName=self.stream, Records=records)
except Exception as err:
raise InvalidKinesisStreamException(f"Failed to write records to Kinesis stream '{self.stream}': {err}")

@property
def mode(self) -> SinkMode:
# Only aggregate mode is supported at the moment
return SinkMode.AGGREGATE

def write(self, job_id: str, features: List[Feature]) -> bool:
pending_features: List[Feature] = []
"""
Writes a list of features to the Kinesis stream. Each feature is serialized and sent
as a record. If the batch of records exceeds the 5 MB limit, the current batch is flushed.
:param job_id: The ID of the job associated with the features.
:param features: A list of features to be written to the stream.
:returns: True if the features were successfully written, False otherwise.
"""
pending_features: List[dict] = []
pending_features_size: int = 0

if self.validate_kinesis_stream():
for feature in features:
if self.batch_size == 1:
self._flush_stream(job_id, [feature])
else:
feature_size = sys.getsizeof(geojson.dumps(feature))
if (
self.batch_size and pending_features and len(pending_features) % self.batch_size == 0
) or pending_features_size + feature_size > (int(ServiceConfig.kinesis_max_record_size)):
self._flush_stream(job_id, pending_features)
pending_features = []
pending_features_size = 0

pending_features.append(feature)
pending_features_size += feature_size

# Flush any remaining features
# Serialize feature data to JSON
record_data = geojson.dumps(FeatureCollection(feature))

# Create the record dict
record = {"Data": record_data, "PartitionKey": job_id}

# Calculate size of the entire record (Data + PartitionKey)
record_size = sys.getsizeof(geojson.dumps(record))

# If adding the next record would exceed the 5 MB batch limit, flush the current batch
if pending_features_size + record_size > int(ServiceConfig.kinesis_max_record_size_batch) or len(
pending_features
) >= int(ServiceConfig.kinesis_max_record_per_batch):
self._flush_stream(pending_features)
pending_features = []
pending_features_size = 0

pending_features.append(record)
pending_features_size += record_size

# Flush any remaining records
if pending_features:
self._flush_stream(job_id, pending_features)
self._flush_stream(pending_features)

logger.info(f"Wrote {len(features)} features for job '{job_id}' to Kinesis Stream '{self.stream}'")
return True
else:
Expand Down
111 changes: 30 additions & 81 deletions test/aws/osml/model_runner/sink/test_kinesis_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,21 @@
import geojson
import pytest
from botocore.stub import ANY, Stubber
from geojson import FeatureCollection

TEST_JOB_ID = "test-job-id"
TEST_RESULTS_STREAM = "test-results-stream"
MOCK_KINESIS_RESPONSE = {
"ShardId": "shardId-000000000000",
"SequenceNumber": "49632155903354096944077309979289188168053675801607929858",
"FailedRecordCount": 1, # Bug where this has to be set to min-value of 1: https://github.com/boto/botocore/issues/2063
"Records": [
{
"ShardId": "shardId-000000000000",
"SequenceNumber": "49632155903354096944077309979289188168053675801607929858",
}
],
}


MOCK_KINESIS_DESCRIBE_STREAM_RESPONSE = {
"StreamDescription": {
"StreamName": TEST_RESULTS_STREAM,
Expand Down Expand Up @@ -98,84 +105,23 @@ def test_write_features_default_credentials(self):
"StreamName": TEST_RESULTS_STREAM,
},
)
kinesis_client_stub.add_response(
"put_record",
MOCK_KINESIS_RESPONSE,
{
"StreamName": TEST_RESULTS_STREAM,
"PartitionKey": TEST_JOB_ID,
"Data": geojson.dumps(geojson.FeatureCollection(self.test_feature_list)),
},
)
kinesis_sink.write(TEST_JOB_ID, self.test_feature_list)
kinesis_client_stub.assert_no_pending_responses()

def test_write_features_batch_size_one(self):
from aws.osml.model_runner.sink.kinesis_sink import KinesisSink
records = [
{"Data": geojson.dumps(FeatureCollection(feature)), "PartitionKey": TEST_JOB_ID}
for feature in self.test_feature_list
]

kinesis_sink = KinesisSink(stream=TEST_RESULTS_STREAM, batch_size=1)
kinesis_client_stub = Stubber(kinesis_sink.kinesis_client)
kinesis_client_stub.activate()
kinesis_client_stub.add_response(
"describe_stream",
MOCK_KINESIS_DESCRIBE_STREAM_RESPONSE,
{
"StreamName": TEST_RESULTS_STREAM,
},
)
for index, feature in enumerate(self.test_feature_list):
kinesis_client_stub.add_response(
"put_record",
{"ShardId": "shardId-000000000000", "SequenceNumber": str(index)},
{
"StreamName": TEST_RESULTS_STREAM,
"PartitionKey": TEST_JOB_ID,
"Data": geojson.dumps(geojson.FeatureCollection([feature])),
},
)
kinesis_sink.write(TEST_JOB_ID, self.test_feature_list)
kinesis_client_stub.assert_no_pending_responses()

def test_write_batch_size_three(self):
from aws.osml.model_runner.sink.kinesis_sink import KinesisSink

kinesis_sink = KinesisSink(stream=TEST_RESULTS_STREAM, batch_size=3)
kinesis_client_stub = Stubber(kinesis_sink.kinesis_client)
kinesis_client_stub.activate()
# We expect the test list to have 4 features because we're specifically
# testing the draining of the list here
assert len(self.test_feature_list) == 4

kinesis_client_stub.add_response(
"describe_stream",
MOCK_KINESIS_DESCRIBE_STREAM_RESPONSE,
{
"StreamName": TEST_RESULTS_STREAM,
},
)

kinesis_client_stub.add_response(
"put_record",
"put_records",
MOCK_KINESIS_RESPONSE,
{
"StreamName": TEST_RESULTS_STREAM,
"PartitionKey": TEST_JOB_ID,
"Data": geojson.dumps(geojson.FeatureCollection(self.test_feature_list[:3])),
},
)
kinesis_client_stub.add_response(
"put_record",
MOCK_KINESIS_RESPONSE,
{
"StreamName": TEST_RESULTS_STREAM,
"PartitionKey": TEST_JOB_ID,
"Data": geojson.dumps(geojson.FeatureCollection(self.test_feature_list[3:])),
},
{"StreamName": TEST_RESULTS_STREAM, "Records": records},
)

kinesis_sink.write(TEST_JOB_ID, self.test_feature_list)
kinesis_client_stub.assert_no_pending_responses()

def test_write_oversized_record(self):
from aws.osml.model_runner.sink.exceptions import InvalidKinesisStreamException
from aws.osml.model_runner.sink.kinesis_sink import KinesisSink

kinesis_sink = KinesisSink(TEST_RESULTS_STREAM)
Expand All @@ -190,21 +136,24 @@ def test_write_oversized_record(self):
},
)

records = [
{"Data": geojson.dumps(FeatureCollection(feature)), "PartitionKey": TEST_JOB_ID}
for feature in self.test_feature_list
]

kinesis_client_stub.add_client_error(
"put_record",
"put_records",
service_error_code="ValidationException",
service_message="""An error occurred (ValidationException) when calling the PutRecord
operation: 1 validation error detected: Value at 'data' failed to satisfy constraint:
service_message="""Failed to write records to Kinesis stream 'test-results-stream':
An error occurred (ValidationException) when calling the PutRecords operation:
1 validation error detected: Value at 'data' failed to satisfy constraint:
Member must have length less than or equal to 1048576.""",
expected_params={
"StreamName": TEST_RESULTS_STREAM,
"PartitionKey": TEST_JOB_ID,
"Data": geojson.dumps(geojson.FeatureCollection(self.test_feature_list)),
},
expected_params={"StreamName": TEST_RESULTS_STREAM, "Records": records},
)
with pytest.raises(Exception) as e_info:
with pytest.raises(InvalidKinesisStreamException) as e_info:
kinesis_sink.write(TEST_JOB_ID, self.test_feature_list)
assert str(e_info.value).startswith("An error occurred (ValidationException) when calling the PutRecord operation")

assert str(e_info.value).startswith("Failed to write records to Kinesis stream")
kinesis_client_stub.assert_no_pending_responses()

def test_bad_kinesis_stream_failure(self):
Expand Down

0 comments on commit f8034ec

Please sign in to comment.