Compare commits

...

1 Commits

Author SHA1 Message Date
Tom Moor 431937aa3a Add task to cleanup old events, change strategies to hourly 2025-02-15 20:57:16 -05:00
15 changed files with 135 additions and 20 deletions
@@ -10,7 +10,7 @@ import BaseTask, {
type Props = Record<string, never>;
export default class CleanupWebhookDeliveriesTask extends BaseTask<Props> {
static cron = TaskSchedule.Daily;
static cron = TaskSchedule.Day;
public async perform() {
Logger.info("task", `Deleting WebhookDeliveries older than one week…`);
+22
View File
@@ -74,5 +74,27 @@ describe("Model", () => {
expect(usersBatch[0].length).toEqual(100);
expect(usersBatch[1].length).toEqual(5);
});
it("should return data in batches with total limit", async () => {
const team = await buildTeam();
await User.bulkCreate(
[...Array(10)].map(() => ({
email: faker.internet.email().toLowerCase(),
name: faker.person.fullName(),
teamId: team.id,
}))
);
const usersBatch: User[][] = [];
await User.findAllInBatches<User>(
{ where: { teamId: team.id }, batchLimit: 2, totalLimit: 4 },
async (foundUsers) => void usersBatch.push(foundUsers)
);
expect(usersBatch.length).toEqual(2);
expect(usersBatch[0].length).toEqual(2);
expect(usersBatch[1].length).toEqual(2);
});
});
});
+8 -2
View File
@@ -282,7 +282,10 @@ class Model<
* @param callback The function to call for each batch of results
*/
static async findAllInBatches<T extends Model>(
query: Replace<FindOptions<T>, "limit", "batchLimit">,
query: Replace<FindOptions<T>, "limit", "batchLimit"> & {
/** The maximum number of results to return, after which the query will stop. */
totalLimit?: number;
},
callback: (results: Array<T>, query: FindOptions<T>) => Promise<void>
) {
const mappedQuery = {
@@ -298,7 +301,10 @@ class Model<
results = await this.findAll<T>(mappedQuery);
await callback(results, mappedQuery);
mappedQuery.offset += mappedQuery.limit;
} while (results.length >= mappedQuery.limit);
} while (
results.length >= mappedQuery.limit &&
(mappedQuery.totalLimit ?? Infinity) > mappedQuery.offset
);
}
/**
+3 -2
View File
@@ -9,8 +9,9 @@ export enum TaskPriority {
}
export enum TaskSchedule {
Daily = "daily",
Hourly = "hourly",
Day = "daily",
Hour = "hourly",
Minute = "minute",
}
export default abstract class BaseTask<T extends Record<string, any>> {
@@ -10,7 +10,7 @@ type Props = {
};
export default class CleanupDeletedDocumentsTask extends BaseTask<Props> {
static cron = TaskSchedule.Daily;
static cron = TaskSchedule.Hour;
public async perform({ limit }: Props) {
Logger.info(
@@ -10,7 +10,7 @@ type Props = {
};
export default class CleanupDeletedTeamsTask extends BaseTask<Props> {
static cron = TaskSchedule.Daily;
static cron = TaskSchedule.Hour;
public async perform({ limit }: Props) {
Logger.info(
@@ -8,7 +8,7 @@ type Props = {
};
export default class CleanupExpiredAttachmentsTask extends BaseTask<Props> {
static cron = TaskSchedule.Daily;
static cron = TaskSchedule.Hour;
public async perform({ limit }: Props) {
Logger.info("task", `Deleting expired attachments…`);
@@ -10,7 +10,7 @@ type Props = {
};
export default class CleanupExpiredFileOperationsTask extends BaseTask<Props> {
static cron = TaskSchedule.Daily;
static cron = TaskSchedule.Hour;
public async perform({ limit }: Props) {
Logger.info("task", `Expiring file operations older than 15 days…`);
@@ -0,0 +1,60 @@
import { subDays } from "date-fns";
import { Op } from "sequelize";
import Logger from "@server/logging/Logger";
import { Event } from "@server/models";
import BaseTask, {
TaskPriority,
TaskSchedule,
} from "@server/queues/tasks/BaseTask";
type Props = Record<string, never>;
export default class CleanupOldEventsTask extends BaseTask<Props> {
static cron = TaskSchedule.Hour;
public async perform() {
// TODO: Hardcoded right now, configurable later
const retentionDays = 365;
const cutoffDate = subDays(new Date(), retentionDays);
const maxEventsPerTask = 100000;
let totalEventsDeleted = 0;
try {
await Event.findAllInBatches(
{
attributes: ["id"],
where: {
createdAt: {
[Op.lt]: cutoffDate,
},
},
batchLimit: 1000,
totalLimit: maxEventsPerTask,
order: [["createdAt", "ASC"]],
},
async (events) => {
totalEventsDeleted += await Event.destroy({
where: {
id: {
[Op.in]: events.map((event) => event.id),
},
},
});
}
);
} finally {
if (totalEventsDeleted > 0) {
Logger.info("task", `Deleted old events`, {
totalEventsDeleted,
});
}
}
}
public get options() {
return {
attempts: 1,
priority: TaskPriority.Background,
};
}
}
@@ -7,7 +7,7 @@ import BaseTask, { TaskPriority, TaskSchedule } from "./BaseTask";
type Props = Record<string, never>;
export default class CleanupOldNotificationsTask extends BaseTask<Props> {
static cron = TaskSchedule.Daily;
static cron = TaskSchedule.Hour;
public async perform() {
Logger.info("task", `Permanently destroying old notifications…`);
@@ -10,7 +10,7 @@ type Props = {
};
export default class ErrorTimedOutFileOperationsTask extends BaseTask<Props> {
static cron = TaskSchedule.Daily;
static cron = TaskSchedule.Hour;
public async perform({ limit }: Props) {
Logger.info("task", `Error file operations running longer than 12 hours…`);
+1 -1
View File
@@ -9,7 +9,7 @@ import BaseTask, { TaskPriority, TaskSchedule } from "./BaseTask";
type Props = Record<string, never>;
export default class InviteReminderTask extends BaseTask<Props> {
static cron = TaskSchedule.Daily;
static cron = TaskSchedule.Day;
public async perform() {
const users = await User.scope("invited").findAll({
@@ -11,7 +11,7 @@ type Props = {
};
export default class UpdateTeamsAttachmentsSizeTask extends BaseTask<Props> {
static cron = TaskSchedule.Daily;
static cron = TaskSchedule.Day;
public async perform({ limit }: Props) {
Logger.info(
+25 -1
View File
@@ -3,13 +3,20 @@ import env from "@server/env";
import { AuthenticationError } from "@server/errors";
import validate from "@server/middlewares/validate";
import tasks from "@server/queues/tasks";
import { TaskSchedule } from "@server/queues/tasks/BaseTask";
import { APIContext } from "@server/types";
import { safeEqual } from "@server/utils/crypto";
import * as T from "./schema";
const router = new Router();
/** Whether the minutely cron job has been received */
const receivedPeriods = new Set<TaskSchedule>();
const cronHandler = async (ctx: APIContext<T.CronSchemaReq>) => {
const period = Object.keys(TaskSchedule).includes(ctx.params.period)
? (ctx.params.period as TaskSchedule)
: TaskSchedule.Day;
const token = (ctx.input.body.token ?? ctx.input.query.token) as string;
const limit = ctx.input.body.limit ?? ctx.input.query.limit;
@@ -17,9 +24,26 @@ const cronHandler = async (ctx: APIContext<T.CronSchemaReq>) => {
throw AuthenticationError("Invalid secret token");
}
receivedPeriods.add(period);
for (const name in tasks) {
const TaskClass = tasks[name];
if (TaskClass.cron) {
if (TaskClass.cron === period) {
await TaskClass.schedule({ limit });
// Backwards compatibility for installations that have not set up
// cron jobs periods other than daily.
} else if (
TaskClass.cron === TaskSchedule.Minute &&
!receivedPeriods.has(TaskSchedule.Minute) &&
(period === TaskSchedule.Hour || period === TaskSchedule.Day)
) {
await TaskClass.schedule({ limit });
} else if (
TaskClass.cron === TaskSchedule.Hour &&
!receivedPeriods.has(TaskSchedule.Hour) &&
period === TaskSchedule.Day
) {
await TaskClass.schedule({ limit });
}
}
+8 -6
View File
@@ -1,4 +1,4 @@
import { Day, Hour, Second } from "@shared/utils/time";
import { Day, Hour, Minute, Second } from "@shared/utils/time";
import tasks from "@server/queues/tasks";
import { TaskSchedule } from "@server/queues/tasks/BaseTask";
@@ -12,13 +12,15 @@ export default function init() {
}
}
setInterval(() => void run(TaskSchedule.Daily), Day.ms);
setInterval(() => void run(TaskSchedule.Hourly), Hour.ms);
setInterval(() => void run(TaskSchedule.Day), Day.ms);
setInterval(() => void run(TaskSchedule.Hour), Hour.ms);
setInterval(() => void run(TaskSchedule.Minute), Minute.ms);
// Just give everything time to startup before running the first time. Not
// _technically_ required to function.
setTimeout(() => {
void run(TaskSchedule.Daily);
void run(TaskSchedule.Hourly);
}, 30 * Second.ms);
void run(TaskSchedule.Day);
void run(TaskSchedule.Hour);
void run(TaskSchedule.Minute);
}, 5 * Second.ms);
}