perf: Move Markdown importer to zip stream (#12372)

* perf: Move Markdown importer to zip stream

* refactor

* refactor: Extract zip walk + tree builder into ZipHelper

Adds `ZipHelper.walk` and `ZipHelper.toFileTree` so other importers can
stream zip contents without extracting to disk. Tree construction uses
an O(1) path → node map; `./`-prefixed entries are normalized, while
dotfiles, `__MACOSX`, and `..` segments are filtered.

* PR feedback
This commit is contained in:
Tom Moor
2026-05-18 18:32:58 -04:00
committed by GitHub
parent 5d32db86cf
commit ee5164290d
3 changed files with 395 additions and 109 deletions
+86 -108
View File
@@ -1,10 +1,9 @@
import path from "node:path";
import { randomUUID } from "node:crypto";
import { escapeRegExp } from "es-toolkit/compat";
import fs from "fs-extra";
import mime from "mime-types";
import { UniqueConstraintError } from "sequelize";
import tmp from "tmp";
import { DocumentValidation } from "@shared/validations";
import type {
ImportTaskInput,
ImportTaskOutput,
@@ -19,12 +18,13 @@ import env from "@server/env";
import Logger from "@server/logging/Logger";
import type { ImportTask } from "@server/models";
import { Attachment } from "@server/models";
import { Buckets } from "@server/models/helpers/AttachmentHelper";
import AttachmentHelper, {
Buckets,
} from "@server/models/helpers/AttachmentHelper";
import { ProsemirrorHelper } from "@server/models/helpers/ProsemirrorHelper";
import { sequelize } from "@server/storage/database";
import FileStorage from "@server/storage/files";
import type { FileTreeNode } from "@server/utils/ImportHelper";
import ImportHelper from "@server/utils/ImportHelper";
import type { ZipTreeNode } from "@server/utils/ZipHelper";
import ZipHelper from "@server/utils/ZipHelper";
import type { ProcessOutput } from "./APIImportTask";
import APIImportTask from "./APIImportTask";
@@ -32,11 +32,6 @@ import { DocumentConverter } from "@server/utils/DocumentConverter";
type Markdown = IntegrationService.Markdown;
interface ExtractedZip {
dirPath: string;
cleanup: () => Promise<void>;
}
interface DiscoveredDocument {
id: string;
title: string;
@@ -173,25 +168,31 @@ export default class MarkdownAPIImportTask extends APIImportTask<Markdown> {
return;
}
const { dirPath, cleanup } = await this.downloadAndExtract(
scratch.storageKey
);
const handle = await FileStorage.getFileHandle(scratch.storageKey);
try {
const createdBy = lastImportTask.import.createdBy;
const manifestByPath = new Map<string, MarkdownAttachmentManifestItem>(
scratch.manifest.map((item) => [item.pathInZip, item])
);
const maxAttachmentSize = AttachmentHelper.presetToMaxUploadSize(
AttachmentPreset.DocumentAttachment
);
const seen = new Set<string>();
for (const item of scratch.manifest) {
const filePath = path.join(dirPath, item.pathInZip);
let buffer: Buffer;
try {
buffer = await fs.readFile(filePath);
} catch (err) {
Logger.warn(
`Markdown import attachment missing in zip, skipping: ${item.pathInZip}`,
err instanceof Error ? err : undefined
);
continue;
await ZipHelper.walk(handle.path, async (entry) => {
if (entry.isDirectory) {
return;
}
// Normalize to match the bootstrap-phase pathInZip (segments rejoined
// with `/`, no leading `./` or empty segments).
const normalized = entry.fileName.split("/").filter(Boolean).join("/");
const item = manifestByPath.get(normalized);
if (!item) {
return;
}
seen.add(item.pathInZip);
const buffer = await entry.readBuffer(maxAttachmentSize);
try {
await sequelize.transaction(async (transaction) =>
@@ -214,13 +215,21 @@ export default class MarkdownAPIImportTask extends APIImportTask<Markdown> {
// this hook can re-encounter ids that already landed. Treat the
// unique-id collision as a no-op so the import remains resumable.
if (err instanceof UniqueConstraintError) {
continue;
return;
}
throw err;
}
});
for (const item of scratch.manifest) {
if (!seen.has(item.pathInZip)) {
Logger.warn(
`Markdown import attachment missing in zip, skipping: ${item.pathInZip}`
);
}
}
} finally {
await cleanup();
await handle.cleanup().catch(() => {});
}
}
@@ -232,11 +241,28 @@ export default class MarkdownAPIImportTask extends APIImportTask<Markdown> {
throw new Error("Markdown import is missing scratch.storageKey");
}
const { dirPath, cleanup } = await this.downloadAndExtract(storageKey);
const handle = await FileStorage.getFileHandle(storageKey);
try {
const tree = await ImportHelper.toFileTree(dirPath);
if (!tree) {
// Side map of pre-loaded markdown text keyed by tree node identity.
// ZipHelper's tree carries only metadata; we capture text during the
// single walk so the downstream pass doesn't have to re-open the zip.
const markdownByNode = new Map<ZipTreeNode, string>();
const tree = await ZipHelper.toFileTree(
handle.path,
async (node, entry) => {
const ext = path.extname(node.name).toLowerCase();
if (ext === ".md" || ext === ".markdown") {
const buffer = await entry.readBuffer(
DocumentValidation.maxStateLength
);
markdownByNode.set(node, buffer.toString("utf8"));
}
}
);
if (tree.children.length === 0) {
throw new Error("Could not find valid content in zip file");
}
@@ -245,14 +271,14 @@ export default class MarkdownAPIImportTask extends APIImportTask<Markdown> {
for (const node of tree.children) {
if (node.children.length === 0) {
Logger.debug("task", `Unhandled file in zip: ${node.path}`, {
Logger.debug("task", `Unhandled file in zip: ${node.pathInZip}`, {
importTaskId: importTask.id,
});
continue;
}
if (this.isAttachmentFolder(node)) {
this.collectAttachments(node, manifest, dirPath);
this.collectAttachments(node, manifest);
continue;
}
@@ -263,12 +289,12 @@ export default class MarkdownAPIImportTask extends APIImportTask<Markdown> {
};
collections.push(collection);
await this.collectDocumentsAndAttachments({
this.collectDocumentsAndAttachments({
children: node.children,
collectionId: collection.id,
out: collection.children,
manifest,
extractionRoot: dirPath,
markdownByNode,
});
}
@@ -336,7 +362,7 @@ export default class MarkdownAPIImportTask extends APIImportTask<Markdown> {
return { taskOutput: collectionOutputs, childTasksInput };
} finally {
await cleanup();
await handle.cleanup().catch(() => {});
}
}
@@ -477,10 +503,10 @@ export default class MarkdownAPIImportTask extends APIImportTask<Markdown> {
* Detects folders containing only attachments (no markdown documents).
* Recursively considers nested folders; mirrors the legacy heuristic.
*
* @param node FileTreeNode to inspect.
* @param node ZipTreeNode to inspect.
* @returns true when the folder appears to hold only attachments.
*/
private isAttachmentFolder(node: FileTreeNode): boolean {
private isAttachmentFolder(node: ZipTreeNode): boolean {
if (node.children.length === 0) {
return false;
}
@@ -501,29 +527,26 @@ export default class MarkdownAPIImportTask extends APIImportTask<Markdown> {
/**
* Recursively collects all files under an attachment-only folder into the
* manifest. `pathInZip` is stored as a path relative to the extraction
* root so it can be resolved again after the zip is re-extracted during
* the completion phase (which lands in a fresh tmp dir).
* manifest. `pathInZip` is stored verbatim so the completion phase can find
* the same entry when re-walking the archive.
*
* @param node Attachment-folder FileTreeNode.
* @param node Attachment-folder ZipTreeNode.
* @param manifest Manifest array to push entries into.
* @param extractionRoot Absolute path to the zip extraction root.
*/
private collectAttachments(
node: FileTreeNode,
manifest: MarkdownAttachmentManifestItem[],
extractionRoot: string
node: ZipTreeNode,
manifest: MarkdownAttachmentManifestItem[]
): void {
for (const child of node.children) {
if (child.children.length > 0) {
this.collectAttachments(child, manifest, extractionRoot);
this.collectAttachments(child, manifest);
continue;
}
manifest.push({
id: randomUUID(),
name: child.name,
pathInZip: path.relative(extractionRoot, child.path),
mimeType: mime.lookup(child.path) || "application/octet-stream",
pathInZip: child.pathInZip,
mimeType: mime.lookup(child.name) || "application/octet-stream",
});
}
}
@@ -534,31 +557,31 @@ export default class MarkdownAPIImportTask extends APIImportTask<Markdown> {
* entry's `children` holds its direct descendants. This is the shape the
* per-page task cascade consumes.
*
* @param children FileTreeNode children of the current folder.
* @param children ZipTreeNode children of the current folder.
* @param collectionId Pre-assigned id of the enclosing collection.
* @param parentDocumentId Optional parent document id when nested.
* @param out Sibling accumulator to push discovered documents into.
* @param manifest Attachment manifest accumulator.
* @returns Promise that resolves when the subtree has been processed.
* @param markdownByNode Pre-loaded markdown text keyed by tree node.
*/
private async collectDocumentsAndAttachments({
private collectDocumentsAndAttachments({
children,
collectionId,
parentDocumentId,
out,
manifest,
extractionRoot,
markdownByNode,
}: {
children: FileTreeNode[];
children: ZipTreeNode[];
collectionId: string;
parentDocumentId?: string;
out: DiscoveredDocument[];
manifest: MarkdownAttachmentManifestItem[];
extractionRoot: string;
}): Promise<void> {
markdownByNode: Map<ZipTreeNode, string>;
}): void {
for (const child of children) {
if (child.children.length > 0 && this.isAttachmentFolder(child)) {
this.collectAttachments(child, manifest, extractionRoot);
this.collectAttachments(child, manifest);
continue;
}
@@ -570,16 +593,14 @@ export default class MarkdownAPIImportTask extends APIImportTask<Markdown> {
manifest.push({
id: randomUUID(),
name: child.name,
pathInZip: path.relative(extractionRoot, child.path),
mimeType: mime.lookup(child.path) || "application/octet-stream",
pathInZip: child.pathInZip,
mimeType: mime.lookup(child.name) || "application/octet-stream",
});
continue;
}
const id = randomUUID();
const markdownText = isFolder
? ""
: await fs.readFile(child.path, "utf8");
const markdownText = isFolder ? "" : (markdownByNode.get(child) ?? "");
// Folder-and-file with the same title (a "name.md" alongside a "name/"
// directory) is merged onto a single document: the folder body picks up
@@ -591,13 +612,13 @@ export default class MarkdownAPIImportTask extends APIImportTask<Markdown> {
sibling.markdownText = markdownText;
}
if (isFolder) {
await this.collectDocumentsAndAttachments({
this.collectDocumentsAndAttachments({
children: child.children,
collectionId,
parentDocumentId: sibling.id,
out: sibling.children,
manifest,
extractionRoot,
markdownByNode,
});
}
continue;
@@ -606,7 +627,7 @@ export default class MarkdownAPIImportTask extends APIImportTask<Markdown> {
const node: DiscoveredDocument = {
id,
title: child.title,
pathInZip: path.relative(extractionRoot, child.path),
pathInZip: child.pathInZip,
collectionId,
parentDocumentId,
markdownText,
@@ -615,58 +636,15 @@ export default class MarkdownAPIImportTask extends APIImportTask<Markdown> {
out.push(node);
if (isFolder) {
await this.collectDocumentsAndAttachments({
this.collectDocumentsAndAttachments({
children: child.children,
collectionId,
parentDocumentId: id,
out: node.children,
manifest,
extractionRoot,
markdownByNode,
});
}
}
}
/**
* Downloads the zip from object storage and extracts it into a temporary
* directory.
*
* @param storageKey Storage key for the uploaded zip.
* @returns The temp dir path and a cleanup callback. Caller must invoke
* cleanup() once finished.
*/
private async downloadAndExtract(storageKey: string): Promise<ExtractedZip> {
const handle = await FileStorage.getFileHandle(storageKey);
let dirPath: string | undefined;
try {
dirPath = await new Promise<string>((resolve, reject) => {
tmp.dir({ unsafeCleanup: true }, (err, tmpDir) => {
if (err) {
reject(err);
return;
}
resolve(tmpDir);
});
});
await ZipHelper.extract(handle.path, dirPath);
return {
dirPath,
cleanup: async () => {
await fs
.rm(dirPath!, { recursive: true, force: true })
.catch(() => {});
await handle.cleanup().catch(() => {});
},
};
} catch (err) {
if (dirPath) {
await fs.rm(dirPath, { recursive: true, force: true }).catch(() => {});
}
await handle.cleanup().catch(() => {});
throw err;
}
}
}
+89
View File
@@ -66,3 +66,92 @@ describe("ZipHelper.extract", () => {
expect(await fs.pathExists(path.join(scratchCwd, filename))).toBe(false);
});
});
describe("ZipHelper.toFileTree", () => {
it("builds a nested tree with normalized pathInZip", async () => {
const zipPath = await writeZip({
"Collection/sub/page.md": "# hi",
"Collection/other.md": "other",
});
const root = await ZipHelper.toFileTree(zipPath);
expect(root.children).toHaveLength(1);
const collection = root.children[0];
expect(collection.name).toBe("Collection");
expect(collection.pathInZip).toBe("Collection");
expect(collection.children.map((c) => c.name).sort()).toEqual([
"other.md",
"sub",
]);
const sub = collection.children.find((c) => c.name === "sub");
expect(sub?.children[0].pathInZip).toBe("Collection/sub/page.md");
});
it("normalizes `./`-prefixed entries instead of dropping them", async () => {
const zipPath = await writeZip({
"./Collection/page.md": "body",
});
const root = await ZipHelper.toFileTree(zipPath);
expect(root.children).toHaveLength(1);
expect(root.children[0].name).toBe("Collection");
expect(root.children[0].children[0].pathInZip).toBe("Collection/page.md");
});
it("filters macOS metadata and dotfiles at any depth", async () => {
const zipPath = await writeZip({
"__MACOSX/Collection/page.md": "junk",
"Collection/.DS_Store": "junk",
"Collection/page.md": "body",
});
const root = await ZipHelper.toFileTree(zipPath);
expect(root.children).toHaveLength(1);
expect(root.children[0].name).toBe("Collection");
expect(root.children[0].children.map((c) => c.name)).toEqual(["page.md"]);
});
it("invokes onFile for each file entry with a readable handle", async () => {
const zipPath = await writeZip({
"Collection/page.md": "hello world",
"Collection/image.png": "binary",
});
const seen: Record<string, string> = {};
await ZipHelper.toFileTree(zipPath, async (node, entry) => {
if (node.name.endsWith(".md")) {
const buf = await entry.readBuffer(100);
seen[node.pathInZip] = buf.toString("utf8");
}
});
expect(seen).toEqual({ "Collection/page.md": "hello world" });
});
it("rejects reads larger than the provided max size", async () => {
const zipPath = await writeZip({
"Collection/page.md": "hello world",
});
await expect(
ZipHelper.toFileTree(zipPath, async (_node, entry) => {
await entry.readBuffer(10);
})
).rejects.toThrow("Collection/page.md is too large");
});
it("exposes entry sizes before the entry is read", async () => {
const zipPath = await writeZip({
"Collection/page.md": "hello world",
});
const sizes: Record<string, number> = {};
await ZipHelper.toFileTree(zipPath, (node, entry) => {
sizes[node.pathInZip] = entry.uncompressedSize;
});
expect(sizes).toEqual({ "Collection/page.md": 11 });
});
});
+220 -1
View File
@@ -5,12 +5,40 @@ import tmp from "tmp";
import type { Entry } from "yauzl";
import yauzl, { validateFileName } from "yauzl";
import { bytesToHumanReadable } from "@shared/utils/files";
import { ValidationError } from "@server/errors";
import Logger from "@server/logging/Logger";
import { trace } from "@server/logging/tracing";
import { trimFilenameAndExt } from "./fs";
import { deserializeFilename, trimFilenameAndExt } from "./fs";
const MAX_FILE_NAME_LENGTH = 255;
export interface ZipEntryHandle {
/** UTF-8 filename as recorded in the zip; directory entries end with `/`. */
fileName: string;
/** Size of the uncompressed entry in bytes. */
uncompressedSize: number;
/** True when this entry is a directory marker rather than a file. */
isDirectory: boolean;
/**
* Read the entry's contents into memory. Safe to skip — entries the caller
* does not read are simply advanced past.
*
* @param maxSize Maximum uncompressed size to read into memory, in bytes.
*/
readBuffer(maxSize: number): Promise<Buffer>;
}
export interface ZipTreeNode {
/** The file name (last path segment) including extension. */
name: string;
/** Title derived from the file name (extension stripped, deserialized). */
title: string;
/** Path within the zip (no leading slash, segments joined with `/`). */
pathInZip: string;
/** Nested children — populated for directory entries. */
children: ZipTreeNode[];
}
@trace()
export default class ZipHelper {
public static defaultStreamOptions: JSZip.JSZipGeneratorOptions<"nodebuffer"> =
@@ -104,6 +132,189 @@ export default class ZipHelper {
});
}
/**
* Iterate through entries in a zip file without extracting it to disk.
* Entries are visited serially in archive order. `onEntry` may be async; the
* next entry is only read once the previous handler resolves.
*
* @param filePath The file path where the zip is located.
* @param onEntry Handler invoked for each entry. Skip an entry by returning
* without calling `entry.readBuffer(maxSize)`.
* @returns Promise that resolves once the archive has been fully walked.
*/
public static walk(
filePath: string,
onEntry: (entry: ZipEntryHandle) => Promise<void> | void
): Promise<void> {
return new Promise((resolve, reject) => {
yauzl.open(
filePath,
{
lazyEntries: true,
autoClose: true,
decodeStrings: false,
},
function (err, zipfile) {
if (err) {
return reject(err);
}
let settled = false;
const fail = (error: Error) => {
if (settled) {
return;
}
settled = true;
zipfile.close();
reject(error);
};
zipfile.on("entry", (entry: Entry) => {
const fileName = Buffer.from(entry.fileName).toString("utf8");
if (validateFileName(fileName)) {
Logger.warn("Invalid zip entry", { fileName });
zipfile.readEntry();
return;
}
const handle: ZipEntryHandle = {
fileName,
uncompressedSize: entry.uncompressedSize,
isDirectory: /\/$/.test(fileName),
readBuffer: (maxSize) =>
new Promise<Buffer>((res, rej) => {
if (entry.uncompressedSize > maxSize) {
return rej(ZipHelper.entryTooLargeError(fileName, maxSize));
}
zipfile.openReadStream(entry, (rErr, readStream) => {
if (rErr) {
return rej(rErr);
}
const chunks: Buffer[] = [];
let bytesRead = 0;
readStream.on("data", (chunk: Buffer) => {
bytesRead += chunk.length;
if (bytesRead > maxSize) {
readStream.destroy(
ZipHelper.entryTooLargeError(fileName, maxSize)
);
return;
}
chunks.push(chunk);
});
readStream.on("end", () => res(Buffer.concat(chunks)));
readStream.on("error", rej);
});
}),
};
Promise.resolve()
.then(() => onEntry(handle))
.then(() => {
if (!settled) {
zipfile.readEntry();
}
})
.catch(fail);
});
zipfile.on("close", () => {
if (!settled) {
settled = true;
resolve();
}
});
zipfile.on("error", (error) => fail(error));
zipfile.readEntry();
}
);
});
}
/**
* Walk a zip file once and build a tree of its entries without extracting
* to disk. macOS metadata directories (`__MACOSX`) and dotfiles are
* filtered out at any path segment.
*
* The optional `onFile` callback fires once per file entry as it is
* encountered, with both the materialized tree node and a handle to the
* raw entry. Callers that need to pre-load contents (e.g. small text
* files) can call `entry.readBuffer(maxSize)`; callers that only need the tree
* structure can omit the callback entirely.
*
* @param filePath Local filesystem path to the zip.
* @param onFile Optional per-file hook; not called for directory entries.
* @returns A synthetic root node whose `children` are the zip's top-level
* entries.
*/
public static async toFileTree(
filePath: string,
onFile?: (node: ZipTreeNode, entry: ZipEntryHandle) => Promise<void> | void
): Promise<ZipTreeNode> {
const root: ZipTreeNode = {
name: "",
title: "",
pathInZip: "",
children: [],
};
const isFiltered = (segment: string) =>
segment === "__MACOSX" || segment.startsWith(".");
const nodesByPath = new Map<string, ZipTreeNode>();
const resolveNode = (entryName: string): ZipTreeNode | null => {
// Drop empty segments and the path-no-op `.` (e.g. entries written as
// `./Collection/page.md`). `..` is preserved so the dotfile filter
// below rejects it — we never resolve path traversal in zip entries.
const segments = entryName
.split("/")
.filter((s) => s !== "" && s !== ".");
if (segments.length === 0) {
return null;
}
let current = root;
let pathSoFar = "";
for (let i = 0; i < segments.length; i++) {
const segment = segments[i];
if (isFiltered(segment)) {
return null;
}
pathSoFar = pathSoFar ? `${pathSoFar}/${segment}` : segment;
let next = nodesByPath.get(pathSoFar);
if (!next) {
next = {
name: segment,
title: deserializeFilename(path.parse(segment).name),
pathInZip: pathSoFar,
children: [],
};
current.children.push(next);
nodesByPath.set(pathSoFar, next);
}
current = next;
}
return current;
};
await this.walk(filePath, async (entry) => {
const node = resolveNode(entry.fileName);
if (!node || entry.isDirectory) {
return;
}
if (onFile) {
await onFile(node, entry);
}
});
return root;
}
/**
* Write a zip file to a disk location
*
@@ -231,4 +442,12 @@ export default class ZipHelper {
);
});
}
private static entryTooLargeError(fileName: string, maxSize: number): Error {
return ValidationError(
`${fileName} is too large - the maximum size is ${bytesToHumanReadable(
maxSize
)}`
);
}
}