Skip to content

Commit

Permalink
[o11y] Support for Streaming Tail Worker outcome instrumentation for …
Browse files Browse the repository at this point in the history
…most event types, initial testing
  • Loading branch information
fhanau committed Feb 19, 2025
1 parent 1cf5f6f commit 61cdeeb
Show file tree
Hide file tree
Showing 14 changed files with 471 additions and 107 deletions.
28 changes: 28 additions & 0 deletions src/workerd/api/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,34 @@ wd_test(
data = ["actor-alarms-test.js"],
)

wd_test(
src = "tail-worker-test.wd-test",
args = [
"--experimental",
],
data = [
# To reduce complexity, tail-worker-test calls into other tests to get traces from them.
"actor-alarms-test.js",
"http-test.js",
"queue-test.js",
"tail-worker-test.js",
"tail-worker-test-receiver.js",
"tail-worker-test-dummy.js",
"tests/websocket-hibernation.js",
],
)

wd_test(
src = "tail-worker-test-invalid.wd-test",
args = [
"--experimental",
],
data = [
"http-test.js",
"tail-worker-test-invalid.js",
],
)

wd_test(
src = "analytics-engine-test.wd-test",
args = ["--experimental"],
Expand Down
10 changes: 8 additions & 2 deletions src/workerd/api/hibernatable-web-socket.c++
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,20 @@ kj::Promise<WorkerInterface::CustomEvent::Result> HibernatableWebSocketCustomEve
t.setEventInfo(context.now(), tracing::HibernatableWebSocketEventInfo(getType()));
}

// TODO(streaming-tail-workers): Support Hibernate and Resume events properly.
context.getMetrics().reportTailEvent(context.getInvocationSpanContext(), [&] {
return tracing::Onset(
tracing::HibernatableWebSocketEventInfo(getType()), tracing::Onset::WorkerInfo{}, kj::none);
});

auto outcomeObserver = kj::rc<OutcomeObserver>(
kj::addRef(incomingRequest->getMetrics()), context.getInvocationSpanContext());

try {
co_await context.run(
[entrypointName = entrypointName, &context, eventParameters = kj::mv(eventParameters),
props = kj::mv(props)](Worker::Lock& lock) mutable {
props = kj::mv(props),
outcomeObserver = outcomeObserver.addRef()](Worker::Lock& lock) mutable {
KJ_SWITCH_ONEOF(eventParameters.eventType) {
KJ_CASE_ONEOF(text, HibernatableSocketParams::Text) {
return lock.getGlobalScope().sendHibernatableWebSocketMessage(kj::mv(text.message),
Expand Down Expand Up @@ -139,7 +144,8 @@ kj::Promise<WorkerInterface::CustomEvent::Result> HibernatableWebSocketCustomEve
outcome = EventOutcome::EXCEPTION;
}

waitUntilTasks.add(incomingRequest->drain().attach(kj::mv(incomingRequest)));
waitUntilTasks.add(
incomingRequest->drain().attach(kj::mv(incomingRequest)).attach(outcomeObserver.addRef()));

co_return Result{
.outcome = outcome,
Expand Down
22 changes: 15 additions & 7 deletions src/workerd/api/queue.c++
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,8 @@ jsg::Ref<QueueEvent> startQueueEvent(EventTarget& globalEventTarget,
IoPtr<QueueEventResult> result,
Worker::Lock& lock,
kj::Maybe<ExportedHandler&> exportedHandler,
const jsg::TypeHandler<QueueExportedHandler>& handlerHandler) {
const jsg::TypeHandler<QueueExportedHandler>& handlerHandler,
kj::Rc<OutcomeObserver> outcomeObserver) {
jsg::Lock& js = lock;
// Start a queue event (called from C++, not JS). Similar to startScheduled(), the caller must
// wait for waitUntil()s to produce the final QueueResult.
Expand All @@ -485,9 +486,12 @@ jsg::Ref<QueueEvent> startQueueEvent(EventTarget& globalEventTarget,
KJ_IF_SOME(f, queueHandler.queue) {
auto promise = f(lock, jsg::alloc<QueueController>(event.addRef()),
jsg::JsValue(h.env.getHandle(js)).addRef(js), h.getCtx());
event->waitUntil(promise.then([event = event.addRef()]() mutable {
event->waitUntil(promise.then(
[outcomeObserver = outcomeObserver.addRef(), event = event.addRef()]() mutable {
event->setCompletionStatus(QueueEvent::CompletedSuccessfully{});
}, [event = event.addRef()](kj::Exception&& e) mutable {
},
[outcomeObserver = outcomeObserver.addRef(), event = event.addRef()](
kj::Exception&& e) mutable {
event->setCompletionStatus(QueueEvent::CompletedWithError{kj::cp(e)});
return kj::mv(e);
}));
Expand Down Expand Up @@ -540,6 +544,9 @@ kj::Promise<WorkerInterface::CustomEvent::Result> QueueCustomEventImpl::run(
tracing::Onset::WorkerInfo{}, kj::none);
});

auto outcomeObserver = kj::rc<OutcomeObserver>(
kj::addRef(incomingRequest->getMetrics()), context.getInvocationSpanContext());

// Create a custom refcounted type for holding the queueEvent so that we can pass it to the
// waitUntil'ed callback safely without worrying about whether this coroutine gets canceled.
struct QueueEventHolder: public kj::Refcounted {
Expand All @@ -552,14 +559,15 @@ kj::Promise<WorkerInterface::CustomEvent::Result> QueueCustomEventImpl::run(
// can't just wait on their addEventListener handler to resolve because it can't be async).
context.addWaitUntil(context.run(
[this, entrypointName = entrypointName, &context, queueEvent = kj::addRef(*queueEventHolder),
&metrics = incomingRequest->getMetrics(),
&metrics = incomingRequest->getMetrics(), outcomeObserver = kj::mv(outcomeObserver),
props = kj::mv(props)](Worker::Lock& lock) mutable {
jsg::AsyncContextFrame::StorageScope traceScope = context.makeAsyncTraceScope(lock);

auto& typeHandler = lock.getWorker().getIsolate().getApi().getQueueTypeHandler(lock);
queueEvent->event = startQueueEvent(lock.getGlobalScope(), kj::mv(params),
context.addObject(result), lock,
lock.getExportedHandler(entrypointName, kj::mv(props), context.getActor()), typeHandler);
queueEvent->event =
startQueueEvent(lock.getGlobalScope(), kj::mv(params), context.addObject(result), lock,
lock.getExportedHandler(entrypointName, kj::mv(props), context.getActor()), typeHandler,
kj::mv(outcomeObserver));
}));

// TODO(soon): There's a good chance we'll want a different wall-clock timeout for queue handlers
Expand Down
9 changes: 9 additions & 0 deletions src/workerd/api/tail-worker-test-dummy.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
// Copyright (c) 2017-2023 Cloudflare, Inc.
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
// https://opensource.org/licenses/Apache-2.0
export default {
// https://developers.cloudflare.com/workers/observability/logs/tail-workers/
tail(...args) {
return (...args) => {};
},
};
29 changes: 29 additions & 0 deletions src/workerd/api/tail-worker-test-invalid.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright (c) 2017-2023 Cloudflare, Inc.
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
// https://opensource.org/licenses/Apache-2.0
import * as assert from 'node:assert';

let resposeMap = new Map();

export default {
// https://developers.cloudflare.com/workers/observability/logs/tail-workers/
tailStream(args) {
// Invalid log statement, causes worker to not return a valid handler
console.log(args.map((t) => t.logs));
return (args) => {
console.log(args);
};
},
};

export const test = {
async test() {
await scheduler.wait(100);
// Tests for a bug where we tried to report an outcome event to a stream after setting up the
// stream handler with the onset event failed – with an invalid tail handler, we should not
// report any further events.
// TODO: How to test this better? What we want to see here is there not being an
// "Expected only a single onset event" error.
assert.ok(resposeMap.size == 0);
},
};
33 changes: 33 additions & 0 deletions src/workerd/api/tail-worker-test-invalid.wd-test
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
using Workerd = import "/workerd/workerd.capnp";

const unitTests :Workerd.Config = (
services = [
( name = "http-test",
worker = (
modules = [
( name = "worker", esModule = embed "http-test.js" )
],
bindings = [
( name = "SERVICE", service = "http-test" ),
( name = "CACHE_ENABLED", json = "false" ),
],
compatibilityDate = "2023-08-01",
compatibilityFlags = ["nodejs_compat", "service_binding_extra_handlers", "cache_option_disabled"],
tails = ["log"],
),
),
# tail worker with tests
( name = "log",
worker = (
modules = [
(name = "worker", esModule = embed "tail-worker-test-invalid.js")
],
compatibilityDate = "2024-10-14",
compatibilityFlags = ["experimental", "nodejs_compat"],
),
),
],
autogates = [
"workerd-autogate-streaming-tail-workers",
],
);
36 changes: 36 additions & 0 deletions src/workerd/api/tail-worker-test-receiver.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright (c) 2017-2023 Cloudflare, Inc.
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
// https://opensource.org/licenses/Apache-2.0
import * as assert from 'node:assert';

let resposeMap = new Map();

export default {
// https://developers.cloudflare.com/workers/observability/logs/tail-workers/
tailStream(...args) {
// Onset event, must be singleton
resposeMap.set(args[0].traceId, JSON.stringify(args[0].event));
return (...args) => {
let cons = resposeMap.get(args[0].traceId);
resposeMap.set(args[0].traceId, cons + JSON.stringify(args[0].event));
};
},
};

export const test = {
async test() {
// HACK: The prior tests terminates once the scheduled() invocation has returned a response, but
// propagating the outcome of the invocation may take longer. Wait briefly so this can go ahead.
await scheduler.wait(100);

// The shared tail worker we configured only produces onset and outcome events, so every trace is identical here.
// Number of traces based on how often main tail worker is invoked from previous tests
let numTraces = 21;
let basicTrace =
'{"type":"onset","info":{"type":"trace","traces":[]}}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}';
assert.deepStrictEqual(
Array.from(resposeMap.values()),
Array(numTraces).fill(basicTrace)
);
},
};
69 changes: 69 additions & 0 deletions src/workerd/api/tail-worker-test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright (c) 2017-2023 Cloudflare, Inc.
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
// https://opensource.org/licenses/Apache-2.0
import * as assert from 'node:assert';

let resposeMap = new Map();

export default {
// https://developers.cloudflare.com/workers/observability/logs/tail-workers/
tailStream(...args) {
// Onset event, must be singleton
resposeMap.set(args[0].traceId, JSON.stringify(args[0].event));
return (...args) => {
// TODO(streaming-tail-worker): Support several queued elements
let cons = resposeMap.get(args[0].traceId);
resposeMap.set(args[0].traceId, cons + JSON.stringify(args[0].event));
};
},
};

export const test = {
async test() {
// HACK: The prior tests terminates once the scheduled() invocation has returned a response, but
// propagating the outcome of the invocation may take longer. Wait briefly so this can go ahead.
await scheduler.wait(100);
// This test verifies that we're able to receive tail stream events for various handlers.

// Recorded streaming tail worker events, in insertion order.
let response = Array.from(resposeMap.values());

let expected = [
// http-test.js: fetch and scheduled events get reported correctly.
'{"type":"onset","info":{"type":"fetch","method":"POST","url":"http://placeholder/body-length","cfJson":"{}","headers":[]}}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}',
'{"type":"onset","info":{"type":"fetch","method":"POST","url":"http://placeholder/body-length","cfJson":"{}","headers":[]}}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}',
'{"type":"onset","info":{"type":"scheduled","scheduledTime":"1970-01-01T00:00:00.000Z","cron":""}}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}',
'{"type":"onset","info":{"type":"scheduled","scheduledTime":"1970-01-01T00:00:00.000Z","cron":"* * * * 30"}}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}',
'{"type":"onset","info":{"type":"fetch","method":"GET","url":"http://placeholder/not-found","cfJson":"{}","headers":[]}}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}',
'{"type":"onset","info":{"type":"fetch","method":"GET","url":"http://placeholder/web-socket","cfJson":"{}","headers":[]}}{"type":"outcome","outcome":"exception","cpuTime":0,"wallTime":0}',

// queue-test.js: queue events
'{"type":"onset","info":{"type":"fetch","method":"POST","url":"https://fake-host/message","cfJson":"{}","headers":[]}}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}',
'{"type":"onset","info":{"type":"fetch","method":"POST","url":"https://fake-host/message","cfJson":"{}","headers":[]}}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}',
'{"type":"onset","info":{"type":"fetch","method":"POST","url":"https://fake-host/message","cfJson":"{}","headers":[]}}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}',
'{"type":"onset","info":{"type":"fetch","method":"POST","url":"https://fake-host/message","cfJson":"{}","headers":[]}}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}',
'{"type":"onset","info":{"type":"fetch","method":"POST","url":"https://fake-host/batch","cfJson":"{}","headers":[]}}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}',
'{"type":"onset","info":{"type":"queue","queueName":"test-queue","batchSize":5}}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}',

// actor-alarms-test.js: alarm events
'{"type":"onset","info":{"type":"fetch","method":"GET","url":"http://foo/test","cfJson":"{}","headers":[]}}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}',
'{"type":"onset","info":{"type":"alarm","scheduledTime":"1970-01-01T00:00:00.000Z"}}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}',

// legacy tail worker, triggered via alarm test. It would appear that these being recorded
// after the onsets above is not guaranteed, but since the streaming tail worker is invoked
// when the main invocation starts whereas the legacy tail worker is only invoked when it ends
// this should be fine in practice.
'{"type":"onset","info":{"type":"trace","traces":[""]}}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}',
'{"type":"onset","info":{"type":"trace","traces":[""]}}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}',
'{"type":"onset","info":{"type":"trace","traces":[""]}}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}',

// tests/websocket-hibernation.js: hibernatableWebSocket events
'{"type":"onset","info":{"type":"fetch","method":"GET","url":"http://example.com/","cfJson":"{}","headers":[]}}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}',
'{"type":"onset","info":{"type":"fetch","method":"GET","url":"http://example.com/hibernation","cfJson":"{}","headers":[]}}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}',
'{"type":"onset","info":{"type":"hibernatableWebSocket","info":{"type":"message"}}}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}',
'{"type":"onset","info":{"type":"hibernatableWebSocket","info":{"type":"close","code":1000,"wasClean":true}}}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}',
];

assert.deepStrictEqual(response, expected);
},
};
Loading

0 comments on commit 61cdeeb

Please sign in to comment.