Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TaskMan #159

Merged
merged 30 commits into from
Mar 19, 2024
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
280ca69
Setup Task Schema
ingalls Mar 12, 2024
45a2679
Wire up Task API
ingalls Mar 12, 2024
654ffe8
Fix lints
ingalls Mar 12, 2024
bf835b0
Add Create & Query By Id
ingalls Mar 12, 2024
2bd62fb
Add output
ingalls Mar 12, 2024
d95a4ea
Setup Task Error Handling
ingalls Mar 13, 2024
c12c84f
Add stats Handler
ingalls Mar 13, 2024
57652b1
Sketch out Infra
ingalls Mar 13, 2024
37d266f
Setup SSM
ingalls Mar 13, 2024
8ba97e4
Update Stats TypeDef
ingalls Mar 13, 2024
1b79b33
Remove ImagesStats
ingalls Mar 13, 2024
139026c
Add Task API
ingalls Mar 13, 2024
031f5af
Remove Config
ingalls Mar 13, 2024
650764a
Setup SQS Submission
ingalls Mar 13, 2024
131647b
Give posting permissions to TaskQueue
ingalls Mar 13, 2024
a3a3e8b
Add Task Create Context
ingalls Mar 14, 2024
3b9c6e2
Task.output is now JSONObject type
ingalls Mar 14, 2024
a47ca96
QueryById
ingalls Mar 14, 2024
adb349f
Update Task Query endpoint
ingalls Mar 14, 2024
7767787
Update Async Task Handler
ingalls Mar 14, 2024
f1e8386
Task ToJson
ingalls Mar 14, 2024
6d7727d
Update Task
ingalls Mar 14, 2024
7d5e678
Pass around context
ingalls Mar 14, 2024
7806002
Setup GetStats Context
ingalls Mar 14, 2024
99b9b23
Update Deps
ingalls Mar 14, 2024
f871eb1
Removed instances of cognito:username and replaced with sub
ingalls Mar 18, 2024
2b5c555
Remove limit from countImages
ingalls Mar 18, 2024
e4f5979
Update mongoose
ingalls Mar 18, 2024
7459b1d
Merge branch 'main' into taskman
nathanielrindlaub Mar 19, 2024
e212d61
Update query task input
nathanielrindlaub Mar 19, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,621 changes: 812 additions & 809 deletions package-lock.json

Large diffs are not rendered by default.

34 changes: 33 additions & 1 deletion serverless.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ provider:
- Fn::GetAtt:
- exportQueue
- Arn
- Fn::GetAtt:
- taskQueue
- Arn
- Effect: Allow
Action:
- sagemaker:InvokeEndpoint
Expand Down Expand Up @@ -181,6 +184,19 @@ functions:
- Arn
batchSize: 1
timeout: 120
task:
image:
name: base
command: src/task/handler.handler
reservedConcurrency: 10
events:
- sqs:
arn:
Fn::GetAtt:
- taskQueue
- Arn
batchSize: 1
timeout: 900

resources:
Resources:
Expand Down Expand Up @@ -215,7 +231,14 @@ resources:
QueueName: inferenceDLQ-${self:provider.stage}
MessageRetentionPeriod: 1209600 # 14 days in seconds

# Export SQS queue
# Task SQS queue
taskQueue:
Type: AWS::SQS::Queue
Properties:
QueueName: taskQueue-${self:provider.stage}
VisibilityTimeout: 900

# Export SQS queue - will eventually be replaced by unified tasks queue
exportQueue:
Type: AWS::SQS::Queue
Properties:
Expand All @@ -240,6 +263,15 @@ resources:
Type: String
Value: !Ref inferenceQueue

# SSM Param - task queue url
SSMParameterTaskqueue:
Type: AWS::SSM::Parameter
Properties:
Description: Task queue url
Name: /tasks/task-queue-url-${opt:stage, self:provider.stage, 'dev'}
Type: String
Value: !Ref taskQueue

# SSM Param - export queue url
SSMParameterExportqueue:
Type: AWS::SSM::Parameter
Expand Down
2 changes: 2 additions & 0 deletions src/api/auth/roles.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const EXPORT_DATA_ROLES = [MANAGER, MEMBER];
const WRITE_OBJECTS_ROLES = [MANAGER, MEMBER];
const WRITE_VIEWS_ROLES = [MANAGER, MEMBER];
const WRITE_COMMENTS_ROLES = [MANAGER, MEMBER];
const READ_TASKS_ROLES = [MANAGER, MEMBER];
const WRITE_PROJECT_ROLES = [MANAGER];
const WRITE_IMAGES_ROLES = [MANAGER];
const DELETE_IMAGES_ROLES = [MANAGER];
Expand All @@ -15,6 +16,7 @@ const WRITE_AUTOMATION_RULES_ROLES = [MANAGER];
const WRITE_CAMERA_REGISTRATION_ROLES = [MANAGER];

export {
READ_TASKS_ROLES,
WRITE_COMMENTS_ROLES,
DELETE_IMAGES_ROLES,
EXPORT_DATA_ROLES,
Expand Down
8 changes: 7 additions & 1 deletion src/api/db/models/Image.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import GraphQLError, { InternalServerError, ForbiddenError, DuplicateImageError,
import crypto from 'node:crypto';
import mongoose from 'mongoose';
import MongoPaging from 'mongo-cursor-pagination';
import { TaskModel } from './Task.js';
import Image from '../schemas/Image.js';
import Project from '../schemas/Project.js';
import ImageError from '../schemas/ImageError.js';
Expand Down Expand Up @@ -972,7 +973,12 @@ export default class AuthedImageModel {
}

async getStats(input, context) {
return await ImageModel.getStats(input, context);
return await TaskModel.create({
type: 'GetStats',
projectId: context.user['curr_project'],
user: context.user['cognito:username'],
ingalls marked this conversation as resolved.
Show resolved Hide resolved
config: input
}, context);
}

async export(input, context) {
Expand Down
4 changes: 3 additions & 1 deletion src/api/db/models/Project.js
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,9 @@ export class ProjectModel {
const label = (project.labels || []).filter((p) => { return p._id.toString() === input._id.toString(); })[0];
if (!label) throw new DeleteLabelError('Label not found on project');

const count = await ImageModel.countImagesByLabel([input._id], context);
const count = await ImageModel.countImagesByLabel([input._id], {
ingalls marked this conversation as resolved.
Show resolved Hide resolved
limit: MAX_LABEL_DELETE
}, context);

if (count > MAX_LABEL_DELETE) {
const msg = `This label is already in extensive use (>${MAX_LABEL_DELETE} images) and cannot be ` +
Expand Down
101 changes: 101 additions & 0 deletions src/api/db/models/Task.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import { NotFoundError, ForbiddenError, AuthenticationError } from '../../errors.js';
import SQS from '@aws-sdk/client-sqs';
import MongoPaging from 'mongo-cursor-pagination';
import Task from '../schemas/Task.js';
import { hasRole } from './utils.js';
import {
READ_TASKS_ROLES
} from '../../auth/roles.js';

/**
* Tasks manage the state of async events (except for batch uploads) on the platform
* @class
*/
export class TaskModel {
/**
* Query Tasks by Filter, returning a paged list
*
* @param {Object} input
* @param {String} input.limit
* @param {String} input.paginatedField
* @param {String} input.sortAscending
* @param {String} input.next
* @param {String} input.previous
* @param {Object} context
*/
static async queryByFilter(input, context) {
return await MongoPaging.aggregate(Task.collection, {
nathanielrindlaub marked this conversation as resolved.
Show resolved Hide resolved
aggregation: [
{ '$match': { 'projectId': context.user['curr_project'] } },
{ '$match': { 'user': context.user.sub } }
ingalls marked this conversation as resolved.
Show resolved Hide resolved
],
limit: input.limit,
paginatedField: input.paginatedField,
sortAscending: input.sortAscending,
next: input.next,
previous: input.previous
});
}

static async queryById(_id, context) {
const query = { _id };
const task = await Task.findOne(query);
if (!task) throw new NotFoundError('Task not found');

if (task.projectId !== context.user['curr_project']) {
throw new NotFoundError('Task does not belong to current project');
}

return task;
}

static async create(input, context) {
const task = new Task({
user: input.user,
projectId: input.projectId,
type: input.type
});

const sqs = new SQS.SQSClient({ region: process.env.AWS_DEFAULT_REGION });

await task.save();

await sqs.send(new SQS.SendMessageCommand({
QueueUrl: context.config['/TASKS/TASK_QUEUE_URL'],
MessageBody: JSON.stringify({
config: input.config,
...task.toJSON()
})
}));
return task;
}

static async update(input, context) {
const task = await this.queryById(input._id, context);

input.updated = new Date();

Object.assign(task, input);
await task.save();
return task;
}
}

export default class AuthedTaskModel {
constructor(user) {
if (!user) throw new AuthenticationError('Authentication failed');
this.user = user;
}

async queryById(input, context) {
if (!hasRole(this.user, READ_TASKS_ROLES)) throw new ForbiddenError();

return await TaskModel.queryById(input, context);
}

async queryByFilter(input, context) {
if (!hasRole(this.user, READ_TASKS_ROLES)) throw new ForbiddenError();

return await TaskModel.queryByFilter(input, context);
}
}
27 changes: 27 additions & 0 deletions src/api/db/schemas/Task.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import mongoose from 'mongoose';
import MongoPaging from 'mongo-cursor-pagination';

const Schema = mongoose.Schema;

const TaskSchema = new Schema({
user: { type: String, required: true },
projectId: { type: String, required: true, ref: 'Project' },
type: {
type: String,
required: true,
enum : ['GetStats']
},
status: {
type: String,
required: true,
enum : ['SUBMITTED', 'RUNNING', 'FAIL', 'COMPLETE'],
default: 'SUBMITTED'
},
created: { type: Date, default: Date.now, required: true },
updated: { type: Date, default: Date.now, required: true },
output: { type: Object }
nathanielrindlaub marked this conversation as resolved.
Show resolved Hide resolved
});

TaskSchema.plugin(MongoPaging.mongoosePlugin);

export default mongoose.model('Task', TaskSchema);
2 changes: 2 additions & 0 deletions src/api/handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import AuthedMLModelModel from './db/models/MLModel.js';
import AuthedBatchModel from './db/models/Batch.js';
import AuthedBatchErrorModel from './db/models/BatchError.js';
import AuthedImageErrorModel from './db/models/ImageError.js';
import AuthedTaskModel from './db/models/Task.js';
import Query from './resolvers/Query.js';
import Mutation from './resolvers/Mutation.js';
import Scalars from './resolvers/Scalars.js';
Expand Down Expand Up @@ -52,6 +53,7 @@ const context = async ({ event, context }) => {
config,
models: {
User: new AuthedUserModel(user),
Task: new AuthedTaskModel(user),
Project: new AuthedProjectModel(user),
Image: new AuthedImageModel(user),
ImageError: new AuthedImageErrorModel(user),
Expand Down
18 changes: 18 additions & 0 deletions src/api/resolvers/Query.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,24 @@ const Query = {
return await context.models.User.listUsers(input, context);
},

tasks: async (_, { input }, context) => {
const response = await context.models.Task.queryByFilter(input, context);
const { previous, hasPrevious, next, hasNext, results } = response;
return {
pageInfo: {
previous,
hasPrevious,
next,
hasNext
},
tasks: results
};
},

task: async (_, { input }, context) => {
return await context.models.Task.queryById(input, context);
},

batches: async (_, { input }, context) => {
const response = await context.models.Batch.queryByFilter(input, context);
const { previous, hasPrevious, next, hasNext, results } = response;
Expand Down
8 changes: 8 additions & 0 deletions src/api/type-defs/inputs/QueryTasksInput.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
export default `
input QueryTasksInput {
paginatedField: String
sortAscending: Boolean
limit: Int
next: String
previous: String
}`;
11 changes: 11 additions & 0 deletions src/api/type-defs/objects/Task.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
export default `
type Task {
_id: ID!
user: String!
projectId: String!
type: String!
status: String!
created: String!
updated: String!
output: JSONObject
}`;
19 changes: 0 additions & 19 deletions src/api/type-defs/payloads/ImagesStats.js

This file was deleted.

5 changes: 5 additions & 0 deletions src/api/type-defs/payloads/TasksPayload.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
export default `
type TasksPayload {
tasks: [Task]!
}
`;
4 changes: 3 additions & 1 deletion src/api/type-defs/root/Query.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
export default `
type Query {
users(input: QueryUsersInput): UsersPayload
tasks(input: QueryTasksInput): TasksPayload
task(input: String!): Task
projects(input: QueryProjectsInput): [Project]
image(input: QueryImageInput!): Image
images(input: QueryImagesInput!): ImagesConnection
Expand All @@ -9,10 +11,10 @@ export default `
wirelessCameras(input: QueryWirelessCamerasInput): [WirelessCamera]
mlModels(input: QueryMLModelsInput): [MLModel]
batches(input: QueryBatchesInput!): BatchesConnection
stats(input: QueryStatsInput!): ImagesStats
export(input: ExportInput!): ExportPayload
exportErrors(input: ExportErrorsInput!): ExportPayload
exportStatus(input: ExportStatusInput!): ExportStatusPayload
stats(input: QueryStatsInput!): Task
}
`;

1 change: 1 addition & 0 deletions src/config/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ const ssmNames = [
`/ml/inference-queue-url-${process.env.STAGE}`,
`/exports/exported-data-bucket-${process.env.STAGE}`,
`/exports/export-queue-url-${process.env.STAGE}`,
`/tasks/task-queue-url-${process.env.STAGE}`,
`/ml/megadetector-v5a-realtime-endpoint-${process.env.STAGE}`,
`/ml/megadetector-v5a-batch-endpoint-${process.env.STAGE}`,
`/ml/megadetector-v5b-realtime-endpoint-${process.env.STAGE}`,
Expand Down
Loading
Loading