From 82d7041b6b94fec7b6bc3dce75b8ba6d141bdd02 Mon Sep 17 00:00:00 2001 From: Tom Moor Date: Sat, 16 May 2026 14:10:15 -0400 Subject: [PATCH] chore: Refactor Markdown importer to use new import pipeline (#12361) * chore: Refactor Markdown importer to use new import pipeline --------- Co-authored-by: Claude Opus 4.7 --- .../Settings/components/DropToImport.tsx | 23 +- .../processors/NotionImportsProcessor.ts | 4 + .../server/tasks/NotionAPIImportTask.ts | 25 +- ...00000-make-imports-integration-nullable.js | 16 + ...dd-import-task-phase-and-import-scratch.js | 21 + server/models/Import.ts | 12 +- server/models/ImportTask.ts | 6 +- .../FileOperationCreatedProcessor.ts | 10 +- server/queues/processors/ImportsProcessor.ts | 70 +- .../processors/MarkdownImportsProcessor.ts | 37 + server/queues/tasks/APIImportTask.ts | 127 +++- .../tasks/ImportMarkdownZipTask.test.ts | 139 ---- server/queues/tasks/ImportMarkdownZipTask.ts | 286 -------- .../tasks/MarkdownAPIImportTask.test.ts | 116 +++ server/queues/tasks/MarkdownAPIImportTask.ts | 672 ++++++++++++++++++ server/routes/api/collections/schema.ts | 6 +- server/routes/api/imports/imports.ts | 47 +- server/routes/api/imports/schema.ts | 6 + shared/schema.ts | 90 ++- shared/types.ts | 19 +- 20 files changed, 1224 insertions(+), 508 deletions(-) create mode 100644 server/migrations/20260515000000-make-imports-integration-nullable.js create mode 100644 server/migrations/20260516000000-add-import-task-phase-and-import-scratch.js create mode 100644 server/queues/processors/MarkdownImportsProcessor.ts delete mode 100644 server/queues/tasks/ImportMarkdownZipTask.test.ts delete mode 100644 server/queues/tasks/ImportMarkdownZipTask.ts create mode 100644 server/queues/tasks/MarkdownAPIImportTask.test.ts create mode 100644 server/queues/tasks/MarkdownAPIImportTask.ts diff --git a/app/scenes/Settings/components/DropToImport.tsx b/app/scenes/Settings/components/DropToImport.tsx index f4fd62e0ba..cbc675d92b 100644 --- a/app/scenes/Settings/components/DropToImport.tsx +++ b/app/scenes/Settings/components/DropToImport.tsx @@ -6,7 +6,12 @@ import { useTranslation } from "react-i18next"; import { toast } from "sonner"; import styled from "styled-components"; import { s } from "@shared/styles"; -import { AttachmentPreset, CollectionPermission } from "@shared/types"; +import { + AttachmentPreset, + CollectionPermission, + FileOperationFormat, + IntegrationService, +} from "@shared/types"; import { bytesToHumanReadable } from "@shared/utils/files"; import Button from "~/components/Button"; import Flex from "~/components/Flex"; @@ -27,7 +32,7 @@ type Props = { function DropToImport({ disabled, onSubmit, children, format }: Props) { const { t } = useTranslation(); - const { collections } = useStores(); + const { collections, imports } = useStores(); const [file, setFile] = useState(null); const [isImporting, setImporting] = useState(false); const [permission, setPermission] = useState( @@ -53,7 +58,19 @@ function DropToImport({ disabled, onSubmit, children, format }: Props) { name: file.name, preset: AttachmentPreset.WorkspaceImport, }); - await collections.import(attachment.id, { format, permission }); + + if (format === FileOperationFormat.MarkdownZip) { + await imports.create( + { service: IntegrationService.Markdown }, + { + attachmentId: attachment.id, + permission: permission ?? undefined, + } + ); + } else { + await collections.import(attachment.id, { format, permission }); + } + onSubmit(); toast.message(file.name, { description: t( diff --git a/plugins/notion/server/processors/NotionImportsProcessor.ts b/plugins/notion/server/processors/NotionImportsProcessor.ts index e9f72cf45e..ee078b3cce 100644 --- a/plugins/notion/server/processors/NotionImportsProcessor.ts +++ b/plugins/notion/server/processors/NotionImportsProcessor.ts @@ -30,6 +30,10 @@ export class NotionImportsProcessor extends ImportsProcessor, transaction: Transaction ): Promise { + if (!importModel.integrationId) { + throw new Error("Notion import is missing integrationId"); + } + const integration = await Integration.scope("withAuthentication").findByPk( importModel.integrationId, { rejectOnEmpty: true } diff --git a/plugins/notion/server/tasks/NotionAPIImportTask.ts b/plugins/notion/server/tasks/NotionAPIImportTask.ts index 58f4b83f35..d66dc1329c 100644 --- a/plugins/notion/server/tasks/NotionAPIImportTask.ts +++ b/plugins/notion/server/tasks/NotionAPIImportTask.ts @@ -29,15 +29,19 @@ export default class NotionAPIImportTask extends APIImportTask ): Promise> { + if (!importTask.import.integrationId) { + throw new Error("Notion import is missing integrationId"); + } + const integration = await Integration.scope("withAuthentication").findByPk( importTask.import.integrationId, { rejectOnEmpty: true } @@ -47,7 +51,7 @@ export default class NotionAPIImportTask extends APIImportTask ({ externalId: parsedPage.externalId, title: parsedPage.title, - emoji: parsedPage.emoji, + icon: parsedPage.icon, content: parsedPage.content, author: parsedPage.author, createdAt: parsedPage.createdAt, @@ -96,7 +100,7 @@ export default class NotionAPIImportTask extends APIImportTask extends ParanoidModel< @Column(DataType.JSONB) input: ImportInput; + @AllowNull + @Column(DataType.JSONB) + scratch: ImportScratch | null; + @IsNumeric @Default(0) @Column(DataType.INTEGER) @@ -66,11 +71,12 @@ class Import extends ParanoidModel< // associations @BelongsTo(() => Integration, "integrationId") - integration: Integration; + integration: Integration | null; + @AllowNull @ForeignKey(() => Integration) @Column(DataType.UUID) - integrationId: string; + integrationId: string | null; @BelongsTo(() => User, "createdById") createdBy: User; diff --git a/server/models/ImportTask.ts b/server/models/ImportTask.ts index 6a7b5dd297..2e262aba3c 100644 --- a/server/models/ImportTask.ts +++ b/server/models/ImportTask.ts @@ -11,7 +11,7 @@ import { import type { ImportTaskOutput } from "@shared/schema"; import { type ImportTaskInput } from "@shared/schema"; import type { ImportableIntegrationService } from "@shared/types"; -import { ImportTaskState } from "@shared/types"; +import { ImportTaskPhase, ImportTaskState } from "@shared/types"; import Import from "./Import"; import IdModel from "./base/IdModel"; import Fix from "./decorators/Fix"; @@ -40,6 +40,10 @@ class ImportTask extends IdModel< @Column(DataType.STRING) state: ImportTaskState; + @IsIn([Object.values(ImportTaskPhase)]) + @Column(DataType.STRING) + phase: ImportTaskPhase; + @Column(DataType.JSONB) input: ImportTaskInput; diff --git a/server/queues/processors/FileOperationCreatedProcessor.ts b/server/queues/processors/FileOperationCreatedProcessor.ts index a16bb78fb8..59c6aff733 100644 --- a/server/queues/processors/FileOperationCreatedProcessor.ts +++ b/server/queues/processors/FileOperationCreatedProcessor.ts @@ -5,7 +5,6 @@ import ExportHTMLZipTask from "../tasks/ExportHTMLZipTask"; import ExportJSONTask from "../tasks/ExportJSONTask"; import ExportMarkdownZipTask from "../tasks/ExportMarkdownZipTask"; import ImportJSONTask from "../tasks/ImportJSONTask"; -import ImportMarkdownZipTask from "../tasks/ImportMarkdownZipTask"; import BaseProcessor from "./BaseProcessor"; export default class FileOperationCreatedProcessor extends BaseProcessor { @@ -19,14 +18,11 @@ export default class FileOperationCreatedProcessor extends BaseProcessor { } ); - // map file operation type and format to the appropriate task + // map file operation type and format to the appropriate task. Markdown + // zip imports flow through the API-import pipeline (`imports.create` → + // MarkdownAPIImportTask) and never reach this dispatcher. if (fileOperation.type === FileOperationType.Import) { switch (fileOperation.format) { - case FileOperationFormat.MarkdownZip: - await new ImportMarkdownZipTask().schedule({ - fileOperationId: event.modelId, - }); - break; case FileOperationFormat.JSON: await new ImportJSONTask().schedule({ fileOperationId: event.modelId, diff --git a/server/queues/processors/ImportsProcessor.ts b/server/queues/processors/ImportsProcessor.ts index 65bcabc02b..c245483b0a 100644 --- a/server/queues/processors/ImportsProcessor.ts +++ b/server/queues/processors/ImportsProcessor.ts @@ -5,14 +5,24 @@ import type { CreateOptions, CreationAttributes, Transaction } from "sequelize"; import { UniqueConstraintError } from "sequelize"; import { randomUUID } from "node:crypto"; import { randomElement } from "@shared/random"; -import type { ImportInput, ImportTaskInput } from "@shared/schema"; +import type { + BaseImportInput, + BaseImportTaskInput, + ImportInput, + ImportTaskInput, +} from "@shared/schema"; import type { ImportableIntegrationService, ProsemirrorData, ProsemirrorDoc, SourceMetadata, } from "@shared/types"; -import { ImportState, ImportTaskState, MentionType } from "@shared/types"; +import { + ImportState, + ImportTaskPhase, + ImportTaskState, + MentionType, +} from "@shared/types"; import { colorPalette } from "@shared/utils/collections"; import { CollectionValidation } from "@shared/validations"; import { createContext } from "@server/context"; @@ -118,22 +128,26 @@ export default abstract class ImportsProcessor< } const tasksInput = await this.buildTasksInput(importModel, transaction); + const phase = this.getInitialPhase(); const importTasks = await Promise.all( - chunk(tasksInput, PagePerImportTask).map((input) => { - const attrs = { - state: ImportTaskState.Created, - input, - importId: importModel.id, - } as ImportTaskCreationAttributes; + chunk(tasksInput as BaseImportTaskInput, PagePerImportTask).map( + (input) => { + const attrs = { + state: ImportTaskState.Created, + phase, + input, + importId: importModel.id, + } as ImportTaskCreationAttributes; - return ImportTask.create< - ImportTask, - CreateOptions> - >(attrs as unknown as CreationAttributes>, { - transaction, - }); - }) + return ImportTask.create< + ImportTask, + CreateOptions> + >(attrs as unknown as CreationAttributes>, { + transaction, + }); + } + ) ); importModel.state = ImportState.InProgress; @@ -271,8 +285,12 @@ export default abstract class ImportsProcessor< const createdCollections: Collection[] = []; // External id to internal model id. const idMap: Record = {}; - // These will be imported as collections. - const importInput = keyBy(importModel.input, "externalId"); + // These will be imported as collections. Widened to the base input shape + // because the abstract class has no narrowed view of T. + const importInput = keyBy( + importModel.input as BaseImportInput, + "externalId" + ); const ctx = createContext({ user: importModel.createdBy, transaction }); const firstCollection = await Collection.findFirstCollectionForUser( @@ -361,8 +379,8 @@ export default abstract class ImportsProcessor< const collection = Collection.build({ id: internalId, name: output.title, - icon: output.emoji ?? "collection", - color: output.emoji ? undefined : randomElement(colorPalette), + icon: output.icon ?? "collection", + color: output.icon ? undefined : randomElement(colorPalette), content: transformedContent, description: truncate(description, { length: CollectionValidation.maxDescriptionLength, @@ -403,7 +421,7 @@ export default abstract class ImportsProcessor< const defaults = { title: output.title, - icon: output.emoji, + icon: output.icon, content: transformedContent, text: await DocumentHelper.toMarkdown(transformedContent, { includeTitle: false, @@ -602,6 +620,18 @@ export default abstract class ImportsProcessor< */ protected abstract canProcess(importModel: Import): boolean; + /** + * Phase assigned to the initial ImportTask rows created from + * `buildTasksInput`. Sources that begin with a bootstrap step (e.g. + * Markdown zip extraction) override this to return `Bootstrap`. Sources + * that fan out directly into page work (e.g. Notion) leave the default. + * + * @returns Phase for the first wave of ImportTask rows. + */ + protected getInitialPhase(): ImportTaskPhase { + return ImportTaskPhase.Page; + } + /** * Build task inputs which will be used for `APIImportTask`s. * diff --git a/server/queues/processors/MarkdownImportsProcessor.ts b/server/queues/processors/MarkdownImportsProcessor.ts new file mode 100644 index 0000000000..5f6a851836 --- /dev/null +++ b/server/queues/processors/MarkdownImportsProcessor.ts @@ -0,0 +1,37 @@ +import type { Transaction } from "sequelize"; +import type { ImportTaskInput } from "@shared/schema"; +import { ImportTaskPhase, IntegrationService } from "@shared/types"; +import type { Import, ImportTask } from "@server/models"; +import MarkdownAPIImportTask from "../tasks/MarkdownAPIImportTask"; +import ImportsProcessor from "./ImportsProcessor"; + +export default class MarkdownImportsProcessor extends ImportsProcessor { + protected canProcess( + importModel: Import + ): boolean { + return importModel.service === IntegrationService.Markdown; + } + + protected getInitialPhase(): ImportTaskPhase { + return ImportTaskPhase.Bootstrap; + } + + protected async buildTasksInput( + importModel: Import, + _transaction: Transaction + ): Promise> { + if (!importModel.scratch?.storageKey) { + throw new Error( + "Markdown import is missing scratch.storageKey for the bootstrap phase" + ); + } + + return [{ externalId: importModel.input[0].externalId }]; + } + + protected async scheduleTask( + importTask: ImportTask + ): Promise { + await new MarkdownAPIImportTask().schedule({ importTaskId: importTask.id }); + } +} diff --git a/server/queues/tasks/APIImportTask.ts b/server/queues/tasks/APIImportTask.ts index 41e3e3749b..5572bee3e3 100644 --- a/server/queues/tasks/APIImportTask.ts +++ b/server/queues/tasks/APIImportTask.ts @@ -4,13 +4,22 @@ import { Fragment, Node } from "prosemirror-model"; import type { WhereOptions } from "sequelize"; import { Transaction } from "sequelize"; import { randomUUID } from "node:crypto"; -import type { ImportTaskInput, ImportTaskOutput } from "@shared/schema"; +import type { + BaseImportTaskInput, + ImportTaskInput, + ImportTaskOutput, +} from "@shared/schema"; import type { ImportableIntegrationService, ProsemirrorData, ProsemirrorDoc, } from "@shared/types"; -import { AttachmentPreset, ImportState, ImportTaskState } from "@shared/types"; +import { + AttachmentPreset, + ImportState, + ImportTaskPhase, + ImportTaskState, +} from "@shared/types"; import { createContext } from "@server/context"; import { schema } from "@server/editor"; import Logger from "@server/logging/Logger"; @@ -134,31 +143,39 @@ export default abstract class APIImportTask< * @returns Promise that resolves once processing has completed. */ private async onProcess(importTask: ImportTask) { - const { taskOutput, childTasksInput } = await this.process(importTask); + const { taskOutput, childTasksInput } = + importTask.phase === ImportTaskPhase.Bootstrap + ? await this.processBootstrap(importTask) + : await this.processPage(importTask); - const taskOutputWithReplacements = await Promise.all( - taskOutput.map(async (item) => ({ - ...item, - content: await this.uploadAttachments({ - doc: item.content, - externalId: item.externalId, - createdBy: importTask.import.createdBy, - }), - })) - ); + const taskOutputWithReplacements = this.shouldUploadAttachmentsPerPage() + ? await Promise.all( + taskOutput.map(async (item) => ({ + ...item, + content: await this.uploadAttachments({ + doc: item.content, + externalId: item.externalId, + createdBy: importTask.import.createdBy, + }), + })) + ) + : taskOutput; await sequelize.transaction(async (transaction) => { await Promise.all( - chunk(childTasksInput, PagePerImportTask).map(async (input) => { - await ImportTask.create( - { - state: ImportTaskState.Created, - input, - importId: importTask.importId, - }, - { transaction } - ); - }) + chunk(childTasksInput as BaseImportTaskInput, PagePerImportTask).map( + async (input) => { + await ImportTask.create( + { + state: ImportTaskState.Created, + phase: ImportTaskPhase.Page, + input: input as ImportTaskInput, + importId: importTask.importId, + }, + { transaction } + ); + } + ) ); importTask.output = taskOutputWithReplacements; @@ -206,10 +223,16 @@ export default abstract class APIImportTask< return await this.scheduleNextTask(nextImportTask); } - // All tasks for this import have been processed. + // All tasks for this import have been processed. Run the post-completion + // hook before flipping state so subclasses can perform work that must + // happen before "imports.processed" downstream handlers fire. + await this.onAllTasksCompleted(importTask); + await sequelize.transaction(async (transaction) => { const associatedImport = importTask.import; associatedImport.state = ImportState.Processed; + // Release any cross-phase scratch state — the import is done with it. + associatedImport.scratch = null; await associatedImport.saveWithCtx( createContext({ user: associatedImport.createdBy, @@ -222,13 +245,63 @@ export default abstract class APIImportTask< } /** - * Process the import task. - * This fetches data from external source and converts it to task output. + * Whether the base class should create Attachment rows and upload S3 blobs + * per page during `onProcess`. Defaults to `true` for sources whose + * attachments are addressable by per-task URLs (e.g. Notion). Sources where + * attachments are shared across pages or live in a single archive may + * override this and handle attachment persistence in `onAllTasksCompleted`. + * + * @returns true to enable the per-page attachment upload step. + */ + protected shouldUploadAttachmentsPerPage(): boolean { + return true; + } + + /** + * Hook invoked after the final import task has been processed but before the + * associated `Import` state transitions to `Processed`. Subclasses can + * override to perform cross-task finalization (e.g. uploading shared + * attachments) that must happen before the persistence pass. + * + * @param lastImportTask The most recently completed ImportTask for the import. + * @returns Promise that resolves when finalization is complete. + */ + protected async onAllTasksCompleted( + // oxlint-disable-next-line @typescript-eslint/no-unused-vars + lastImportTask: ImportTask + ): Promise { + return; + } + + /** + * Bootstrap phase. Runs once per import on a worker that owns the source + * artifact (e.g. extracts a zip, walks the file tree, schedules child page + * tasks). Subclasses without a bootstrap step leave this unimplemented; the + * base only invokes it when an `ImportTask` is created with + * `phase === ImportTaskPhase.Bootstrap`. * * @param importTask ImportTask model to process. * @returns Promise with output that resolves once processing has completed. */ - protected abstract process( + protected processBootstrap( + // oxlint-disable-next-line @typescript-eslint/no-unused-vars + importTask: ImportTask + ): Promise> { + throw new Error( + `${this.constructor.name} does not implement processBootstrap()` + ); + } + + /** + * Page phase. Runs for every `ImportTask` row with + * `phase === ImportTaskPhase.Page`, transforming a batch of source pages + * into ProseMirror output and optionally cascading descendants as the next + * wave of child tasks. + * + * @param importTask ImportTask model to process. + * @returns Promise with output that resolves once processing has completed. + */ + protected abstract processPage( importTask: ImportTask ): Promise>; diff --git a/server/queues/tasks/ImportMarkdownZipTask.test.ts b/server/queues/tasks/ImportMarkdownZipTask.test.ts deleted file mode 100644 index d4bb91f3dc..0000000000 --- a/server/queues/tasks/ImportMarkdownZipTask.test.ts +++ /dev/null @@ -1,139 +0,0 @@ -/* oxlint-disable @typescript-eslint/no-empty-function */ -import path from "node:path"; -import { FileOperation } from "@server/models"; -import { buildFileOperation } from "@server/test/factories"; -import ImportMarkdownZipTask from "./ImportMarkdownZipTask"; - -describe("ImportMarkdownZipTask", () => { - it("should import the documents, attachments", async () => { - const fileOperation = await buildFileOperation(); - Object.defineProperty(fileOperation, "handle", { - get() { - return { - path: path.resolve( - __dirname, - "..", - "..", - "test", - "fixtures", - "outline-markdown.zip" - ), - cleanup: async () => {}, - }; - }, - }); - vi.spyOn(FileOperation, "findByPk").mockResolvedValue(fileOperation); - - const props = { - fileOperationId: fileOperation.id, - }; - - const task = new ImportMarkdownZipTask(); - const response = await task.perform(props); - - expect(response.collections.size).toEqual(1); - expect(response.documents.size).toEqual(8); - expect(response.attachments.size).toEqual(6); - }, 10000); - - it("should import the documents, public attachments", async () => { - const fileOperation = await buildFileOperation(); - Object.defineProperty(fileOperation, "handle", { - get() { - return { - path: path.resolve( - __dirname, - "..", - "..", - "test", - "fixtures", - "outline-markdown-public.zip" - ), - cleanup: async () => {}, - }; - }, - }); - vi.spyOn(FileOperation, "findByPk").mockResolvedValue(fileOperation); - - const props = { - fileOperationId: fileOperation.id, - }; - - const task = new ImportMarkdownZipTask(); - const response = await task.perform(props); - - expect(response.collections.size).toEqual(1); - expect(response.documents.size).toEqual(2); - expect(response.attachments.size).toEqual(1); - }, 10000); - - it("should throw an error with corrupt zip", async () => { - const fileOperation = await buildFileOperation(); - Object.defineProperty(fileOperation, "handle", { - get() { - return { - path: path.resolve( - __dirname, - "..", - "..", - "test", - "fixtures", - "corrupt.zip" - ), - cleanup: async () => {}, - }; - }, - }); - vi.spyOn(FileOperation, "findByPk").mockResolvedValue(fileOperation); - - const props = { - fileOperationId: fileOperation.id, - }; - - let error; - try { - const task = new ImportMarkdownZipTask(); - await task.perform(props); - } catch (err) { - error = err; - } - - expect(error && error.message).toBeTruthy(); - }); - - it("should throw an error with empty collection in zip", async () => { - const fileOperation = await buildFileOperation(); - Object.defineProperty(fileOperation, "handle", { - get() { - return { - path: path.resolve( - __dirname, - "..", - "..", - "test", - "fixtures", - "empty.zip" - ), - cleanup: async () => {}, - }; - }, - }); - vi.spyOn(FileOperation, "findByPk").mockResolvedValue(fileOperation); - - const props = { - fileOperationId: fileOperation.id, - }; - - let error; - try { - const task = new ImportMarkdownZipTask(); - await task.perform(props); - } catch (err) { - error = err; - } - - expect(error && error.message).toContain( - "Uploaded file does not contain any valid collections" - ); - }); -}); diff --git a/server/queues/tasks/ImportMarkdownZipTask.ts b/server/queues/tasks/ImportMarkdownZipTask.ts deleted file mode 100644 index 88883c09de..0000000000 --- a/server/queues/tasks/ImportMarkdownZipTask.ts +++ /dev/null @@ -1,286 +0,0 @@ -import path from "node:path"; -import fs from "fs-extra"; -import { escapeRegExp } from "es-toolkit/compat"; -import mime from "mime-types"; -import { randomUUID } from "node:crypto"; -import documentImporter from "@server/commands/documentImporter"; -import { createContext } from "@server/context"; -import Logger from "@server/logging/Logger"; -import type { FileOperation } from "@server/models"; -import { User } from "@server/models"; -import { Buckets } from "@server/models/helpers/AttachmentHelper"; -import { sequelize } from "@server/storage/database"; -import type { FileTreeNode } from "@server/utils/ImportHelper"; -import ImportHelper from "@server/utils/ImportHelper"; -import type { StructuredImportData } from "./ImportTask"; -import ImportTask from "./ImportTask"; - -export default class ImportMarkdownZipTask extends ImportTask { - public async parseData( - dirPath: string, - fileOperation: FileOperation - ): Promise { - const tree = await ImportHelper.toFileTree(dirPath); - if (!tree) { - throw new Error("Could not find valid content in zip file"); - } - - return this.parseFileTree(fileOperation, tree.children); - } - - /** - * Check if a folder contains only attachment files (no markdown documents). - * - * @param node The file tree node to check - * @returns true if the folder contains only non-markdown files - */ - private isAttachmentFolder(node: FileTreeNode): boolean { - if (node.children.length === 0) { - return false; - } - - if (node.title.toLowerCase() === "attachments") { - return true; - } - - return node.children.every((child) => { - // If child has children, it's a folder - recurse to check its contents - if (child.children.length > 0) { - return this.isAttachmentFolder(child); - } - - // Child has no children - could be a file or empty folder - const ext = path.extname(child.name).toLowerCase(); - - // If no extension, it's likely an empty folder, not a file. - // Be conservative and don't treat it as an attachment. - if (!ext) { - return false; - } - - // It's a file with an extension - check if it's NOT markdown - return ext !== ".md" && ext !== ".markdown"; - }); - } - - /** - * Recursively process all files in a folder as attachments. - * - * @param node The file tree node to process - * @param output The structured import data to add attachments to - */ - private parseAttachmentFolder( - node: FileTreeNode, - output: StructuredImportData - ): void { - for (const child of node.children) { - if (child.children.length > 0) { - this.parseAttachmentFolder(child, output); - } else { - const id = randomUUID(); - output.attachments.push({ - id, - name: child.name, - path: child.path, - mimeType: mime.lookup(child.path) || "application/octet-stream", - buffer: () => fs.readFile(child.path), - }); - } - } - } - - /** - * Converts the file structure from zipAsFileTree into documents, - * collections, and attachments. - * - * @param fileOperation The file operation - * @param tree An array of FileTreeNode representing root files in the zip - * @returns A StructuredImportData object - */ - private async parseFileTree( - fileOperation: FileOperation, - tree: FileTreeNode[] - ): Promise { - const user = await User.findByPk(fileOperation.userId, { - rejectOnEmpty: true, - }); - const output: StructuredImportData = { - collections: [], - documents: [], - attachments: [], - }; - - const docPathToIdMap = new Map(); - - const parseNodeChildren = async ( - children: FileTreeNode[], - collectionId: string, - parentDocumentId?: string - ): Promise => { - for (const child of children) { - // special case for folders of attachments - detect by content - if (child.children.length > 0 && this.isAttachmentFolder(child)) { - this.parseAttachmentFolder(child, output); - continue; - } - - const id = randomUUID(); - - const { title, icon, text } = await sequelize.transaction( - async (transaction) => - documentImporter({ - mimeType: "text/markdown", - fileName: child.name, - content: - child.children.length > 0 - ? "" - : await fs.readFile(child.path, "utf8"), - user, - ctx: createContext({ user, transaction }), - }) - ); - - const existingDocumentIndex = output.documents.findIndex( - (doc) => - doc.title === title && - doc.collectionId === collectionId && - doc.parentDocumentId === parentDocumentId - ); - - const existingDocument = output.documents[existingDocumentIndex]; - - // When there is a file and a folder with the same name this handles - // the case by combining the two into one document with nested children - if (existingDocument) { - docPathToIdMap.set(child.path, existingDocument.id); - - if (existingDocument.text === "") { - output.documents[existingDocumentIndex].text = text; - } - - await parseNodeChildren( - child.children, - collectionId, - existingDocument.id - ); - } else { - docPathToIdMap.set(child.path, id); - - output.documents.push({ - id, - title, - icon, - text, - collectionId, - parentDocumentId, - path: child.path, - mimeType: "text/markdown", - }); - - await parseNodeChildren(child.children, collectionId, id); - } - } - }; - - // All nodes in the root level should be collections - for (const node of tree) { - if (node.children.length > 0) { - // Check if this is an attachments-only folder at root level - if (this.isAttachmentFolder(node)) { - this.parseAttachmentFolder(node, output); - continue; - } - - const collectionId = randomUUID(); - output.collections.push({ - id: collectionId, - name: node.title, - }); - await parseNodeChildren(node.children, collectionId); - } else { - Logger.debug("task", `Unhandled file in zip: ${node.path}`, { - fileOperationId: fileOperation.id, - }); - } - } - - for (const document of output.documents) { - // Check all of the attachments we've created against urls in the text - // and replace them out with attachment redirect urls before continuing. - for (const attachment of output.attachments) { - const encodedPath = encodeURI(attachment.path); - const attachmentFileName = path.basename(attachment.path); - const reference = `<<${attachment.id}>>`; - - // Pull the collection and subdirectory out of the path name, upload - // folders in an export are relative to the document itself. - // Support both legacy bucket names (uploads/public) and generic attachment folders. - let normalizedAttachmentPath = encodedPath - .replace( - new RegExp(`(.*)/${Buckets.uploads}/`), - `${Buckets.uploads}/` - ) - .replace(new RegExp(`(.*)/${Buckets.public}/`), `${Buckets.public}/`); - - // Also try normalizing to just the folder containing the attachment - // This handles arbitrary folder names like "attachments/" - const attachmentDir = path.basename(path.dirname(attachment.path)); - const genericNormalizedPath = `${attachmentDir}/${encodeURI(attachmentFileName)}`; - - document.text = document.text - .replace(new RegExp(escapeRegExp(encodedPath), "g"), reference) - .replace( - new RegExp(`\\.?/?${escapeRegExp(normalizedAttachmentPath)}`, "g"), - reference - ) - .replace( - new RegExp(`\\.?/?${escapeRegExp(genericNormalizedPath)}`, "g"), - reference - ); - - // Handle markdown links that reference attachments via a path rooted - // at an "attachments" folder, optionally prefixed with "./", e.g. - // ./attachments/foo.png or ./attachments/sub/foo.png. - const segments = attachment.path.split(path.sep); - const attachmentsIdx = segments.findIndex( - (seg) => seg.toLowerCase() === "attachments" - ); - if (attachmentsIdx >= 0) { - const relFromAttachments = segments.slice(attachmentsIdx).join("/"); - document.text = document.text.replace( - new RegExp( - `\\.?/?${escapeRegExp(encodeURI(relFromAttachments))}`, - "g" - ), - reference - ); - } - } - - const basePath = path.dirname(document.path); - - // check internal document links in the text and replace them with placeholders. - // When persisting, the placeholders will be replaced with the right urls. - const internalLinks = [ - ...document.text.matchAll(/\[[^\]]+\]\(([^)]+\.md)\)/g), - ]; - - internalLinks.forEach((match) => { - const referredDocPath = match[1]; - const normalizedDocPath = decodeURI( - path.normalize(`${basePath}/${referredDocPath}`) - ); - - const referredDocId = docPathToIdMap.get(normalizedDocPath); - if (referredDocId) { - document.text = document.text.replace( - referredDocPath, - `<<${referredDocId}>>` - ); - } - }); - } - - return output; - } -} diff --git a/server/queues/tasks/MarkdownAPIImportTask.test.ts b/server/queues/tasks/MarkdownAPIImportTask.test.ts new file mode 100644 index 0000000000..5767f340b3 --- /dev/null +++ b/server/queues/tasks/MarkdownAPIImportTask.test.ts @@ -0,0 +1,116 @@ +import { + rewriteAttachmentPaths, + rewriteInternalLinks, +} from "./MarkdownAPIImportTask"; + +describe("rewriteAttachmentPaths", () => { + it("replaces a direct encoded path with the placeholder", () => { + const out = rewriteAttachmentPaths( + "![alt](My%20Collection/attachments/foo.png)", + [{ id: "att-1", pathInZip: "My Collection/attachments/foo.png" }] + ); + expect(out).toBe("![alt](<>)"); + }); + + it("normalizes legacy `uploads/` bucket layout", () => { + const out = rewriteAttachmentPaths("![x](./uploads/abc/file.png)", [ + { + id: "att-2", + pathInZip: "Some Collection/uploads/abc/file.png", + }, + ]); + expect(out).toBe("![x](<>)"); + }); + + it("normalizes legacy `public/` bucket layout", () => { + const out = rewriteAttachmentPaths("![x](./public/abc/file.png)", [ + { + id: "att-3", + pathInZip: "Some Collection/public/abc/file.png", + }, + ]); + expect(out).toBe("![x](<>)"); + }); + + it("handles arbitrary folder names like 'attachments/'", () => { + const out = rewriteAttachmentPaths("![x](./attachments/foo.png)", [ + { id: "att-4", pathInZip: "Collection/attachments/foo.png" }, + ]); + expect(out).toBe("![x](<>)"); + }); + + it("matches nested attachments folders", () => { + const out = rewriteAttachmentPaths("![x](./attachments/sub/bar.png)", [ + { + id: "att-5", + pathInZip: "Collection/Doc/attachments/sub/bar.png", + }, + ]); + expect(out).toBe("![x](<>)"); + }); + + it("substitutes multiple references in the same document", () => { + const out = rewriteAttachmentPaths( + "![a](./attachments/a.png) and ![b](./attachments/b.png)", + [ + { id: "id-a", pathInZip: "C/attachments/a.png" }, + { id: "id-b", pathInZip: "C/attachments/b.png" }, + ] + ); + expect(out).toBe("![a](<>) and ![b](<>)"); + }); + + it("is a no-op when no attachments match", () => { + const out = rewriteAttachmentPaths("![x](https://example.com/a.png)", [ + { id: "id-a", pathInZip: "C/attachments/a.png" }, + ]); + expect(out).toBe("![x](https://example.com/a.png)"); + }); +}); + +describe("rewriteInternalLinks", () => { + it("rewrites a sibling .md link to a placeholder", () => { + const out = rewriteInternalLinks( + "see [other](./other.md)", + "Collection/parent.md", + { "Collection/other.md": "doc-1" } + ); + expect(out).toBe("see [other](<>)"); + }); + + it("rewrites a nested .md link", () => { + const out = rewriteInternalLinks( + "see [child](./sub/child.md)", + "Collection/parent.md", + { "Collection/sub/child.md": "doc-2" } + ); + expect(out).toBe("see [child](<>)"); + }); + + it("leaves unresolved .md links untouched", () => { + const out = rewriteInternalLinks( + "see [missing](./missing.md)", + "Collection/parent.md", + {} + ); + expect(out).toBe("see [missing](./missing.md)"); + }); + + it("ignores non-md links", () => { + const out = rewriteInternalLinks( + "see [site](https://example.com)", + "Collection/parent.md", + { "Collection/parent.md": "doc-self" } + ); + expect(out).toBe("see [site](https://example.com)"); + }); + + it("decodes encoded path segments before lookup", () => { + const out = rewriteInternalLinks( + "see [other](./My%20Doc.md)", + "Collection/parent.md", + { "Collection/My Doc.md": "doc-3" } + ); + expect(out).toBe("see [other](<>)"); + }); +}); diff --git a/server/queues/tasks/MarkdownAPIImportTask.ts b/server/queues/tasks/MarkdownAPIImportTask.ts new file mode 100644 index 0000000000..5221fa77bf --- /dev/null +++ b/server/queues/tasks/MarkdownAPIImportTask.ts @@ -0,0 +1,672 @@ +import path from "node:path"; +import { randomUUID } from "node:crypto"; +import { escapeRegExp } from "es-toolkit/compat"; +import fs from "fs-extra"; +import mime from "mime-types"; +import { UniqueConstraintError } from "sequelize"; +import tmp from "tmp"; +import type { + ImportTaskInput, + ImportTaskOutput, + MarkdownAttachmentManifestItem, + MarkdownPageImportTaskInputItem, +} from "@shared/schema"; +import type { IntegrationService, ProsemirrorDoc } from "@shared/types"; +import { AttachmentPreset } from "@shared/types"; +import attachmentCreator from "@server/commands/attachmentCreator"; +import { createContext } from "@server/context"; +import env from "@server/env"; +import Logger from "@server/logging/Logger"; +import type { ImportTask } from "@server/models"; +import { Attachment } from "@server/models"; +import { Buckets } from "@server/models/helpers/AttachmentHelper"; +import { ProsemirrorHelper } from "@server/models/helpers/ProsemirrorHelper"; +import { sequelize } from "@server/storage/database"; +import FileStorage from "@server/storage/files"; +import type { FileTreeNode } from "@server/utils/ImportHelper"; +import ImportHelper from "@server/utils/ImportHelper"; +import ZipHelper from "@server/utils/ZipHelper"; +import type { ProcessOutput } from "./APIImportTask"; +import APIImportTask from "./APIImportTask"; +import { DocumentConverter } from "@server/utils/DocumentConverter"; + +type Markdown = IntegrationService.Markdown; + +interface ExtractedZip { + dirPath: string; + cleanup: () => Promise; +} + +interface DiscoveredDocument { + id: string; + title: string; + pathInZip: string; + collectionId: string; + parentDocumentId?: string; + markdownText: string; + children: DiscoveredDocument[]; +} + +interface DiscoveredCollection { + id: string; + title: string; + children: DiscoveredDocument[]; +} + +interface AttachmentRef { + id: string; + pathInZip: string; +} + +/** + * Rewrites local attachment paths in markdown text into `<>` + * placeholders. Supports legacy bucket layouts (`uploads/`, `public/`), + * arbitrary nested folder names, and `./attachments/...` rooted paths. Both + * encoded and unencoded path forms are matched. + * + * Exported for tests; not part of the module's public surface. + * + * @param markdown The raw markdown text from a single document. + * @param attachments Attachment manifest entries to substitute. + * @returns Markdown text with local paths replaced by `<>` references. + */ +export function rewriteAttachmentPaths( + markdown: string, + attachments: AttachmentRef[] +): string { + let text = markdown; + + for (const attachment of attachments) { + const encodedPath = encodeURI(attachment.pathInZip); + const attachmentFileName = path.basename(attachment.pathInZip); + const reference = `<<${attachment.id}>>`; + + const normalizedAttachmentPath = encodedPath + .replace(new RegExp(`(.*)/${Buckets.uploads}/`), `${Buckets.uploads}/`) + .replace(new RegExp(`(.*)/${Buckets.public}/`), `${Buckets.public}/`); + + const attachmentDir = path.basename(path.dirname(attachment.pathInZip)); + const genericNormalizedPath = `${attachmentDir}/${encodeURI(attachmentFileName)}`; + + text = text + .replace(new RegExp(escapeRegExp(encodedPath), "g"), reference) + .replace( + new RegExp(`\\.?/?${escapeRegExp(normalizedAttachmentPath)}`, "g"), + reference + ); + + const segments = attachment.pathInZip.split(path.sep); + const attachmentsIdx = segments.findIndex( + (seg) => seg.toLowerCase() === "attachments" + ); + if (attachmentsIdx >= 0) { + const relFromAttachments = segments.slice(attachmentsIdx).join("/"); + text = text.replace( + new RegExp(`\\.?/?${escapeRegExp(encodeURI(relFromAttachments))}`, "g"), + reference + ); + } + + text = text.replace( + new RegExp(`\\.?/?${escapeRegExp(genericNormalizedPath)}`, "g"), + reference + ); + } + + return text; +} + +/** + * Rewrites internal markdown links (`[label](./relative.md)`) into + * `<>` placeholders, resolved against a path → id map built from + * the zip's full document tree. + * + * Exported for tests; not part of the module's public surface. + * + * @param markdown The raw markdown text from a single document. + * @param documentPath Zip-relative path of the document being rewritten + * (e.g. `Collection/parent.md`); used as the base for + * resolving relative link targets against docMap keys. + * @param docMap Map of document path (as it appeared in the zip) to its + * pre-assigned externalId. + * @returns Markdown text with internal `.md` link targets replaced by + * `<>` references. + */ +export function rewriteInternalLinks( + markdown: string, + documentPath: string, + docMap: Record +): string { + const basePath = path.dirname(documentPath); + const internalLinks = [...markdown.matchAll(/\[[^\]]+\]\(([^)]+\.md)\)/g)]; + + let text = markdown; + for (const match of internalLinks) { + const referredDocPath = match[1]; + const normalizedDocPath = decodeURI( + path.normalize(`${basePath}/${referredDocPath}`) + ); + + const referredDocId = docMap[normalizedDocPath]; + if (referredDocId) { + text = text.replace(referredDocPath, `<<${referredDocId}>>`); + } + } + + return text; +} + +export default class MarkdownAPIImportTask extends APIImportTask { + protected shouldUploadAttachmentsPerPage(): boolean { + return false; + } + + protected async scheduleNextTask(importTask: ImportTask) { + await new MarkdownAPIImportTask().schedule({ importTaskId: importTask.id }); + } + + protected async onAllTasksCompleted( + lastImportTask: ImportTask + ): Promise { + const scratch = lastImportTask.import.scratch; + if (!scratch?.storageKey || !scratch.manifest?.length) { + return; + } + + const { dirPath, cleanup } = await this.downloadAndExtract( + scratch.storageKey + ); + + try { + const createdBy = lastImportTask.import.createdBy; + + for (const item of scratch.manifest) { + const filePath = path.join(dirPath, item.pathInZip); + let buffer: Buffer; + try { + buffer = await fs.readFile(filePath); + } catch (err) { + Logger.warn( + `Markdown import attachment missing in zip, skipping: ${item.pathInZip}`, + err instanceof Error ? err : undefined + ); + continue; + } + + try { + await sequelize.transaction(async (transaction) => + attachmentCreator({ + source: "import", + preset: AttachmentPreset.DocumentAttachment, + id: item.id, + name: item.name, + type: item.mimeType, + buffer, + user: createdBy, + ctx: createContext({ user: createdBy, transaction }), + fetchOptions: { + timeout: env.FILE_STORAGE_IMPORT_TIMEOUT, + }, + }) + ); + } catch (err) { + // Each attachment commits in its own transaction, so a retry of + // this hook can re-encounter ids that already landed. Treat the + // unique-id collision as a no-op so the import remains resumable. + if (err instanceof UniqueConstraintError) { + continue; + } + throw err; + } + } + } finally { + await cleanup(); + } + } + + protected async processBootstrap( + importTask: ImportTask + ): Promise> { + const storageKey = importTask.import.scratch?.storageKey; + if (!storageKey) { + throw new Error("Markdown import is missing scratch.storageKey"); + } + + const { dirPath, cleanup } = await this.downloadAndExtract(storageKey); + + try { + const tree = await ImportHelper.toFileTree(dirPath); + if (!tree) { + throw new Error("Could not find valid content in zip file"); + } + + const collections: DiscoveredCollection[] = []; + const manifest: MarkdownAttachmentManifestItem[] = []; + + for (const node of tree.children) { + if (node.children.length === 0) { + Logger.debug("task", `Unhandled file in zip: ${node.path}`, { + importTaskId: importTask.id, + }); + continue; + } + + if (this.isAttachmentFolder(node)) { + this.collectAttachments(node, manifest, dirPath); + continue; + } + + const collection: DiscoveredCollection = { + id: randomUUID(), + title: node.title, + children: [], + }; + collections.push(collection); + + await this.collectDocumentsAndAttachments({ + children: node.children, + collectionId: collection.id, + out: collection.children, + manifest, + extractionRoot: dirPath, + }); + } + + // Build docMap (pathInZip -> externalId) for internal-link resolution. + // Walk the full document tree to collect every doc id, since internal + // markdown links can target any document regardless of depth. + const docMap: Record = {}; + const collectDocMap = (docs: DiscoveredDocument[]) => { + for (const d of docs) { + docMap[d.pathInZip] = d.id; + collectDocMap(d.children); + } + }; + for (const c of collections) { + collectDocMap(c.children); + } + + // Replace (not append) anything past the create-time placeholder with + // the freshly discovered collections so a retried bootstrap doesn't + // accumulate duplicate entries with fresh UUIDs from a previous + // partial run. ImportsProcessor's persistence pass treats these as + // collections. + const associatedImport = importTask.import; + const placeholder = associatedImport.input[0]; + associatedImport.input = [ + placeholder, + ...collections.map((c) => ({ + externalId: c.id, + permission: placeholder.permission, + })), + ]; + associatedImport.scratch = { storageKey, manifest }; + await associatedImport.save(); + + // Append collection placeholder items so ImportsProcessor iterates + // them during the bootstrap row (the earliest createdAt) — that + // guarantees collections land in the DB before any per-page document + // references them. + const collectionInputItems: MarkdownPageImportTaskInputItem[] = + collections.map((c) => ({ + externalId: c.id, + title: c.title, + path: c.title, + markdownText: "", + attachmentMap: [], + docMap: {}, + })); + + importTask.input = [importTask.input[0], ...collectionInputItems]; + + const collectionOutputs: ImportTaskOutput = collections.map((c) => ({ + externalId: c.id, + title: c.title, + content: ProsemirrorHelper.getEmptyDocument() as ProsemirrorDoc, + })); + + // First wave of document tasks: only top-level docs in each collection. + // Each carries its descendants in `children` and the per-page handler + // re-emits them as the next wave of childTasksInput, producing a strict + // depth-ordered cascade of ImportTask rows so parent FKs are always + // satisfied at child-doc creation time. + const childTasksInput: ImportTaskInput = collections.flatMap( + (c) => c.children.map((d) => this.toPageInput(d, manifest, docMap)) + ); + + return { taskOutput: collectionOutputs, childTasksInput }; + } finally { + await cleanup(); + } + } + + /** + * Converts a discovered document subtree into a per-page task input, + * recursively packing the doc's descendants into the `children` field so + * each tree-depth runs as its own task wave. + * + * @param doc The discovered document, including its descendants. + * @param manifest The full attachment manifest (used for per-page refs). + * @param docMap Path → externalId map for internal link rewriting. + * @returns A self-contained per-page task input. + */ + private toPageInput( + doc: DiscoveredDocument, + manifest: MarkdownAttachmentManifestItem[], + docMap: Record + ): MarkdownPageImportTaskInputItem { + return { + externalId: doc.id, + parentExternalId: doc.parentDocumentId, + collectionExternalId: doc.collectionId, + title: doc.title, + path: doc.pathInZip, + markdownText: doc.markdownText, + attachmentMap: this.attachmentsReferencedBy(doc.markdownText, manifest), + docMap, + children: doc.children.length + ? doc.children.map((c) => this.toPageInput(c, manifest, docMap)) + : undefined, + }; + } + + protected async processPage( + importTask: ImportTask + ): Promise> { + const taskOutput: ImportTaskOutput = []; + const childTasksInput: MarkdownPageImportTaskInputItem[] = []; + + const items = importTask.input as MarkdownPageImportTaskInputItem[]; + for (const item of items) { + // Empty markdown short-circuits — used by collection placeholders so + // ImportsProcessor sees their externalId paired with empty content and + // builds a Collection rather than a Document. (Currently collections + // are persisted via the bootstrap task itself, so this branch is only + // a defensive fallback.) + if (!item.markdownText) { + taskOutput.push({ + externalId: item.externalId, + title: item.title, + content: ProsemirrorHelper.getEmptyDocument() as ProsemirrorDoc, + }); + } else { + const transformedMarkdown = this.rewriteMarkdown(item); + const { doc, title, icon } = await DocumentConverter.convert( + transformedMarkdown, + path.basename(item.path), + "text/markdown" + ); + + taskOutput.push({ + externalId: item.externalId, + title: title || item.title, + icon, + content: doc.toJSON() as ProsemirrorDoc, + }); + } + + // Cascade this doc's direct descendants as the next task wave. Their + // ImportTask rows will be created after the current one returns, so + // their createdAt is strictly later — guaranteeing parent-before-child + // FK ordering during ImportsProcessor's persistence pass. + if (item.children?.length) { + childTasksInput.push(...item.children); + } + } + + return { taskOutput, childTasksInput }; + } + + /** + * Pre-rewrites a page's markdown text. Internal `.md` links become mention + * markdown so the editor parses them as Document mentions. Attachment paths + * are first reduced to `<>` placeholders by the shared rewriter, then + * — distinct from the prosemirror-tree walk we used to do — substituted + * with their final attachment redirect URLs in the markdown text. Doing + * the resolution at the text layer avoids markdown-it parsing `<>` as + * an angle-bracket-wrapped URL (which produced broken image src attrs). + * + * @param page The per-page task input. + * @returns Rewritten markdown text ready for DocumentConverter. + */ + private rewriteMarkdown(page: MarkdownPageImportTaskInputItem): string { + let text = rewriteInternalLinks(page.markdownText, page.path, page.docMap); + + // Convert `[label](<>)` links from rewriteInternalLinks into mention + // markdown the editor recognises: `@[label](mention:///document/)`. + text = text.replace( + /\[([^\]]+)\]\(<<([^>]+)>>\)/g, + (_full, label: string, externalId: string) => + `@[${label}](mention://${randomUUID()}/document/${externalId})` + ); + + text = rewriteAttachmentPaths( + text, + page.attachmentMap.map((m) => ({ id: m.id, pathInZip: m.pathInZip })) + ); + + // Resolve remaining `<>` placeholders to attachment redirect URLs. + text = text.replace(/<<([^>]+)>>/g, (_full, id: string) => + Attachment.getRedirectUrl(id) + ); + + return text; + } + + /** + * Returns the subset of the full manifest that is referenced anywhere in + * the given markdown text. Used to bound the per-page task input size. + * + * @param markdown Raw markdown text for a single document. + * @param manifest The full attachment manifest from the bootstrap phase. + * @returns Manifest entries that appear (by filename) in the markdown. + */ + private attachmentsReferencedBy( + markdown: string, + manifest: MarkdownAttachmentManifestItem[] + ): MarkdownAttachmentManifestItem[] { + return manifest.filter((item) => { + const fileName = path.basename(item.pathInZip); + return ( + markdown.includes(fileName) || markdown.includes(encodeURI(fileName)) + ); + }); + } + + /** + * Detects folders containing only attachments (no markdown documents). + * Recursively considers nested folders; mirrors the legacy heuristic. + * + * @param node FileTreeNode to inspect. + * @returns true when the folder appears to hold only attachments. + */ + private isAttachmentFolder(node: FileTreeNode): boolean { + if (node.children.length === 0) { + return false; + } + if (node.title.toLowerCase() === "attachments") { + return true; + } + return node.children.every((child) => { + if (child.children.length > 0) { + return this.isAttachmentFolder(child); + } + const ext = path.extname(child.name).toLowerCase(); + if (!ext) { + return false; + } + return ext !== ".md" && ext !== ".markdown"; + }); + } + + /** + * Recursively collects all files under an attachment-only folder into the + * manifest. `pathInZip` is stored as a path relative to the extraction + * root so it can be resolved again after the zip is re-extracted during + * the completion phase (which lands in a fresh tmp dir). + * + * @param node Attachment-folder FileTreeNode. + * @param manifest Manifest array to push entries into. + * @param extractionRoot Absolute path to the zip extraction root. + */ + private collectAttachments( + node: FileTreeNode, + manifest: MarkdownAttachmentManifestItem[], + extractionRoot: string + ): void { + for (const child of node.children) { + if (child.children.length > 0) { + this.collectAttachments(child, manifest, extractionRoot); + continue; + } + manifest.push({ + id: randomUUID(), + name: child.name, + pathInZip: path.relative(extractionRoot, child.path), + mimeType: mime.lookup(child.path) || "application/octet-stream", + }); + } + } + + /** + * Walks a collection subtree and gathers documents (markdown files) and + * loose attachments. Documents are appended to `out` as a tree — each + * entry's `children` holds its direct descendants. This is the shape the + * per-page task cascade consumes. + * + * @param children FileTreeNode children of the current folder. + * @param collectionId Pre-assigned id of the enclosing collection. + * @param parentDocumentId Optional parent document id when nested. + * @param out Sibling accumulator to push discovered documents into. + * @param manifest Attachment manifest accumulator. + * @returns Promise that resolves when the subtree has been processed. + */ + private async collectDocumentsAndAttachments({ + children, + collectionId, + parentDocumentId, + out, + manifest, + extractionRoot, + }: { + children: FileTreeNode[]; + collectionId: string; + parentDocumentId?: string; + out: DiscoveredDocument[]; + manifest: MarkdownAttachmentManifestItem[]; + extractionRoot: string; + }): Promise { + for (const child of children) { + if (child.children.length > 0 && this.isAttachmentFolder(child)) { + this.collectAttachments(child, manifest, extractionRoot); + continue; + } + + const ext = path.extname(child.name).toLowerCase(); + const isMarkdown = ext === ".md" || ext === ".markdown"; + const isFolder = child.children.length > 0; + + if (!isMarkdown && !isFolder) { + manifest.push({ + id: randomUUID(), + name: child.name, + pathInZip: path.relative(extractionRoot, child.path), + mimeType: mime.lookup(child.path) || "application/octet-stream", + }); + continue; + } + + const id = randomUUID(); + const markdownText = isFolder + ? "" + : await fs.readFile(child.path, "utf8"); + + // Folder-and-file with the same title (a "name.md" alongside a "name/" + // directory) is merged onto a single document: the folder body picks up + // the file's markdown text, and the folder's contents become children. + const sibling = out.find((d) => d.title === child.title); + + if (sibling) { + if (sibling.markdownText === "" && markdownText) { + sibling.markdownText = markdownText; + } + if (isFolder) { + await this.collectDocumentsAndAttachments({ + children: child.children, + collectionId, + parentDocumentId: sibling.id, + out: sibling.children, + manifest, + extractionRoot, + }); + } + continue; + } + + const node: DiscoveredDocument = { + id, + title: child.title, + pathInZip: path.relative(extractionRoot, child.path), + collectionId, + parentDocumentId, + markdownText, + children: [], + }; + out.push(node); + + if (isFolder) { + await this.collectDocumentsAndAttachments({ + children: child.children, + collectionId, + parentDocumentId: id, + out: node.children, + manifest, + extractionRoot, + }); + } + } + } + + /** + * Downloads the zip from object storage and extracts it into a temporary + * directory. + * + * @param storageKey Storage key for the uploaded zip. + * @returns The temp dir path and a cleanup callback. Caller must invoke + * cleanup() once finished. + */ + private async downloadAndExtract(storageKey: string): Promise { + const handle = await FileStorage.getFileHandle(storageKey); + + let dirPath: string | undefined; + try { + dirPath = await new Promise((resolve, reject) => { + tmp.dir({ unsafeCleanup: true }, (err, tmpDir) => { + if (err) { + reject(err); + return; + } + resolve(tmpDir); + }); + }); + + await ZipHelper.extract(handle.path, dirPath); + + return { + dirPath, + cleanup: async () => { + await fs + .rm(dirPath!, { recursive: true, force: true }) + .catch(() => {}); + await handle.cleanup().catch(() => {}); + }, + }; + } catch (err) { + if (dirPath) { + await fs.rm(dirPath, { recursive: true, force: true }).catch(() => {}); + } + await handle.cleanup().catch(() => {}); + throw err; + } + } +} diff --git a/server/routes/api/collections/schema.ts b/server/routes/api/collections/schema.ts index 10f01debcb..7a746bfd77 100644 --- a/server/routes/api/collections/schema.ts +++ b/server/routes/api/collections/schema.ts @@ -76,9 +76,11 @@ export const CollectionsImportSchema = BaseSchema.extend({ .nullish() .transform((val) => (isUndefined(val) ? null : val)), attachmentId: z.uuid(), + // Markdown zip imports now run through `imports.create` → + // MarkdownAPIImportTask, so only JSON is accepted here. format: z - .enum(FileOperationFormat) - .prefault(FileOperationFormat.MarkdownZip), + .literal(FileOperationFormat.JSON) + .prefault(FileOperationFormat.JSON), }), }); diff --git a/server/routes/api/imports/imports.ts b/server/routes/api/imports/imports.ts index 58d5d58f59..581ac9beab 100644 --- a/server/routes/api/imports/imports.ts +++ b/server/routes/api/imports/imports.ts @@ -1,15 +1,16 @@ import Router from "koa-router"; +import { randomUUID } from "node:crypto"; import { truncate } from "es-toolkit/compat"; import type { WhereOptions } from "sequelize"; import type { IntegrationType } from "@shared/types"; -import { ImportState, UserRole } from "@shared/types"; +import { ImportState, IntegrationService, UserRole } from "@shared/types"; import { ImportValidation } from "@shared/validations"; import { UnprocessableEntityError } from "@server/errors"; import auth from "@server/middlewares/authentication"; import { rateLimiter } from "@server/middlewares/rateLimiter"; import { transaction } from "@server/middlewares/transaction"; import validate from "@server/middlewares/validate"; -import { Integration } from "@server/models"; +import { Attachment, Integration } from "@server/models"; import Import from "@server/models/Import"; import { authorize } from "@server/policies"; import { presentImport, presentPolicies } from "@server/presenters"; @@ -27,7 +28,7 @@ router.post( validate(T.ImportsCreateSchema), transaction(), async (ctx: APIContext) => { - const { integrationId, service, input } = ctx.input.body; + const body = ctx.input.body; const { user } = ctx.state.auth; authorize(user, "createImport", user.team); @@ -47,9 +48,41 @@ router.post( throw UnprocessableEntityError("An import is already in progress"); } + if (body.service === IntegrationService.Markdown) { + const attachment = await Attachment.findByPk(body.attachmentId, { + rejectOnEmpty: true, + }); + authorize(user, "read", attachment); + + const importModel = await Import.createWithCtx(ctx, { + name: truncate(attachment.name, { + length: ImportValidation.maxNameLength, + }), + service: IntegrationService.Markdown, + state: ImportState.Created, + input: [ + { + externalId: randomUUID(), + permission: body.permission, + }, + ], + scratch: { storageKey: attachment.key }, + integrationId: null, + createdById: user.id, + teamId: user.teamId, + }); + importModel.createdBy = user; + + ctx.body = { + data: presentImport(importModel), + policies: presentPolicies(user, [importModel]), + }; + return; + } + const integration = await Integration.findByPk< Integration - >(integrationId, { + >(body.integrationId, { rejectOnEmpty: true, }); authorize(user, "read", integration); @@ -58,10 +91,10 @@ router.post( const importModel = await Import.createWithCtx(ctx, { name: truncate(name, { length: ImportValidation.maxNameLength }), - service, + service: body.service, state: ImportState.Created, - input, - integrationId, + input: body.input, + integrationId: body.integrationId, createdById: user.id, teamId: user.teamId, }); diff --git a/server/routes/api/imports/schema.ts b/server/routes/api/imports/schema.ts index 522695c8c2..e94b82801e 100644 --- a/server/routes/api/imports/schema.ts +++ b/server/routes/api/imports/schema.ts @@ -1,6 +1,7 @@ import { z } from "zod"; import { NotionImportInputItemSchema } from "@shared/schema"; import { + CollectionPermission, ImportableIntegrationService, IntegrationService, } from "@shared/types"; @@ -37,6 +38,11 @@ export const ImportsCreateSchema = BaseSchema.extend({ service: z.literal(IntegrationService.Notion), input: z.array(NotionImportInputItemSchema), }), + z.object({ + service: z.literal(IntegrationService.Markdown), + attachmentId: z.uuid(), + permission: z.enum(CollectionPermission).optional(), + }), ]), }); diff --git a/shared/schema.ts b/shared/schema.ts index d1e3953559..fdf08c0ef1 100644 --- a/shared/schema.ts +++ b/shared/schema.ts @@ -20,8 +20,20 @@ export const NotionImportInputItemSchema = BaseImportInputItemSchema.extend({ export type NotionImportInput = z.infer[]; +export const MarkdownImportInputItemSchema = BaseImportInputItemSchema.extend({ + externalId: z.string(), +}); + +export type MarkdownImportInput = z.infer< + typeof MarkdownImportInputItemSchema +>[]; + export type ImportInput = - T extends IntegrationService.Notion ? NotionImportInput : BaseImportInput; + T extends IntegrationService.Notion + ? NotionImportInput + : T extends IntegrationService.Markdown + ? MarkdownImportInput + : BaseImportInput; export const BaseImportTaskInputItemSchema = z.object({ externalId: z.string(), @@ -42,16 +54,88 @@ export type NotionImportTaskInput = z.infer< typeof NotionImportTaskInputItemSchema >[]; +/** + * Manifest entry describing a single attachment discovered during the + * Markdown zip bootstrap phase. The `id` is a pre-assigned UUID used both + * as the attachment node id in per-page prosemirror output and as the + * Attachment row id created during completion. + */ +export const MarkdownAttachmentManifestItemSchema = z.object({ + id: z.uuid(), + name: z.string(), + mimeType: z.string(), + pathInZip: z.string(), +}); + +export type MarkdownAttachmentManifestItem = z.infer< + typeof MarkdownAttachmentManifestItemSchema +>; + +/** + * Markdown importer scratch state. `storageKey` is set at import creation + * (it's the only durable handle on the uploaded zip). `manifest` is added + * by the bootstrap phase so the completion phase can re-download the zip + * and create Attachment rows without re-walking the tree. + */ +export interface MarkdownImportScratch { + storageKey: string; + manifest?: MarkdownAttachmentManifestItem[]; +} + +/** + * Per-importer scratch shape stored on `Import.scratch`. Holds cross-phase + * state that the importer needs between bootstrap and completion but that + * isn't part of any single task's input. Cleared when the import flips to + * `Processed`. + */ +export type ImportScratch = + T extends IntegrationService.Markdown ? MarkdownImportScratch : never; + +/** + * Per-page task input. Generated by the bootstrap task and consumed by + * subsequent MarkdownAPIImportTask runs. `children` carries this document's + * direct descendants so that each level of the document tree is scheduled + * as a separate task wave; this preserves parent-before-child ordering + * during persistence (createdAt of child tasks is strictly later than + * parents'). The type is defined as a TypeScript interface rather than via + * z.infer because it is only consumed internally — never validated at an + * API boundary — and zod's recursive-schema ergonomics aren't worth the + * cost here. + */ +export interface MarkdownPageImportTaskInputItem { + externalId: string; + parentExternalId?: string; + collectionExternalId?: string; + title: string; + path: string; + markdownText: string; + attachmentMap: MarkdownAttachmentManifestItem[]; + docMap: Record; + children?: MarkdownPageImportTaskInputItem[]; +} + +/** + * Markdown import task input — a bootstrap row carrying only the base + * placeholder item (the zip's `storageKey` lives on `Import.scratch`), or a + * page row carrying per-document content. + */ +export type MarkdownImportTaskInput = ( + | BaseImportTaskInput[number] + | MarkdownPageImportTaskInputItem +)[]; + export type ImportTaskInput = T extends IntegrationService.Notion ? NotionImportTaskInput - : BaseImportTaskInput; + : T extends IntegrationService.Markdown + ? MarkdownImportTaskInput + : BaseImportTaskInput; // No reason to be here except for co-location with import task input. export type ImportTaskOutput = { externalId: string; title: string; - emoji?: string; + icon?: string; author?: string; content: ProsemirrorDoc; createdAt?: Date; diff --git a/shared/types.ts b/shared/types.ts index 39860c05dd..4b8fc5dc31 100644 --- a/shared/types.ts +++ b/shared/types.ts @@ -97,6 +97,21 @@ export enum ImportTaskState { Canceled = "canceled", } +/** + * Classifies the work an `ImportTask` row represents. Set when the task is + * created and used by `APIImportTask` to dispatch to the right handler. + * + * - `Bootstrap` runs once per import on a worker that owns the source + * artifact (e.g. extracts a zip, discovers structure, schedules child + * tasks). Subclasses without a bootstrap step never produce these. + * - `Page` is the per-document work that the bootstrap (or `ImportsProcessor` + * for sources without a bootstrap, like Notion) fans out into. + */ +export enum ImportTaskPhase { + Bootstrap = "bootstrap", + Page = "page", +} + export enum MentionType { User = "user", Document = "document", @@ -151,15 +166,17 @@ export enum IntegrationService { Linear = "linear", Figma = "figma", Notion = "notion", + Markdown = "markdown", } export type ImportableIntegrationService = Extract< IntegrationService, - IntegrationService.Notion + IntegrationService.Notion | IntegrationService.Markdown >; export const ImportableIntegrationService = { Notion: IntegrationService.Notion, + Markdown: IntegrationService.Markdown, } as const; export type IssueTrackerIntegrationService = Extract<