Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

replace asyncio-redis with redis-py #867

Merged
merged 27 commits into from
Feb 4, 2023
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions bot/kodiak/app_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ def __call__(
REDIS_URL = config("REDIS_URL", cast=databases.DatabaseURL, default=None) or config(
"REDISCLOUD_URL", cast=databases.DatabaseURL
)
REDIS_POOL_SIZE = config("REDIS_POOL_SIZE", cast=int, default=20)
SECRET_KEY = config("SECRET_KEY")
GITHUB_APP_ID = config("GITHUB_APP_ID")
GITHUB_PRIVATE_KEY = config("GITHUB_PRIVATE_KEY", default=None)
Expand All @@ -55,11 +54,19 @@ def __call__(
default=["pull_request", "pull_request_review", "pull_request_comment"],
)
)
USAGE_REPORTING_POOL_SIZE = config("USAGE_REPORTING_POOL_SIZE", cast=int, default=50)
USAGE_REPORTING_QUEUE_LENGTH = config(
"USAGE_REPORTING_QUEUE_LENGTH", cast=int, default=10_000
)
INGEST_QUEUE_LENGTH = config("INGEST_QUEUE_LENGTH", cast=int, default=1_000)
REDIS_BLOCKING_POP_TIMEOUT_SEC = config(
"REDIS_BLOCKING_POP_TIMEOUT_SEC", cast=int, default=10
)
# if we don't get a reply from Redis within a short period, we have an error because we always expect short response times from redis. We specify a timeout for blocking oeprations
chdsbd marked this conversation as resolved.
Show resolved Hide resolved
REDIS_SOCKET_TIMEOUT_SEC = config("REDIS_SOCKET_TIMEOUT_SEC", cast=int, default=90)
# if we can't open a TCP connection quickly, we should raise a timeout.
REDIS_SOCKET_CONNECT_TIMEOUT_SEC = config(
"REDIS_SOCKET_CONNECT_TIMEOUT_SEC", cast=int, default=30
)

SUBSCRIPTIONS_ENABLED = config("SUBSCRIPTIONS_ENABLED", cast=bool, default=False)

Expand Down
31 changes: 7 additions & 24 deletions bot/kodiak/entrypoints/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import hmac
from typing import Any

import asyncio_redis
import structlog
import uvicorn
from sentry_sdk.integrations.asgi import SentryAsgiMiddleware
Expand All @@ -21,6 +20,7 @@
from kodiak.entrypoints.worker import PubsubIngestQueueSchema
from kodiak.logging import configure_logging
from kodiak.queue import INGEST_QUEUE_NAMES, QUEUE_PUBSUB_INGEST, get_ingest_queue
from kodiak.redis_client import redis_bot
from kodiak.schemas import RawWebhookEvent

configure_logging()
Expand All @@ -30,23 +30,6 @@
app = Starlette()
app.add_middleware(SentryAsgiMiddleware)

# TODO(sbdchd): should this be a pool?
_redis: asyncio_redis.Pool | None = None


async def get_redis() -> asyncio_redis.Pool:
global _redis # pylint: disable=global-statement
if _redis is None:
_redis = await asyncio_redis.Pool.create(
host=conf.REDIS_URL.hostname or "localhost",
port=conf.REDIS_URL.port or 6379,
password=conf.REDIS_URL.password,
# XXX: which var?
poolsize=conf.USAGE_REPORTING_POOL_SIZE,
ssl=conf.REDIS_URL.scheme == "rediss",
)
return _redis


@app.route("/", methods=["GET"])
async def root(_: Request) -> Response:
Expand Down Expand Up @@ -98,15 +81,15 @@ async def github_webhook_event(request: Request) -> Response:
return JSONResponse({"ok": True})

ingest_queue = get_ingest_queue(installation_id)
redis = await get_redis()
await redis.rpush(

await redis_bot.rpush(
ingest_queue,
[RawWebhookEvent(event_name=github_event, payload=event).json()],
RawWebhookEvent(event_name=github_event, payload=event).json(),
)

await redis.ltrim(ingest_queue, 0, conf.INGEST_QUEUE_LENGTH)
await redis.sadd(INGEST_QUEUE_NAMES, [ingest_queue])
await redis.publish(
await redis_bot.ltrim(ingest_queue, 0, conf.INGEST_QUEUE_LENGTH)
await redis_bot.sadd(INGEST_QUEUE_NAMES, ingest_queue)
await redis_bot.publish(
QUEUE_PUBSUB_INGEST,
PubsubIngestQueueSchema(installation_id=installation_id).json(),
)
Expand Down
33 changes: 20 additions & 13 deletions bot/kodiak/entrypoints/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import sentry_sdk
import structlog

from kodiak import redis_client
from kodiak import app_config as conf
from kodiak.assertions import assert_never
from kodiak.logging import configure_logging
from kodiak.queue import (
Expand All @@ -22,6 +22,7 @@
get_ingest_queue,
handle_webhook_event,
)
from kodiak.redis_client import redis_bot
from kodiak.schemas import RawWebhookEvent

configure_logging()
Expand All @@ -30,13 +31,17 @@


async def work_ingest_queue(queue: WebhookQueueProtocol, queue_name: str) -> NoReturn:
redis = await redis_client.create_connection()
log = logger.bind(queue_name=queue_name, task="work_ingest_queue")

log.info("start working ingest_queue")
while True:
raw_event = await redis.blpop([queue_name])
parsed_event = RawWebhookEvent.parse_raw(raw_event.value)
res = await redis_bot.blpop(
[queue_name], timeout=conf.REDIS_BLOCKING_POP_TIMEOUT_SEC
)
if res is None:
continue
_, value = res
parsed_event = RawWebhookEvent.parse_raw(value)
await handle_webhook_event(
queue=queue,
event_name=parsed_event.event_name,
Expand All @@ -55,14 +60,17 @@ async def ingest_queue_starter(
"""
Listen on Redis Pubsub and start queue worker if we don't have one already.
"""
redis = await redis_client.create_connection()
subscriber = await redis.start_subscribe()
await subscriber.subscribe([QUEUE_PUBSUB_INGEST])
pubsub = redis_bot.pubsub()
await pubsub.subscribe(QUEUE_PUBSUB_INGEST)
log = logger.bind(task="ingest_queue_starter")
log.info("start watch for ingest_queues")
while True:
reply = await subscriber.next_published()
installation_id = PubsubIngestQueueSchema.parse_raw(reply.value).installation_id
reply = await pubsub.get_message(ignore_subscribe_messages=True, timeout=10)
if reply is None:
continue
installation_id = PubsubIngestQueueSchema.parse_raw(
reply["data"]
).installation_id
queue_name = get_ingest_queue(installation_id)
if queue_name not in ingest_workers:
ingest_workers[queue_name] = asyncio.create_task(
Expand All @@ -77,12 +85,11 @@ async def main() -> NoReturn:

ingest_workers = dict()

redis = await redis_client.create_connection()
ingest_queue_names = await redis.smembers(INGEST_QUEUE_NAMES)
ingest_queue_names = await redis_bot.smembers(INGEST_QUEUE_NAMES)
log = logger.bind(task="main_worker")

for queue_result in ingest_queue_names:
queue_name = await queue_result
for queue_name_bytes in ingest_queue_names:
queue_name = queue_name_bytes.decode()
if queue_name not in ingest_workers:
log.info("start ingest_queue_worker", queue_name=queue_name)
ingest_workers[queue_name] = asyncio.create_task(
Expand Down
17 changes: 5 additions & 12 deletions bot/kodiak/queries/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from kodiak.queries.commits import Commit, CommitConnection, GitActor
from kodiak.queries.commits import User as PullRequestCommitUser
from kodiak.queries.commits import get_commits
from kodiak.redis_client import redis_web_api
from kodiak.throttle import get_thottler_for_installation

logger = structlog.get_logger()
Expand Down Expand Up @@ -1327,20 +1328,12 @@ async def get_subscription(self) -> Optional[Subscription]:
"""
Get subscription information for installation.
"""
from kodiak.queue import get_redis

redis = await get_redis()
res = await redis.hgetall(
res = await redis_web_api.hgetall(
f"kodiak:subscription:{self.installation_id}".encode()
)
if not res:
return None
real_response = await res.asdict()
if not real_response:
return None
subscription_blocker_kind = (
real_response.get(b"subscription_blocker") or b""
).decode()
subscription_blocker_kind = (res.get(b"subscription_blocker") or b"").decode()
subscription_blocker: Optional[
Union[SubscriptionExpired, TrialExpired, SeatsExceeded]
] = None
Expand All @@ -1350,7 +1343,7 @@ async def get_subscription(self) -> Optional[Subscription]:
subscription_blocker = SeatsExceeded.parse_raw(
# Pydantic says it doesn't allow Nones, but passing a None
# raises a ValidationError which is fine.
real_response.get(b"data") # type: ignore
res.get(b"data") # type: ignore
)
except pydantic.ValidationError:
logger.exception("failed to parse seats_exceeded data", exc_info=True)
Expand All @@ -1360,7 +1353,7 @@ async def get_subscription(self) -> Optional[Subscription]:
if subscription_blocker_kind == "subscription_expired":
subscription_blocker = SubscriptionExpired()
return Subscription(
account_id=real_response[b"account_id"].decode(),
account_id=res[b"account_id"].decode(),
subscription_blocker=subscription_blocker,
)

Expand Down
Loading