From 0c0facc2a187f2e9caea95cf18115d8fc5ad6761 Mon Sep 17 00:00:00 2001 From: Tom Moor Date: Fri, 5 Jun 2026 17:53:40 -0400 Subject: [PATCH] perf: Avoid empty webhook processor work via cached subscription lookup (#12593) * Avoid empty webhook processor work via cached subscription lookup WebhookProcessor ran for every event but most teams have no matching webhook subscription, costing an empty processor job and a database query per event. Cache a team's enabled subscriptions ({ id, events }) in Redis, invalidated by model lifecycle hooks, and add an optional BaseProcessor.shouldQueue hook consulted by the global event queue so the webhook processor only enqueues a job when a matching subscription exists. Co-Authored-By: Claude Opus 4.8 * feedback --------- Co-authored-by: Claude Opus 4.8 --- .../processors/WebhookProcessor.test.ts | 47 ++++++++ .../server/processors/WebhookProcessor.ts | 35 ++++-- server/models/WebhookSubscription.test.ts | 94 ++++++++++++++++ server/models/WebhookSubscription.ts | 104 +++++++++++++++--- server/queues/processors/BaseProcessor.ts | 23 ++++ server/services/worker.ts | 9 +- server/utils/RedisPrefixHelper.ts | 10 ++ 7 files changed, 299 insertions(+), 23 deletions(-) create mode 100644 server/models/WebhookSubscription.test.ts diff --git a/plugins/webhooks/server/processors/WebhookProcessor.test.ts b/plugins/webhooks/server/processors/WebhookProcessor.test.ts index e7c34edf97..dae000d664 100644 --- a/plugins/webhooks/server/processors/WebhookProcessor.test.ts +++ b/plugins/webhooks/server/processors/WebhookProcessor.test.ts @@ -86,4 +86,51 @@ describe("WebhookProcessor", () => { subscriptionId: subscriptionTwo.id, }); }); + + describe("shouldQueue", () => { + it("returns true when a matching subscription exists", async () => { + const subscription = await buildWebhookSubscription({ + url: "http://example.com", + events: ["users"], + }); + const event: UserEvent = { + name: "users.signin", + userId: subscription.createdById, + teamId: subscription.teamId, + actorId: subscription.createdById, + ip, + }; + + expect(await WebhookProcessor.shouldQueue(event)).toBe(true); + }); + + it("returns false when no subscription matches the event", async () => { + const subscription = await buildWebhookSubscription({ + url: "http://example.com", + events: ["documents.create"], + }); + const event: UserEvent = { + name: "users.signin", + userId: subscription.createdById, + teamId: subscription.teamId, + actorId: subscription.createdById, + ip, + }; + + expect(await WebhookProcessor.shouldQueue(event)).toBe(false); + }); + + it("returns false when the team has no subscriptions", async () => { + const user = await buildUser(); + const event: UserEvent = { + name: "users.signin", + userId: user.id, + teamId: user.teamId, + actorId: user.id, + ip, + }; + + expect(await WebhookProcessor.shouldQueue(event)).toBe(false); + }); + }); }); diff --git a/plugins/webhooks/server/processors/WebhookProcessor.ts b/plugins/webhooks/server/processors/WebhookProcessor.ts index a9c410cf02..1a4b14097f 100644 --- a/plugins/webhooks/server/processors/WebhookProcessor.ts +++ b/plugins/webhooks/server/processors/WebhookProcessor.ts @@ -6,20 +6,39 @@ import DeliverWebhookTask from "../tasks/DeliverWebhookTask"; export default class WebhookProcessor extends BaseProcessor { static applicableEvents: ["*"] = ["*"]; + /** + * Only queue an event when the team has an enabled webhook subscription that + * matches it. The vast majority of events belong to teams with no applicable + * subscriptions, so this avoids creating and running an empty job for them. + * + * @param event The event about to be queued. + * @returns true if a matching subscription exists. + */ + static async shouldQueue(event: Event): Promise { + if (!event.teamId) { + return false; + } + + const subscriptions = await WebhookSubscription.findEnabledByTeamId( + event.teamId + ); + + return subscriptions.some((subscription) => + WebhookSubscription.matchEvent(subscription.events, event.name) + ); + } + async perform(event: Event) { if (!event.teamId) { return; } - const webhookSubscriptions = await WebhookSubscription.findAll({ - where: { - enabled: true, - teamId: event.teamId, - }, - }); + const subscriptions = await WebhookSubscription.findEnabledByTeamId( + event.teamId + ); - const applicableSubscriptions = webhookSubscriptions.filter((webhook) => - webhook.validForEvent(event) + const applicableSubscriptions = subscriptions.filter((subscription) => + WebhookSubscription.matchEvent(subscription.events, event.name) ); await Promise.all( diff --git a/server/models/WebhookSubscription.test.ts b/server/models/WebhookSubscription.test.ts new file mode 100644 index 0000000000..a6665f27a3 --- /dev/null +++ b/server/models/WebhookSubscription.test.ts @@ -0,0 +1,94 @@ +import { buildTeam, buildWebhookSubscription } from "@server/test/factories"; +import WebhookSubscription from "./WebhookSubscription"; + +describe("WebhookSubscription", () => { + describe("matchEvent", () => { + it("matches everything for a wildcard subscription", () => { + expect(WebhookSubscription.matchEvent(["*"], "users.signin")).toBe(true); + }); + + it("matches an exact event name", () => { + expect( + WebhookSubscription.matchEvent(["users.signin"], "users.signin") + ).toBe(true); + }); + + it("matches a namespace prefix", () => { + expect(WebhookSubscription.matchEvent(["users"], "users.signin")).toBe( + true + ); + }); + + it("does not match unrelated events", () => { + expect( + WebhookSubscription.matchEvent(["documents"], "users.signin") + ).toBe(false); + }); + }); + + describe("findEnabledByTeamId", () => { + it("returns only enabled subscriptions for the team", async () => { + const subscription = await buildWebhookSubscription({ + events: ["users"], + }); + const disabled = await buildWebhookSubscription({ + teamId: subscription.teamId, + events: ["documents"], + }); + await disabled.disable(); + + const result = await WebhookSubscription.findEnabledByTeamId( + subscription.teamId + ); + + expect(result).toHaveLength(1); + expect(result[0].id).toEqual(subscription.id); + expect(result[0].events).toEqual(["users"]); + }); + + it("returns an empty array when the team has no subscriptions", async () => { + const team = await buildTeam(); + + const result = await WebhookSubscription.findEnabledByTeamId(team.id); + + expect(result).toEqual([]); + }); + + it("reflects changes after a subscription is disabled", async () => { + const subscription = await buildWebhookSubscription({ + events: ["users"], + }); + + // prime the cache + const before = await WebhookSubscription.findEnabledByTeamId( + subscription.teamId + ); + expect(before).toHaveLength(1); + + await subscription.disable(); + + const after = await WebhookSubscription.findEnabledByTeamId( + subscription.teamId + ); + expect(after).toHaveLength(0); + }); + + it("reflects changes after a subscription is destroyed", async () => { + const subscription = await buildWebhookSubscription({ + events: ["users"], + }); + + const before = await WebhookSubscription.findEnabledByTeamId( + subscription.teamId + ); + expect(before).toHaveLength(1); + + await subscription.destroy(); + + const after = await WebhookSubscription.findEnabledByTeamId( + subscription.teamId + ); + expect(after).toHaveLength(0); + }); + }); +}); diff --git a/server/models/WebhookSubscription.ts b/server/models/WebhookSubscription.ts index 4c000adf22..9ec423e3ab 100644 --- a/server/models/WebhookSubscription.ts +++ b/server/models/WebhookSubscription.ts @@ -4,6 +4,7 @@ import type { InferAttributes, InferCreationAttributes, InstanceUpdateOptions, + Transaction, } from "sequelize"; import { Column, @@ -14,12 +15,19 @@ import { DataType, IsUrl, BeforeCreate, + AfterCreate, + AfterUpdate, + AfterDestroy, + AfterRestore, DefaultScope, AllowNull, } from "sequelize-typescript"; +import { Hour } from "@shared/utils/time"; import { WebhookSubscriptionValidation } from "@shared/validations"; import { ValidationError } from "@server/errors"; import type { Event } from "@server/types"; +import { CacheHelper } from "@server/utils/CacheHelper"; +import { RedisPrefixHelper } from "@server/utils/RedisPrefixHelper"; import Team from "./Team"; import User from "./User"; import ParanoidModel from "./base/ParanoidModel"; @@ -47,6 +55,60 @@ class WebhookSubscription extends ParanoidModel< > { static eventNamespace = "webhookSubscriptions"; + /** + * Returns the enabled webhook subscriptions for a team, caching the + * lightweight { id, events } projection in Redis to avoid a database query on + * every event. The cache is invalidated by model lifecycle hooks whenever a + * team's subscriptions change. + * + * @param teamId The team to load subscriptions for. + * @returns the enabled subscriptions' ids and subscribed event names. + */ + public static async findEnabledByTeamId( + teamId: string + ): Promise> { + return ( + (await CacheHelper.getDataOrSet>( + RedisPrefixHelper.getWebhookSubscriptionsKey(teamId), + async () => { + const subscriptions = await this.unscoped().findAll({ + attributes: ["id", "events"], + where: { enabled: true, teamId }, + raw: true, + }); + return subscriptions.map((subscription) => ({ + id: subscription.id, + events: subscription.events, + })); + }, + Hour.seconds + )) ?? [] + ); + } + + /** + * Determines whether a subscription configured for the given event names + * should receive an event with the given name. Pure so it can run against the + * cached projection as well as model instances. + * + * @param events The event names a subscription is configured for. + * @param eventName The name of the event being processed. + * @returns true if the event matches the subscription configuration. + */ + public static matchEvent(events: string[], eventName: string): boolean { + if (events.length === 1 && events[0] === "*") { + return true; + } + + for (const e of events) { + if (e === eventName || eventName.startsWith(e + ".")) { + return true; + } + } + + return false; + } + @NotEmpty @Length({ max: WebhookSubscriptionValidation.maxNameLength, @@ -105,6 +167,31 @@ class WebhookSubscription extends ParanoidModel< } } + @AfterCreate + @AfterUpdate + @AfterDestroy + @AfterRestore + static async invalidateCache( + model: WebhookSubscription, + options: { transaction?: Transaction | null } + ) { + const invalidate = () => + CacheHelper.removeData( + RedisPrefixHelper.getWebhookSubscriptionsKey(model.teamId) + ); + + // Defer invalidation until after the transaction commits so that a rollback + // does not leave the cache out of sync, and so a stale pre-commit read is + // not re-cached by a concurrent reader. Walk to the parent transaction when + // nested so the callback isn't lost when a savepoint releases. + if (options.transaction) { + const transaction = options.transaction.parent || options.transaction; + transaction.afterCommit(invalidate); + } else { + await invalidate(); + } + } + // instance methods /** @@ -130,22 +217,11 @@ class WebhookSubscription extends ParanoidModel< * Determines if an event should be processed for this webhook subscription * based on the event configuration. * - * @param event Event to ceck + * @param event Event to check * @returns true if event is valid */ - public validForEvent = (event: Event): boolean => { - if (this.events.length === 1 && this.events[0] === "*") { - return true; - } - - for (const e of this.events) { - if (e === event.name || event.name.startsWith(e + ".")) { - return true; - } - } - - return false; - }; + public validForEvent = (event: Event): boolean => + WebhookSubscription.matchEvent(this.events, event.name); /** * Calculates the signature for a webhook payload if the webhook subscription diff --git a/server/queues/processors/BaseProcessor.ts b/server/queues/processors/BaseProcessor.ts index 4406d26c3f..c33e405804 100644 --- a/server/queues/processors/BaseProcessor.ts +++ b/server/queues/processors/BaseProcessor.ts @@ -1,8 +1,31 @@ import type { Event } from "@server/types"; export default abstract class BaseProcessor { + /** + * The event names this processor handles. The global event queue only creates + * a job for the processor when an event's name is listed here, or when it + * contains the `"*"` wildcard to match every event. + */ static applicableEvents: (Event["name"] | "*")[] = []; + /** + * Optional hook run in the global event queue before a job is created for this + * processor. Implement it to cheaply opt out of events the processor will not + * act on and avoid the cost of an empty job. When omitted, every applicable + * event is queued. + * + * @param event The event about to be queued. + * @returns true if a job should be queued for this processor. + */ + static shouldQueue?: (event: Event) => Promise; + + /** + * Handle an applicable event. Called once per queued job, with retries on + * failure. + * + * @param event The event to process. + * @returns A promise that resolves once the event has been processed. + */ public abstract perform(event: Event): Promise; /** diff --git a/server/services/worker.ts b/server/services/worker.ts index a62e90cf02..9d58ace917 100644 --- a/server/services/worker.ts +++ b/server/services/worker.ts @@ -57,7 +57,14 @@ export default async function init() { ProcessorClass.applicableEvents.includes(event.name) || ProcessorClass.applicableEvents.includes("*") ) { - await processorEventQueue().add({ event, name }); + // A processor may optionally opt out of an event before a job is + // created, avoiding the cost of an empty job. + if ( + !ProcessorClass.shouldQueue || + (await ProcessorClass.shouldQueue(event)) + ) { + await processorEventQueue().add({ event, name }); + } } } catch (error) { Logger.error( diff --git a/server/utils/RedisPrefixHelper.ts b/server/utils/RedisPrefixHelper.ts index 9a0b2d0ce1..5510bdb26c 100644 --- a/server/utils/RedisPrefixHelper.ts +++ b/server/utils/RedisPrefixHelper.ts @@ -43,6 +43,16 @@ export class RedisPrefixHelper { return `uc:${userId}`; } + /** + * Gets key for caching a team's enabled webhook subscriptions. + * + * @param teamId The team ID to generate a key for. + * @returns the cache key string. + */ + public static getWebhookSubscriptionsKey(teamId: string) { + return `whs:${teamId}`; + } + /** * Gets key for caching the count of a relationship managed by the * `CounterCache` decorator.