fix: Sync API changes to clients in realtime (#11186)

* fix: Sync API changes to clients in realtime

* Add editMode parameter

* rename

* Add error handling

* refactor
This commit is contained in:
Tom Moor
2026-01-19 13:28:44 -05:00
committed by GitHub
parent f009375fbc
commit 1cc4d879dc
10 changed files with 448 additions and 20 deletions
+210
View File
@@ -0,0 +1,210 @@
import type {
Extension,
afterLoadDocumentPayload,
onConfigurePayload,
onDestroyPayload,
onDisconnectPayload,
} from "@hocuspocus/server";
import type { Document as HocuspocusDocument } from "@hocuspocus/server";
import * as Y from "yjs";
import env from "@server/env";
import Logger from "@server/logging/Logger";
import { trace } from "@server/logging/tracing";
import Document from "@server/models/Document";
import RedisAdapter from "@server/storage/redis";
/**
* Redis channel prefix for API update notifications.
*/
const CHANNEL_PREFIX = "collaboration:api-update";
/**
* Extension that listens for document updates made through the API and syncs
* them to the collaborative editing state held in memory.
*
* When a document is updated via the API (e.g., documents.update endpoint),
* a message is published to Redis. This extension receives that message and
* reloads the document state from the database, then broadcasts the update
* to all connected clients.
*/
@trace()
export class APIUpdateExtension implements Extension {
/**
* Map of document names to their Hocuspocus Document instances.
*/
private documents: Map<string, HocuspocusDocument> = new Map();
/**
* Redis subscriber client for receiving update notifications.
*/
private subscriber: RedisAdapter | null = null;
/**
* Whether the extension has been configured.
*/
private configured = false;
async onConfigure(_data: onConfigurePayload): Promise<void> {
if (this.configured) {
return;
}
this.configured = true;
try {
// Create a dedicated subscriber for API update notifications
this.subscriber = new RedisAdapter(
env.REDIS_COLLABORATION_URL || env.REDIS_URL,
{
connectionNameSuffix: "collab-api-updates",
maxRetriesPerRequest: null,
}
);
// Handle Redis connection errors
this.subscriber.on("error", (err) => {
Logger.error("Redis subscriber error in APIUpdateExtension", err);
});
// Subscribe to the API update channel pattern
this.subscriber.psubscribe(`${CHANNEL_PREFIX}:*`, (err) => {
if (err) {
Logger.error("Failed to subscribe to API update channel", err);
return;
}
Logger.debug(
"multiplayer",
`Subscribed to ${CHANNEL_PREFIX}:* for API updates`
);
});
// Handle incoming messages
this.subscriber.on("pmessage", this.handleMessage);
} catch (error) {
Logger.error(
"Failed to configure APIUpdateExtension Redis subscriber",
error as Error
);
this.subscriber = null;
this.configured = false;
}
}
async afterLoadDocument({
documentName,
document,
}: afterLoadDocumentPayload): Promise<void> {
const [, documentId] = documentName.split(".");
this.documents.set(documentId, document);
}
async onDestroy(_data: onDestroyPayload): Promise<void> {
if (this.subscriber) {
await this.subscriber.punsubscribe(`${CHANNEL_PREFIX}:*`);
await this.subscriber.quit();
this.subscriber = null;
}
this.documents.clear();
}
/**
* Handle a document being disconnected (no more clients).
*/
async onDisconnect({
documentName,
clientsCount,
}: onDisconnectPayload): Promise<void> {
if (clientsCount === 0) {
const [, documentId] = documentName.split(".");
this.documents.delete(documentId);
}
}
/**
* Handle incoming Redis messages for API updates.
*/
private handleMessage = async (
_pattern: string,
channel: string,
message: string
): Promise<void> => {
try {
const documentId = channel.replace(`${CHANNEL_PREFIX}:`, "");
const document = this.documents.get(documentId);
if (!document) {
// Document not loaded in this instance, ignore
return;
}
const data = JSON.parse(message);
Logger.debug("multiplayer", `Received API update for document`, {
documentId,
actorId: data.actorId,
});
// Fetch the latest state from the database
const dbDocument = await Document.unscoped().findOne({
attributes: ["state", "content", "text"],
where: { id: documentId },
});
if (!dbDocument) {
Logger.warn(`Document ${documentId} not found in database`);
return;
}
if (!dbDocument.state) {
Logger.warn(`Document ${documentId} has no state in database`);
return;
}
// Create a Y.Doc from the database state
const dbYdoc = new Y.Doc();
Y.applyUpdate(dbYdoc, dbDocument.state);
// Calculate the diff between the current in-memory state and the database state
const currentStateVector = Y.encodeStateVector(document);
const update = Y.encodeStateAsUpdate(dbYdoc, currentStateVector);
// Apply the update if there are changes
if (update.length > 0) {
Y.applyUpdate(document, update);
Logger.info("multiplayer", `Applied API update to document`, {
documentId,
updateSize: update.length,
});
}
dbYdoc.destroy();
} catch (error) {
Logger.error("Failed to process API update message", error as Error);
}
};
/**
* Publish a notification that a document was updated via the API.
* This should be called from the document update command.
*
* @param documentId - the id of the document that was updated.
* @param actorId - the id of the user who made the update.
*/
static async notifyUpdate(
documentId: string,
actorId: string
): Promise<void> {
const channel = `${CHANNEL_PREFIX}:${documentId}`;
const message = JSON.stringify({
actorId,
timestamp: Date.now(),
});
await RedisAdapter.defaultClient.publish(channel, message);
Logger.debug("multiplayer", `Published API update notification`, {
documentId,
actorId,
});
}
}
@@ -97,6 +97,8 @@ export default async function documentCollaborativeUpdater({
},
{
transaction,
// Hooks MUST NOT be called or the AfterUpdate hook in Document model may
// result in infinite processing.
hooks: false,
}
);
+151 -5
View File
@@ -1,8 +1,12 @@
import { randomUUID } from "crypto";
import * as Y from "yjs";
import { TextEditMode } from "@shared/types";
import { APIUpdateExtension } from "@server/collaboration/APIUpdateExtension";
import { Event } from "@server/models";
import { ProsemirrorHelper } from "@server/models/helpers/ProsemirrorHelper";
import { buildDocument, buildUser } from "@server/test/factories";
import { withAPIContext } from "@server/test/support";
import documentUpdater from "./documentUpdater";
import { randomUUID } from "crypto";
describe("documentUpdater", () => {
it("should change lastModifiedById", async () => {
@@ -83,7 +87,7 @@ describe("documentUpdater", () => {
documentUpdater(ctx, {
text: "Appended",
document,
append: true,
editMode: TextEditMode.Append,
})
);
@@ -110,7 +114,7 @@ describe("documentUpdater", () => {
documentUpdater(ctx, {
text: "Appended",
document,
append: true,
editMode: TextEditMode.Append,
})
);
@@ -162,7 +166,7 @@ describe("documentUpdater", () => {
documentUpdater(ctx, {
text: "Appended",
document,
append: true,
editMode: TextEditMode.Append,
})
);
@@ -198,7 +202,7 @@ describe("documentUpdater", () => {
documentUpdater(ctx, {
text: "\n\nAppended",
document,
append: true,
editMode: TextEditMode.Append,
})
);
@@ -217,4 +221,146 @@ describe("documentUpdater", () => {
],
});
});
it("should prepend document content when requested", async () => {
const user = await buildUser();
let document = await buildDocument({
teamId: user.teamId,
text: "Existing",
});
document = await withAPIContext(user, (ctx) =>
documentUpdater(ctx, {
text: "Prepended",
document,
editMode: TextEditMode.Prepend,
})
);
expect(document.text).toEqual("PrependedExisting");
expect(document.content).toMatchObject({
type: "doc",
content: [
{
type: "paragraph",
content: [{ type: "text", text: "PrependedExisting" }],
},
],
});
});
it("should preserve rich content when prepending", async () => {
const user = await buildUser();
let document = await buildDocument({
teamId: user.teamId,
text: "**Bold**",
});
document = await withAPIContext(user, (ctx) =>
documentUpdater(ctx, {
text: "Prepended",
document,
editMode: TextEditMode.Prepend,
})
);
expect(document.content).toMatchObject({
type: "doc",
content: [
{
type: "paragraph",
content: [
{
type: "text",
text: "Prepended",
},
{
type: "text",
marks: [{ type: "strong" }],
text: "Bold",
},
],
},
],
});
});
it("should create new paragraph when prepending with newline", async () => {
const user = await buildUser();
let document = await buildDocument({
teamId: user.teamId,
text: "Existing",
});
document = await withAPIContext(user, (ctx) =>
documentUpdater(ctx, {
text: "Prepended\n\n",
document,
editMode: TextEditMode.Prepend,
})
);
expect(document.text).toEqual("Prepended\n\nExisting");
expect(document.content).toMatchObject({
type: "doc",
content: [
{
type: "paragraph",
content: [{ type: "text", text: "Prepended" }],
},
{
type: "paragraph",
content: [{ type: "text", text: "Existing" }],
},
],
});
});
it("should notify collaboration server when text changes", async () => {
const notifyUpdateSpy = jest
.spyOn(APIUpdateExtension, "notifyUpdate")
.mockResolvedValue(undefined);
const user = await buildUser();
let document = await buildDocument({
teamId: user.teamId,
text: "Initial text",
});
// Create initial collaborative state (simulating an active collaboration session)
const ydoc = ProsemirrorHelper.toYDoc("Initial text");
document.state = Buffer.from(Y.encodeStateAsUpdate(ydoc));
await document.save();
document = await withAPIContext(user, (ctx) =>
documentUpdater(ctx, {
text: "Changed content",
document,
})
);
expect(notifyUpdateSpy).toHaveBeenCalledWith(document.id, user.id);
notifyUpdateSpy.mockRestore();
});
it("should not notify collaboration server when only title changes", async () => {
const notifyUpdateSpy = jest
.spyOn(APIUpdateExtension, "notifyUpdate")
.mockResolvedValue(undefined);
const user = await buildUser();
let document = await buildDocument({
teamId: user.teamId,
});
document = await withAPIContext(user, (ctx) =>
documentUpdater(ctx, {
title: "New Title",
document,
})
);
expect(notifyUpdateSpy).not.toHaveBeenCalled();
notifyUpdateSpy.mockRestore();
});
});
+5 -4
View File
@@ -1,3 +1,4 @@
import type { TextEditMode } from "@shared/types";
import { Event, Document } from "@server/models";
import { DocumentHelper } from "@server/models/helpers/DocumentHelper";
import { TextHelper } from "@server/models/helpers/TextHelper";
@@ -24,8 +25,8 @@ type Props = {
fullWidth?: boolean;
/** Whether insights should be visible on the document */
insightsEnabled?: boolean;
/** Whether the text be appended to the end instead of replace */
append?: boolean;
/** The edit mode: "replace", "append", or "prepend" */
editMode?: TextEditMode;
/** Whether the document should be published to the collection */
publish?: boolean;
/** The ID of the collection to publish the document to */
@@ -51,7 +52,7 @@ export default async function documentUpdater(
templateId,
fullWidth,
insightsEnabled,
append,
editMode,
publish,
collectionId,
done,
@@ -88,7 +89,7 @@ export default async function documentUpdater(
await TextHelper.replaceImagesWithAttachments(ctx, text, user, {
base64Only: true,
}),
append
editMode
);
}
+12
View File
@@ -70,6 +70,7 @@ import { DocumentHelper } from "./helpers/DocumentHelper";
import IsHexColor from "./validators/IsHexColor";
import Length from "./validators/Length";
import type { APIContext } from "@server/types";
import { APIUpdateExtension } from "@server/collaboration/APIUpdateExtension";
import { SkipChangeset } from "./decorators/Changeset";
import type { HookContext } from "./base/Model";
@@ -586,6 +587,17 @@ class Document extends ArchivableModel<
}
}
@AfterUpdate
static notifyCollaborationServer(model: Document, ctx: HookContext) {
if (model.changed("state") && ctx.transaction && ctx.auth?.user?.id) {
const actorId = ctx.auth.user.id;
const transaction = ctx.transaction.parent || ctx.transaction;
transaction.afterCommit(async () => {
await APIUpdateExtension.notifyUpdate(model.id, actorId);
});
}
}
// associations
@BelongsTo(() => FileOperation, "importId")
+29 -5
View File
@@ -10,7 +10,7 @@ import {
import textBetween from "@shared/editor/lib/textBetween";
import { EditorStyleHelper } from "@shared/editor/styles/EditorStyleHelper";
import type { NavigationNode, ProsemirrorData } from "@shared/types";
import { IconType } from "@shared/types";
import { IconType, TextEditMode } from "@shared/types";
import { determineIconType } from "@shared/utils/icon";
import { parser, serializer, schema } from "@server/editor";
import { addTags } from "@server/logging/tracer";
@@ -472,18 +472,17 @@ export class DocumentHelper {
*
* @param document The document to apply the changes to
* @param text The markdown to apply
* @param append If true appends the markdown instead of replacing existing
* content
* @param editMode The edit mode to use: "replace" (default), "append", or "prepend"
* @returns The document
*/
static applyMarkdownToDocument(
document: Document,
text: string,
append = false
editMode: TextEditMode = TextEditMode.Replace
) {
let doc: Node;
if (append) {
if (editMode === TextEditMode.Append) {
const existingDoc = DocumentHelper.toProsemirror(document);
const newDoc = parser.parse(text);
const lastChild = existingDoc.lastChild;
@@ -508,6 +507,31 @@ export class DocumentHelper {
} else {
doc = existingDoc.copy(existingDoc.content.append(newDoc.content));
}
} else if (editMode === TextEditMode.Prepend) {
const existingDoc = DocumentHelper.toProsemirror(document);
const newDoc = parser.parse(text);
const lastChild = newDoc.lastChild;
const firstChild = existingDoc.firstChild;
if (
!text.match(/\n\s*$/) &&
lastChild &&
firstChild &&
lastChild.type.name === "paragraph" &&
firstChild.type.name === "paragraph"
) {
const mergedPara = lastChild.copy(
lastChild.content.append(firstChild.content)
);
doc = existingDoc.copy(
newDoc.content
.cut(0, newDoc.content.size - lastChild.nodeSize)
.append(Fragment.from(mergedPara))
.append(existingDoc.content.cut(firstChild.nodeSize))
);
} else {
doc = existingDoc.copy(newDoc.content.append(existingDoc.content));
}
} else {
doc = parser.parse(text);
}
@@ -93,7 +93,7 @@ exports[`#documents.update should require authentication 1`] = `
exports[`#documents.update should require text while appending 1`] = `
{
"error": "validation_error",
"message": "text is required while appending",
"message": "text is required when using append, prepend, or editMode",
"ok": false,
"status": 400,
}
+26 -5
View File
@@ -1,7 +1,7 @@
import type formidable from "formidable";
import isEmpty from "lodash/isEmpty";
import { z } from "zod";
import { DocumentPermission, StatusFilter } from "@shared/types";
import { DocumentPermission, StatusFilter, TextEditMode } from "@shared/types";
import { BaseSchema } from "@server/routes/api/schema";
import { zodIconType, zodIdType, zodShareIdType } from "@server/utils/zod";
import { ValidateColor } from "@server/validation";
@@ -252,18 +252,39 @@ export const DocumentsUpdateSchema = BaseSchema.extend({
/** Doc collection Id */
collectionId: z.string().uuid().nullish(),
/** Boolean to denote if text should be appended */
/** @deprecated Use editMode instead */
append: z.boolean().optional(),
/** The edit mode for text updates: "replace", "append", or "prepend" */
editMode: z.nativeEnum(TextEditMode).optional(),
/** @deprecated Version of the API to be used, remove in a few releases */
apiVersion: z.number().optional(),
/** Whether the editing session is complete */
done: z.boolean().optional(),
}),
}).refine((req) => !(req.body.append && !req.body.text), {
message: "text is required while appending",
});
})
.refine(
(req) =>
!(
(req.body.append ||
req.body.editMode === TextEditMode.Append ||
req.body.editMode === TextEditMode.Prepend) &&
!req.body.text
),
{
message: "text is required when using append, prepend, or editMode",
}
)
.transform((req) => {
// Transform deprecated append to editMode for backwards compatibility
if (req.body.append && !req.body.editMode) {
req.body.editMode = TextEditMode.Append;
}
delete req.body.append;
return req;
});
export type DocumentsUpdateReq = z.infer<typeof DocumentsUpdateSchema>;
+2
View File
@@ -8,6 +8,7 @@ import { Server } from "@hocuspocus/server";
import type Koa from "koa";
import WebSocket from "ws";
import { DocumentValidation } from "@shared/validations";
import { APIUpdateExtension } from "@server/collaboration/APIUpdateExtension";
import { ConnectionLimitExtension } from "@server/collaboration/ConnectionLimitExtension";
import { ViewsExtension } from "@server/collaboration/ViewsExtension";
import env from "@server/env";
@@ -58,6 +59,7 @@ export default function init(
new EditorVersionExtension(),
new AuthenticationExtension(),
new PersistenceExtension(),
new APIUpdateExtension(),
new ViewsExtension(),
new LoggerExtension(),
new MetricsExtension(),
+10
View File
@@ -588,6 +588,16 @@ export enum IconType {
Custom = "custom",
}
/** Edit modes for document text updates. */
export enum TextEditMode {
/** Replace existing content with new content (default). */
Replace = "replace",
/** Append new content to the end of the document. */
Append = "append",
/** Prepend new content to the beginning of the document. */
Prepend = "prepend",
}
export enum EmojiCategory {
People = "People",
Nature = "Nature",