Skip to content
This repository has been archived by the owner on Jul 17, 2023. It is now read-only.

Commit

Permalink
refactor: delete aws credentials
Browse files Browse the repository at this point in the history
  • Loading branch information
dirodriguezm committed Feb 17, 2023
1 parent e92dc3c commit 67b915e
Show file tree
Hide file tree
Showing 10 changed files with 58 additions and 95 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/unittest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ jobs:

runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v3
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
with:
Expand All @@ -22,7 +22,7 @@ jobs:
if [ -f requirements.txt ]; then pip install -r requirements.txt; fi
- name: Test with pytest
run: |
coverage run --source s3_step -m pytest -x -s tests/unittest/
coverage run --source s3_step -m pytest -x tests/unittest/
coverage xml
- name: Codecov
uses: codecov/codecov-action@v1
Expand Down
9 changes: 5 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@
- None

## Previous conditions

Credentials of AWS.
None

## Version
- **1.0.0:**
Expand All @@ -38,9 +37,11 @@ Credentials of AWS.
- `ES_NETWORK_PORT`: Elasticsearch port.

### S3 setup
<<<<<<< Updated upstream
- `BUCKET_NAME`: Mapping of bucket name(s) to topic prefix, e.g., `bucket1:topic1,bucket2:topic2`. The example will send the inputs from topics with names starting with `topic1` to `bucket1` and analogously for `topic2` and `bucket2`.
- `AWS_ACCESS_KEY_ID`: Access key id of your AWS account.
- `AWS_SECRET_ACCESS_KEY`: Secret access key of your AWS account.
=======
- `BUCKET_NAME`: Name of bucket to store avro files.
>>>>>>> Stashed changes
### Step metadata
- `STEP_VERSION`: Current version of the step. e.g: `1.0.0`
Expand Down
4 changes: 1 addition & 3 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ services:
- CONSUMER_SERVER=
- CONSUMER_GROUP_ID=
- BUCKET_NAME=
- AWS_ACCESS_KEY_ID=
- AWS_SECRET_ACCESS_KEY=
- DB_ENGINE=
- DB_HOST=
- DB_USER=
Expand All @@ -21,4 +19,4 @@ services:
- STEP_VERSION=
- STEP_ID=
- STEP_NAME=
-STEP_COMMENTS=
- STEP_COMMENTS=
3 changes: 1 addition & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
boto3==1.24.15
apf-base==1.0.7
apf-base==1.0.9
18 changes: 3 additions & 15 deletions s3_step/step.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import io
import logging

import boto3
from apf.core.step import GenericStep

Expand All @@ -14,12 +13,7 @@ class S3Step(GenericStep):
Description of parameter `consumer`.
"""

def __init__(
self,
consumer=None,
config=None,
level=logging.INFO,
):
def __init__(self, consumer=None, config=None, level=logging.INFO, **step_args):
super().__init__(consumer, config=config, level=level)

def get_object_url(self, bucket_name, candid):
Expand Down Expand Up @@ -48,8 +42,6 @@ def upload_file(self, f, candid, bucket_name):
STEP_CONFIG = {
"STORAGE": {
"AWS_ACCESS_KEY": "",
"AWS_SECRET_ACCESS_KEY": "",
"REGION_NAME": "",
}
}
Expand All @@ -65,8 +57,6 @@ def upload_file(self, f, candid, bucket_name):
"""
s3 = boto3.client(
"s3",
aws_access_key_id=self.config["STORAGE"]["AWS_ACCESS_KEY"],
aws_secret_access_key=self.config["STORAGE"]["AWS_SECRET_ACCESS_KEY"],
region_name=self.config["STORAGE"]["REGION_NAME"],
)
reverse_candid = self.reverse_candid(candid)
Expand All @@ -92,9 +82,7 @@ def _upload_message(self, message, serialized):
try:
bucket = self._find_bucket(serialized.topic())
except KeyError as err:
self.logger.error(
f"{err}"
)
self.logger.error(f"{err}")
raise
else:
self.upload_file(file, message["candidate"]["candid"], bucket)
Expand All @@ -109,7 +97,7 @@ def _find_bucket(self, topic):

def execute(self, message):
try:
serialized, = self.consumer.messages
(serialized,) = self.consumer.messages
except ValueError:
for msg, serialized in zip(message, self.consumer.messages):
self._upload_message(msg, serialized)
Expand Down
9 changes: 4 additions & 5 deletions settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
"group.id": os.environ["CONSUMER_GROUP_ID"],
"auto.offset.reset": "beginning",
"enable.partition.eof": os.getenv("ENABLE_PARTITION_EOF", False),
'max.poll.interval.ms': 3600000
"max.poll.interval.ms": 3600000,
},
}

Expand Down Expand Up @@ -84,11 +84,10 @@

STORAGE_CONFIG = {
# BUCKET_NAME is mapping from topic prefix to s3 bucket name
"BUCKET_NAME": dict([pair.split(':')[::-1]
for pair in os.environ["BUCKET_NAME"].split(',')]),
"BUCKET_NAME": dict(
[pair.split(":")[::-1] for pair in os.environ["BUCKET_NAME"].split(",")]
),
"REGION_NAME": os.environ["REGION_NAME"],
"AWS_ACCESS_KEY": os.environ["AWS_ACCESS_KEY"],
"AWS_SECRET_ACCESS_KEY": os.environ["AWS_SECRET_ACCESS_KEY"],
}

LOGGING_DEBUG = os.getenv("LOGGING_DEBUG", False)
Expand Down
6 changes: 6 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ def docker_compose_file(pytestconfig):
)


@pytest.fixture(scope="session")
def docker_compose_command():
version = os.getenv("COMPOSE", "v1")
return "docker compose" if version == "v2" else "docker-compose"


def is_responsive_kafka(url):
client = AdminClient({"bootstrap.servers": url})
topics = ["test_topic"]
Expand Down
28 changes: 13 additions & 15 deletions tests/integration/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,23 +1,21 @@
version: "3"
services:
zookeeper:
image: confluentinc/cp-zookeeper:5.5.1
image: 'bitnami/zookeeper:latest'
ports:
- '2181:2181'
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
- ALLOW_ANONYMOUS_LOGIN=yes

kafka:
image: confluentinc/cp-kafka:5.5.1
depends_on:
- zookeeper
image: 'bitnami/kafka:latest'
ports:
- '9092:9092'
environment:
- KAFKA_BROKER_ID=1
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
- KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1
- KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1
- KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0
ports:
- "9092:9092"
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
depends_on:
- zookeeper
57 changes: 19 additions & 38 deletions tests/integration/test_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,48 +14,35 @@
"PARAMS": {
"bootstrap.servers": "localhost:9092",
"group.id": "test_consumer",
'auto.offset.reset': 'beginning',
'enable.partition.eof': True
"auto.offset.reset": "beginning",
"enable.partition.eof": True,
},
"consume.messages": 1,
"TOPICS": ["test_topic1", "test_topic2", "test_survey"]
"TOPICS": ["test_topic1", "test_topic2", "test_survey"],
}

STEP_CONFIG = {
"CONSUMER_CONFIG": CONSUMER_CONFIG,
"STORAGE": {
"BUCKET_NAME": {
"test_topic": "test_bucket1",
"test_survey": "test_bucket2"
},
"AWS_ACCESS_KEY": "test_key",
"AWS_SECRET_ACCESS_KEY": "test_key",
"REGION_NAME": "us-east1"
}
"BUCKET_NAME": {"test_topic": "test_bucket1", "test_survey": "test_bucket2"},
"REGION_NAME": "us-east1",
},
}

schema = {
"type": "record",
"name": "test",
"fields": [
{
"name": "objectId",
"type": "string"
},
{"name": "objectId", "type": "string"},
{
"name": "candidate",
"type": {
"type": "record",
"name": "candidateRecord",
"fields": [
{
"name": "candid",
"type": "int"
}
]
}
}
]
"fields": [{"name": "candid", "type": "int"}],
},
},
],
}


Expand All @@ -75,21 +62,15 @@ def setUp(self):
self.messages = [
{
"objectId": "oid1",
"candidate": {
"candid": 123
},
"candidate": {"candid": 123},
},
{
"objectId": "oid2",
"candidate": {
"candid": 124
},
"candidate": {"candid": 124},
},
{
"objectId": "oid3",
"candidate": {
"candid": 125
},
"candidate": {"candid": 125},
},
]
self.serialized = [serialize_message(message) for message in self.messages]
Expand All @@ -101,7 +82,7 @@ def tearDown(self):
del self.conn

def test_step_execution(self):
external = Producer({'bootstrap.servers': 'localhost:9092'})
external = Producer({"bootstrap.servers": "localhost:9092"})
consumer = KafkaConsumer(CONSUMER_CONFIG)
step = S3Step(consumer=consumer, config=STEP_CONFIG)

Expand All @@ -113,8 +94,8 @@ def test_step_execution(self):
step.start()

o123, o125 = list(self.conn.Bucket("test_bucket1").objects.all())
self.assertEqual(o123.key, '321.avro')
self.assertEqual(o125.key, '521.avro')
self.assertEqual(o123.key, "321.avro")
self.assertEqual(o125.key, "521.avro")

o124, = list(self.conn.Bucket("test_bucket2").objects.all())
self.assertEqual(o124.key, '421.avro')
(o124,) = list(self.conn.Bucket("test_bucket2").objects.all())
self.assertEqual(o124.key, "421.avro")
15 changes: 4 additions & 11 deletions tests/unittest/test_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,7 @@


STORAGE_CONFIG = {
"BUCKET_NAME": {
'svy1': SVY1_BUCKET,
'svy2': SVY2_BUCKET
},
"AWS_ACCESS_KEY": "fake",
"AWS_SECRET_ACCESS_KEY": "fake",
"BUCKET_NAME": {"svy1": SVY1_BUCKET, "svy2": SVY2_BUCKET},
"REGION_NAME": "fake",
}

Expand Down Expand Up @@ -43,10 +38,8 @@ def test_upload_file(self, mock_client):
self.step.upload_file(f, candid, SVY1_BUCKET)
mock_client.assert_called_with(
"s3",
aws_access_key_id=self.step_config["STORAGE"]["AWS_ACCESS_KEY"],
aws_secret_access_key=self.step_config["STORAGE"]["AWS_SECRET_ACCESS_KEY"],
region_name=self.step_config["STORAGE"]["REGION_NAME"]
)
region_name=self.step_config["STORAGE"]["REGION_NAME"],
)
mock_client().upload_fileobj.assert_called_with(f, SVY1_BUCKET, "321.avro")

@mock.patch("s3_step.S3Step.upload_file")
Expand All @@ -60,7 +53,7 @@ def test_execute_with_unknown_topic_for_bucket(self, mock_upload):
message = {"objectId": "obj", "candidate": {"candid": 123}}
self.mock_message.topic.return_value = "svy3_topic"
self.mock_consumer.messages = [self.mock_message]
with self.assertRaisesRegex(KeyError, 'svy3_topic'):
with self.assertRaisesRegex(KeyError, "svy3_topic"):
self.step.execute(message)
mock_upload.assert_not_called()

Expand Down

0 comments on commit 67b915e

Please sign in to comment.