From daf88ad0ec29468b38e0858acb4345317c1decf8 Mon Sep 17 00:00:00 2001 From: Tom Moor Date: Tue, 12 May 2026 20:28:46 -0400 Subject: [PATCH] perf: Remove row lock on collection when mutating collection structure --- server/commands/documentMover.test.ts | 91 ++++++++++++++ server/commands/documentMover.ts | 118 ++++++++++++++++-- server/errors.ts | 9 ++ ...cument-structure-version-to-collections.js | 18 +++ server/models/Collection.ts | 66 +++++++++- 5 files changed, 288 insertions(+), 14 deletions(-) create mode 100644 server/migrations/20260512000000-add-document-structure-version-to-collections.js diff --git a/server/commands/documentMover.test.ts b/server/commands/documentMover.test.ts index 6530fbba68..5497de96f4 100644 --- a/server/commands/documentMover.test.ts +++ b/server/commands/documentMover.test.ts @@ -1,3 +1,4 @@ +import Collection from "@server/models/Collection"; import Pin from "@server/models/Pin"; import { buildDocument, @@ -234,4 +235,94 @@ describe("documentMover", () => { expect(response.documents[0].updatedBy.id).toEqual(user.id); expect(response.documents[0].publishedAt).toBeNull(); }); + + it("bumps documentStructureVersion on a successful move", async () => { + const team = await buildTeam(); + const user = await buildUser({ teamId: team.id }); + const collection = await buildCollection({ + userId: user.id, + teamId: team.id, + }); + const document = await buildDocument({ + userId: user.id, + collectionId: collection.id, + teamId: team.id, + }); + + const before = await Collection.findByPk(collection.id, { + rejectOnEmpty: true, + }); + const versionBefore = before.documentStructureVersion; + + await withAPIContext(user, (ctx) => + documentMover(ctx, { + document, + collectionId: collection.id, + index: 0, + }) + ); + + const after = await Collection.findByPk(collection.id, { + rejectOnEmpty: true, + }); + expect(after.documentStructureVersion).toBeGreaterThan(versionBefore); + }); + + it("serialises concurrent moves on the same collection via retry", async () => { + const team = await buildTeam(); + const user = await buildUser({ teamId: team.id }); + const sourceA = await buildCollection({ + userId: user.id, + teamId: team.id, + }); + const sourceB = await buildCollection({ + userId: user.id, + teamId: team.id, + }); + const target = await buildCollection({ + userId: user.id, + teamId: team.id, + }); + const docA = await buildDocument({ + userId: user.id, + collectionId: sourceA.id, + teamId: team.id, + }); + const docB = await buildDocument({ + userId: user.id, + collectionId: sourceB.id, + teamId: team.id, + }); + + // Two moves targeting the same destination collection run in independent + // transactions. With optimistic concurrency one will conflict on the + // version bump and retry; both should ultimately succeed. + const [resultA, resultB] = await Promise.all([ + withAPIContext(user, (ctx) => + documentMover(ctx, { + document: docA, + collectionId: target.id, + index: 0, + }) + ), + withAPIContext(user, (ctx) => + documentMover(ctx, { + document: docB, + collectionId: target.id, + index: 0, + }) + ), + ]); + + expect(resultA.collectionChanged).toBe(true); + expect(resultB.collectionChanged).toBe(true); + + const after = await Collection.findByPk(target.id, { + includeDocumentStructure: true, + rejectOnEmpty: true, + }); + const ids = (after.documentStructure ?? []).map((node) => node.id); + expect(ids).toContain(docA.id); + expect(ids).toContain(docB.id); + }); }); diff --git a/server/commands/documentMover.ts b/server/commands/documentMover.ts index ae13f11ea8..18aca5b03d 100644 --- a/server/commands/documentMover.ts +++ b/server/commands/documentMover.ts @@ -1,6 +1,7 @@ -import { Transaction } from "sequelize"; +import { sleep } from "@shared/utils/timers"; import { traceFunction } from "@server/logging/tracing"; import { Document, Collection, Pin } from "@server/models"; +import { sequelize } from "@server/storage/database"; import type { APIContext } from "@server/types"; type Props = { @@ -20,15 +21,29 @@ type Result = { collectionChanged: boolean; }; -async function documentMover( +const MAX_RETRIES = 3; + +function isOptimisticLockError(err: unknown): boolean { + return ( + typeof err === "object" && + err !== null && + (err as { id?: string }).id === "optimistic_lock" + ); +} + +async function runMove( ctx: APIContext, { document, - collectionId = null, - parentDocumentId = null, - // convert undefined to null so parentId comparison treats them as equal + collectionId, + parentDocumentId, index, - }: Props + }: { + document: Document; + collectionId: string | null; + parentDocumentId: string | null; + index?: number; + } ): Promise { const { user } = ctx.state.auth; const { transaction } = ctx.state; @@ -41,11 +56,12 @@ async function documentMover( collectionChanged, }; - // Load the current and the next collection upfront and lock them + // Load the current and next collection. No FOR UPDATE lock: concurrent + // moves on the same collection are coordinated via optimistic concurrency + // on `documentStructureVersion` and retried at the savepoint boundary. const collection = await Collection.findByPk(document.collectionId!, { includeDocumentStructure: true, transaction, - lock: Transaction.LOCK.UPDATE, paranoid: false, }); @@ -54,17 +70,17 @@ async function documentMover( newCollection = await Collection.findByPk(collectionId, { includeDocumentStructure: true, transaction, - lock: Transaction.LOCK.UPDATE, }); } else if (!collectionId) { newCollection = null; } if (document.publishedAt) { - // Remove the document from the current collection + // Mutate source structure in memory without saving; we'll persist via the + // version-checked path below. const response = await collection?.removeDocumentInStructure(document, { transaction, - save: collectionChanged, + save: false, }); let documentJson = response?.[0]; @@ -94,12 +110,24 @@ async function documentMover( document.updatedBy = user; if (newCollection) { - // Add the document and it's tree to the new collection + // When source and destination are the same instance, both the remove and + // add mutate `this.documentStructure`; the single save below persists + // them together with one version bump. await newCollection.addDocumentToStructure(document, toIndex, { documentJson, transaction, + save: false, }); } + + await Promise.all([ + collectionChanged && collection + ? collection.saveDocumentStructure({ transaction }) + : null, + newCollection + ? newCollection.saveDocumentStructure({ transaction }) + : null, + ]); } else { document.collectionId = collectionId; document.parentDocumentId = parentDocumentId; @@ -186,7 +214,6 @@ async function documentMover( collectionId: previousCollectionId, }, transaction, - lock: Transaction.LOCK.UPDATE, }); await pin?.destroyWithCtx(ctx); @@ -206,6 +233,71 @@ async function documentMover( return result; } +async function documentMover(ctx: APIContext, props: Props): Promise { + const { + document, + collectionId = null, + parentDocumentId = null, + index, + } = props; + + // Snapshot the document fields we mutate inside `runMove`. On retry the + // in-memory instance keeps the writes from the previous attempt, so we + // restore the original values before re-entering the savepoint to keep + // each attempt deterministic. + const initial = { + collectionId: document.collectionId, + parentDocumentId: document.parentDocumentId, + lastModifiedById: document.lastModifiedById, + publishedAt: document.publishedAt, + updatedBy: document.updatedBy, + collection: document.collection, + }; + + const outerTransaction = ctx.state.transaction; + let lastError: unknown; + + for (let attempt = 0; attempt <= MAX_RETRIES; attempt++) { + if (attempt > 0) { + document.collectionId = initial.collectionId; + document.parentDocumentId = initial.parentDocumentId; + document.lastModifiedById = initial.lastModifiedById; + document.publishedAt = initial.publishedAt; + document.updatedBy = initial.updatedBy; + document.collection = initial.collection; + + await sleep(5 * 2 ** (attempt - 1) + Math.random() * 5); + } + + try { + return await sequelize.transaction( + { transaction: outerTransaction }, + async (savepoint) => { + ctx.state.transaction = savepoint; + try { + return await runMove(ctx, { + document, + collectionId, + parentDocumentId, + index, + }); + } finally { + ctx.state.transaction = outerTransaction; + } + } + ); + } catch (err) { + if (isOptimisticLockError(err) && attempt < MAX_RETRIES) { + lastError = err; + continue; + } + throw err; + } + } + + throw lastError; +} + export default traceFunction({ spanName: "documentMover", })(documentMover); diff --git a/server/errors.ts b/server/errors.ts index c27e53b210..49ee534088 100644 --- a/server/errors.ts +++ b/server/errors.ts @@ -250,3 +250,12 @@ export function ClientClosedRequestError( isReportable: false, }); } + +export function OptimisticLockError( + message = "The resource was modified by another request, please retry" +) { + return httpErrors(409, message, { + id: "optimistic_lock", + isReportable: false, + }); +} diff --git a/server/migrations/20260512000000-add-document-structure-version-to-collections.js b/server/migrations/20260512000000-add-document-structure-version-to-collections.js new file mode 100644 index 0000000000..fd11ea8ae6 --- /dev/null +++ b/server/migrations/20260512000000-add-document-structure-version-to-collections.js @@ -0,0 +1,18 @@ +"use strict"; + +module.exports = { + async up(queryInterface, Sequelize) { + await queryInterface.addColumn("collections", "documentStructureVersion", { + type: Sequelize.INTEGER, + allowNull: false, + defaultValue: 0, + }); + }, + + async down(queryInterface) { + await queryInterface.removeColumn( + "collections", + "documentStructureVersion" + ); + }, +}; diff --git a/server/models/Collection.ts b/server/models/Collection.ts index 4af1dc579f..aac547f954 100644 --- a/server/models/Collection.ts +++ b/server/models/Collection.ts @@ -55,7 +55,7 @@ import { UrlHelper } from "@shared/utils/UrlHelper"; import { sortNavigationNodes } from "@shared/utils/collections"; import slugify from "@shared/utils/slugify"; import { CollectionValidation } from "@shared/validations"; -import { ValidationError } from "@server/errors"; +import { OptimisticLockError, ValidationError } from "@server/errors"; import type { APIContext } from "@server/types"; import { CacheHelper } from "@server/utils/CacheHelper"; import { RedisPrefixHelper } from "@server/utils/RedisPrefixHelper"; @@ -266,6 +266,15 @@ class Collection extends ParanoidModel< @Column(DataType.JSONB) documentStructure: NavigationNode[] | null; + /** + * Monotonically increasing counter bumped on every write to + * `documentStructure`. Used for optimistic concurrency so that concurrent + * structural changes detect conflicts instead of serializing on a row lock. + */ + @Default(0) + @Column + documentStructureVersion: number; + @Default(true) @Column sharing: boolean; @@ -1055,6 +1064,61 @@ class Collection extends ParanoidModel< return this; }; + /** + * Persist the in-memory `documentStructure` using optimistic concurrency. + * An alternative to `save({ fields: ["documentStructure"] })` for callers + * that coordinate concurrent structural mutations via the + * `documentStructureVersion` column. The version is incremented atomically; + * if it has already moved since this instance was loaded the update affects + * zero rows and an `OptimisticLockError` is thrown for the caller to retry. + * + * @param options optional Sequelize options (currently only `transaction` is read). + * @throws OptimisticLockError if another writer has bumped the version. + */ + saveDocumentStructure = async function ( + this: Collection, + options?: { transaction?: Transaction | null } + ): Promise { + if (!this.changed("documentStructure")) { + return; + } + + const expectedVersion = this.documentStructureVersion; + const nextVersion = expectedVersion + 1; + + const [affectedCount] = await ( + this.constructor as typeof Collection + ).update( + { + documentStructure: this.documentStructure, + documentStructureVersion: nextVersion, + }, + { + where: { + id: this.id, + documentStructureVersion: expectedVersion, + }, + transaction: options?.transaction, + // Mirror the prior `paranoid: false` load semantics; documents may be + // moved out of soft-deleted collections during archive flows. + paranoid: false, + } + ); + + if (affectedCount === 0) { + throw OptimisticLockError(); + } + + this.setDataValue("documentStructureVersion", nextVersion); + + // `Model.update` bypasses instance hooks, so invoke the AfterSave cache + // hook manually. The hook keys off `changed("documentStructure")`, which + // is still true on this instance. + await Collection.cacheDocumentStructure(this, { + transaction: options?.transaction ?? undefined, + }); + }; + /** * Get all of the document ids that are in this collection by * recursively iterating through `documentStructure`.