Compare commits

...

6 Commits

Author SHA1 Message Date
Tom Moor bab2729669 Add optional chaining to avatarUrl check 2025-04-06 15:46:48 -07:00
Tom Moor 90f9721b40 Remove domain parameter and update avatarUrl check 2025-04-06 15:28:34 -07:00
codegen-sh[bot] dc474573c6 Remove avatars.ts, avatars.test.ts and update teamCreator.ts to remove generateAvatarUrl usage 2025-04-06 22:26:59 +00:00
codegen-sh[bot] a3910ce6d1 #8873: Remove usage of generateAvatarUrl and logo.clearbit.com API 2025-04-06 22:21:25 +00:00
Tom Moor f9476770ce fix: Collaboration server inaccurately counts connections (#8886)
* fix: Collaboration server inaccurately counts connections

* Add integration test

* docs
2025-04-06 21:55:10 +00:00
Hemachandar 2e018e74b8 Log fields that cause UniqueConstraintError in ImportsProcessor (#8887) 2025-04-06 11:10:59 -07:00
6 changed files with 203 additions and 92 deletions
@@ -0,0 +1,109 @@
import { Server } from "@hocuspocus/server";
import WebSocket from "ws";
import EDITOR_VERSION from "@shared/editor/version";
import { sleep } from "@server/utils/timers";
import { ConnectionLimitExtension } from "./ConnectionLimitExtension";
import { EditorVersionExtension } from "./EditorVersionExtension";
jest.mock("@server/env", () => ({
COLLABORATION_MAX_CLIENTS_PER_DOCUMENT: 2,
}));
describe("ConnectionLimitExtension", () => {
let server: typeof Server;
let extension: ConnectionLimitExtension;
const port = 12345;
const url = `ws://localhost:${port}`;
const documentName = "test";
beforeEach(async () => {
extension = new ConnectionLimitExtension();
server = Server.configure({
port,
extensions: [extension, new EditorVersionExtension()],
});
await server.listen();
});
afterEach(async () => {
await server.destroy();
});
const getConnections = () =>
extension.connectionsByDocument.get(documentName)?.size ?? 0;
const createWebSocket = (editorVersion = EDITOR_VERSION) =>
new Promise<WebSocket>((resolve, reject) => {
const ws = new WebSocket(
`${url}/${documentName}?editorVersion=${editorVersion}`
);
ws.on("open", () => resolve(ws));
ws.on("error", reject);
});
it("should allow connections within limit", async () => {
const ws1 = await createWebSocket();
const ws2 = await createWebSocket();
expect(ws1.readyState).toBe(WebSocket.OPEN);
expect(ws2.readyState).toBe(WebSocket.OPEN);
expect(getConnections()).toBe(2);
ws1.close();
ws2.close();
await sleep(250);
expect(getConnections()).toBe(0);
});
it("should close connections exceeding limit", async () => {
const ws1 = await createWebSocket();
const ws2 = await createWebSocket();
const ws3 = await createWebSocket();
await sleep(250);
expect(ws3.readyState).toBe(WebSocket.CLOSED);
expect(ws2.readyState).toBe(WebSocket.OPEN);
expect(ws1.readyState).toBe(WebSocket.OPEN);
expect(getConnections()).toBe(2);
ws1.close();
ws2.close();
await sleep(250);
expect(getConnections()).toBe(0);
});
it("should handle connections closed by other extensions", async () => {
const ws1 = await createWebSocket();
// Create a connection that will be closed by the EditorVersionExtension
const ws2 = await createWebSocket("1.0.0");
ws1.close();
ws2.close();
await sleep(250);
expect(getConnections()).toBe(0);
});
it("should allow new connection after disconnect", async () => {
const ws1 = await createWebSocket();
const ws2 = await createWebSocket();
ws1.close();
await sleep(250);
expect(getConnections()).toBe(1);
const ws3 = await createWebSocket();
expect(ws3.readyState).toBe(WebSocket.OPEN);
expect(getConnections()).toBe(2);
ws2.close();
ws3.close();
await sleep(250);
expect(getConnections()).toBe(0);
});
});
@@ -1,8 +1,10 @@
import {
Extension,
connectedPayload,
onConnectPayload,
onDisconnectPayload,
} from "@hocuspocus/server";
import pluralize from "pluralize";
import { TooManyConnections } from "@shared/collaboration/CloseEvents";
import env from "@server/env";
import Logger from "@server/logging/Logger";
@@ -14,7 +16,7 @@ export class ConnectionLimitExtension implements Extension {
/**
* Map of documentId -> connection count
*/
connectionsByDocument: Map<string, Set<string>> = new Map();
public connectionsByDocument: Map<string, Set<string>> = new Map();
/**
* On disconnect hook
@@ -34,23 +36,30 @@ export class ConnectionLimitExtension implements Extension {
}
}
const connectionCount = connections?.size ?? 0;
Logger.debug(
"multiplayer",
`${connections?.size} connections to "${documentName}"`
`${connectionCount} ${pluralize(
"connection",
connectionCount
)} to "${documentName}"`
);
return Promise.resolve();
}
/**
* On connect hook
* onConnect hook is called when a new connection has been established.
* This is where we can check if the document has reached the maximum number of
* connections and reject the connection if it has.
*
* @param data The connect payload
* @returns Promise, resolving will allow the connection, rejecting will drop it
* @param data The onConnect payload
* @returns Promise, resolving will allow the connection, rejecting will drop.
*/
onConnect({ documentName, socketId }: withContext<onConnectPayload>) {
onConnect({ documentName }: withContext<onConnectPayload>) {
const connections =
this.connectionsByDocument.get(documentName) || new Set();
if (connections?.size >= env.COLLABORATION_MAX_CLIENTS_PER_DOCUMENT) {
Logger.info(
"multiplayer",
@@ -61,12 +70,30 @@ export class ConnectionLimitExtension implements Extension {
return Promise.reject(TooManyConnections);
}
return Promise.resolve();
}
/**
* Connected hook is called after a new connection has been established.
* We can safely update the connection count for the document.
*
* @param data The onConnect payload
* @returns Promise
*/
connected({ documentName, socketId }: withContext<connectedPayload>) {
const connections =
this.connectionsByDocument.get(documentName) || new Set();
connections.add(socketId);
this.connectionsByDocument.set(documentName, connections);
const connectionCount = connections.size ?? 0;
Logger.debug(
"multiplayer",
`${connections.size} connections to "${documentName}"`
`${connectionCount} ${pluralize(
"connection",
connectionCount
)} to "${documentName}"`
);
return Promise.resolve();
+2 -9
View File
@@ -3,7 +3,6 @@ import slugify from "slugify";
import { RESERVED_SUBDOMAINS } from "@shared/utils/domains";
import { traceFunction } from "@server/logging/tracing";
import { Team, Event } from "@server/models";
import { generateAvatarUrl } from "@server/utils/avatars";
type Props = {
/** The displayed name of the team */
@@ -29,20 +28,14 @@ type Props = {
async function teamCreator({
name,
domain,
subdomain,
avatarUrl,
authenticationProviders,
ip,
transaction,
}: Props): Promise<Team> {
// If the service did not provide a logo/avatar then we attempt to generate
// one via ClearBit, or fallback to colored initials in worst case scenario
if (!avatarUrl || !avatarUrl.startsWith("http")) {
avatarUrl = await generateAvatarUrl({
domain,
id: subdomain,
});
if (!avatarUrl?.startsWith("http")) {
avatarUrl = null;
}
const team = await Team.create(
+58 -39
View File
@@ -3,7 +3,12 @@ import chunk from "lodash/chunk";
import keyBy from "lodash/keyBy";
import truncate from "lodash/truncate";
import { Fragment, Node } from "prosemirror-model";
import { CreateOptions, CreationAttributes, Transaction } from "sequelize";
import {
CreateOptions,
CreationAttributes,
Transaction,
UniqueConstraintError,
} from "sequelize";
import { v4 as uuidv4 } from "uuid";
import { randomElement } from "@shared/random";
import { ImportInput, ImportTaskInput } from "@shared/schema";
@@ -154,45 +159,59 @@ export default abstract class ImportsProcessor<
* @returns Promise that resolves when mapping and persistence is completed.
*/
private async onProcessed(importModel: Import<T>, transaction: Transaction) {
const { collections } = await this.createCollectionsAndDocuments({
importModel,
transaction,
});
// Once all collections and documents are created, update collection's document structure.
// This ensures the root documents have the whole subtree available in the structure.
for (const collection of collections) {
await Document.unscoped().findAllInBatches<Document>(
{
where: { parentDocumentId: null, collectionId: collection.id },
order: [
["createdAt", "DESC"],
["id", "ASC"],
],
transaction,
},
async (documents) => {
for (const document of documents) {
await collection.addDocumentToStructure(document, 0, {
save: false,
silent: true,
transaction,
});
}
}
);
await collection.save({ silent: true, transaction });
}
importModel.state = ImportState.Completed;
importModel.error = null; // unset any error from previous attempts.
await importModel.saveWithCtx(
createContext({
user: importModel.createdBy,
try {
const { collections } = await this.createCollectionsAndDocuments({
importModel,
transaction,
})
);
});
// Once all collections and documents are created, update collection's document structure.
// This ensures the root documents have the whole subtree available in the structure.
for (const collection of collections) {
await Document.unscoped().findAllInBatches<Document>(
{
where: { parentDocumentId: null, collectionId: collection.id },
order: [
["createdAt", "DESC"],
["id", "ASC"],
],
transaction,
},
async (documents) => {
for (const document of documents) {
await collection.addDocumentToStructure(document, 0, {
save: false,
silent: true,
transaction,
});
}
}
);
await collection.save({ silent: true, transaction });
}
importModel.state = ImportState.Completed;
importModel.error = null; // unset any error from previous attempts.
await importModel.saveWithCtx(
createContext({
user: importModel.createdBy,
transaction,
})
);
} catch (err) {
if (err instanceof UniqueConstraintError) {
Logger.error(
"ImportsProcessor persistence failed due to unique constraint error",
err,
{
fields: err.fields,
}
);
}
throw err;
}
}
/**
-9
View File
@@ -1,9 +0,0 @@
import { generateAvatarUrl } from "./avatars";
it("should return clearbit url if available", async () => {
const url = await generateAvatarUrl({
id: "google",
domain: "google.com",
});
expect(url).toBe("https://logo.clearbit.com/google.com");
});
-28
View File
@@ -1,28 +0,0 @@
import crypto from "crypto";
import fetch from "./fetch";
export async function generateAvatarUrl({
id,
domain,
}: {
id: string;
domain?: string;
}) {
// attempt to get logo from Clearbit API. If one doesn't exist then
// fall back to using tiley to generate a placeholder logo
const hash = crypto.createHash("sha256");
hash.update(id);
let cbResponse, cbUrl;
if (domain) {
cbUrl = `https://logo.clearbit.com/${domain}`;
try {
cbResponse = await fetch(cbUrl);
} catch (err) {
// okay
}
}
return cbUrl && cbResponse && cbResponse.status === 200 ? cbUrl : null;
}