-
Notifications
You must be signed in to change notification settings - Fork 56
/
Copy pathworker.py
81 lines (65 loc) · 2.36 KB
/
worker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import asyncio
from datetime import timedelta
import opentelemetry.context
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from temporalio import activity, workflow
from temporalio.client import Client
from temporalio.contrib.opentelemetry import TracingInterceptor
from temporalio.runtime import OpenTelemetryConfig, Runtime, TelemetryConfig
from temporalio.worker import Worker
@workflow.defn
class GreetingWorkflow:
@workflow.run
async def run(self, name: str) -> str:
return await workflow.execute_activity(
compose_greeting,
name,
start_to_close_timeout=timedelta(seconds=10),
)
@activity.defn
async def compose_greeting(name: str) -> str:
return f"Hello, {name}!"
interrupt_event = asyncio.Event()
def init_runtime_with_telemetry() -> Runtime:
# Setup global tracer for workflow traces
provider = TracerProvider(resource=Resource.create({SERVICE_NAME: "my-service"}))
exporter = OTLPSpanExporter(endpoint="http://localhost:4317", insecure=True)
provider.add_span_processor(BatchSpanProcessor(exporter))
trace.set_tracer_provider(provider)
# Setup SDK metrics to OTel endpoint
return Runtime(
telemetry=TelemetryConfig(
metrics=OpenTelemetryConfig(url="http://localhost:4317")
)
)
async def main():
runtime = init_runtime_with_telemetry()
# Connect client
client = await Client.connect(
"localhost:7233",
# Use OpenTelemetry interceptor
interceptors=[TracingInterceptor()],
runtime=runtime,
)
# Run a worker for the workflow
async with Worker(
client,
task_queue="open_telemetry-task-queue",
workflows=[GreetingWorkflow],
activities=[compose_greeting],
):
# Wait until interrupted
print("Worker started, ctrl+c to exit")
await interrupt_event.wait()
print("Shutting down")
if __name__ == "__main__":
loop = asyncio.new_event_loop()
try:
loop.run_until_complete(main())
except KeyboardInterrupt:
interrupt_event.set()
loop.run_until_complete(loop.shutdown_asyncgens())