Skip to content

Commit

Permalink
Make migration to redis as a broker possible. (#16784)
Browse files Browse the repository at this point in the history
Temporarily hard-codes a BROKER_URL in a separate worker process.

If not present, REDIS_URL is used for broker.
  • Loading branch information
ewdurbin committed Sep 24, 2024
1 parent 2c337ea commit 85bb371
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 4 deletions.
1 change: 1 addition & 0 deletions Procfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ web-uploads: bin/start-web ddtrace-run python -m gunicorn.app.wsgiapp -c gunicor
worker: bin/start-worker celery -A warehouse worker -Q default -l info --max-tasks-per-child 32
worker-beat: bin/start-worker celery -A warehouse beat -S redbeat.RedBeatScheduler -l info
worker-traced: env DD_SERVICE=warehouse-worker bin/start-worker ddtrace-run celery -A warehouse worker -Q default -l info --max-tasks-per-child 32
worker-drain-sqs: env BROKER_URL=sqs:///?region=us-east-2&queue_name_prefix=pypi-worker bin/start-worker celery -A warehouse worker -Q default -l info --max-tasks-per-child 32
1 change: 0 additions & 1 deletion dev/environment
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ WAREHOUSE_IP_SALT="insecure himalayan pink salt"

AWS_ACCESS_KEY_ID=foo
AWS_SECRET_ACCESS_KEY=foo
BROKER_URL=sqs://localstack:4566/?region=us-east-1&queue_name_prefix=warehouse-dev

DATABASE_URL=postgresql+psycopg://postgres@db/warehouse

Expand Down
44 changes: 42 additions & 2 deletions tests/unit/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -460,26 +460,36 @@ def test_make_celery_app():


@pytest.mark.parametrize(
("env", "ssl", "broker_url", "expected_url", "transport_options"),
(
"env",
"ssl",
"broker_url",
"broker_redis_url",
"expected_url",
"transport_options",
),
[
(
Environment.development,
False,
"amqp://guest@rabbitmq:5672//",
None,
"amqp://guest@rabbitmq:5672//",
{},
),
(
Environment.production,
True,
"amqp://guest@rabbitmq:5672//",
None,
"amqp://guest@rabbitmq:5672//",
{},
),
(
Environment.development,
False,
"sqs://",
None,
"sqs://",
{
"client-config": {"tcp_keepalive": True},
Expand All @@ -489,6 +499,7 @@ def test_make_celery_app():
Environment.production,
True,
"sqs://",
None,
"sqs://",
{
"client-config": {"tcp_keepalive": True},
Expand All @@ -498,6 +509,7 @@ def test_make_celery_app():
Environment.development,
False,
"sqs://?queue_name_prefix=warehouse",
None,
"sqs://",
{
"queue_name_prefix": "warehouse-",
Expand All @@ -508,6 +520,7 @@ def test_make_celery_app():
Environment.production,
True,
"sqs://?queue_name_prefix=warehouse",
None,
"sqs://",
{
"queue_name_prefix": "warehouse-",
Expand All @@ -518,6 +531,7 @@ def test_make_celery_app():
Environment.development,
False,
"sqs://?region=us-east-2",
None,
"sqs://",
{
"region": "us-east-2",
Expand All @@ -528,6 +542,7 @@ def test_make_celery_app():
Environment.production,
True,
"sqs://?region=us-east-2",
None,
"sqs://",
{
"region": "us-east-2",
Expand All @@ -538,6 +553,7 @@ def test_make_celery_app():
Environment.development,
False,
"sqs:///?region=us-east-2&queue_name_prefix=warehouse",
None,
"sqs://",
{
"region": "us-east-2",
Expand All @@ -549,16 +565,39 @@ def test_make_celery_app():
Environment.production,
True,
"sqs:///?region=us-east-2&queue_name_prefix=warehouse",
None,
"sqs://",
{
"region": "us-east-2",
"queue_name_prefix": "warehouse-",
"client-config": {"tcp_keepalive": True},
},
),
(
Environment.production,
True,
"sqs:///?region=us-east-2&queue_name_prefix=warehouse",
"redis://127.0.0.1:6379/10",
"sqs://",
{
"region": "us-east-2",
"queue_name_prefix": "warehouse-",
"client-config": {"tcp_keepalive": True},
},
),
(
Environment.production,
True,
None,
"redis://127.0.0.1:6379/10",
"redis://127.0.0.1:6379/10",
{},
),
],
)
def test_includeme(env, ssl, broker_url, expected_url, transport_options):
def test_includeme(
env, ssl, broker_url, broker_redis_url, expected_url, transport_options
):
registry_dict = {}
config = pretend.stub(
action=pretend.call_recorder(lambda *a, **kw: None),
Expand All @@ -570,6 +609,7 @@ def test_includeme(env, ssl, broker_url, expected_url, transport_options):
settings={
"warehouse.env": env,
"celery.broker_url": broker_url,
"celery.broker_redis_url": broker_redis_url,
"celery.result_url": pretend.stub(),
"celery.scheduler_url": pretend.stub(),
},
Expand Down
1 change: 1 addition & 0 deletions warehouse/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,7 @@ def configure(settings=None):
)
maybe_set(settings, "warehouse.downloads_table", "WAREHOUSE_DOWNLOADS_TABLE")
maybe_set(settings, "celery.broker_url", "BROKER_URL")
maybe_set_redis(settings, "celery.broker_redis_url", "REDIS_URL", db=10)
maybe_set_redis(settings, "celery.result_url", "REDIS_URL", db=12)
maybe_set_redis(settings, "celery.scheduler_url", "REDIS_URL", db=0)
maybe_set_redis(settings, "oidc.jwk_cache_url", "REDIS_URL", db=1)
Expand Down
5 changes: 4 additions & 1 deletion warehouse/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,10 @@ def includeme(config):

broker_transport_options = {}

broker_url = s["celery.broker_url"]
broker_url = s.get("celery.broker_url")
if broker_url is None:
broker_url = s["celery.broker_redis_url"]

if broker_url.startswith("sqs://"):
parsed_url = parse_url(broker_url)
parsed_query = urllib.parse.parse_qs(parsed_url.query)
Expand Down

0 comments on commit 85bb371

Please sign in to comment.