Files
outline/server/queues/tasks/APIImportTask.ts
T

483 lines
15 KiB
TypeScript

import type { JobOptions } from "bull";
import { chunk, truncate, uniqBy } from "es-toolkit/compat";
import { Fragment, Node } from "prosemirror-model";
import type { WhereOptions } from "sequelize";
import { Transaction } from "sequelize";
import { randomUUID } from "node:crypto";
import type {
BaseImportTaskInput,
ImportTaskInput,
ImportTaskOutput,
} from "@shared/schema";
import type {
ImportableIntegrationService,
ProsemirrorData,
ProsemirrorDoc,
} from "@shared/types";
import {
AttachmentPreset,
ImportState,
ImportTaskPhase,
ImportTaskState,
} from "@shared/types";
import { createContext } from "@server/context";
import { schema } from "@server/editor";
import Logger from "@server/logging/Logger";
import type { User } from "@server/models";
import { Attachment, Import, ImportTask } from "@server/models";
import AttachmentHelper from "@server/models/helpers/AttachmentHelper";
import { ProsemirrorHelper } from "@server/models/helpers/ProsemirrorHelper";
import { sequelize } from "@server/storage/database";
import { PagePerImportTask } from "../processors/ImportsProcessor";
import { BaseTask, TaskPriority } from "./base/BaseTask";
import UploadAttachmentsForImportTask from "./UploadAttachmentsForImportTask";
export type ProcessOutput<T extends ImportableIntegrationService> = {
taskOutput: ImportTaskOutput;
childTasksInput: ImportTaskInput<T>;
};
type Props = {
/** id of the import_task */
importTaskId: string;
};
export default abstract class APIImportTask<
T extends ImportableIntegrationService,
> extends BaseTask<Props> {
/**
* Run the import task.
*
* @param importTaskId id of the import_task model.
* @returns Promise that resolves once the task has completed.
*/
public async perform({ importTaskId }: Props) {
let importTask = await ImportTask.findByPk<ImportTask<T>>(importTaskId, {
include: [
{
model: Import,
as: "import",
required: true,
},
],
});
// The import_task row may have been deleted (e.g. its Import was removed)
// between the job being enqueued and the worker picking it up. Nothing to do.
if (!importTask) {
return;
}
// Don't process any further when the associated import is canceled by the user.
if (importTask.import.state === ImportState.Canceled) {
importTask.state = ImportTaskState.Canceled;
await importTask.save();
return;
}
try {
switch (importTask.state) {
case ImportTaskState.Created: {
importTask.state = ImportTaskState.InProgress;
importTask = await importTask.save();
return await this.onProcess(importTask);
}
case ImportTaskState.InProgress:
return await this.onProcess(importTask);
case ImportTaskState.Completed:
return await this.onCompletion(importTask);
default:
}
} catch (err) {
if (err instanceof Error) {
importTask.error = truncate(err.message, { length: 255 });
await importTask.save();
}
throw err; // throw error for retry.
}
}
/**
* Handle failure when all attempts of APIImportTask has failed.
*
* @param importTaskId id of the import_task model.
* @returns Promise that resolves once failure has been handled.
*/
public async onFailed({ importTaskId }: Props) {
await sequelize.transaction(async (transaction) => {
const importTask = await ImportTask.findByPk<ImportTask<T>>(
importTaskId,
{
include: [
{
model: Import,
as: "import",
required: true,
},
],
transaction,
lock: Transaction.LOCK.UPDATE,
}
);
if (!importTask) {
return;
}
importTask.state = ImportTaskState.Errored;
await importTask.save({ transaction });
const associatedImport = importTask.import;
associatedImport.error = importTask.error; // copy error from ImportTask that caused the failure.
associatedImport.state = ImportState.Errored;
await associatedImport.saveWithCtx(
createContext({
user: associatedImport.createdBy,
transaction,
})
);
});
}
/**
* Creation flow for the task.
* This fetches data from external source, stores the task output and creates subsequent import_task models.
*
* @param importTask import_task model to process.
* @returns Promise that resolves once processing has completed.
*/
private async onProcess(importTask: ImportTask<T>) {
const { taskOutput, childTasksInput } =
importTask.phase === ImportTaskPhase.Bootstrap
? await this.processBootstrap(importTask)
: await this.processPage(importTask);
const taskOutputWithReplacements = this.shouldUploadAttachmentsPerPage()
? await Promise.all(
taskOutput.map(async (item) => ({
...item,
content: await this.uploadAttachments({
doc: item.content,
externalId: item.externalId,
createdBy: importTask.import.createdBy,
}),
}))
)
: taskOutput;
await sequelize.transaction(async (transaction) => {
await Promise.all(
chunk(childTasksInput as BaseImportTaskInput, PagePerImportTask).map(
async (input) => {
await ImportTask.create(
{
state: ImportTaskState.Created,
phase: ImportTaskPhase.Page,
input: input as ImportTaskInput<T>,
importId: importTask.importId,
},
{ transaction }
);
}
)
);
importTask.output = taskOutputWithReplacements;
importTask.state = ImportTaskState.Completed;
importTask.error = null; // unset any error from previous attempts.
await importTask.save({ transaction });
const associatedImport = importTask.import;
associatedImport.documentCount += taskOutputWithReplacements.length;
await associatedImport.saveWithCtx(
createContext({
user: associatedImport.createdBy,
transaction,
}),
undefined,
{
persist: false,
}
);
});
await this.scheduleNextTask(importTask);
}
/**
* Completion flow for the task.
* This determines if there are any more import_tasks to process (or) all tasks for the import have been processed, and schedules the next step.
*
* @param importTask import_task model to process.
* @returns Promise that resolves once processing has completed.
*/
private async onCompletion(importTask: ImportTask<T>) {
const where: WhereOptions<ImportTask<T>> = {
state: ImportTaskState.Created,
importId: importTask.importId,
};
const nextImportTask = await ImportTask.findOne<ImportTask<T>>({
where,
order: [["createdAt", "ASC"]],
});
// Tasks available to process for this import.
if (nextImportTask) {
return await this.scheduleNextTask(nextImportTask);
}
// All tasks for this import have been processed. Run the post-completion
// hook before flipping state so subclasses can perform work that must
// happen before "imports.processed" downstream handlers fire.
await this.onAllTasksCompleted(importTask);
await sequelize.transaction(async (transaction) => {
const associatedImport = importTask.import;
associatedImport.state = ImportState.Processed;
// Release any cross-phase scratch state — the import is done with it.
associatedImport.scratch = null;
await associatedImport.saveWithCtx(
createContext({
user: associatedImport.createdBy,
transaction,
}),
undefined,
{ name: "processed" }
);
});
}
/**
* Whether the base class should create Attachment rows and upload S3 blobs
* per page during `onProcess`. Defaults to `true` for sources whose
* attachments are addressable by per-task URLs (e.g. Notion). Sources where
* attachments are shared across pages or live in a single archive may
* override this and handle attachment persistence in `onAllTasksCompleted`.
*
* @returns true to enable the per-page attachment upload step.
*/
protected shouldUploadAttachmentsPerPage(): boolean {
return true;
}
/**
* Hook invoked after the final import task has been processed but before the
* associated `Import` state transitions to `Processed`. Subclasses can
* override to perform cross-task finalization (e.g. uploading shared
* attachments) that must happen before the persistence pass.
*
* @param lastImportTask The most recently completed ImportTask for the import.
* @returns Promise that resolves when finalization is complete.
*/
protected async onAllTasksCompleted(
// oxlint-disable-next-line @typescript-eslint/no-unused-vars
lastImportTask: ImportTask<T>
): Promise<void> {
return;
}
/**
* Bootstrap phase. Runs once per import on a worker that owns the source
* artifact (e.g. extracts a zip, walks the file tree, schedules child page
* tasks). Subclasses without a bootstrap step leave this unimplemented; the
* base only invokes it when an `ImportTask` is created with
* `phase === ImportTaskPhase.Bootstrap`.
*
* @param importTask ImportTask model to process.
* @returns Promise with output that resolves once processing has completed.
*/
protected processBootstrap(
// oxlint-disable-next-line @typescript-eslint/no-unused-vars
importTask: ImportTask<T>
): Promise<ProcessOutput<T>> {
throw new Error(
`${this.constructor.name} does not implement processBootstrap()`
);
}
/**
* Page phase. Runs for every `ImportTask` row with
* `phase === ImportTaskPhase.Page`, transforming a batch of source pages
* into ProseMirror output and optionally cascading descendants as the next
* wave of child tasks.
*
* @param importTask ImportTask model to process.
* @returns Promise with output that resolves once processing has completed.
*/
protected abstract processPage(
importTask: ImportTask<T>
): Promise<ProcessOutput<T>>;
/**
* Schedule the next `APIImportTask`.
*
* @param importTask ImportTask model associated with the `APIImportTask`.
* @returns Promise that resolves when the task is scheduled.
*/
protected abstract scheduleNextTask(importTask: ImportTask<T>): Promise<void>;
/**
* Upload attachments found in the external document.
*
* @param doc ProseMirrorDoc that represents collection (or) document content.
* @param externalId id of the document in the external service.
* @param createdBy user who created the import.
* @returns Updated ProseMirrorDoc.
*/
private async uploadAttachments({
doc,
externalId,
createdBy,
}: {
doc: ProsemirrorDoc;
externalId: string;
createdBy: User;
}): Promise<ProsemirrorDoc> {
const docNode = ProsemirrorHelper.toProsemirror(doc);
const nodes = [
...ProsemirrorHelper.getImages(docNode),
...ProsemirrorHelper.getVideos(docNode),
...ProsemirrorHelper.getAttachments(docNode),
];
if (!nodes.length) {
return doc;
}
const urlToAttachment: Record<string, Attachment> = {};
// perf: dedup url.
const attachmentsData = uniqBy(
nodes.map((node) => {
const url = String(
node.type.name === "attachment" ? node.attrs.href : node.attrs.src
);
const name = String(
node.type.name === "image" ? node.attrs.alt : node.attrs.title
).trim();
return { url, name: name.length !== 0 ? name : node.type.name };
}),
"url"
);
await sequelize.transaction(async (transaction) => {
const dbPromises = attachmentsData.map(async (item) => {
const modelId = randomUUID();
const acl = AttachmentHelper.presetToAcl(
AttachmentPreset.DocumentAttachment
);
const key = AttachmentHelper.getKey({
id: modelId,
name: item.name,
userId: createdBy.id,
});
const attachment = await Attachment.create(
{
id: modelId,
key,
acl,
size: 0,
expiresAt: AttachmentHelper.presetToExpiry(
AttachmentPreset.DocumentAttachment
),
contentType: "application/octet-stream",
documentId: externalId,
teamId: createdBy.teamId,
userId: createdBy.id,
},
{ transaction }
);
urlToAttachment[item.url] = attachment;
});
return await Promise.all(dbPromises);
});
try {
const uploadItems = Object.entries(urlToAttachment).map(
([url, attachment]) => ({ attachmentId: attachment.id, url })
);
await new UploadAttachmentsForImportTask().schedule(uploadItems);
} catch (err) {
// upload attachments failure is not critical enough to fail the whole import.
Logger.error(
`upload attachment task failed for externalId ${externalId}`,
err
);
}
return this.replaceAttachmentUrls(docNode, urlToAttachment).toJSON();
}
/**
* Replace remote url to internal redirect url for attachments.
*
* @param doc ProseMirror node that represents collection (or) document content.
* @param urlToAttachment Map of remote url to attachment model.
* @returns Updated Prosemirror node.
*/
private replaceAttachmentUrls(
doc: Node,
urlToAttachment: Record<string, Attachment>
): Node {
const attachmentTypes = ["attachment", "image", "video"];
const transformAttachmentNode = (node: Node): Node => {
const json = node.toJSON() as ProsemirrorData;
const attrs = json.attrs ?? {};
if (node.type.name === "attachment") {
const attachmentModel = urlToAttachment[attrs.href as string];
// attachment node uses 'href' attribute.
attrs.href = attachmentModel.redirectUrl;
// attachment node can have id.
attrs.id = attachmentModel.id;
} else if (node.type.name === "image" || node.type.name === "video") {
// image & video nodes use 'src' attribute.
attrs.src = urlToAttachment[attrs.src as string].redirectUrl;
}
json.attrs = attrs;
return Node.fromJSON(schema, json);
};
const transformFragment = (fragment: Fragment): Fragment => {
const nodes: Node[] = [];
fragment.forEach((node) => {
nodes.push(
attachmentTypes.includes(node.type.name)
? transformAttachmentNode(node)
: node.copy(transformFragment(node.content))
);
});
return Fragment.fromArray(nodes);
};
return doc.copy(transformFragment(doc.content));
}
/**
* Job options such as priority and retry strategy, as defined by Bull.
*/
public get options(): JobOptions {
return {
priority: TaskPriority.Normal,
attempts: 3,
backoff: {
type: "exponential",
delay: 60 * 1000,
},
};
}
}