-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathAsyncJob.cs
158 lines (139 loc) · 6.62 KB
/
AsyncJob.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using System.Net.Http;
using System.Threading;
using System.Net;
using Newtonsoft.Json.Linq;
namespace gbelenky.monitor
{
public static class AsyncJob
{
[FunctionName("AsyncJobOrchestrator")]
public static async Task AsyncJobOrchestrator(
[OrchestrationTrigger] IDurableOrchestrationContext context, ILogger logger)
{
(double, double) orchParams = context.GetInput<(double, double)>();
double queuedParam = orchParams.Item1;
double inProgressParam = orchParams.Item2;
context.SetCustomStatus("Queued");
DateTime queuedTime = context.CurrentUtcDateTime.AddSeconds(queuedParam);
await context.CreateTimer(queuedTime, CancellationToken.None);
context.SetCustomStatus("InProgress");
DateTime inProgressTime = context.CurrentUtcDateTime.AddSeconds(inProgressParam);
await context.CreateTimer(inProgressTime, CancellationToken.None);
context.SetCustomStatus("Completed");
return;
}
[FunctionName("AsyncJobTrigger")]
public static async Task<HttpResponseMessage> AsyncJobTrigger(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = "job-start/{jobName}")] HttpRequestMessage req,
[DurableClient] IDurableOrchestrationClient starter,
[DurableClient] IDurableEntityClient client,
string jobName,
ILogger log)
{
string instanceId = $"job-{jobName}";
// create mock jobId
string jobId = Guid.NewGuid().ToString();
var entityId = new EntityId(nameof(Job), jobId);
await client.SignalEntityAsync(entityId, "SetJobName", jobName);
if (!String.IsNullOrEmpty(instanceId))
{
// Check if an instance with the specified ID already exists or an existing one stopped running(completed/failed/terminated).
var existingInstance = await starter.GetStatusAsync(instanceId);
if (existingInstance == null
|| existingInstance.RuntimeStatus == OrchestrationRuntimeStatus.Completed
|| existingInstance.RuntimeStatus == OrchestrationRuntimeStatus.Failed
|| existingInstance.RuntimeStatus == OrchestrationRuntimeStatus.Terminated)
{
// An instance with the specified ID doesn't exist or an existing one stopped running, create one.
// packing all together into one orchParams to avoid "Environment.GetEnvironmentVariable' violates the orchestrator deterministic code constraint"
double queueDuration = Double.Parse(Environment.GetEnvironmentVariable("ASYNC_JOB_QUEUED_DURATION_SEC"));
double inProgressDuration = Double.Parse(Environment.GetEnvironmentVariable("ASYNC_JOB_INPROGRESS_DURATION_SEC"));
(double, double) orchParams = (queueDuration, inProgressDuration);
await starter.StartNewAsync("AsyncJobOrchestrator", instanceId, orchParams);
log.LogInformation($"Started AsyncJobOrchestrator with jobName = '{jobName}'");
JobResult result = new JobResult()
{
JobId = jobId,
JobName = jobName,
JobStatus = "Initializing"
};
return new HttpResponseMessage(HttpStatusCode.Accepted)
{
Content = new StringContent(JsonConvert.SerializeObject(result))
};
}
else
{
// An instance with the specified ID exists or an existing one still running, don't create one.
return new HttpResponseMessage(HttpStatusCode.Conflict)
{
Content = new StringContent($"An instance with Id '{instanceId}' already exists."),
};
}
}
else
{
return new HttpResponseMessage(HttpStatusCode.UnprocessableEntity)
{
Content = new StringContent($"Please provide instance Id in your request payload"),
};
}
}
[FunctionName("AsyncJobStatus")]
public static async Task<HttpResponseMessage> AsyncJobStatus(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = "job-status/{jobId}")] HttpRequestMessage req,
[DurableClient] IDurableOrchestrationClient starter,
[DurableClient] IDurableEntityClient client,
string jobId,
ILogger log)
{
var entityId = new EntityId(nameof(Job), jobId);
var entityState = await client.ReadEntityStateAsync<Job>(entityId);
string jobName = entityState.EntityState.GetJobName();
string instanceId = $"job-{jobName}";
DurableOrchestrationStatus orchStatus = await starter.GetStatusAsync(instanceId);
JobResult result = new JobResult()
{
JobId = jobId,
JobName = jobName,
JobStatus = orchStatus.CustomStatus.ToString()
};
HttpResponseMessage httpResponseMessage = new HttpResponseMessage(HttpStatusCode.OK)
{
Content = new StringContent(JsonConvert.SerializeObject(result))
};
log.LogTrace($"AsyncJobStatus: current status for jobName '{jobName}' jobId '{jobId}' is '{orchStatus.CustomStatus.ToString()}'");
return httpResponseMessage;
}
}
[JsonObject(MemberSerialization.OptIn)]
public class Job
{
[JsonProperty("jobName")]
public string JobName { get; set; }
public void SetJobName(string jobName) => this.JobName = jobName;
public string GetJobName() => this.JobName;
[FunctionName(nameof(Job))]
public static Task Run([EntityTrigger] IDurableEntityContext ctx)
=> ctx.DispatchAsync<Job>();
}
public class JobResult
{
[JsonProperty("jobId")]
public string JobId { get; set; }
[JsonProperty("jobName")]
public string JobName { get; set; }
[JsonProperty("jobStatus")]
public string JobStatus { get; set; }
}
}