Skip to content

Commit

Permalink
ref(notifications): add position information (#74)
Browse files Browse the repository at this point in the history
CHANGES
- add position information to status messages
- update `zadd` calls to prevent updating existing items in sets
  • Loading branch information
chdsbd authored and kodiakhq[bot] committed Jul 18, 2019
1 parent e3211b5 commit 9694937
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 7 deletions.
47 changes: 41 additions & 6 deletions kodiak/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import typing

import asyncio_redis
import inflection
import sentry_sdk
import structlog
from asyncio_redis.connection import Connection as RedisConnection
Expand Down Expand Up @@ -39,6 +40,18 @@ def get_merge_target_queue_name(self) -> str:
return self.get_merge_queue_name() + ":target"


T = typing.TypeVar("T")


def find_position(x: typing.Iterable[T], v: T) -> typing.Optional[int]:
count = 0
for item in x:
if item == v:
return count
count += 1
return None


async def webhook_event_consumer(
*, connection: RedisConnection, webhook_queue: RedisWebhookQueue, queue_name: str
) -> typing.NoReturn:
Expand Down Expand Up @@ -88,14 +101,25 @@ async def webhook_event_consumer(
raise Exception("Unknown MergeabilityResponse")

# don't clobber statuses set in the merge loop
if not is_merging:
await pull_request.set_status("📦 enqueued for merge")
# The following responses are okay to add to merge queue:
# + NEEDS_UPDATE - okay for merging
# + NEED_REFRESH - assume okay
# + WAIT - assume checks pass
# + OK - we've got the green
await webhook_queue.enqueue_for_repo(event=webhook_event)
webhook_event_jsons = await webhook_queue.enqueue_for_repo(
event=webhook_event
)
if is_merging:
continue

position = find_position(webhook_event_jsons, webhook_event_json.value)
if position is None:
continue
# use 1-based indexing
humanized_position = inflection.ordinalize(position + 1)
await pull_request.set_status(
f"📦 enqueued for merge (position={humanized_position})"
)


# TODO(chdsbd): Generalize this event processor boilerplate
Expand Down Expand Up @@ -251,12 +275,14 @@ async def enqueue(self, *, event: WebhookEvent) -> None:
queue_name = get_webhook_queue_name(event)
transaction = await self.connection.multi()
await transaction.sadd(WEBHOOK_QUEUE_NAMES, [queue_name])
await transaction.zadd(queue_name, {event.json(): time.time()})
await transaction.zadd(
queue_name, {event.json(): time.time()}, only_if_not_exists=True
)
await transaction.exec()

self.start_webhook_worker(queue_name=queue_name)

async def enqueue_for_repo(self, *, event: WebhookEvent) -> None:
async def enqueue_for_repo(self, *, event: WebhookEvent) -> typing.List[str]:
"""
1. get the corresponding repo queue for event
2. add key to MERGE_QUEUE_NAMES so on restart we can recreate the
Expand All @@ -267,10 +293,19 @@ async def enqueue_for_repo(self, *, event: WebhookEvent) -> None:
key = get_merge_queue_name(event)
transaction = await self.connection.multi()
await transaction.sadd(MERGE_QUEUE_NAMES, [key])
await transaction.zadd(key, {event.json(): time.time()})
await transaction.zadd(
key, {event.json(): time.time()}, only_if_not_exists=True
)
future_results = await transaction.zrange(key, 0, 1000)
await transaction.exec()

self.start_repo_worker(key)
results = await future_results
dictionary = await results.asdict()
kvs = sorted(
((key, value) for key, value in dictionary.items()), key=lambda x: x[1]
)
return [key for key, value in kvs]


def get_merge_queue_name(event: WebhookEvent) -> str:
Expand Down
11 changes: 10 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ arrow = "^0.13.2"
databases = "^0.2.2"
asyncio_redis = {git = "https://github.com/chdsbd/asyncio-redis.git"}
asyncio-throttle = "^0.1.1"
inflection = "^0.3.1"

[tool.poetry.dev-dependencies]
pytest = "^4.4"
Expand Down

0 comments on commit 9694937

Please sign in to comment.