mirror of
https://github.com/outline/outline.git
synced 2026-06-13 11:25:03 +03:00
82d7041b6b
* chore: Refactor Markdown importer to use new import pipeline --------- Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
475 lines
15 KiB
TypeScript
475 lines
15 KiB
TypeScript
import type { JobOptions } from "bull";
|
|
import { chunk, truncate, uniqBy } from "es-toolkit/compat";
|
|
import { Fragment, Node } from "prosemirror-model";
|
|
import type { WhereOptions } from "sequelize";
|
|
import { Transaction } from "sequelize";
|
|
import { randomUUID } from "node:crypto";
|
|
import type {
|
|
BaseImportTaskInput,
|
|
ImportTaskInput,
|
|
ImportTaskOutput,
|
|
} from "@shared/schema";
|
|
import type {
|
|
ImportableIntegrationService,
|
|
ProsemirrorData,
|
|
ProsemirrorDoc,
|
|
} from "@shared/types";
|
|
import {
|
|
AttachmentPreset,
|
|
ImportState,
|
|
ImportTaskPhase,
|
|
ImportTaskState,
|
|
} from "@shared/types";
|
|
import { createContext } from "@server/context";
|
|
import { schema } from "@server/editor";
|
|
import Logger from "@server/logging/Logger";
|
|
import type { User } from "@server/models";
|
|
import { Attachment, Import, ImportTask } from "@server/models";
|
|
import AttachmentHelper from "@server/models/helpers/AttachmentHelper";
|
|
import { ProsemirrorHelper } from "@server/models/helpers/ProsemirrorHelper";
|
|
import { sequelize } from "@server/storage/database";
|
|
import { PagePerImportTask } from "../processors/ImportsProcessor";
|
|
import { BaseTask, TaskPriority } from "./base/BaseTask";
|
|
import UploadAttachmentsForImportTask from "./UploadAttachmentsForImportTask";
|
|
|
|
export type ProcessOutput<T extends ImportableIntegrationService> = {
|
|
taskOutput: ImportTaskOutput;
|
|
childTasksInput: ImportTaskInput<T>;
|
|
};
|
|
|
|
type Props = {
|
|
/** id of the import_task */
|
|
importTaskId: string;
|
|
};
|
|
|
|
export default abstract class APIImportTask<
|
|
T extends ImportableIntegrationService,
|
|
> extends BaseTask<Props> {
|
|
/**
|
|
* Run the import task.
|
|
*
|
|
* @param importTaskId id of the import_task model.
|
|
* @returns Promise that resolves once the task has completed.
|
|
*/
|
|
public async perform({ importTaskId }: Props) {
|
|
let importTask = await ImportTask.findByPk<ImportTask<T>>(importTaskId, {
|
|
rejectOnEmpty: true,
|
|
include: [
|
|
{
|
|
model: Import,
|
|
as: "import",
|
|
required: true,
|
|
},
|
|
],
|
|
});
|
|
|
|
// Don't process any further when the associated import is canceled by the user.
|
|
if (importTask.import.state === ImportState.Canceled) {
|
|
importTask.state = ImportTaskState.Canceled;
|
|
await importTask.save();
|
|
return;
|
|
}
|
|
|
|
try {
|
|
switch (importTask.state) {
|
|
case ImportTaskState.Created: {
|
|
importTask.state = ImportTaskState.InProgress;
|
|
importTask = await importTask.save();
|
|
return await this.onProcess(importTask);
|
|
}
|
|
|
|
case ImportTaskState.InProgress:
|
|
return await this.onProcess(importTask);
|
|
|
|
case ImportTaskState.Completed:
|
|
return await this.onCompletion(importTask);
|
|
|
|
default:
|
|
}
|
|
} catch (err) {
|
|
if (err instanceof Error) {
|
|
importTask.error = truncate(err.message, { length: 255 });
|
|
await importTask.save();
|
|
}
|
|
|
|
throw err; // throw error for retry.
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Handle failure when all attempts of APIImportTask has failed.
|
|
*
|
|
* @param importTaskId id of the import_task model.
|
|
* @returns Promise that resolves once failure has been handled.
|
|
*/
|
|
public async onFailed({ importTaskId }: Props) {
|
|
await sequelize.transaction(async (transaction) => {
|
|
const importTask = await ImportTask.findByPk<ImportTask<T>>(
|
|
importTaskId,
|
|
{
|
|
rejectOnEmpty: true,
|
|
include: [
|
|
{
|
|
model: Import,
|
|
as: "import",
|
|
required: true,
|
|
},
|
|
],
|
|
transaction,
|
|
lock: Transaction.LOCK.UPDATE,
|
|
}
|
|
);
|
|
|
|
importTask.state = ImportTaskState.Errored;
|
|
await importTask.save({ transaction });
|
|
|
|
const associatedImport = importTask.import;
|
|
associatedImport.error = importTask.error; // copy error from ImportTask that caused the failure.
|
|
associatedImport.state = ImportState.Errored;
|
|
await associatedImport.saveWithCtx(
|
|
createContext({
|
|
user: associatedImport.createdBy,
|
|
transaction,
|
|
})
|
|
);
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Creation flow for the task.
|
|
* This fetches data from external source, stores the task output and creates subsequent import_task models.
|
|
*
|
|
* @param importTask import_task model to process.
|
|
* @returns Promise that resolves once processing has completed.
|
|
*/
|
|
private async onProcess(importTask: ImportTask<T>) {
|
|
const { taskOutput, childTasksInput } =
|
|
importTask.phase === ImportTaskPhase.Bootstrap
|
|
? await this.processBootstrap(importTask)
|
|
: await this.processPage(importTask);
|
|
|
|
const taskOutputWithReplacements = this.shouldUploadAttachmentsPerPage()
|
|
? await Promise.all(
|
|
taskOutput.map(async (item) => ({
|
|
...item,
|
|
content: await this.uploadAttachments({
|
|
doc: item.content,
|
|
externalId: item.externalId,
|
|
createdBy: importTask.import.createdBy,
|
|
}),
|
|
}))
|
|
)
|
|
: taskOutput;
|
|
|
|
await sequelize.transaction(async (transaction) => {
|
|
await Promise.all(
|
|
chunk(childTasksInput as BaseImportTaskInput, PagePerImportTask).map(
|
|
async (input) => {
|
|
await ImportTask.create(
|
|
{
|
|
state: ImportTaskState.Created,
|
|
phase: ImportTaskPhase.Page,
|
|
input: input as ImportTaskInput<T>,
|
|
importId: importTask.importId,
|
|
},
|
|
{ transaction }
|
|
);
|
|
}
|
|
)
|
|
);
|
|
|
|
importTask.output = taskOutputWithReplacements;
|
|
importTask.state = ImportTaskState.Completed;
|
|
importTask.error = null; // unset any error from previous attempts.
|
|
await importTask.save({ transaction });
|
|
|
|
const associatedImport = importTask.import;
|
|
associatedImport.documentCount += taskOutputWithReplacements.length;
|
|
await associatedImport.saveWithCtx(
|
|
createContext({
|
|
user: associatedImport.createdBy,
|
|
transaction,
|
|
}),
|
|
undefined,
|
|
{
|
|
persist: false,
|
|
}
|
|
);
|
|
});
|
|
|
|
await this.scheduleNextTask(importTask);
|
|
}
|
|
|
|
/**
|
|
* Completion flow for the task.
|
|
* This determines if there are any more import_tasks to process (or) all tasks for the import have been processed, and schedules the next step.
|
|
*
|
|
* @param importTask import_task model to process.
|
|
* @returns Promise that resolves once processing has completed.
|
|
*/
|
|
private async onCompletion(importTask: ImportTask<T>) {
|
|
const where: WhereOptions<ImportTask<T>> = {
|
|
state: ImportTaskState.Created,
|
|
importId: importTask.importId,
|
|
};
|
|
|
|
const nextImportTask = await ImportTask.findOne<ImportTask<T>>({
|
|
where,
|
|
order: [["createdAt", "ASC"]],
|
|
});
|
|
|
|
// Tasks available to process for this import.
|
|
if (nextImportTask) {
|
|
return await this.scheduleNextTask(nextImportTask);
|
|
}
|
|
|
|
// All tasks for this import have been processed. Run the post-completion
|
|
// hook before flipping state so subclasses can perform work that must
|
|
// happen before "imports.processed" downstream handlers fire.
|
|
await this.onAllTasksCompleted(importTask);
|
|
|
|
await sequelize.transaction(async (transaction) => {
|
|
const associatedImport = importTask.import;
|
|
associatedImport.state = ImportState.Processed;
|
|
// Release any cross-phase scratch state — the import is done with it.
|
|
associatedImport.scratch = null;
|
|
await associatedImport.saveWithCtx(
|
|
createContext({
|
|
user: associatedImport.createdBy,
|
|
transaction,
|
|
}),
|
|
undefined,
|
|
{ name: "processed" }
|
|
);
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Whether the base class should create Attachment rows and upload S3 blobs
|
|
* per page during `onProcess`. Defaults to `true` for sources whose
|
|
* attachments are addressable by per-task URLs (e.g. Notion). Sources where
|
|
* attachments are shared across pages or live in a single archive may
|
|
* override this and handle attachment persistence in `onAllTasksCompleted`.
|
|
*
|
|
* @returns true to enable the per-page attachment upload step.
|
|
*/
|
|
protected shouldUploadAttachmentsPerPage(): boolean {
|
|
return true;
|
|
}
|
|
|
|
/**
|
|
* Hook invoked after the final import task has been processed but before the
|
|
* associated `Import` state transitions to `Processed`. Subclasses can
|
|
* override to perform cross-task finalization (e.g. uploading shared
|
|
* attachments) that must happen before the persistence pass.
|
|
*
|
|
* @param lastImportTask The most recently completed ImportTask for the import.
|
|
* @returns Promise that resolves when finalization is complete.
|
|
*/
|
|
protected async onAllTasksCompleted(
|
|
// oxlint-disable-next-line @typescript-eslint/no-unused-vars
|
|
lastImportTask: ImportTask<T>
|
|
): Promise<void> {
|
|
return;
|
|
}
|
|
|
|
/**
|
|
* Bootstrap phase. Runs once per import on a worker that owns the source
|
|
* artifact (e.g. extracts a zip, walks the file tree, schedules child page
|
|
* tasks). Subclasses without a bootstrap step leave this unimplemented; the
|
|
* base only invokes it when an `ImportTask` is created with
|
|
* `phase === ImportTaskPhase.Bootstrap`.
|
|
*
|
|
* @param importTask ImportTask model to process.
|
|
* @returns Promise with output that resolves once processing has completed.
|
|
*/
|
|
protected processBootstrap(
|
|
// oxlint-disable-next-line @typescript-eslint/no-unused-vars
|
|
importTask: ImportTask<T>
|
|
): Promise<ProcessOutput<T>> {
|
|
throw new Error(
|
|
`${this.constructor.name} does not implement processBootstrap()`
|
|
);
|
|
}
|
|
|
|
/**
|
|
* Page phase. Runs for every `ImportTask` row with
|
|
* `phase === ImportTaskPhase.Page`, transforming a batch of source pages
|
|
* into ProseMirror output and optionally cascading descendants as the next
|
|
* wave of child tasks.
|
|
*
|
|
* @param importTask ImportTask model to process.
|
|
* @returns Promise with output that resolves once processing has completed.
|
|
*/
|
|
protected abstract processPage(
|
|
importTask: ImportTask<T>
|
|
): Promise<ProcessOutput<T>>;
|
|
|
|
/**
|
|
* Schedule the next `APIImportTask`.
|
|
*
|
|
* @param importTask ImportTask model associated with the `APIImportTask`.
|
|
* @returns Promise that resolves when the task is scheduled.
|
|
*/
|
|
protected abstract scheduleNextTask(importTask: ImportTask<T>): Promise<void>;
|
|
|
|
/**
|
|
* Upload attachments found in the external document.
|
|
*
|
|
* @param doc ProseMirrorDoc that represents collection (or) document content.
|
|
* @param externalId id of the document in the external service.
|
|
* @param createdBy user who created the import.
|
|
* @returns Updated ProseMirrorDoc.
|
|
*/
|
|
private async uploadAttachments({
|
|
doc,
|
|
externalId,
|
|
createdBy,
|
|
}: {
|
|
doc: ProsemirrorDoc;
|
|
externalId: string;
|
|
createdBy: User;
|
|
}): Promise<ProsemirrorDoc> {
|
|
const docNode = ProsemirrorHelper.toProsemirror(doc);
|
|
const nodes = [
|
|
...ProsemirrorHelper.getImages(docNode),
|
|
...ProsemirrorHelper.getVideos(docNode),
|
|
...ProsemirrorHelper.getAttachments(docNode),
|
|
];
|
|
|
|
if (!nodes.length) {
|
|
return doc;
|
|
}
|
|
|
|
const urlToAttachment: Record<string, Attachment> = {};
|
|
|
|
// perf: dedup url.
|
|
const attachmentsData = uniqBy(
|
|
nodes.map((node) => {
|
|
const url = String(
|
|
node.type.name === "attachment" ? node.attrs.href : node.attrs.src
|
|
);
|
|
const name = String(
|
|
node.type.name === "image" ? node.attrs.alt : node.attrs.title
|
|
).trim();
|
|
|
|
return { url, name: name.length !== 0 ? name : node.type.name };
|
|
}),
|
|
"url"
|
|
);
|
|
|
|
await sequelize.transaction(async (transaction) => {
|
|
const dbPromises = attachmentsData.map(async (item) => {
|
|
const modelId = randomUUID();
|
|
const acl = AttachmentHelper.presetToAcl(
|
|
AttachmentPreset.DocumentAttachment
|
|
);
|
|
const key = AttachmentHelper.getKey({
|
|
id: modelId,
|
|
name: item.name,
|
|
userId: createdBy.id,
|
|
});
|
|
|
|
const attachment = await Attachment.create(
|
|
{
|
|
id: modelId,
|
|
key,
|
|
acl,
|
|
size: 0,
|
|
expiresAt: AttachmentHelper.presetToExpiry(
|
|
AttachmentPreset.DocumentAttachment
|
|
),
|
|
contentType: "application/octet-stream",
|
|
documentId: externalId,
|
|
teamId: createdBy.teamId,
|
|
userId: createdBy.id,
|
|
},
|
|
{ transaction }
|
|
);
|
|
|
|
urlToAttachment[item.url] = attachment;
|
|
});
|
|
|
|
return await Promise.all(dbPromises);
|
|
});
|
|
|
|
try {
|
|
const uploadItems = Object.entries(urlToAttachment).map(
|
|
([url, attachment]) => ({ attachmentId: attachment.id, url })
|
|
);
|
|
await new UploadAttachmentsForImportTask().schedule(uploadItems);
|
|
} catch (err) {
|
|
// upload attachments failure is not critical enough to fail the whole import.
|
|
Logger.error(
|
|
`upload attachment task failed for externalId ${externalId}`,
|
|
err
|
|
);
|
|
}
|
|
|
|
return this.replaceAttachmentUrls(docNode, urlToAttachment).toJSON();
|
|
}
|
|
|
|
/**
|
|
* Replace remote url to internal redirect url for attachments.
|
|
*
|
|
* @param doc ProseMirror node that represents collection (or) document content.
|
|
* @param urlToAttachment Map of remote url to attachment model.
|
|
* @returns Updated Prosemirror node.
|
|
*/
|
|
private replaceAttachmentUrls(
|
|
doc: Node,
|
|
urlToAttachment: Record<string, Attachment>
|
|
): Node {
|
|
const attachmentTypes = ["attachment", "image", "video"];
|
|
|
|
const transformAttachmentNode = (node: Node): Node => {
|
|
const json = node.toJSON() as ProsemirrorData;
|
|
const attrs = json.attrs ?? {};
|
|
|
|
if (node.type.name === "attachment") {
|
|
const attachmentModel = urlToAttachment[attrs.href as string];
|
|
// attachment node uses 'href' attribute.
|
|
attrs.href = attachmentModel.redirectUrl;
|
|
// attachment node can have id.
|
|
attrs.id = attachmentModel.id;
|
|
} else if (node.type.name === "image" || node.type.name === "video") {
|
|
// image & video nodes use 'src' attribute.
|
|
attrs.src = urlToAttachment[attrs.src as string].redirectUrl;
|
|
}
|
|
|
|
json.attrs = attrs;
|
|
return Node.fromJSON(schema, json);
|
|
};
|
|
|
|
const transformFragment = (fragment: Fragment): Fragment => {
|
|
const nodes: Node[] = [];
|
|
|
|
fragment.forEach((node) => {
|
|
nodes.push(
|
|
attachmentTypes.includes(node.type.name)
|
|
? transformAttachmentNode(node)
|
|
: node.copy(transformFragment(node.content))
|
|
);
|
|
});
|
|
|
|
return Fragment.fromArray(nodes);
|
|
};
|
|
|
|
return doc.copy(transformFragment(doc.content));
|
|
}
|
|
|
|
/**
|
|
* Job options such as priority and retry strategy, as defined by Bull.
|
|
*/
|
|
public get options(): JobOptions {
|
|
return {
|
|
priority: TaskPriority.Normal,
|
|
attempts: 3,
|
|
backoff: {
|
|
type: "exponential",
|
|
delay: 60 * 1000,
|
|
},
|
|
};
|
|
}
|
|
}
|