Skip to content

Commit 17760de

Browse files
author
Henry Fontanier
committed
feat: add ability to unpause connectors
1 parent bccfb51 commit 17760de

File tree

13 files changed

+243
-1
lines changed

13 files changed

+243
-1
lines changed
+60
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
import type { WithConnectorsAPIErrorReponse } from "@dust-tt/types";
2+
import type { Request, Response } from "express";
3+
4+
import { UNPAUSE_CONNECTOR_BY_TYPE } from "@connectors/connectors";
5+
import { errorFromAny } from "@connectors/lib/error";
6+
import logger from "@connectors/logger/logger";
7+
import { apiError, withLogging } from "@connectors/logger/withlogging";
8+
import { ConnectorResource } from "@connectors/resources/connector_resource";
9+
10+
type ConnectorUnpauseResBody = WithConnectorsAPIErrorReponse<{
11+
connectorId: string;
12+
}>;
13+
14+
const _unpauseConnectorAPIHandler = async (
15+
req: Request<{ connector_id: string }, ConnectorUnpauseResBody>,
16+
res: Response<ConnectorUnpauseResBody>
17+
) => {
18+
try {
19+
const connector = await ConnectorResource.fetchById(
20+
req.params.connector_id
21+
);
22+
if (!connector) {
23+
return apiError(req, res, {
24+
api_error: {
25+
type: "connector_not_found",
26+
message: "Connector not found",
27+
},
28+
status_code: 404,
29+
});
30+
}
31+
const connectorUnpauser = UNPAUSE_CONNECTOR_BY_TYPE[connector.type];
32+
33+
const unpauseRes = await connectorUnpauser(connector.id);
34+
35+
if (unpauseRes.isErr()) {
36+
return apiError(req, res, {
37+
status_code: 500,
38+
api_error: {
39+
type: "internal_server_error",
40+
message: unpauseRes.error.message,
41+
},
42+
});
43+
}
44+
45+
return res.sendStatus(204);
46+
} catch (e) {
47+
logger.error(errorFromAny(e), "Failed to unpause the connector");
48+
return apiError(req, res, {
49+
status_code: 500,
50+
api_error: {
51+
type: "internal_server_error",
52+
message: "Could not unpause the connector",
53+
},
54+
});
55+
}
56+
};
57+
58+
export const unpauseConnectorAPIHandler = withLogging(
59+
_unpauseConnectorAPIHandler
60+
);

connectors/src/api_server.ts

+2
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import {
2121
} from "@connectors/api/slack_channels_linked_with_agent";
2222
import { stopConnectorAPIHandler } from "@connectors/api/stop_connector";
2323
import { syncConnectorAPIHandler } from "@connectors/api/sync_connector";
24+
import { unpauseConnectorAPIHandler } from "@connectors/api/unpause_connector";
2425
import { postConnectorUpdateAPIHandler } from "@connectors/api/update_connector";
2526
import { webhookGithubAPIHandler } from "@connectors/api/webhooks/webhook_github";
2627
import { webhookGoogleDriveAPIHandler } from "@connectors/api/webhooks/webhook_google_drive";
@@ -91,6 +92,7 @@ export function startServer(port: number) {
9192
app.post("/connectors/update/:connector_id/", postConnectorUpdateAPIHandler);
9293
app.post("/connectors/stop/:connector_id", stopConnectorAPIHandler);
9394
app.post("/connectors/pause/:connector_id", pauseConnectorAPIHandler);
95+
app.post("/connectors/unpause/:connector_id", unpauseConnectorAPIHandler);
9496
app.post("/connectors/resume/:connector_id", resumeConnectorAPIHandler);
9597
app.delete("/connectors/delete/:connector_id", deleteConnectorAPIHandler);
9698
app.get("/connectors/:connector_id", getConnectorAPIHandler);

connectors/src/connectors/confluence/index.ts

+32
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,38 @@ export async function resumeConfluenceConnector(
220220
}
221221
}
222222

223+
export async function pauseConfluenceConnector(
224+
connectorId: ModelId
225+
): Promise<Result<undefined, Error>> {
226+
const connector = await ConnectorResource.fetchById(connectorId);
227+
if (!connector) {
228+
logger.error({ connectorId }, "Connector not found.");
229+
return new Err(new Error("Connector not found"));
230+
}
231+
232+
await connector.markAsPaused();
233+
234+
return new Ok(undefined);
235+
}
236+
237+
export async function unpauseConfluenceConnector(
238+
connectorId: ModelId
239+
): Promise<Result<undefined, Error>> {
240+
const connector = await ConnectorResource.fetchById(connectorId);
241+
if (!connector) {
242+
logger.error({ connectorId }, "Connector not found.");
243+
return new Err(new Error("Connector not found"));
244+
}
245+
246+
await connector.markAsUnpaused();
247+
const r = await launchConfluenceSyncWorkflow(connectorId, null);
248+
if (r.isErr()) {
249+
return r;
250+
}
251+
252+
return new Ok(undefined);
253+
}
254+
223255
export async function cleanupConfluenceConnector(
224256
connectorId: ModelId
225257
): Promise<Result<undefined, Error>> {

connectors/src/connectors/github/index.ts

+17
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,23 @@ export async function pauseGithubConnector(
156156
return new Ok(undefined);
157157
}
158158

159+
export async function unpauseGithubConnector(
160+
connectorId: ModelId
161+
): Promise<Result<undefined, Error>> {
162+
const connector = await ConnectorResource.fetchById(connectorId);
163+
if (!connector) {
164+
logger.error({ connectorId }, "Connector not found");
165+
return new Err(new Error("Connector not found"));
166+
}
167+
await connector.markAsUnpaused();
168+
await launchGithubFullSyncWorkflow({
169+
connectorId,
170+
syncCodeOnly: false,
171+
});
172+
173+
return new Ok(undefined);
174+
}
175+
159176
export async function resumeGithubConnector(
160177
connectorId: ModelId
161178
): Promise<Result<undefined, Error>> {

connectors/src/connectors/google_drive/index.ts

+13
Original file line numberDiff line numberDiff line change
@@ -836,3 +836,16 @@ export async function pauseGoogleDriveConnector(connectorId: ModelId) {
836836
await terminateAllWorkflowsForConnectorId(connectorId);
837837
return new Ok(undefined);
838838
}
839+
840+
export async function unpauseGoogleDriveConnector(connectorId: ModelId) {
841+
const connector = await ConnectorResource.fetchById(connectorId);
842+
if (!connector) {
843+
return new Err(new Error(`Connector not found with id ${connectorId}`));
844+
}
845+
await connector.markAsUnpaused();
846+
const r = await launchGoogleDriveFullSyncWorkflow(connectorId, null);
847+
if (r.isErr()) {
848+
return r;
849+
}
850+
return new Ok(undefined);
851+
}

connectors/src/connectors/index.ts

+25-1
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,14 @@ import { Err, Ok } from "@dust-tt/types";
88
import {
99
cleanupConfluenceConnector,
1010
createConfluenceConnector,
11+
pauseConfluenceConnector,
1112
resumeConfluenceConnector,
1213
retrieveConfluenceConnectorPermissions,
1314
retrieveConfluenceContentNodeParents,
1415
retrieveConfluenceContentNodes,
1516
setConfluenceConnectorPermissions,
1617
stopConfluenceConnector,
18+
unpauseConfluenceConnector,
1719
updateConfluenceConnector,
1820
} from "@connectors/connectors/confluence";
1921
import { launchConfluenceSyncWorkflow } from "@connectors/connectors/confluence/temporal/client";
@@ -29,6 +31,7 @@ import {
2931
retrieveGithubReposContentNodes,
3032
setGithubConfig,
3133
stopGithubConnector,
34+
unpauseGithubConnector,
3235
updateGithubConnector,
3336
} from "@connectors/connectors/github";
3437
import {
@@ -42,6 +45,7 @@ import {
4245
retrieveGoogleDriveContentNodes,
4346
setGoogleDriveConfig,
4447
setGoogleDriveConnectorPermissions,
48+
unpauseGoogleDriveConnector,
4549
updateGoogleDriveConnector,
4650
} from "@connectors/connectors/google_drive";
4751
import { launchGoogleDriveFullSyncWorkflow } from "@connectors/connectors/google_drive/temporal/client";
@@ -56,6 +60,7 @@ import {
5660
retrieveIntercomContentNodes,
5761
setIntercomConnectorPermissions,
5862
stopIntercomConnector,
63+
unpauseIntercomConnector,
5964
updateIntercomConnector,
6065
} from "@connectors/connectors/intercom";
6166
import type {
@@ -71,6 +76,7 @@ import type {
7176
ConnectorProviderUpdateConfigurationMapping,
7277
ConnectorResumer,
7378
ConnectorStopper,
79+
ConnectorUnpauser,
7480
ConnectorUpdater,
7581
ContentNodeParentsRetriever,
7682
SyncConnector,
@@ -85,6 +91,7 @@ import {
8591
retrieveNotionContentNodeParents,
8692
retrieveNotionContentNodes,
8793
stopNotionConnector,
94+
unpauseNotionConnector,
8895
updateNotionConnector,
8996
} from "@connectors/connectors/notion";
9097
import {
@@ -96,6 +103,7 @@ import {
96103
retrieveSlackContentNodes,
97104
setSlackConfig,
98105
setSlackConnectorPermissions,
106+
unpauseSlackConnector,
99107
updateSlackConnector,
100108
} from "@connectors/connectors/slack";
101109
import { launchSlackSyncWorkflow } from "@connectors/connectors/slack/temporal/client";
@@ -110,6 +118,7 @@ import {
110118
retrieveWebCrawlerContentNodes,
111119
setWebcrawlerConfiguration,
112120
stopWebcrawlerConnector,
121+
unpauseWebcrawlerConnector,
113122
} from "./webcrawler";
114123
import { launchCrawlWebsiteWorkflow } from "./webcrawler/temporal/client";
115124

@@ -368,11 +377,26 @@ export const PAUSE_CONNECTOR_BY_TYPE: Record<
368377
ConnectorProvider,
369378
ConnectorPauser
370379
> = {
371-
confluence: stopConfluenceConnector,
380+
confluence: pauseConfluenceConnector,
372381
slack: pauseSlackConnector,
373382
notion: pauseNotionConnector,
374383
github: pauseGithubConnector,
375384
google_drive: pauseGoogleDriveConnector,
376385
intercom: pauseIntercomConnector,
377386
webcrawler: pauseWebcrawlerConnector,
378387
};
388+
389+
// If the connector has webhooks: resume processing them, and trigger a full sync.
390+
// If the connector has long-running workflows: resume them. If they support "partial resync" do that, otherwise trigger a full sync.
391+
export const UNPAUSE_CONNECTOR_BY_TYPE: Record<
392+
ConnectorProvider,
393+
ConnectorUnpauser
394+
> = {
395+
confluence: unpauseConfluenceConnector,
396+
slack: unpauseSlackConnector,
397+
notion: unpauseNotionConnector,
398+
github: unpauseGithubConnector,
399+
google_drive: unpauseGoogleDriveConnector,
400+
intercom: unpauseIntercomConnector,
401+
webcrawler: unpauseWebcrawlerConnector,
402+
};

connectors/src/connectors/intercom/index.ts

+19
Original file line numberDiff line numberDiff line change
@@ -756,3 +756,22 @@ export async function pauseIntercomConnector(connectorId: ModelId) {
756756

757757
return new Ok(undefined);
758758
}
759+
760+
export async function unpauseIntercomConnector(connectorId: ModelId) {
761+
const connector = await ConnectorResource.fetchById(connectorId);
762+
if (!connector) {
763+
logger.error({ connectorId }, "[Intercom] Connector not found.");
764+
return new Err(new Error("Connector not found"));
765+
}
766+
767+
await connector.markAsUnpaused();
768+
769+
const r = await launchIntercomSyncWorkflow({
770+
connectorId,
771+
});
772+
if (r.isErr()) {
773+
return r;
774+
}
775+
776+
return new Ok(undefined);
777+
}

connectors/src/connectors/interface.ts

+4
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,10 @@ export type ConnectorGarbageCollector = (
9191
export type ConnectorPauser = (
9292
connectorId: ModelId
9393
) => Promise<Result<undefined, Error>>;
94+
export type ConnectorUnpauser = (
95+
connectorId: ModelId
96+
) => Promise<Result<undefined, Error>>;
97+
9498
export type ConnectorConfigurationSetter<T extends ConnectorConfiguration> = (
9599
connectorId: ModelId,
96100
configuration: T

connectors/src/connectors/notion/index.ts

+25
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,31 @@ export async function pauseNotionConnector(
230230
return new Ok(undefined);
231231
}
232232

233+
export async function unpauseNotionConnector(
234+
connectorId: ModelId
235+
): Promise<Result<undefined, Error>> {
236+
const connector = await ConnectorResource.fetchById(connectorId);
237+
238+
if (!connector) {
239+
logger.error(
240+
{
241+
connectorId,
242+
},
243+
"Notion connector not found."
244+
);
245+
246+
return new Err(new Error("Connector not found"));
247+
}
248+
249+
await connector.markAsUnpaused();
250+
const r = await resumeNotionConnector(connector.id);
251+
if (r.isErr()) {
252+
return r;
253+
}
254+
255+
return new Ok(undefined);
256+
}
257+
233258
export async function resumeNotionConnector(
234259
connectorId: ModelId
235260
): Promise<Result<undefined, Error>> {

connectors/src/connectors/slack/index.ts

+13
Original file line numberDiff line numberDiff line change
@@ -653,3 +653,16 @@ export async function pauseSlackConnector(connectorId: ModelId) {
653653
await terminateAllWorkflowsForConnectorId(connectorId);
654654
return new Ok(undefined);
655655
}
656+
657+
export async function unpauseSlackConnector(connectorId: ModelId) {
658+
const connector = await ConnectorResource.fetchById(connectorId);
659+
if (!connector) {
660+
return new Err(new Error(`Connector not found with id ${connectorId}`));
661+
}
662+
await connector.markAsUnpaused();
663+
const r = await launchSlackSyncWorkflow(connectorId, null);
664+
if (r.isErr()) {
665+
return r;
666+
}
667+
return new Ok(undefined);
668+
}

connectors/src/connectors/webcrawler/index.ts

+15
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,21 @@ export async function pauseWebcrawlerConnector(
224224
return new Ok(undefined);
225225
}
226226

227+
export async function unpauseWebcrawlerConnector(
228+
connectorId: ModelId
229+
): Promise<Result<undefined, Error>> {
230+
const connector = await ConnectorResource.fetchById(connectorId);
231+
if (!connector) {
232+
throw new Error("Connector not found.");
233+
}
234+
await connector.markAsUnpaused();
235+
const startRes = await launchCrawlWebsiteWorkflow(connectorId);
236+
if (startRes.isErr()) {
237+
return startRes;
238+
}
239+
return new Ok(undefined);
240+
}
241+
227242
export async function cleanupWebcrawlerConnector(
228243
connectorId: ModelId
229244
): Promise<Result<undefined, Error>> {

connectors/src/resources/connector_resource.ts

+4
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,10 @@ export class ConnectorResource extends BaseResource<ConnectorModel> {
153153
return this.update({ pausedAt: new Date() });
154154
}
155155

156+
async markAsUnpaused() {
157+
return this.update({ pausedAt: null });
158+
}
159+
156160
get isAuthTokenRevoked() {
157161
return this.errorType === "oauth_token_revoked";
158162
}

types/src/front/lib/connectors_api.ts

+14
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,20 @@ export class ConnectorsAPI {
223223
return this._resultFromResponse(res);
224224
}
225225

226+
async unpauseConnector(
227+
connectorId: string
228+
): Promise<ConnectorsAPIResponse<undefined>> {
229+
const res = await fetch(
230+
`${CONNECTORS_API}/connectors/unpause/${encodeURIComponent(connectorId)}`,
231+
{
232+
method: "POST",
233+
headers: this.getDefaultHeaders(),
234+
}
235+
);
236+
237+
return this._resultFromResponse(res);
238+
}
239+
226240
async resumeConnector(
227241
connectorId: string
228242
): Promise<ConnectorsAPIResponse<undefined>> {

0 commit comments

Comments
 (0)