perf: Remove row lock on collection when mutating collection structure

This commit is contained in:
Tom Moor
2026-05-12 20:28:46 -04:00
parent 871cb52a23
commit daf88ad0ec
5 changed files with 288 additions and 14 deletions
+91
View File
@@ -1,3 +1,4 @@
import Collection from "@server/models/Collection";
import Pin from "@server/models/Pin";
import {
buildDocument,
@@ -234,4 +235,94 @@ describe("documentMover", () => {
expect(response.documents[0].updatedBy.id).toEqual(user.id);
expect(response.documents[0].publishedAt).toBeNull();
});
it("bumps documentStructureVersion on a successful move", async () => {
const team = await buildTeam();
const user = await buildUser({ teamId: team.id });
const collection = await buildCollection({
userId: user.id,
teamId: team.id,
});
const document = await buildDocument({
userId: user.id,
collectionId: collection.id,
teamId: team.id,
});
const before = await Collection.findByPk(collection.id, {
rejectOnEmpty: true,
});
const versionBefore = before.documentStructureVersion;
await withAPIContext(user, (ctx) =>
documentMover(ctx, {
document,
collectionId: collection.id,
index: 0,
})
);
const after = await Collection.findByPk(collection.id, {
rejectOnEmpty: true,
});
expect(after.documentStructureVersion).toBeGreaterThan(versionBefore);
});
it("serialises concurrent moves on the same collection via retry", async () => {
const team = await buildTeam();
const user = await buildUser({ teamId: team.id });
const sourceA = await buildCollection({
userId: user.id,
teamId: team.id,
});
const sourceB = await buildCollection({
userId: user.id,
teamId: team.id,
});
const target = await buildCollection({
userId: user.id,
teamId: team.id,
});
const docA = await buildDocument({
userId: user.id,
collectionId: sourceA.id,
teamId: team.id,
});
const docB = await buildDocument({
userId: user.id,
collectionId: sourceB.id,
teamId: team.id,
});
// Two moves targeting the same destination collection run in independent
// transactions. With optimistic concurrency one will conflict on the
// version bump and retry; both should ultimately succeed.
const [resultA, resultB] = await Promise.all([
withAPIContext(user, (ctx) =>
documentMover(ctx, {
document: docA,
collectionId: target.id,
index: 0,
})
),
withAPIContext(user, (ctx) =>
documentMover(ctx, {
document: docB,
collectionId: target.id,
index: 0,
})
),
]);
expect(resultA.collectionChanged).toBe(true);
expect(resultB.collectionChanged).toBe(true);
const after = await Collection.findByPk(target.id, {
includeDocumentStructure: true,
rejectOnEmpty: true,
});
const ids = (after.documentStructure ?? []).map((node) => node.id);
expect(ids).toContain(docA.id);
expect(ids).toContain(docB.id);
});
});
+105 -13
View File
@@ -1,6 +1,7 @@
import { Transaction } from "sequelize";
import { sleep } from "@shared/utils/timers";
import { traceFunction } from "@server/logging/tracing";
import { Document, Collection, Pin } from "@server/models";
import { sequelize } from "@server/storage/database";
import type { APIContext } from "@server/types";
type Props = {
@@ -20,15 +21,29 @@ type Result = {
collectionChanged: boolean;
};
async function documentMover(
const MAX_RETRIES = 3;
function isOptimisticLockError(err: unknown): boolean {
return (
typeof err === "object" &&
err !== null &&
(err as { id?: string }).id === "optimistic_lock"
);
}
async function runMove(
ctx: APIContext,
{
document,
collectionId = null,
parentDocumentId = null,
// convert undefined to null so parentId comparison treats them as equal
collectionId,
parentDocumentId,
index,
}: Props
}: {
document: Document;
collectionId: string | null;
parentDocumentId: string | null;
index?: number;
}
): Promise<Result> {
const { user } = ctx.state.auth;
const { transaction } = ctx.state;
@@ -41,11 +56,12 @@ async function documentMover(
collectionChanged,
};
// Load the current and the next collection upfront and lock them
// Load the current and next collection. No FOR UPDATE lock: concurrent
// moves on the same collection are coordinated via optimistic concurrency
// on `documentStructureVersion` and retried at the savepoint boundary.
const collection = await Collection.findByPk(document.collectionId!, {
includeDocumentStructure: true,
transaction,
lock: Transaction.LOCK.UPDATE,
paranoid: false,
});
@@ -54,17 +70,17 @@ async function documentMover(
newCollection = await Collection.findByPk(collectionId, {
includeDocumentStructure: true,
transaction,
lock: Transaction.LOCK.UPDATE,
});
} else if (!collectionId) {
newCollection = null;
}
if (document.publishedAt) {
// Remove the document from the current collection
// Mutate source structure in memory without saving; we'll persist via the
// version-checked path below.
const response = await collection?.removeDocumentInStructure(document, {
transaction,
save: collectionChanged,
save: false,
});
let documentJson = response?.[0];
@@ -94,12 +110,24 @@ async function documentMover(
document.updatedBy = user;
if (newCollection) {
// Add the document and it's tree to the new collection
// When source and destination are the same instance, both the remove and
// add mutate `this.documentStructure`; the single save below persists
// them together with one version bump.
await newCollection.addDocumentToStructure(document, toIndex, {
documentJson,
transaction,
save: false,
});
}
await Promise.all([
collectionChanged && collection
? collection.saveDocumentStructure({ transaction })
: null,
newCollection
? newCollection.saveDocumentStructure({ transaction })
: null,
]);
} else {
document.collectionId = collectionId;
document.parentDocumentId = parentDocumentId;
@@ -186,7 +214,6 @@ async function documentMover(
collectionId: previousCollectionId,
},
transaction,
lock: Transaction.LOCK.UPDATE,
});
await pin?.destroyWithCtx(ctx);
@@ -206,6 +233,71 @@ async function documentMover(
return result;
}
async function documentMover(ctx: APIContext, props: Props): Promise<Result> {
const {
document,
collectionId = null,
parentDocumentId = null,
index,
} = props;
// Snapshot the document fields we mutate inside `runMove`. On retry the
// in-memory instance keeps the writes from the previous attempt, so we
// restore the original values before re-entering the savepoint to keep
// each attempt deterministic.
const initial = {
collectionId: document.collectionId,
parentDocumentId: document.parentDocumentId,
lastModifiedById: document.lastModifiedById,
publishedAt: document.publishedAt,
updatedBy: document.updatedBy,
collection: document.collection,
};
const outerTransaction = ctx.state.transaction;
let lastError: unknown;
for (let attempt = 0; attempt <= MAX_RETRIES; attempt++) {
if (attempt > 0) {
document.collectionId = initial.collectionId;
document.parentDocumentId = initial.parentDocumentId;
document.lastModifiedById = initial.lastModifiedById;
document.publishedAt = initial.publishedAt;
document.updatedBy = initial.updatedBy;
document.collection = initial.collection;
await sleep(5 * 2 ** (attempt - 1) + Math.random() * 5);
}
try {
return await sequelize.transaction(
{ transaction: outerTransaction },
async (savepoint) => {
ctx.state.transaction = savepoint;
try {
return await runMove(ctx, {
document,
collectionId,
parentDocumentId,
index,
});
} finally {
ctx.state.transaction = outerTransaction;
}
}
);
} catch (err) {
if (isOptimisticLockError(err) && attempt < MAX_RETRIES) {
lastError = err;
continue;
}
throw err;
}
}
throw lastError;
}
export default traceFunction({
spanName: "documentMover",
})(documentMover);
+9
View File
@@ -250,3 +250,12 @@ export function ClientClosedRequestError(
isReportable: false,
});
}
export function OptimisticLockError(
message = "The resource was modified by another request, please retry"
) {
return httpErrors(409, message, {
id: "optimistic_lock",
isReportable: false,
});
}
@@ -0,0 +1,18 @@
"use strict";
module.exports = {
async up(queryInterface, Sequelize) {
await queryInterface.addColumn("collections", "documentStructureVersion", {
type: Sequelize.INTEGER,
allowNull: false,
defaultValue: 0,
});
},
async down(queryInterface) {
await queryInterface.removeColumn(
"collections",
"documentStructureVersion"
);
},
};
+65 -1
View File
@@ -55,7 +55,7 @@ import { UrlHelper } from "@shared/utils/UrlHelper";
import { sortNavigationNodes } from "@shared/utils/collections";
import slugify from "@shared/utils/slugify";
import { CollectionValidation } from "@shared/validations";
import { ValidationError } from "@server/errors";
import { OptimisticLockError, ValidationError } from "@server/errors";
import type { APIContext } from "@server/types";
import { CacheHelper } from "@server/utils/CacheHelper";
import { RedisPrefixHelper } from "@server/utils/RedisPrefixHelper";
@@ -266,6 +266,15 @@ class Collection extends ParanoidModel<
@Column(DataType.JSONB)
documentStructure: NavigationNode[] | null;
/**
* Monotonically increasing counter bumped on every write to
* `documentStructure`. Used for optimistic concurrency so that concurrent
* structural changes detect conflicts instead of serializing on a row lock.
*/
@Default(0)
@Column
documentStructureVersion: number;
@Default(true)
@Column
sharing: boolean;
@@ -1055,6 +1064,61 @@ class Collection extends ParanoidModel<
return this;
};
/**
* Persist the in-memory `documentStructure` using optimistic concurrency.
* An alternative to `save({ fields: ["documentStructure"] })` for callers
* that coordinate concurrent structural mutations via the
* `documentStructureVersion` column. The version is incremented atomically;
* if it has already moved since this instance was loaded the update affects
* zero rows and an `OptimisticLockError` is thrown for the caller to retry.
*
* @param options optional Sequelize options (currently only `transaction` is read).
* @throws OptimisticLockError if another writer has bumped the version.
*/
saveDocumentStructure = async function (
this: Collection,
options?: { transaction?: Transaction | null }
): Promise<void> {
if (!this.changed("documentStructure")) {
return;
}
const expectedVersion = this.documentStructureVersion;
const nextVersion = expectedVersion + 1;
const [affectedCount] = await (
this.constructor as typeof Collection
).update(
{
documentStructure: this.documentStructure,
documentStructureVersion: nextVersion,
},
{
where: {
id: this.id,
documentStructureVersion: expectedVersion,
},
transaction: options?.transaction,
// Mirror the prior `paranoid: false` load semantics; documents may be
// moved out of soft-deleted collections during archive flows.
paranoid: false,
}
);
if (affectedCount === 0) {
throw OptimisticLockError();
}
this.setDataValue("documentStructureVersion", nextVersion);
// `Model.update` bypasses instance hooks, so invoke the AfterSave cache
// hook manually. The hook keys off `changed("documentStructure")`, which
// is still true on this instance.
await Collection.cacheDocumentStructure(this, {
transaction: options?.transaction ?? undefined,
});
};
/**
* Get all of the document ids that are in this collection by
* recursively iterating through `documentStructure`.