Compare commits

...

2 Commits

Author SHA1 Message Date
Tom Moor b8cb10248e lint 2026-03-05 20:28:10 -05:00
Tom Moor 762a0c78c7 perf: Introduce max task timeout on queue 2026-03-05 20:23:53 -05:00
11 changed files with 29 additions and 3 deletions
+2
View File
@@ -13,6 +13,7 @@ import type {
ProsemirrorDoc,
} from "@shared/types";
import { AttachmentPreset, ImportState, ImportTaskState } from "@shared/types";
import { Hour } from "@shared/utils/time";
import { ProsemirrorHelper as SharedProseMirrorHelper } from "@shared/utils/ProsemirrorHelper";
import { createContext } from "@server/context";
import { schema } from "@server/editor";
@@ -399,6 +400,7 @@ export default abstract class APIImportTask<
type: "exponential",
delay: 60 * 1000,
},
timeout: 24 * Hour.ms,
};
}
}
@@ -33,6 +33,7 @@ export default class CleanupDeletedDocumentsTask extends CronTask {
return {
attempts: 1,
priority: TaskPriority.Background,
timeout: 30 * Minute.ms,
};
}
@@ -1,3 +1,4 @@
import { Hour } from "@shared/utils/time";
import teamPermanentDeleter from "@server/commands/teamPermanentDeleter";
import { Team } from "@server/models";
import { BaseTask, TaskPriority } from "./base/BaseTask";
@@ -20,6 +21,7 @@ export default class CleanupDeletedTeamTask extends BaseTask<Props> {
return {
attempts: 1,
priority: TaskPriority.Background,
timeout: Hour.ms,
};
}
}
+2 -1
View File
@@ -5,7 +5,7 @@ import { Event } from "@server/models";
import { TaskPriority } from "./base/BaseTask";
import type { Props } from "./base/CronTask";
import { CronTask, TaskInterval } from "./base/CronTask";
import { Minute } from "@shared/utils/time";
import { Minute, Hour } from "@shared/utils/time";
export default class CleanupOldEventsTask extends CronTask {
public async perform({ partition }: Props) {
@@ -59,6 +59,7 @@ export default class CleanupOldEventsTask extends CronTask {
return {
attempts: 1,
priority: TaskPriority.Background,
timeout: Hour.ms,
};
}
}
@@ -1,6 +1,7 @@
import { subDays } from "date-fns";
import { Op } from "sequelize";
import { ImportState } from "@shared/types";
import { Hour } from "@shared/utils/time";
import Logger from "@server/logging/Logger";
import { Import, ImportTask } from "@server/models";
import { TaskPriority } from "./base/BaseTask";
@@ -83,6 +84,7 @@ export default class CleanupOldImportsTask extends CronTask {
return {
attempts: 1,
priority: TaskPriority.Background,
timeout: Hour.ms,
};
}
}
+2
View File
@@ -2,6 +2,7 @@ import fs from "fs-extra";
import truncate from "lodash/truncate";
import type { NavigationNode } from "@shared/types";
import { FileOperationState, NotificationEventType } from "@shared/types";
import { Hour } from "@shared/utils/time";
import { bytesToHumanReadable } from "@shared/utils/files";
import ExportFailureEmail from "@server/emails/templates/ExportFailureEmail";
import ExportSuccessEmail from "@server/emails/templates/ExportSuccessEmail";
@@ -251,6 +252,7 @@ export default abstract class ExportTask extends BaseTask<Props> {
return {
priority: TaskPriority.Background,
attempts: 1,
timeout: 24 * Hour.ms,
};
}
}
+2
View File
@@ -10,6 +10,7 @@ import {
CollectionPermission,
FileOperationState,
} from "@shared/types";
import { Hour } from "@shared/utils/time";
import { CollectionValidation } from "@shared/validations";
import attachmentCreator from "@server/commands/attachmentCreator";
import documentCreator from "@server/commands/documentCreator";
@@ -532,6 +533,7 @@ export default abstract class ImportTask extends BaseTask<Props> {
return {
priority: TaskPriority.Low,
attempts: 1,
timeout: 24 * Hour.ms,
};
}
@@ -2,7 +2,7 @@ import crypto from "node:crypto";
import { setTimeout } from "node:timers/promises";
import { subWeeks } from "date-fns";
import { QueryTypes } from "sequelize";
import { Minute } from "@shared/utils/time";
import { Hour, Minute } from "@shared/utils/time";
import env from "@server/env";
import Logger from "@server/logging/Logger";
import { TaskPriority } from "./base/BaseTask";
@@ -467,6 +467,7 @@ export default class UpdateDocumentsPopularityScoreTask extends CronTask {
return {
attempts: 1,
priority: TaskPriority.Background,
timeout: Hour.ms,
};
}
}
@@ -6,6 +6,7 @@ import { sequelize } from "@server/storage/database";
import { TaskPriority } from "./base/BaseTask";
import { CronTask, TaskInterval } from "./base/CronTask";
import UpdateTeamAttachmentsSizeTask from "./UpdateTeamAttachmentsSizeTask";
import { Hour } from "@shared/utils/time";
type Props = {
limit: number;
@@ -53,6 +54,7 @@ export default class UpdateTeamsAttachmentsSizeTask extends CronTask {
return {
attempts: 1,
priority: TaskPriority.Background,
timeout: Hour.ms,
};
}
}
@@ -1,4 +1,5 @@
import { Sema } from "async-sema";
import { Hour } from "@shared/utils/time";
import Logger from "@server/logging/Logger";
import { Attachment } from "@server/models";
import FileStorage from "@server/storage/files";
@@ -68,6 +69,7 @@ export default class UploadAttachmentsForImportTask extends BaseTask<Item[]> {
return {
attempts: 3,
priority: TaskPriority.Normal,
timeout: 24 * Hour.ms,
};
}
}
+10 -1
View File
@@ -1,4 +1,5 @@
import type { Job, JobOptions } from "bull";
import { Minute } from "@shared/utils/time";
import { taskQueue } from "../../";
export enum TaskPriority {
@@ -8,6 +9,13 @@ export enum TaskPriority {
High = 10,
}
/**
* Default timeout for tasks. Tasks that do not explicitly set a timeout will
* use this value. This prevents hung tasks (e.g. waiting on a downed external
* service) from blocking the worker indefinitely.
*/
const DEFAULT_TASK_TIMEOUT = 5 * Minute.ms;
export abstract class BaseTask<T extends Record<string, any>> {
/**
* Schedule this task type to be processed asynchronously by a worker.
@@ -22,7 +30,7 @@ export abstract class BaseTask<T extends Record<string, any>> {
name: this.constructor.name,
props,
},
{ ...options, ...this.options }
{ timeout: DEFAULT_TASK_TIMEOUT, ...options, ...this.options }
);
}
@@ -56,6 +64,7 @@ export abstract class BaseTask<T extends Record<string, any>> {
type: "exponential",
delay: 60 * 1000,
},
timeout: DEFAULT_TASK_TIMEOUT,
};
}
}