diff --git a/server/models/UserAuthentication.ts b/server/models/UserAuthentication.ts index e6a093671a..3ce1721092 100644 --- a/server/models/UserAuthentication.ts +++ b/server/models/UserAuthentication.ts @@ -88,7 +88,7 @@ class UserAuthentication extends IdModel< * @returns true if the accessToken or refreshToken is still valid */ public async validateAccess( - options: SaveOptions, + options: SaveOptions = {}, force = false ): Promise { // Check a maximum of once every 5 minutes diff --git a/server/queues/tasks/ValidateSSOAccessTask.ts b/server/queues/tasks/ValidateSSOAccessTask.ts index 3ad4c7309e..86a365c851 100644 --- a/server/queues/tasks/ValidateSSOAccessTask.ts +++ b/server/queues/tasks/ValidateSSOAccessTask.ts @@ -1,6 +1,7 @@ import Logger from "@server/logging/Logger"; import { User, UserAuthentication } from "@server/models"; -import { sequelize } from "@server/storage/database"; +import { MutexLock } from "@server/utils/MutexLock"; +import { Minute } from "@shared/utils/time"; import { BaseTask, TaskPriority } from "./base/BaseTask"; type Props = { @@ -9,51 +10,54 @@ type Props = { export default class ValidateSSOAccessTask extends BaseTask { public async perform({ userId }: Props) { - await sequelize.transaction(async (transaction) => { - const userAuthentications = await UserAuthentication.findAll({ - where: { userId }, - transaction, - lock: transaction.LOCK.UPDATE, - }); - if (userAuthentications.length === 0) { - return; + await MutexLock.using( + `validateSSO:${userId}`, + Minute.ms, + async (signal) => { + const userAuthentications = await UserAuthentication.findAll({ + where: { userId }, + }); + if (userAuthentications.length === 0) { + return; + } + + // Check the validity of the user's authentications. + let error; + const validity = await Promise.all( + userAuthentications.map(async (authentication) => { + try { + return await authentication.validateAccess(); + } catch (err) { + error = err; + return false; + } + }) + ); + + if (signal.aborted) { + throw signal.error; + } + + if (validity.some((isValid) => isValid)) { + return; + } + + // If an unexpected error occurred, throw it to trigger a retry. + if (error) { + throw error; + } + + // If all are invalid then we need to revoke the users Outline sessions. + const user = await User.findByPk(userId); + + Logger.info( + "task", + `Authentication token no longer valid for ${user?.id}` + ); + + await user?.rotateJwtSecret({}); } - - // Check the validity of the user's authentications. - let error; - const validity = await Promise.all( - userAuthentications.map(async (authentication) => { - try { - return await authentication.validateAccess({ transaction }); - } catch (err) { - error = err; - return false; - } - }) - ); - - if (validity.some((isValid) => isValid)) { - return; - } - - // If an unexpected error occurred, throw it to trigger a retry. - if (error) { - throw error; - } - - // If all are invalid then we need to revoke the users Outline sessions. - const user = await User.findByPk(userId, { - transaction, - lock: transaction.LOCK.UPDATE, - }); - - Logger.info( - "task", - `Authentication token no longer valid for ${user?.id}` - ); - - await user?.rotateJwtSecret({ transaction }); - }); + ); } public get options() { diff --git a/server/routes/api/auth/auth.ts b/server/routes/api/auth/auth.ts index b13d47a8c6..5ffbff4093 100644 --- a/server/routes/api/auth/auth.ts +++ b/server/routes/api/auth/auth.ts @@ -145,7 +145,18 @@ router.post("auth.info", auth(), async (ctx: APIContext) => { user.lastSignedInAt && user.lastSignedInAt < subHours(new Date(), 1) ) { - await new ValidateSSOAccessTask().schedule({ userId: user.id }); + await new ValidateSSOAccessTask() + .schedule( + { + userId: user.id, + }, + { + jobId: `validate-sso:${user.id}`, + } + ) + .catch(() => { + // Ignore errors from duplicate jobId when a validation is already queued + }); } ctx.body = { diff --git a/server/utils/MutexLock.ts b/server/utils/MutexLock.ts index 09ca631086..53e0f8b800 100644 --- a/server/utils/MutexLock.ts +++ b/server/utils/MutexLock.ts @@ -1,4 +1,4 @@ -import Redlock, { type Lock } from "redlock"; +import Redlock, { type Lock, type RedlockAbortSignal } from "redlock"; import Redis from "@server/storage/redis"; import ShutdownHelper, { ShutdownOrder } from "./ShutdownHelper"; @@ -46,6 +46,25 @@ export class MutexLock { return lock; } + /** + * Execute a routine in the context of an auto-extending lock. The lock is + * automatically acquired before the routine runs and released when it + * completes. If the lock cannot be extended, the provided AbortSignal will + * be triggered so the routine can bail out. + * + * @param resource The resource to lock. + * @param timeout The initial lock duration in milliseconds (auto-extended while running). + * @param routine The async routine to execute while holding the lock. + * @returns A promise that resolves with the routine's return value. + */ + public static async using( + resource: string, + timeout: number, + routine: (signal: RedlockAbortSignal) => Promise + ): Promise { + return this.lock.using([resource], timeout, routine); + } + /** * Safely release a lock *