Skip to content

Commit

Permalink
Rename bootstrap_servers -> brokers (#509)
Browse files Browse the repository at this point in the history
  • Loading branch information
gtopper authored Apr 8, 2024
1 parent fb668cf commit d17950f
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 16 deletions.
20 changes: 10 additions & 10 deletions integration/test_kafka_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@
from storey import AsyncEmitSource, Event, Reduce, SyncEmitSource, build_flow
from storey.targets import KafkaTarget

bootstrap_servers = os.getenv("KAFKA_BOOTSTRAP_SERVERS")
kafka_brokers = os.getenv("KAFKA_BROKERS")
topic = "test_kafka_integration"

if bootstrap_servers:
if kafka_brokers:
import kafka


Expand All @@ -38,8 +38,8 @@ def append_return(lst, x):
@pytest.fixture()
def kafka_topic_setup_teardown():
# Setup
kafka_admin_client = kafka.KafkaAdminClient(bootstrap_servers=bootstrap_servers)
kafka_consumer = kafka.KafkaConsumer(topic, bootstrap_servers=bootstrap_servers, auto_offset_reset="earliest")
kafka_admin_client = kafka.KafkaAdminClient(bootstrap_servers=kafka_brokers)
kafka_consumer = kafka.KafkaConsumer(topic, bootstrap_servers=kafka_brokers, auto_offset_reset="earliest")
try:
kafka_admin_client.delete_topics([topic])
sleep(1)
Expand All @@ -57,16 +57,16 @@ def kafka_topic_setup_teardown():


@pytest.mark.skipif(
not bootstrap_servers,
reason="KAFKA_BOOTSTRAP_SERVERS must be defined to run kafka tests",
not kafka_brokers,
reason="KAFKA_BROKERS must be defined to run kafka tests",
)
def test_kafka_target(kafka_topic_setup_teardown):
kafka_consumer = kafka_topic_setup_teardown

controller = build_flow(
[
SyncEmitSource(),
KafkaTarget(bootstrap_servers, topic, sharding_func=0, full_event=False),
KafkaTarget(kafka_brokers, topic, sharding_func=0, full_event=False),
]
).run()
events = []
Expand Down Expand Up @@ -98,7 +98,7 @@ async def async_test_write_to_kafka_full_event_readback(kafka_topic_setup_teardo
controller = build_flow(
[
AsyncEmitSource(),
KafkaTarget(bootstrap_servers, topic, sharding_func=lambda _: 0, full_event=True),
KafkaTarget(kafka_brokers, topic, sharding_func=lambda _: 0, full_event=True),
]
).run()
events = []
Expand Down Expand Up @@ -140,8 +140,8 @@ async def async_test_write_to_kafka_full_event_readback(kafka_topic_setup_teardo


@pytest.mark.skipif(
not bootstrap_servers,
reason="KAFKA_BOOTSTRAP_SERVERS must be defined to run kafka tests",
not kafka_brokers,
reason="KAFKA_BROKERS must be defined to run kafka tests",
)
def test_async_test_write_to_kafka_full_event_readback(kafka_topic_setup_teardown):
asyncio.run(async_test_write_to_kafka_full_event_readback(kafka_topic_setup_teardown))
12 changes: 6 additions & 6 deletions storey/targets.py
Original file line number Diff line number Diff line change
Expand Up @@ -997,7 +997,7 @@ class KafkaTarget(Flow, _Writer):
"""Writes all incoming events into a Kafka stream.
:param topic: Kafka topic.
:param bootstrap_servers: Kafka bootstrap servers (brokers).
:param brokers: List of kafka brokers, each in the form of host:port.
:param producer_options: Extra options to be passed as kwargs to kafka.KafkaProducer.
:param sharding_func: Partition, sharding key field, or function from event to partition or sharding key. Optional.
If not set, event key will be used as the sharding key.
Expand All @@ -1014,7 +1014,7 @@ class KafkaTarget(Flow, _Writer):

def __init__(
self,
bootstrap_servers: Union[str, List[str]],
brokers: Union[str, List[str]],
topic: str,
producer_options: Optional[dict] = None,
sharding_func: Union[None, int, str, Callable[[Event], Any]] = None,
Expand All @@ -1023,12 +1023,12 @@ def __init__(
full_event: Optional[bool] = None,
**kwargs,
):
if not bootstrap_servers:
raise ValueError("bootstrap_servers must be defined")
if not brokers:
raise ValueError("brokers must be defined")
if not topic:
raise ValueError("topic must be defined")

self._bootstrap_servers = bootstrap_servers
self._brokers = brokers
self._topic = topic
self._producer_options = producer_options

Expand Down Expand Up @@ -1061,7 +1061,7 @@ async def _lazy_init(self):

if not self._initialized:
kwargs = self._producer_options or {}
self._producer = KafkaProducer(bootstrap_servers=self._bootstrap_servers, **kwargs)
self._producer = KafkaProducer(bootstrap_servers=self._brokers, **kwargs)
self._initialized = True

async def _do(self, event):
Expand Down

0 comments on commit d17950f

Please sign in to comment.