mirror of
https://github.com/outline/outline.git
synced 2026-06-13 11:25:03 +03:00
Compare commits
1 Commits
tom/settings
...
tom/cron
| Author | SHA1 | Date | |
|---|---|---|---|
| 431937aa3a |
@@ -10,7 +10,7 @@ import BaseTask, {
|
||||
type Props = Record<string, never>;
|
||||
|
||||
export default class CleanupWebhookDeliveriesTask extends BaseTask<Props> {
|
||||
static cron = TaskSchedule.Daily;
|
||||
static cron = TaskSchedule.Day;
|
||||
|
||||
public async perform() {
|
||||
Logger.info("task", `Deleting WebhookDeliveries older than one week…`);
|
||||
|
||||
@@ -74,5 +74,27 @@ describe("Model", () => {
|
||||
expect(usersBatch[0].length).toEqual(100);
|
||||
expect(usersBatch[1].length).toEqual(5);
|
||||
});
|
||||
|
||||
it("should return data in batches with total limit", async () => {
|
||||
const team = await buildTeam();
|
||||
await User.bulkCreate(
|
||||
[...Array(10)].map(() => ({
|
||||
email: faker.internet.email().toLowerCase(),
|
||||
name: faker.person.fullName(),
|
||||
teamId: team.id,
|
||||
}))
|
||||
);
|
||||
|
||||
const usersBatch: User[][] = [];
|
||||
|
||||
await User.findAllInBatches<User>(
|
||||
{ where: { teamId: team.id }, batchLimit: 2, totalLimit: 4 },
|
||||
async (foundUsers) => void usersBatch.push(foundUsers)
|
||||
);
|
||||
|
||||
expect(usersBatch.length).toEqual(2);
|
||||
expect(usersBatch[0].length).toEqual(2);
|
||||
expect(usersBatch[1].length).toEqual(2);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -282,7 +282,10 @@ class Model<
|
||||
* @param callback The function to call for each batch of results
|
||||
*/
|
||||
static async findAllInBatches<T extends Model>(
|
||||
query: Replace<FindOptions<T>, "limit", "batchLimit">,
|
||||
query: Replace<FindOptions<T>, "limit", "batchLimit"> & {
|
||||
/** The maximum number of results to return, after which the query will stop. */
|
||||
totalLimit?: number;
|
||||
},
|
||||
callback: (results: Array<T>, query: FindOptions<T>) => Promise<void>
|
||||
) {
|
||||
const mappedQuery = {
|
||||
@@ -298,7 +301,10 @@ class Model<
|
||||
results = await this.findAll<T>(mappedQuery);
|
||||
await callback(results, mappedQuery);
|
||||
mappedQuery.offset += mappedQuery.limit;
|
||||
} while (results.length >= mappedQuery.limit);
|
||||
} while (
|
||||
results.length >= mappedQuery.limit &&
|
||||
(mappedQuery.totalLimit ?? Infinity) > mappedQuery.offset
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -9,8 +9,9 @@ export enum TaskPriority {
|
||||
}
|
||||
|
||||
export enum TaskSchedule {
|
||||
Daily = "daily",
|
||||
Hourly = "hourly",
|
||||
Day = "daily",
|
||||
Hour = "hourly",
|
||||
Minute = "minute",
|
||||
}
|
||||
|
||||
export default abstract class BaseTask<T extends Record<string, any>> {
|
||||
|
||||
@@ -10,7 +10,7 @@ type Props = {
|
||||
};
|
||||
|
||||
export default class CleanupDeletedDocumentsTask extends BaseTask<Props> {
|
||||
static cron = TaskSchedule.Daily;
|
||||
static cron = TaskSchedule.Hour;
|
||||
|
||||
public async perform({ limit }: Props) {
|
||||
Logger.info(
|
||||
|
||||
@@ -10,7 +10,7 @@ type Props = {
|
||||
};
|
||||
|
||||
export default class CleanupDeletedTeamsTask extends BaseTask<Props> {
|
||||
static cron = TaskSchedule.Daily;
|
||||
static cron = TaskSchedule.Hour;
|
||||
|
||||
public async perform({ limit }: Props) {
|
||||
Logger.info(
|
||||
|
||||
@@ -8,7 +8,7 @@ type Props = {
|
||||
};
|
||||
|
||||
export default class CleanupExpiredAttachmentsTask extends BaseTask<Props> {
|
||||
static cron = TaskSchedule.Daily;
|
||||
static cron = TaskSchedule.Hour;
|
||||
|
||||
public async perform({ limit }: Props) {
|
||||
Logger.info("task", `Deleting expired attachments…`);
|
||||
|
||||
@@ -10,7 +10,7 @@ type Props = {
|
||||
};
|
||||
|
||||
export default class CleanupExpiredFileOperationsTask extends BaseTask<Props> {
|
||||
static cron = TaskSchedule.Daily;
|
||||
static cron = TaskSchedule.Hour;
|
||||
|
||||
public async perform({ limit }: Props) {
|
||||
Logger.info("task", `Expiring file operations older than 15 days…`);
|
||||
|
||||
@@ -0,0 +1,60 @@
|
||||
import { subDays } from "date-fns";
|
||||
import { Op } from "sequelize";
|
||||
import Logger from "@server/logging/Logger";
|
||||
import { Event } from "@server/models";
|
||||
import BaseTask, {
|
||||
TaskPriority,
|
||||
TaskSchedule,
|
||||
} from "@server/queues/tasks/BaseTask";
|
||||
|
||||
type Props = Record<string, never>;
|
||||
|
||||
export default class CleanupOldEventsTask extends BaseTask<Props> {
|
||||
static cron = TaskSchedule.Hour;
|
||||
|
||||
public async perform() {
|
||||
// TODO: Hardcoded right now, configurable later
|
||||
const retentionDays = 365;
|
||||
const cutoffDate = subDays(new Date(), retentionDays);
|
||||
const maxEventsPerTask = 100000;
|
||||
let totalEventsDeleted = 0;
|
||||
|
||||
try {
|
||||
await Event.findAllInBatches(
|
||||
{
|
||||
attributes: ["id"],
|
||||
where: {
|
||||
createdAt: {
|
||||
[Op.lt]: cutoffDate,
|
||||
},
|
||||
},
|
||||
batchLimit: 1000,
|
||||
totalLimit: maxEventsPerTask,
|
||||
order: [["createdAt", "ASC"]],
|
||||
},
|
||||
async (events) => {
|
||||
totalEventsDeleted += await Event.destroy({
|
||||
where: {
|
||||
id: {
|
||||
[Op.in]: events.map((event) => event.id),
|
||||
},
|
||||
},
|
||||
});
|
||||
}
|
||||
);
|
||||
} finally {
|
||||
if (totalEventsDeleted > 0) {
|
||||
Logger.info("task", `Deleted old events`, {
|
||||
totalEventsDeleted,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public get options() {
|
||||
return {
|
||||
attempts: 1,
|
||||
priority: TaskPriority.Background,
|
||||
};
|
||||
}
|
||||
}
|
||||
@@ -7,7 +7,7 @@ import BaseTask, { TaskPriority, TaskSchedule } from "./BaseTask";
|
||||
type Props = Record<string, never>;
|
||||
|
||||
export default class CleanupOldNotificationsTask extends BaseTask<Props> {
|
||||
static cron = TaskSchedule.Daily;
|
||||
static cron = TaskSchedule.Hour;
|
||||
|
||||
public async perform() {
|
||||
Logger.info("task", `Permanently destroying old notifications…`);
|
||||
|
||||
@@ -10,7 +10,7 @@ type Props = {
|
||||
};
|
||||
|
||||
export default class ErrorTimedOutFileOperationsTask extends BaseTask<Props> {
|
||||
static cron = TaskSchedule.Daily;
|
||||
static cron = TaskSchedule.Hour;
|
||||
|
||||
public async perform({ limit }: Props) {
|
||||
Logger.info("task", `Error file operations running longer than 12 hours…`);
|
||||
|
||||
@@ -9,7 +9,7 @@ import BaseTask, { TaskPriority, TaskSchedule } from "./BaseTask";
|
||||
type Props = Record<string, never>;
|
||||
|
||||
export default class InviteReminderTask extends BaseTask<Props> {
|
||||
static cron = TaskSchedule.Daily;
|
||||
static cron = TaskSchedule.Day;
|
||||
|
||||
public async perform() {
|
||||
const users = await User.scope("invited").findAll({
|
||||
|
||||
@@ -11,7 +11,7 @@ type Props = {
|
||||
};
|
||||
|
||||
export default class UpdateTeamsAttachmentsSizeTask extends BaseTask<Props> {
|
||||
static cron = TaskSchedule.Daily;
|
||||
static cron = TaskSchedule.Day;
|
||||
|
||||
public async perform({ limit }: Props) {
|
||||
Logger.info(
|
||||
|
||||
@@ -3,13 +3,20 @@ import env from "@server/env";
|
||||
import { AuthenticationError } from "@server/errors";
|
||||
import validate from "@server/middlewares/validate";
|
||||
import tasks from "@server/queues/tasks";
|
||||
import { TaskSchedule } from "@server/queues/tasks/BaseTask";
|
||||
import { APIContext } from "@server/types";
|
||||
import { safeEqual } from "@server/utils/crypto";
|
||||
import * as T from "./schema";
|
||||
|
||||
const router = new Router();
|
||||
|
||||
/** Whether the minutely cron job has been received */
|
||||
const receivedPeriods = new Set<TaskSchedule>();
|
||||
|
||||
const cronHandler = async (ctx: APIContext<T.CronSchemaReq>) => {
|
||||
const period = Object.keys(TaskSchedule).includes(ctx.params.period)
|
||||
? (ctx.params.period as TaskSchedule)
|
||||
: TaskSchedule.Day;
|
||||
const token = (ctx.input.body.token ?? ctx.input.query.token) as string;
|
||||
const limit = ctx.input.body.limit ?? ctx.input.query.limit;
|
||||
|
||||
@@ -17,9 +24,26 @@ const cronHandler = async (ctx: APIContext<T.CronSchemaReq>) => {
|
||||
throw AuthenticationError("Invalid secret token");
|
||||
}
|
||||
|
||||
receivedPeriods.add(period);
|
||||
|
||||
for (const name in tasks) {
|
||||
const TaskClass = tasks[name];
|
||||
if (TaskClass.cron) {
|
||||
if (TaskClass.cron === period) {
|
||||
await TaskClass.schedule({ limit });
|
||||
|
||||
// Backwards compatibility for installations that have not set up
|
||||
// cron jobs periods other than daily.
|
||||
} else if (
|
||||
TaskClass.cron === TaskSchedule.Minute &&
|
||||
!receivedPeriods.has(TaskSchedule.Minute) &&
|
||||
(period === TaskSchedule.Hour || period === TaskSchedule.Day)
|
||||
) {
|
||||
await TaskClass.schedule({ limit });
|
||||
} else if (
|
||||
TaskClass.cron === TaskSchedule.Hour &&
|
||||
!receivedPeriods.has(TaskSchedule.Hour) &&
|
||||
period === TaskSchedule.Day
|
||||
) {
|
||||
await TaskClass.schedule({ limit });
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import { Day, Hour, Second } from "@shared/utils/time";
|
||||
import { Day, Hour, Minute, Second } from "@shared/utils/time";
|
||||
import tasks from "@server/queues/tasks";
|
||||
import { TaskSchedule } from "@server/queues/tasks/BaseTask";
|
||||
|
||||
@@ -12,13 +12,15 @@ export default function init() {
|
||||
}
|
||||
}
|
||||
|
||||
setInterval(() => void run(TaskSchedule.Daily), Day.ms);
|
||||
setInterval(() => void run(TaskSchedule.Hourly), Hour.ms);
|
||||
setInterval(() => void run(TaskSchedule.Day), Day.ms);
|
||||
setInterval(() => void run(TaskSchedule.Hour), Hour.ms);
|
||||
setInterval(() => void run(TaskSchedule.Minute), Minute.ms);
|
||||
|
||||
// Just give everything time to startup before running the first time. Not
|
||||
// _technically_ required to function.
|
||||
setTimeout(() => {
|
||||
void run(TaskSchedule.Daily);
|
||||
void run(TaskSchedule.Hourly);
|
||||
}, 30 * Second.ms);
|
||||
void run(TaskSchedule.Day);
|
||||
void run(TaskSchedule.Hour);
|
||||
void run(TaskSchedule.Minute);
|
||||
}, 5 * Second.ms);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user