Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: fleet improvements #3438

Merged
merged 2 commits into from
Feb 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/deploy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ jobs:
curl -sS --fail-with-body --request POST "https://$NANGO_API_HOSTNAME/internal/fleet/nango_runners/rollout" \
--header "authorization: Bearer $INTERNAL_API_KEY"\
--header "content-type: application/json"\
--data "{ \"commitHash\": \"${{ github.sha }}\" }"
--data "{ \"image\": \"nangohq/nango-runner:${{ github.sha }}\" }"
deploy_persist:
if: inputs.service == 'persist'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import type { Knex } from 'knex';
import { NODE_CONFIG_OVERRIDES_TABLE } from '../../models/node_config_overrides.js';
import { DEPLOYMENTS_TABLE } from '../../models/deployments.js';

export async function up(knex: Knex): Promise<void> {
await knex.raw(`
ALTER TABLE ${DEPLOYMENTS_TABLE}
ADD COLUMN IF NOT EXISTS "image" character varying(255)
DEFAULT 'nangohq/nango-runner:production'
`);
await knex.raw(`
UPDATE ${DEPLOYMENTS_TABLE}
SET image = CONCAT('nangohq/nango-runner:', commit_id)
`);
await knex.raw(`
ALTER TABLE ${DEPLOYMENTS_TABLE}
ALTER COLUMN "image" DROP DEFAULT,
ALTER COLUMN "image" SET NOT NULL
`);

await knex.raw(`
ALTER TABLE ${NODE_CONFIG_OVERRIDES_TABLE}
ALTER COLUMN "image" DROP NOT NULL,
ALTER COLUMN "cpu_milli" DROP NOT NULL,
ALTER COLUMN "memory_mb" DROP NOT NULL,
ALTER COLUMN "storage_mb" DROP NOT NULL
`);

await knex.raw(`
ALTER TABLE ${NODE_CONFIG_OVERRIDES_TABLE}
ADD COLUMN IF NOT EXISTS "notes" text
`);
}

export async function down(): Promise<void> {}
23 changes: 12 additions & 11 deletions packages/fleet/lib/fleet.integration.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { expect, describe, it, beforeAll, afterAll } from 'vitest';
import { Fleet } from './fleet.js';
import { getTestDbClient, testDbUrl } from './db/helpers.test.js';
import { generateCommitHash } from './models/helpers.js';
import { generateImage } from './models/helpers.js';
import { noopNodeProvider } from './node-providers/noop.js';
import * as nodeConfigOverrides from './models/node_config_overrides.js';
import { createNodeWithAttributes } from './models/helpers.test.js';
Expand All @@ -27,9 +27,10 @@ describe('fleet', () => {

describe('rollout', () => {
it('should create a new deployment', async () => {
const commitId = generateCommitHash().unwrap();
const deployment = (await fleet.rollout(commitId)).unwrap();
expect(deployment.commitId).toBe(commitId);
const image = generateImage().unwrap();
const deployment = (await fleet.rollout(image, { verifyImage: false })).unwrap();
expect(deployment.commitId).toBe(image.split(':')[1]);
expect(deployment.image).toBe(image);
expect(deployment.createdAt).toBeInstanceOf(Date);
expect(deployment.supersededAt).toBe(null);
});
Expand All @@ -43,13 +44,13 @@ describe('fleet', () => {
storageMb: 100
};
await nodeConfigOverrides.create(dbClient.db, props);
const commitId = generateCommitHash().unwrap();
await fleet.rollout(commitId);
const image = generateImage().unwrap();
await fleet.rollout(image, { verifyImage: false });
const nodeConfigOverride = (await nodeConfigOverrides.search(dbClient.db, { routingIds: [props.routingId] })).unwrap();
expect(nodeConfigOverride.get('test')).toStrictEqual({
id: expect.any(Number),
routingId: props.routingId,
image: nodeProvider.defaultNodeConfig.image,
image,
cpuMilli: props.cpuMilli,
memoryMb: props.memoryMb,
storageMb: props.storageMb,
Expand All @@ -61,8 +62,8 @@ describe('fleet', () => {

describe('getRunningNode', () => {
it('should return a running node', async () => {
const commitId = generateCommitHash().unwrap();
const deployment = (await fleet.rollout(commitId)).unwrap();
const image = generateImage().unwrap();
const deployment = (await fleet.rollout(image, { verifyImage: false })).unwrap();
const routingId = nanoid();
const runningNode = await createNodeWithAttributes(dbClient.db, {
state: 'RUNNING',
Expand All @@ -73,8 +74,8 @@ describe('fleet', () => {
expect(res.unwrap()).toStrictEqual(runningNode);
});
it('should return an outdated node', async () => {
const commitId = generateCommitHash().unwrap();
const deployment = (await fleet.rollout(commitId)).unwrap();
const image = generateImage().unwrap();
const deployment = (await fleet.rollout(image, { verifyImage: false })).unwrap();
const routingId = nanoid();
await createNodeWithAttributes(dbClient.db, {
state: 'PENDING',
Expand Down
19 changes: 15 additions & 4 deletions packages/fleet/lib/fleet.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import * as deployments from './models/deployments.js';
import * as nodes from './models/nodes.js';
import * as nodeConfigOverrides from './models/node_config_overrides.js';
import type { Node } from './types.js';
import type { CommitHash, Deployment, RoutingId } from '@nangohq/types';
import type { Deployment, RoutingId } from '@nangohq/types';
import { FleetError } from './utils/errors.js';
import { setTimeout } from 'node:timers/promises';
import { Supervisor } from './supervisor/supervisor.js';
Expand Down Expand Up @@ -57,15 +57,26 @@ export class Fleet {
}
}

public async rollout(commitId: CommitHash): Promise<Result<Deployment>> {
public async rollout(image: string, options?: { verifyImage?: boolean }): Promise<Result<Deployment>> {
if (options?.verifyImage !== false) {
const [name, tag] = image.split(':');
if (!name || !tag) {
return Err(new FleetError('fleet_rollout_invalid_image', { context: { image } }));
}
const res = await fetch(`https://hub.docker.com/v2/repositories/${name}/tags/${tag}`);
if (!res.ok) {
return Err(new FleetError('fleet_rollout_image_not_found', { context: { image } }));
}
}

return this.dbClient.db.transaction(async (trx) => {
const deployment = await deployments.create(trx, commitId);
const deployment = await deployments.create(trx, image);
if (deployment.isErr()) {
throw deployment.error;
}

// rolling out cancels all nodeConfigOverrides images
await nodeConfigOverrides.resetImage(trx, { image: this.nodeProvider.defaultNodeConfig.image });
await nodeConfigOverrides.resetImage(trx, { image });

return deployment;
});
Expand Down
17 changes: 9 additions & 8 deletions packages/fleet/lib/models/deployments.integration.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { expect, describe, it, beforeEach, afterEach } from 'vitest';
import * as deployments from './deployments.js';
import { getTestDbClient } from '../db/helpers.test.js';
import { generateCommitHash } from './helpers.js';
import { generateImage } from './helpers.js';

describe('Deployments', () => {
const dbClient = getTestDbClient('deployments');
Expand All @@ -16,19 +16,20 @@ describe('Deployments', () => {

describe('create', () => {
it('should create a deployment', async () => {
const commitId = generateCommitHash().unwrap();
const deployment = (await deployments.create(db, commitId)).unwrap();
expect(deployment.commitId).toBe(commitId);
const image = generateImage().unwrap();
const deployment = (await deployments.create(db, image)).unwrap();
expect(deployment.commitId).toBe(image.split(':')[1]);
expect(deployment.image).toBe(image);
expect(deployment.createdAt).toBeInstanceOf(Date);
expect(deployment.supersededAt).toBe(null);
});

it('should supersede any active deployments', async () => {
const commitId1 = generateCommitHash().unwrap();
const commitId2 = generateCommitHash().unwrap();
const image1 = generateImage().unwrap();
const image2 = generateImage().unwrap();

const deployment1 = (await deployments.create(db, commitId1)).unwrap();
const deployment2 = (await deployments.create(db, commitId2)).unwrap();
const deployment1 = (await deployments.create(db, image1)).unwrap();
const deployment2 = (await deployments.create(db, image2)).unwrap();

expect((await deployments.get(db, deployment1.id)).unwrap().supersededAt).not.toBe(null);
expect((await deployments.get(db, deployment2.id)).unwrap().supersededAt).toBe(null);
Expand Down
20 changes: 14 additions & 6 deletions packages/fleet/lib/models/deployments.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ export const DEPLOYMENTS_TABLE = 'deployments';
interface DBDeployment {
readonly id: number;
readonly commit_id: CommitHash;
readonly image: string;
readonly created_at: Date;
readonly superseded_at: Date | null;
}
Expand All @@ -18,29 +19,31 @@ const DBDeployment = {
return {
id: dbDeployment.id,
commitId: dbDeployment.commit_id,
image: dbDeployment.image,
createdAt: dbDeployment.created_at,
supersededAt: dbDeployment.superseded_at
};
},
from(deployment: Deployment): DBDeployment {
return {
id: deployment.id,
image: deployment.image,
commit_id: deployment.commitId,
created_at: deployment.createdAt,
superseded_at: deployment.supersededAt
};
}
};

export async function create(db: knex.Knex, commitId: CommitHash): Promise<Result<Deployment>> {
export async function create(db: knex.Knex, image: string): Promise<Result<Deployment>> {
try {
return await db.transaction(async (trx) => {
// do nothing if commitId is already active deployment
// do nothing if already active deployment
const active = await getActive(db);
if (active.isErr()) {
return Err(active.error);
}
if (active.value?.commitId === commitId) {
if (active.value?.image === image) {
return Ok(active.value);
}

Expand All @@ -53,19 +56,24 @@ export async function create(db: knex.Knex, commitId: CommitHash): Promise<Resul
})
.update({ superseded_at: now });
// insert new deployment
const commitId = image.split(':')[1];
if (!commitId || commitId.length !== 40) {
return Err(new Error(`Error: invalid image '${image}'`));
}
const dbDeployment: Omit<DBDeployment, 'id'> = {
commit_id: commitId,
commit_id: commitId as CommitHash,
image,
created_at: now,
superseded_at: null
};
const [inserted] = await trx.into<DBDeployment>(DEPLOYMENTS_TABLE).insert(dbDeployment).returning('*');
if (!inserted) {
return Err(new Error(`Error: no deployment '${commitId}' created`));
return Err(new Error(`Error: no deployment for '${image}' created`));
}
return Ok(DBDeployment.to(inserted));
});
} catch (err) {
return Err(new FleetError(`deployment_creation_failed`, { cause: err, context: { commitId } }));
return Err(new FleetError(`deployment_creation_failed`, { cause: err, context: { image } }));
}
}

Expand Down
9 changes: 4 additions & 5 deletions packages/fleet/lib/models/helpers.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
import type { Result } from '@nangohq/utils';
import { Err, Ok } from '@nangohq/utils';
import type { CommitHash } from '@nangohq/types';
import crypto from 'crypto';

export function generateCommitHash(): Result<CommitHash> {
export function generateImage(): Result<string> {
const charset = '0123456789abcdef';
const length = 40;
const randomBytes = new Uint8Array(length);
crypto.getRandomValues(randomBytes);

const value = Array.from(randomBytes)
const commitHash = Array.from(randomBytes)
.map((byte) => charset[byte % charset.length])
.join('');
if (value.length !== 40) {
if (commitHash.length !== 40) {
return Err('CommitHash must be exactly 40 characters');
}
return Ok(value as CommitHash);
return Ok(`generated/image:${commitHash}`);
}
6 changes: 3 additions & 3 deletions packages/fleet/lib/models/nodes.integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { nodeStates } from '../types.js';
import type { NodeState } from '../types.js';
import type { Deployment } from '@nangohq/types';
import { getTestDbClient } from '../db/helpers.test.js';
import { generateCommitHash } from './helpers.js';
import { generateImage } from './helpers.js';
import { createNodeWithAttributes } from './helpers.test.js';

describe('Nodes', () => {
Expand All @@ -16,8 +16,8 @@ describe('Nodes', () => {
let activeDeployment: Deployment;
beforeEach(async () => {
await dbClient.migrate();
previousDeployment = (await deployments.create(db, generateCommitHash().unwrap())).unwrap();
activeDeployment = (await deployments.create(db, generateCommitHash().unwrap())).unwrap();
previousDeployment = (await deployments.create(db, generateImage().unwrap())).unwrap();
activeDeployment = (await deployments.create(db, generateImage().unwrap())).unwrap();
});

afterEach(async () => {
Expand Down
6 changes: 3 additions & 3 deletions packages/fleet/lib/supervisor/supervisor.integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { getTestDbClient } from '../db/helpers.test.js';
import * as deployments from '../models/deployments.js';
import * as nodes from '../models/nodes.js';
import * as nodeConfigOverrides from '../models/node_config_overrides.js';
import { generateCommitHash } from '../models/helpers.js';
import { generateImage } from '../models/helpers.js';
import { createNodeWithAttributes } from '../models/helpers.test.js';
import type { Deployment } from '@nangohq/types';
import { FleetError } from '../utils/errors.js';
Expand Down Expand Up @@ -34,8 +34,8 @@ describe('Supervisor', () => {

beforeEach(async () => {
await dbClient.migrate();
previousDeployment = (await deployments.create(dbClient.db, generateCommitHash().unwrap())).unwrap();
activeDeployment = (await deployments.create(dbClient.db, generateCommitHash().unwrap())).unwrap();
previousDeployment = (await deployments.create(dbClient.db, generateImage().unwrap())).unwrap();
activeDeployment = (await deployments.create(dbClient.db, generateImage().unwrap())).unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.unwrap().unwrap().unwrap().unwrap() 😆

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

});

afterEach(async () => {
Expand Down
2 changes: 2 additions & 0 deletions packages/fleet/lib/utils/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ type FleetErrorCode =
| 'supervisor_unknown_action'
| 'supervisor_tick_failed'
| 'fleet_misconfigured'
| 'fleet_rollout_invalid_image'
| 'fleet_rollout_image_not_found'
| 'fleet_node_not_ready_timeout'
| 'fleet_node_url_not_found'
| 'fleet_node_outdate_failed'
Expand Down
10 changes: 5 additions & 5 deletions packages/jobs/lib/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import db from '@nangohq/database';
import { getOtlpRoutes } from '@nangohq/shared';
import { otlp } from '@nangohq/logs';
import { runnersFleet } from './runner/fleet.js';
import { generateCommitHash } from '@nangohq/fleet';
import { generateImage } from '@nangohq/fleet';

const logger = getLogger('Jobs');

Expand Down Expand Up @@ -78,11 +78,11 @@ try {
// when running locally, the runners (running as processes) are being killed
// when the main process is killed and the fleet entries are therefore not associated with any running process
// we then must fake a new deployment so fleet replaces runners with new ones
const commitHash = generateCommitHash();
if (commitHash.isErr()) {
logger.error(`Unable to generate commit hash`, commitHash.error);
const image = generateImage();
if (image.isErr()) {
logger.error(`Unable to generate commit hash`, image.error);
} else {
await runnersFleet.rollout(commitHash.value);
await runnersFleet.rollout(image.value, { verifyImage: false });
}
}
runnersFleet.start();
Expand Down
13 changes: 4 additions & 9 deletions packages/server/lib/controllers/fleet/postRollout.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,12 @@
import { z } from 'zod';
import { requireEmptyQuery, zodErrorToHTTP } from '@nangohq/utils';
import type { PostRollout, CommitHash } from '@nangohq/types';
import type { PostRollout } from '@nangohq/types';
import { asyncWrapper } from '../../utils/asyncWrapper.js';
import { runnersFleet } from '../../fleet.js';

const bodyValidation = z
.object({
commitHash: z
.string()
.length(40)
.transform((data) => {
return data as CommitHash;
})
image: z.string().regex(/^[a-z0-9-]+\/[a-z0-9-]+:[a-f0-9]{40}$/, { message: "Invalid image. Must be 'repository/image:commit'" })
})
.strict();

Expand Down Expand Up @@ -41,14 +36,14 @@ export const postRollout = asyncWrapper<PostRollout>(async (req, res) => {
}

const { fleetId }: PostRollout['Params'] = params.data;
const { commitHash }: PostRollout['Body'] = body.data;
const { image }: PostRollout['Body'] = body.data;

if (fleetId !== runnersFleet.fleetId) {
res.status(404).send({ error: { code: 'invalid_uri_params', message: 'Unknown fleet' } });
return;
}

const rollout = await runnersFleet.rollout(commitHash);
const rollout = await runnersFleet.rollout(image);
if (rollout.isErr()) {
res.status(500).send({ error: { code: 'rollout_failed', message: rollout.error.message } });
} else {
Expand Down
Loading
Loading