Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support /extract and /crawl for self-hosted #1137

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions apps/api/src/controllers/v1/crawl-status-ws.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ async function crawlStatusWS(
.filter((x) => x[1] === "completed" || x[1] === "failed")
.map((x) => x[0]);
const newlyDoneJobs: Job[] = (
await Promise.all(newlyDoneJobIDs.map((x) => getJob(x)))
await Promise.all(newlyDoneJobIDs.map((x) => getJob(getScrapeQueue, x)))
).filter((x) => x !== undefined) as Job[];

for (const job of newlyDoneJobs) {
Expand Down Expand Up @@ -154,7 +154,7 @@ async function crawlStatusWS(

jobIDs = validJobIDs; // Use validJobIDs instead of jobIDs for further processing

const doneJobs = await getJobs(doneJobIDs);
const doneJobs = await getJobs(getScrapeQueue, doneJobIDs);
const data = doneJobs.map((x) => x.returnvalue);

await send(ws, {
Expand Down
16 changes: 8 additions & 8 deletions apps/api/src/controllers/v1/crawl-status.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ import {
getThrottledJobs,
isCrawlKickoffFinished,
} from "../../lib/crawl-redis";
import { getScrapeQueue } from "../../services/queue-service";
import { getScrapeQueue, QueueFunction } from "../../services/queue-service";
import {
supabaseGetJobById,
supabaseGetJobsById,
} from "../../lib/supabase-jobs";
import { configDotenv } from "dotenv";
import type { Job, JobState } from "bullmq";
import type { Job, JobState, Queue } from "bullmq";
import { logger } from "../../lib/logger";
import { supabase_service } from "../../services/supabase";
configDotenv();
Expand All @@ -38,9 +38,9 @@ export type PseudoJob<T> = {

export type DBJob = { docs: any, success: boolean, page_options: any, date_added: any, message: string | null }

export async function getJob(id: string): Promise<PseudoJob<any> | null> {
export async function getJob(queueFunction: QueueFunction, id: string): Promise<PseudoJob<any> | null> {
const [bullJob, dbJob] = await Promise.all([
getScrapeQueue().getJob(id),
queueFunction().getJob(id),
(process.env.USE_DB_AUTHENTICATION === "true" ? supabaseGetJobById(id) : null) as Promise<DBJob | null>,
]);

Expand All @@ -64,9 +64,9 @@ export async function getJob(id: string): Promise<PseudoJob<any> | null> {
return job;
}

export async function getJobs(ids: string[]): Promise<PseudoJob<any>[]> {
export async function getJobs(queueFunction: QueueFunction, ids: string[]): Promise<PseudoJob<any>[]> {
const [bullJobs, dbJobs] = await Promise.all([
Promise.all(ids.map((x) => getScrapeQueue().getJob(x))).then(x => x.filter(x => x)) as Promise<(Job<any, any, string> & { id: string })[]>,
Promise.all(ids.map((x) => queueFunction().getJob(x))).then(x => x.filter(x => x)) as Promise<(Job<any, any, string> & { id: string })[]>,
process.env.USE_DB_AUTHENTICATION === "true" ? supabaseGetJobsById(ids) : [],
]);

Expand Down Expand Up @@ -190,7 +190,7 @@ export async function crawlStatusController(
) {
// get current chunk and retrieve jobs
const currentIDs = doneJobsOrder.slice(i, i + factor);
const jobs = await getJobs(currentIDs);
const jobs = await getJobs(getScrapeQueue, currentIDs);

// iterate through jobs and add them one them one to the byte counter
// both loops will break once we cross the byte counter
Expand Down Expand Up @@ -222,7 +222,7 @@ export async function crawlStatusController(
} else {
doneJobs = (
await Promise.all(
(await getJobs(doneJobsOrder)).map(async (x) =>
(await getJobs(getScrapeQueue, doneJobsOrder)).map(async (x) =>
(await x.getState()) === "failed" ? null : x,
),
)
Expand Down
20 changes: 13 additions & 7 deletions apps/api/src/controllers/v1/extract-status.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
import { Response } from "express";
import { supabaseGetJobsById } from "../../lib/supabase-jobs";
import { RequestWithAuth } from "./types";
import { getExtract, getExtractExpiry } from "../../lib/extract/extract-redis";
import { getJob, PseudoJob } from "./crawl-status";
import { getExtractQueue } from "../../services/queue-service";
import { ExtractResult } from "../../lib/extract/extraction-service";



export async function getExtractJob(id: string): Promise<PseudoJob<ExtractResult> | null> {
return await getJob(getExtractQueue, id);
}

export async function extractStatusController(
req: RequestWithAuth<{ jobId: string }, any, any>,
Expand All @@ -16,21 +24,19 @@ export async function extractStatusController(
});
}

let data: any[] = [];
let data: ExtractResult | [] = [];

if (extract.status === "completed") {
const jobData = await supabaseGetJobsById([req.params.jobId]);
if (!jobData || jobData.length === 0) {
const jobData = await getExtractJob(req.params.jobId);
if (!jobData) {
return res.status(404).json({
success: false,
error: "Job not found",
});
}

data = jobData[0].docs;
data = jobData.returnvalue?.data ?? [];
}

// console.log(extract.sources);
return res.status(200).json({
success: extract.status === "failed" ? false : true,
data: data,
Expand Down
2 changes: 1 addition & 1 deletion apps/api/src/lib/extract/extraction-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ interface ExtractServiceOptions {
cacheKey?: string;
}

interface ExtractResult {
export interface ExtractResult {
success: boolean;
data?: any;
extractId: string;
Expand Down
2 changes: 2 additions & 0 deletions apps/api/src/services/queue-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ import { Queue } from "bullmq";
import { logger } from "../lib/logger";
import IORedis from "ioredis";

export type QueueFunction = () => Queue<any, any, string, any, any, string>;

let scrapeQueue: Queue;
let extractQueue: Queue;
let loggingQueue: Queue;
Expand Down
2 changes: 1 addition & 1 deletion apps/api/src/services/queue-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ async function finishCrawlIfNeeded(job: Job & { id: string }, sc: StoredCrawl) {
if (!job.data.v1) {
const jobIDs = await getCrawlJobs(job.data.crawl_id);

const jobs = (await getJobs(jobIDs)).sort(
const jobs = (await getJobs(getScrapeQueue, jobIDs)).sort(
(a, b) => a.timestamp - b.timestamp,
);
// const jobStatuses = await Promise.all(jobs.map((x) => x.getState()));
Expand Down