Skip to content

Commit

Permalink
Merge branch 'main' into jenkins/zshkoor/setup-py-updated-5a91ddb
Browse files Browse the repository at this point in the history
  • Loading branch information
BilalQamar95 authored Dec 10, 2024
2 parents fdc6d00 + cb5dac8 commit 26ee465
Show file tree
Hide file tree
Showing 29 changed files with 818 additions and 712 deletions.
15 changes: 8 additions & 7 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@ jobs:
runs-on: ${{ matrix.os }}
strategy:
matrix:
os: [ubuntu-20.04]
python-version: ['3.8']
toxenv: [quality, docs, pii_check, django32, django40]
os: [ubuntu-latest]
python-version: ['3.12']
toxenv: [quality, docs, pii_check, django42]

steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
- name: setup python
uses: actions/setup-python@v4
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}

Expand All @@ -37,8 +37,9 @@ jobs:
run: tox

- name: Run coverage
if: matrix.python-version == '3.8' && matrix.toxenv == 'django32'
uses: codecov/codecov-action@v3
if: matrix.python-version == '3.12' && matrix.toxenv == 'django42'
uses: codecov/codecov-action@v4
with:
token: ${{ secrets.CODECOV_TOKEN }}
flags: unittests
fail_ci_if_error: true
10 changes: 5 additions & 5 deletions .github/workflows/pypi-publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@ on:
jobs:

push:
runs-on: ubuntu-20.04
runs-on: ubuntu-latest

steps:
- name: Checkout
uses: actions/checkout@v3
uses: actions/checkout@v4
- name: setup python
uses: actions/setup-python@v4
uses: actions/setup-python@v5
with:
python-version: 3.8
python-version: 3.12

- name: Install pip
run: pip install -r requirements/pip.txt
Expand All @@ -24,7 +24,7 @@ jobs:
run: python setup.py sdist bdist_wheel

- name: Publish to PyPi
uses: pypa/gh-action-pypi-publish@master
uses: pypa/gh-action-pypi-publish@release/v1
with:
user: __token__
password: ${{ secrets.PYPI_UPLOAD_TOKEN }}
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ __pycache__
/.installed.cfg
/lib
/lib64
venv

# Installer logs
pip-log.txt
Expand Down
41 changes: 41 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,47 @@ Change Log
Unreleased
**********

[6.0.0] - 2024-08-24
********************
Added
=======
* Added support for python3.12

Removed
=======
* Removed the support for python3.8

[5.8.1] - 2024-08-02
********************
Changed
=======
* Monitoring: Add a custom attribute, ``kafka_received_message`` to track whether a message was processed or not.

[5.8.0] - 2024-08-01
********************
Changed
=======
* Monitoring: Ensure that we have a root span for each iteration of the consume loop; renamed the trace name to be ``consumer.consume``.

[5.7.0] - 2024-03-22
********************
Changed
=======
* Dropped Support for Django 3.2, Django 4.0 and Django 4.1
* Added Support for Python 3.12

[5.6.0] - 2024-01-25
********************
Changed
=======
* Added client.id to base configuration.

[5.5.0] - 2023-09-21
********************
Changed
=======
* Reset edx-django-utils RequestCache before handling each event

[5.4.0] - 2023-08-28
********************
Changed
Expand Down
2 changes: 1 addition & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ can find it at `ISSUE_TEMPLATE.md <.github/ISSUE_TEMPLATE.md>`_.
Reporting Security Issues
*************************

Please do not report security issues in public. Please email security@edx.org.
Please do not report security issues in public. Please email security@openedx.org.

Getting Help
************
Expand Down
20 changes: 20 additions & 0 deletions catalog-info.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# This file records information about this repo. Its use is described in OEP-55:
# https://open-edx-proposals.readthedocs.io/en/latest/processes/oep-0055-proc-project-maintainers.html

apiVersion: backstage.io/v1alpha1
kind: Component
metadata:
name: 'event-bus-kafka'
description: "Kafka implementation for Open edX event bus."
links:
- url: "https://github.com/openedx/event-bus-kafka"
title: "Event Bus Kafka"
icon: "Web"
annotations:
openedx.org/arch-interest-groups: ""
spec:
owner: group:2u-arch-bom
type: 'library'
lifecycle: 'production'
dependsOn:
- 'openedx-events'
10 changes: 10 additions & 0 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,16 @@ def get_version(*file_paths):
#
# epub_use_index = True

# -- Read the Docs Specific Configuration
# Define the canonical URL if you are using a custom domain on Read the Docs
html_baseurl = os.environ.get("READTHEDOCS_CANONICAL_URL", "")

# Tell Jinja2 templates the build is running on Read the Docs
if os.environ.get("READTHEDOCS", "") == "True":
if "html_context" not in globals():
html_context = {}
html_context["READTHEDOCS"] = True


# Example configuration for intersphinx: refer to the Python standard library.
intersphinx_mapping = {
Expand Down
2 changes: 1 addition & 1 deletion edx_event_bus_kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@
from edx_event_bus_kafka.internal.consumer import KafkaEventConsumer
from edx_event_bus_kafka.internal.producer import KafkaEventProducer, create_producer

__version__ = '5.4.0'
__version__ = '6.0.0'
7 changes: 6 additions & 1 deletion edx_event_bus_kafka/internal/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
This module is for internal use only.
"""

import warnings
from functools import lru_cache
from typing import Optional

from django.conf import settings
from django.dispatch import receiver
from django.test.signals import setting_changed
from openedx_events.data import get_service_name

# See https://github.com/openedx/event-bus-kafka/blob/main/docs/decisions/0005-optional-import-of-confluent-kafka.rst
try:
Expand Down Expand Up @@ -110,6 +110,11 @@ def load_common_settings() -> Optional[dict]:
'sasl.password': secret,
})

client_id = get_service_name()
if client_id:
base_settings.update({
'client.id': client_id
})
return base_settings


Expand Down
95 changes: 66 additions & 29 deletions edx_event_bus_kafka/internal/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from django.db import connection
from django.dispatch import receiver
from django.test.signals import setting_changed
from edx_django_utils.cache import RequestCache
from edx_django_utils.monitoring import function_trace, record_exception, set_custom_attribute
from edx_toggles.toggles import SettingToggle
from openedx_events.event_bus import EventBusConsumer
Expand Down Expand Up @@ -106,6 +107,31 @@ def _reconnect_to_db_if_needed():
connection.connect()


def _clear_request_cache():
"""
Clear the RequestCache so that each event consumption starts fresh.
Signal handlers may be written with the assumption that they are called in the context
of a web request, so we clear the request cache just in case.
"""
RequestCache.clear_all_namespaces()


def _prepare_for_new_work_cycle():
"""
Ensure that the application state is appropriate for performing a new unit of work.
This mimics some setup/teardown that is normally performed by Django in its
request/response based architecture and that is needed for ensuring a clean and
usable state in this worker-based application.
"""
# Ensure that the database connection is active and usable.
_reconnect_to_db_if_needed()

# Clear the request cache, in case anything in the signal handlers rely on it.
_clear_request_cache()


class KafkaEventConsumer(EventBusConsumer):
"""
Construct consumer for the given topic and group. The consumer can then
Expand Down Expand Up @@ -273,38 +299,41 @@ def _consume_indefinitely(self):
if CONSECUTIVE_ERRORS_LIMIT and consecutive_errors >= CONSECUTIVE_ERRORS_LIMIT:
raise Exception(f"Too many consecutive errors, exiting ({consecutive_errors} in a row)")

msg = None
try:
msg = self.consumer.poll(timeout=CONSUMER_POLL_TIMEOUT)
if msg is not None:
with function_trace('_consume_indefinitely_consume_single_message'):
# Before processing, make sure our db connection is still active
_reconnect_to_db_if_needed()
with function_trace('consumer.consume'):
msg = None
try:
msg = self.consumer.poll(timeout=CONSUMER_POLL_TIMEOUT)
if msg is not None:
# Before processing, try to make sure our application state is cleaned
# up as would happen at the start of a Django request/response cycle.
# See https://github.com/openedx/openedx-events/issues/236 for details.
_prepare_for_new_work_cycle()

signal = self.determine_signal(msg)
msg.set_value(self._deserialize_message_value(msg, signal))
self.emit_signals_from_message(msg, signal)
consecutive_errors = 0

self._add_message_monitoring(run_context=run_context, message=msg)
except Exception as e: # pylint: disable=broad-except
consecutive_errors += 1
self.record_event_consuming_error(run_context, e, msg)
# Kill the infinite loop if the error is fatal for the consumer
_, kafka_error = self._get_kafka_message_and_error(message=msg, error=e)
if kafka_error and kafka_error.fatal():
raise e
# Prevent fast error-looping when no event received from broker. Because
# DeserializingConsumer raises rather than returning a Message when it has an
# error() value, this may be triggered even when a Message *was* returned,
# slowing down the queue. This is probably close enough, though.
if msg is None:
time.sleep(POLL_FAILURE_SLEEP)
if msg:
# theoretically we could just call consumer.commit() without passing the specific message
# to commit all this consumer's current offset across all partitions since we only process one
# message at a time, but limit it to just the offset/partition of the specified message
# to be super safe
self.consumer.commit(message=msg)
self._add_message_monitoring(run_context=run_context, message=msg)
except Exception as e:
consecutive_errors += 1
self.record_event_consuming_error(run_context, e, msg)
# Kill the infinite loop if the error is fatal for the consumer
_, kafka_error = self._get_kafka_message_and_error(message=msg, error=e)
if kafka_error and kafka_error.fatal():
raise e
# Prevent fast error-looping when no event received from broker. Because
# DeserializingConsumer raises rather than returning a Message when it has an
# error() value, this may be triggered even when a Message *was* returned,
# slowing down the queue. This is probably close enough, though.
if msg is None:
time.sleep(POLL_FAILURE_SLEEP)
if msg:
# theoretically we could just call consumer.commit() without passing the specific message
# to commit all this consumer's current offset across all partitions since we only process one
# message at a time, but limit it to just the offset/partition of the specified message
# to be super safe
self.consumer.commit(message=msg)
finally:
self.consumer.close()

Expand Down Expand Up @@ -486,7 +515,7 @@ def _log_message_received(self, msg):
f'offset={msg.offset()}, message_id={message_id}, key={msg.key()}, '
f'event_timestamp_ms={timestamp_info}'
)
except Exception as e: # pragma: no cover pylint: disable=broad-except
except Exception as e: # pragma: no cover
# Use this to fix any bugs in what should be benign logging code
set_custom_attribute('kafka_logging_error', repr(e))

Expand Down Expand Up @@ -546,6 +575,10 @@ def _add_message_monitoring(self, run_context, message, error=None):
set_custom_attribute('kafka_topic', run_context['full_topic'])

if kafka_message:
# .. custom_attribute_name: kafka_received_message
# .. custom_attribute_description: True if we are processing a message with this span, False otherwise.
set_custom_attribute('kafka_received_message', True)

# .. custom_attribute_name: kafka_partition
# .. custom_attribute_description: The partition of the message.
set_custom_attribute('kafka_partition', kafka_message.partition())
Expand All @@ -565,6 +598,10 @@ def _add_message_monitoring(self, run_context, message, error=None):
# .. custom_attribute_description: The event type of the message. Note that the header in the logs
# will use 'ce_type'.
set_custom_attribute('kafka_event_type', ",".join(event_types))
else:
# .. custom_attribute_name: kafka_received_message
# .. custom_attribute_description: True if we are processing a message with this span.
set_custom_attribute('kafka_received_message', False)

if kafka_error:
# .. custom_attribute_name: kafka_error_fatal
Expand All @@ -574,7 +611,7 @@ def _add_message_monitoring(self, run_context, message, error=None):
# .. custom_attribute_description: Boolean describing if the error is retriable.
set_custom_attribute('kafka_error_retriable', kafka_error.retriable())

except Exception as e: # pragma: no cover pylint: disable=broad-except
except Exception as e: # pragma: no cover
# Use this to fix any bugs in what should be benign monitoring code
set_custom_attribute('kafka_monitoring_error', repr(e))

Expand Down
2 changes: 1 addition & 1 deletion edx_event_bus_kafka/internal/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ def send(
# would never get a delivery callback. That's why there's also a thread calling
# poll(0) on a regular interval (see `poll_indefinitely`).
self.producer.poll(0)
except Exception as e: # pylint: disable=broad-except
except Exception as e:
# Errors caused by the produce call should be handled by the on_delivery callback.
# Here we might expect serialization errors, or any errors from preparing to produce.
record_producing_error(e, context)
Expand Down
2 changes: 2 additions & 0 deletions edx_event_bus_kafka/internal/tests/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,15 @@ def test_full(self):
EVENT_BUS_KAFKA_BOOTSTRAP_SERVERS='localhost:54321',
EVENT_BUS_KAFKA_API_KEY='some_other_key',
EVENT_BUS_KAFKA_API_SECRET='some_other_secret',
EVENTS_SERVICE_NAME='my_service',
):
assert config.load_common_settings() == {
'bootstrap.servers': 'localhost:54321',
'sasl.mechanism': 'PLAIN',
'security.protocol': 'SASL_SSL',
'sasl.username': 'some_other_key',
'sasl.password': 'some_other_secret',
'client.id': 'my_service',
}


Expand Down
7 changes: 7 additions & 0 deletions edx_event_bus_kafka/internal/tests/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ def raise_exception():
call("kafka_partition", 2),
call("kafka_offset", 12345),
call("kafka_event_type", "org.openedx.learning.auth.session.login.completed.v1"),
call("kafka_received_message", True),
] * len(mock_emit_side_effects),
any_order=True,
)
Expand Down Expand Up @@ -431,7 +432,13 @@ def poll_side_effect(*args, **kwargs):
expected_custom_attribute_calls += [
call("kafka_message_id", "1111-1111"),
call("kafka_event_type", "org.openedx.learning.auth.session.login.completed.v1"),
call("kafka_received_message", True),
]
else:
expected_custom_attribute_calls += [
call("kafka_received_message", False),
]

if has_kafka_error:
expected_custom_attribute_calls += [
call('kafka_error_fatal', is_fatal),
Expand Down
Loading

0 comments on commit 26ee465

Please sign in to comment.