mirror of
https://github.com/outline/outline.git
synced 2026-06-13 03:14:59 +03:00
Weekly insights rollup (#12113)
* Weekly insights rollup * fix: Avoid eager db instance creation in DocumentInsight model Importing sequelize at the top level triggered createDatabaseInstance during module load, which caused unrelated test suites that transitively require the model to fail. Use the instance-bound this.sequelize in the static method instead. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * fix: Skip soft-deleted documents in weekly insights rollup The weekly task was deleting daily rows for soft-deleted documents without creating a weekly replacement, since rollupPeriod filters them out. Join to documents in both the week-discovery query and the DELETE to keep behavior consistent — historical daily rows for deleted docs are left for the cleanup task to remove at the retention boundary. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * refactor: Bind cutoff days param and add date predicate in weekly rollup Moves CUTOFF_DAYS from string interpolation to a bound parameter and adds a plain `date <` predicate so the planner can use the (documentId, date, period) index before evaluating date_trunc. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,55 @@
|
||||
"use strict";
|
||||
|
||||
module.exports = {
|
||||
async up(queryInterface, Sequelize) {
|
||||
return queryInterface.sequelize.transaction(async (transaction) => {
|
||||
await queryInterface.addColumn(
|
||||
"document_insights",
|
||||
"period",
|
||||
{
|
||||
type: Sequelize.ENUM("day", "week"),
|
||||
allowNull: false,
|
||||
defaultValue: "day",
|
||||
},
|
||||
{ transaction }
|
||||
);
|
||||
|
||||
await queryInterface.removeIndex(
|
||||
"document_insights",
|
||||
["documentId", "date"],
|
||||
{ transaction }
|
||||
);
|
||||
|
||||
await queryInterface.addIndex(
|
||||
"document_insights",
|
||||
["documentId", "date", "period"],
|
||||
{ unique: true, transaction }
|
||||
);
|
||||
});
|
||||
},
|
||||
|
||||
async down(queryInterface) {
|
||||
return queryInterface.sequelize.transaction(async (transaction) => {
|
||||
await queryInterface.removeIndex(
|
||||
"document_insights",
|
||||
["documentId", "date", "period"],
|
||||
{ transaction }
|
||||
);
|
||||
|
||||
await queryInterface.addIndex(
|
||||
"document_insights",
|
||||
["documentId", "date"],
|
||||
{ unique: true, transaction }
|
||||
);
|
||||
|
||||
await queryInterface.removeColumn("document_insights", "period", {
|
||||
transaction,
|
||||
});
|
||||
|
||||
await queryInterface.sequelize.query(
|
||||
'DROP TYPE IF EXISTS "enum_document_insights_period"',
|
||||
{ transaction }
|
||||
);
|
||||
});
|
||||
},
|
||||
};
|
||||
@@ -0,0 +1,319 @@
|
||||
import { format, startOfISOWeek, subDays } from "date-fns";
|
||||
import DocumentInsight, {
|
||||
DocumentInsightPeriod,
|
||||
} from "@server/models/DocumentInsight";
|
||||
import { Comment, Event, Revision } from "@server/models";
|
||||
import { buildDocument, buildTeam, buildUser } from "@server/test/factories";
|
||||
|
||||
const FULL_UUID_RANGE: [string, string] = [
|
||||
"00000000-0000-4000-8000-000000000000",
|
||||
"ffffffff-ffff-4fff-bfff-ffffffffffff",
|
||||
];
|
||||
|
||||
const dayStr = (d: Date) => format(d, "yyyy-MM-dd");
|
||||
|
||||
describe("DocumentInsight.rollupPeriod", () => {
|
||||
it("writes nothing when no source activity exists in the window", async () => {
|
||||
const team = await buildTeam();
|
||||
await buildDocument({ teamId: team.id });
|
||||
|
||||
const upserted = await DocumentInsight.rollupPeriod({
|
||||
periodStart: dayStr(subDays(new Date(), 1)),
|
||||
intervalDays: 1,
|
||||
period: DocumentInsightPeriod.Day,
|
||||
startUuid: FULL_UUID_RANGE[0],
|
||||
endUuid: FULL_UUID_RANGE[1],
|
||||
});
|
||||
|
||||
expect(upserted).toBe(0);
|
||||
expect(await DocumentInsight.count({ where: { teamId: team.id } })).toBe(0);
|
||||
});
|
||||
|
||||
it("respects the window boundaries and excludes events outside it", async () => {
|
||||
const team = await buildTeam();
|
||||
const user = await buildUser({ teamId: team.id });
|
||||
const document = await buildDocument({
|
||||
teamId: team.id,
|
||||
publishedAt: new Date(),
|
||||
});
|
||||
|
||||
const target = subDays(new Date(), 10);
|
||||
const before = subDays(target, 1);
|
||||
const after = subDays(target, -1);
|
||||
|
||||
await Event.create({
|
||||
name: "views.create",
|
||||
teamId: team.id,
|
||||
userId: user.id,
|
||||
documentId: document.id,
|
||||
createdAt: before,
|
||||
});
|
||||
await Event.create({
|
||||
name: "views.create",
|
||||
teamId: team.id,
|
||||
userId: user.id,
|
||||
documentId: document.id,
|
||||
createdAt: target,
|
||||
});
|
||||
await Event.create({
|
||||
name: "views.create",
|
||||
teamId: team.id,
|
||||
userId: user.id,
|
||||
documentId: document.id,
|
||||
createdAt: after,
|
||||
});
|
||||
|
||||
await DocumentInsight.rollupPeriod({
|
||||
periodStart: dayStr(target),
|
||||
intervalDays: 1,
|
||||
period: DocumentInsightPeriod.Day,
|
||||
startUuid: FULL_UUID_RANGE[0],
|
||||
endUuid: FULL_UUID_RANGE[1],
|
||||
});
|
||||
|
||||
const row = await DocumentInsight.findOne({
|
||||
where: { documentId: document.id, date: dayStr(target) },
|
||||
});
|
||||
expect(row).toBeTruthy();
|
||||
expect(row!.period).toBe(DocumentInsightPeriod.Day);
|
||||
expect(row!.viewCount).toBe(1);
|
||||
});
|
||||
|
||||
it("aggregates a 7-day window into a single weekly row", async () => {
|
||||
const team = await buildTeam();
|
||||
const userA = await buildUser({ teamId: team.id });
|
||||
const userB = await buildUser({ teamId: team.id });
|
||||
const document = await buildDocument({
|
||||
teamId: team.id,
|
||||
publishedAt: new Date(),
|
||||
});
|
||||
|
||||
const weekStart = startOfISOWeek(subDays(new Date(), 60));
|
||||
|
||||
// Views across multiple days of the same week; userA on two days, userB on one.
|
||||
await Event.create({
|
||||
name: "views.create",
|
||||
teamId: team.id,
|
||||
userId: userA.id,
|
||||
documentId: document.id,
|
||||
createdAt: subDays(weekStart, -1), // Tuesday
|
||||
});
|
||||
await Event.create({
|
||||
name: "views.create",
|
||||
teamId: team.id,
|
||||
userId: userA.id,
|
||||
documentId: document.id,
|
||||
createdAt: subDays(weekStart, -3), // Thursday
|
||||
});
|
||||
await Event.create({
|
||||
name: "views.create",
|
||||
teamId: team.id,
|
||||
userId: userB.id,
|
||||
documentId: document.id,
|
||||
createdAt: subDays(weekStart, -5), // Saturday
|
||||
});
|
||||
|
||||
// Revision within the week by userB.
|
||||
await Revision.create({
|
||||
documentId: document.id,
|
||||
userId: userB.id,
|
||||
title: document.title,
|
||||
text: "x",
|
||||
createdAt: subDays(weekStart, -2),
|
||||
});
|
||||
|
||||
await DocumentInsight.rollupPeriod({
|
||||
periodStart: dayStr(weekStart),
|
||||
intervalDays: 7,
|
||||
period: DocumentInsightPeriod.Week,
|
||||
startUuid: FULL_UUID_RANGE[0],
|
||||
endUuid: FULL_UUID_RANGE[1],
|
||||
});
|
||||
|
||||
const rows = await DocumentInsight.findAll({
|
||||
where: { documentId: document.id },
|
||||
});
|
||||
expect(rows).toHaveLength(1);
|
||||
expect(rows[0].period).toBe(DocumentInsightPeriod.Week);
|
||||
expect(rows[0].date).toBe(dayStr(weekStart));
|
||||
expect(rows[0].viewCount).toBe(3);
|
||||
// Two distinct viewers across the week, not double-counted by day.
|
||||
expect(rows[0].viewerCount).toBe(2);
|
||||
expect(rows[0].revisionCount).toBe(1);
|
||||
expect(rows[0].editorCount).toBe(1);
|
||||
});
|
||||
|
||||
it("counts reactions on older comments when the reaction is in-window", async () => {
|
||||
const team = await buildTeam();
|
||||
const user = await buildUser({ teamId: team.id });
|
||||
const document = await buildDocument({
|
||||
teamId: team.id,
|
||||
publishedAt: new Date(),
|
||||
});
|
||||
|
||||
const target = subDays(new Date(), 2);
|
||||
const olderComment = await Comment.create({
|
||||
documentId: document.id,
|
||||
createdById: user.id,
|
||||
data: { type: "doc", content: [] },
|
||||
createdAt: subDays(target, 30),
|
||||
});
|
||||
|
||||
const { Reaction } = await import("@server/models");
|
||||
await Reaction.create(
|
||||
{
|
||||
emoji: "🎉",
|
||||
commentId: olderComment.id,
|
||||
userId: user.id,
|
||||
createdAt: target,
|
||||
},
|
||||
{ hooks: false }
|
||||
);
|
||||
|
||||
await DocumentInsight.rollupPeriod({
|
||||
periodStart: dayStr(target),
|
||||
intervalDays: 1,
|
||||
period: DocumentInsightPeriod.Day,
|
||||
startUuid: FULL_UUID_RANGE[0],
|
||||
endUuid: FULL_UUID_RANGE[1],
|
||||
});
|
||||
|
||||
const row = await DocumentInsight.findOne({
|
||||
where: { documentId: document.id, date: dayStr(target) },
|
||||
});
|
||||
expect(row).toBeTruthy();
|
||||
expect(row!.commentCount).toBe(0);
|
||||
expect(row!.reactionCount).toBe(1);
|
||||
});
|
||||
|
||||
it("skips documents outside the partition UUID range", async () => {
|
||||
const team = await buildTeam();
|
||||
const user = await buildUser({ teamId: team.id });
|
||||
const document = await buildDocument({
|
||||
teamId: team.id,
|
||||
publishedAt: new Date(),
|
||||
});
|
||||
|
||||
const target = subDays(new Date(), 1);
|
||||
await Event.create({
|
||||
name: "views.create",
|
||||
teamId: team.id,
|
||||
userId: user.id,
|
||||
documentId: document.id,
|
||||
createdAt: target,
|
||||
});
|
||||
|
||||
// An empty range that excludes all documents.
|
||||
const [bogusStart, bogusEnd] = [
|
||||
"ffffffff-ffff-4fff-bfff-fffffffffffe",
|
||||
"ffffffff-ffff-4fff-bfff-ffffffffffff",
|
||||
];
|
||||
// Only hits if the test document's UUID happens to fall in the range; in
|
||||
// practice buildDocument generates a random v4 that won't. Assert that
|
||||
// the rollup writes nothing for this document.
|
||||
const upserted = await DocumentInsight.rollupPeriod({
|
||||
periodStart: dayStr(target),
|
||||
intervalDays: 1,
|
||||
period: DocumentInsightPeriod.Day,
|
||||
startUuid: bogusStart,
|
||||
endUuid: bogusEnd,
|
||||
});
|
||||
|
||||
expect(upserted).toBe(0);
|
||||
expect(
|
||||
await DocumentInsight.count({ where: { documentId: document.id } })
|
||||
).toBe(0);
|
||||
});
|
||||
|
||||
it("upserts on (documentId, date, period) and updates counts on re-run", async () => {
|
||||
const team = await buildTeam();
|
||||
const user = await buildUser({ teamId: team.id });
|
||||
const document = await buildDocument({
|
||||
teamId: team.id,
|
||||
publishedAt: new Date(),
|
||||
});
|
||||
|
||||
const target = subDays(new Date(), 1);
|
||||
await Event.create({
|
||||
name: "views.create",
|
||||
teamId: team.id,
|
||||
userId: user.id,
|
||||
documentId: document.id,
|
||||
createdAt: target,
|
||||
});
|
||||
|
||||
await DocumentInsight.rollupPeriod({
|
||||
periodStart: dayStr(target),
|
||||
intervalDays: 1,
|
||||
period: DocumentInsightPeriod.Day,
|
||||
startUuid: FULL_UUID_RANGE[0],
|
||||
endUuid: FULL_UUID_RANGE[1],
|
||||
});
|
||||
|
||||
// Add another event and re-run.
|
||||
await Event.create({
|
||||
name: "views.create",
|
||||
teamId: team.id,
|
||||
userId: user.id,
|
||||
documentId: document.id,
|
||||
createdAt: target,
|
||||
});
|
||||
|
||||
await DocumentInsight.rollupPeriod({
|
||||
periodStart: dayStr(target),
|
||||
intervalDays: 1,
|
||||
period: DocumentInsightPeriod.Day,
|
||||
startUuid: FULL_UUID_RANGE[0],
|
||||
endUuid: FULL_UUID_RANGE[1],
|
||||
});
|
||||
|
||||
const rows = await DocumentInsight.findAll({
|
||||
where: { documentId: document.id },
|
||||
});
|
||||
expect(rows).toHaveLength(1);
|
||||
expect(rows[0].viewCount).toBe(2);
|
||||
});
|
||||
|
||||
it("stores daily and weekly rows for the same start date side-by-side", async () => {
|
||||
const team = await buildTeam();
|
||||
const user = await buildUser({ teamId: team.id });
|
||||
const document = await buildDocument({
|
||||
teamId: team.id,
|
||||
publishedAt: new Date(),
|
||||
});
|
||||
|
||||
const weekStart = startOfISOWeek(subDays(new Date(), 60));
|
||||
await Event.create({
|
||||
name: "views.create",
|
||||
teamId: team.id,
|
||||
userId: user.id,
|
||||
documentId: document.id,
|
||||
createdAt: weekStart,
|
||||
});
|
||||
|
||||
await DocumentInsight.rollupPeriod({
|
||||
periodStart: dayStr(weekStart),
|
||||
intervalDays: 1,
|
||||
period: DocumentInsightPeriod.Day,
|
||||
startUuid: FULL_UUID_RANGE[0],
|
||||
endUuid: FULL_UUID_RANGE[1],
|
||||
});
|
||||
await DocumentInsight.rollupPeriod({
|
||||
periodStart: dayStr(weekStart),
|
||||
intervalDays: 7,
|
||||
period: DocumentInsightPeriod.Week,
|
||||
startUuid: FULL_UUID_RANGE[0],
|
||||
endUuid: FULL_UUID_RANGE[1],
|
||||
});
|
||||
|
||||
const rows = await DocumentInsight.findAll({
|
||||
where: { documentId: document.id },
|
||||
order: [["period", "ASC"]],
|
||||
});
|
||||
expect(rows).toHaveLength(2);
|
||||
expect(rows.map((r) => r.period).sort()).toEqual([
|
||||
DocumentInsightPeriod.Day,
|
||||
DocumentInsightPeriod.Week,
|
||||
]);
|
||||
});
|
||||
});
|
||||
@@ -1,4 +1,5 @@
|
||||
import type { InferAttributes, InferCreationAttributes } from "sequelize";
|
||||
import { QueryTypes } from "sequelize";
|
||||
import {
|
||||
BelongsTo,
|
||||
Column,
|
||||
@@ -13,12 +14,17 @@ import IdModel from "./base/IdModel";
|
||||
import { SkipChangeset } from "./decorators/Changeset";
|
||||
import Fix from "./decorators/Fix";
|
||||
|
||||
export enum DocumentInsightPeriod {
|
||||
Day = "day",
|
||||
Week = "week",
|
||||
}
|
||||
|
||||
@Table({
|
||||
tableName: "document_insights",
|
||||
modelName: "documentInsight",
|
||||
indexes: [
|
||||
{
|
||||
fields: ["documentId", "date"],
|
||||
fields: ["documentId", "date", "period"],
|
||||
unique: true,
|
||||
},
|
||||
{
|
||||
@@ -34,6 +40,11 @@ class DocumentInsight extends IdModel<
|
||||
@Column(DataType.DATEONLY)
|
||||
date: string;
|
||||
|
||||
@Default(DocumentInsightPeriod.Day)
|
||||
@Column(DataType.ENUM(...Object.values(DocumentInsightPeriod)))
|
||||
@SkipChangeset
|
||||
period: DocumentInsightPeriod;
|
||||
|
||||
@Default(0)
|
||||
@Column(DataType.INTEGER)
|
||||
@SkipChangeset
|
||||
@@ -79,6 +90,141 @@ class DocumentInsight extends IdModel<
|
||||
@ForeignKey(() => Team)
|
||||
@Column(DataType.UUID)
|
||||
teamId: string;
|
||||
|
||||
/**
|
||||
* Aggregate a time window of source activity (views, comments, reactions,
|
||||
* revisions) into document_insights rows for documents whose id falls in the
|
||||
* given UUID range. Upserts on the unique (documentId, date, period) index
|
||||
* so the operation is idempotent.
|
||||
*
|
||||
* @param options.periodStart UTC date string (YYYY-MM-DD) marking the start of the window.
|
||||
* @param options.intervalDays length of the window in days (1 for daily, 7 for weekly).
|
||||
* @param options.period the period type to write on each row.
|
||||
* @param options.startUuid inclusive lower bound of the document id partition.
|
||||
* @param options.endUuid inclusive upper bound of the document id partition.
|
||||
* @returns the number of rows upserted.
|
||||
*/
|
||||
public static async rollupPeriod({
|
||||
periodStart,
|
||||
intervalDays,
|
||||
period,
|
||||
startUuid,
|
||||
endUuid,
|
||||
}: {
|
||||
periodStart: string;
|
||||
intervalDays: number;
|
||||
period: DocumentInsightPeriod;
|
||||
startUuid: string;
|
||||
endUuid: string;
|
||||
}): Promise<number> {
|
||||
const [{ upserted }] = await this.sequelize!.query<{ upserted: string }>(
|
||||
`
|
||||
WITH partitioned_documents AS (
|
||||
SELECT id, "teamId"
|
||||
FROM documents
|
||||
WHERE "deletedAt" IS NULL
|
||||
AND id >= :startUuid::uuid
|
||||
AND id <= :endUuid::uuid
|
||||
),
|
||||
view_counts AS (
|
||||
SELECT
|
||||
e."documentId",
|
||||
COUNT(*) AS view_count,
|
||||
COUNT(DISTINCT e."userId") AS viewer_count
|
||||
FROM events e
|
||||
INNER JOIN partitioned_documents pd ON pd.id = e."documentId"
|
||||
WHERE e.name = 'views.create'
|
||||
AND e."createdAt" >= :periodStart::timestamp AT TIME ZONE 'UTC'
|
||||
AND e."createdAt" < (:periodStart::timestamp + (:intervalDays * INTERVAL '1 day')) AT TIME ZONE 'UTC'
|
||||
GROUP BY e."documentId"
|
||||
),
|
||||
comment_counts AS (
|
||||
SELECT c."documentId", COUNT(*) AS comment_count
|
||||
FROM comments c
|
||||
INNER JOIN partitioned_documents pd ON pd.id = c."documentId"
|
||||
WHERE c."createdAt" >= :periodStart::timestamp AT TIME ZONE 'UTC'
|
||||
AND c."createdAt" < (:periodStart::timestamp + (:intervalDays * INTERVAL '1 day')) AT TIME ZONE 'UTC'
|
||||
GROUP BY c."documentId"
|
||||
),
|
||||
reaction_counts AS (
|
||||
SELECT c."documentId", COUNT(rx.id) AS reaction_count
|
||||
FROM reactions rx
|
||||
INNER JOIN comments c ON c.id = rx."commentId"
|
||||
INNER JOIN partitioned_documents pd ON pd.id = c."documentId"
|
||||
WHERE rx."createdAt" >= :periodStart::timestamp AT TIME ZONE 'UTC'
|
||||
AND rx."createdAt" < (:periodStart::timestamp + (:intervalDays * INTERVAL '1 day')) AT TIME ZONE 'UTC'
|
||||
GROUP BY c."documentId"
|
||||
),
|
||||
revision_counts AS (
|
||||
SELECT
|
||||
r."documentId",
|
||||
COUNT(*) AS revision_count,
|
||||
COUNT(DISTINCT r."userId") AS editor_count
|
||||
FROM revisions r
|
||||
INNER JOIN partitioned_documents pd ON pd.id = r."documentId"
|
||||
WHERE r."createdAt" >= :periodStart::timestamp AT TIME ZONE 'UTC'
|
||||
AND r."createdAt" < (:periodStart::timestamp + (:intervalDays * INTERVAL '1 day')) AT TIME ZONE 'UTC'
|
||||
GROUP BY r."documentId"
|
||||
),
|
||||
active AS (
|
||||
SELECT "documentId" FROM view_counts
|
||||
UNION SELECT "documentId" FROM comment_counts
|
||||
UNION SELECT "documentId" FROM reaction_counts
|
||||
UNION SELECT "documentId" FROM revision_counts
|
||||
),
|
||||
inserted AS (
|
||||
INSERT INTO document_insights (
|
||||
id, "documentId", "teamId", date, period,
|
||||
"viewCount", "viewerCount",
|
||||
"commentCount", "reactionCount",
|
||||
"revisionCount", "editorCount",
|
||||
"createdAt", "updatedAt"
|
||||
)
|
||||
SELECT
|
||||
uuid_generate_v4(),
|
||||
pd.id,
|
||||
pd."teamId",
|
||||
:periodStart::date,
|
||||
:period,
|
||||
COALESCE(v.view_count, 0),
|
||||
COALESCE(v.viewer_count, 0),
|
||||
COALESCE(c.comment_count, 0),
|
||||
COALESCE(rx.reaction_count, 0),
|
||||
COALESCE(r.revision_count, 0),
|
||||
COALESCE(r.editor_count, 0),
|
||||
NOW(), NOW()
|
||||
FROM active a
|
||||
INNER JOIN partitioned_documents pd ON pd.id = a."documentId"
|
||||
LEFT JOIN view_counts v ON v."documentId" = pd.id
|
||||
LEFT JOIN comment_counts c ON c."documentId" = pd.id
|
||||
LEFT JOIN reaction_counts rx ON rx."documentId" = pd.id
|
||||
LEFT JOIN revision_counts r ON r."documentId" = pd.id
|
||||
ON CONFLICT ("documentId", date, period) DO UPDATE SET
|
||||
"viewCount" = EXCLUDED."viewCount",
|
||||
"viewerCount" = EXCLUDED."viewerCount",
|
||||
"commentCount" = EXCLUDED."commentCount",
|
||||
"reactionCount" = EXCLUDED."reactionCount",
|
||||
"revisionCount" = EXCLUDED."revisionCount",
|
||||
"editorCount" = EXCLUDED."editorCount",
|
||||
"updatedAt" = NOW()
|
||||
RETURNING 1
|
||||
)
|
||||
SELECT COUNT(*)::text AS upserted FROM inserted
|
||||
`,
|
||||
{
|
||||
replacements: {
|
||||
periodStart,
|
||||
intervalDays,
|
||||
period,
|
||||
startUuid,
|
||||
endUuid,
|
||||
},
|
||||
type: QueryTypes.SELECT,
|
||||
}
|
||||
);
|
||||
|
||||
return parseInt(upserted, 10);
|
||||
}
|
||||
}
|
||||
|
||||
export default DocumentInsight;
|
||||
|
||||
@@ -3,6 +3,7 @@ import type { DocumentInsight } from "@server/models";
|
||||
export default function presentDocumentInsight(insight: DocumentInsight) {
|
||||
return {
|
||||
date: insight.date,
|
||||
period: insight.period,
|
||||
viewCount: insight.viewCount,
|
||||
viewerCount: insight.viewerCount,
|
||||
commentCount: insight.commentCount,
|
||||
|
||||
+3
-3
@@ -1,12 +1,12 @@
|
||||
import { format, subDays } from "date-fns";
|
||||
import { DocumentInsight } from "@server/models";
|
||||
import { buildDocument, buildTeam } from "@server/test/factories";
|
||||
import CleanupExpiredDocumentInsightsTask from "./CleanupExpiredDocumentInsightsTask";
|
||||
import CleanupDocumentInsightsTask from "./CleanupDocumentInsightsTask";
|
||||
|
||||
const daysAgo = (n: number) => subDays(new Date(), n);
|
||||
const dayStr = (d: Date) => format(d, "yyyy-MM-dd");
|
||||
|
||||
describe("CleanupExpiredDocumentInsightsTask", () => {
|
||||
describe("CleanupDocumentInsightsTask", () => {
|
||||
it("deletes rows older than the retention window", async () => {
|
||||
const team = await buildTeam();
|
||||
const document = await buildDocument({
|
||||
@@ -27,7 +27,7 @@ describe("CleanupExpiredDocumentInsightsTask", () => {
|
||||
viewCount: 1,
|
||||
});
|
||||
|
||||
await new CleanupExpiredDocumentInsightsTask().perform();
|
||||
await new CleanupDocumentInsightsTask().perform();
|
||||
|
||||
const dates = (
|
||||
await DocumentInsight.findAll({
|
||||
+1
-1
@@ -10,7 +10,7 @@ import { CronTask, TaskInterval } from "./base/CronTask";
|
||||
*/
|
||||
const RETENTION_DAYS = 365;
|
||||
|
||||
export default class CleanupExpiredDocumentInsightsTask extends CronTask {
|
||||
export default class CleanupDocumentInsightsTask extends CronTask {
|
||||
public async perform() {
|
||||
// Derive the cutoff in UTC from the database so retention isn't affected
|
||||
// by the worker's local timezone. `date` is stored as a UTC DATE.
|
||||
@@ -1,7 +1,7 @@
|
||||
import { QueryTypes } from "sequelize";
|
||||
import { Day, Minute } from "@shared/utils/time";
|
||||
import Logger from "@server/logging/Logger";
|
||||
import { sequelize } from "@server/storage/database";
|
||||
import { DocumentInsight } from "@server/models";
|
||||
import { DocumentInsightPeriod } from "@server/models/DocumentInsight";
|
||||
import { TaskPriority } from "./base/BaseTask";
|
||||
import type { Props } from "./base/CronTask";
|
||||
import { CronTask, TaskInterval } from "./base/CronTask";
|
||||
@@ -19,122 +19,24 @@ export default class RollupDocumentInsightsTask extends CronTask {
|
||||
const [startUuid, endUuid] = this.getPartitionBounds(partition);
|
||||
|
||||
for (let offset = RECOMPUTE_DAYS; offset >= 0; offset--) {
|
||||
const date = new Date(Date.now() - offset * Day.ms)
|
||||
const periodStart = new Date(Date.now() - offset * Day.ms)
|
||||
.toISOString()
|
||||
.slice(0, 10);
|
||||
await this.rollupDay(date, startUuid, endUuid);
|
||||
|
||||
const upserted = await DocumentInsight.rollupPeriod({
|
||||
periodStart,
|
||||
intervalDays: 1,
|
||||
period: DocumentInsightPeriod.Day,
|
||||
startUuid,
|
||||
endUuid,
|
||||
});
|
||||
|
||||
Logger.info("task", `Rolled up document insights for ${periodStart}`, {
|
||||
upserted,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private async rollupDay(
|
||||
date: string,
|
||||
startUuid: string,
|
||||
endUuid: string
|
||||
): Promise<void> {
|
||||
const [{ upserted }] = await sequelize.query<{ upserted: string }>(
|
||||
`
|
||||
WITH partitioned_documents AS (
|
||||
SELECT id, "teamId"
|
||||
FROM documents
|
||||
WHERE "deletedAt" IS NULL
|
||||
AND id >= :startUuid::uuid
|
||||
AND id <= :endUuid::uuid
|
||||
),
|
||||
view_counts AS (
|
||||
SELECT
|
||||
e."documentId",
|
||||
COUNT(*) AS view_count,
|
||||
COUNT(DISTINCT e."userId") AS viewer_count
|
||||
FROM events e
|
||||
INNER JOIN partitioned_documents pd ON pd.id = e."documentId"
|
||||
WHERE e.name = 'views.create'
|
||||
AND e."createdAt" >= :dayStart::timestamp AT TIME ZONE 'UTC'
|
||||
AND e."createdAt" < (:dayStart::timestamp + INTERVAL '1 day') AT TIME ZONE 'UTC'
|
||||
GROUP BY e."documentId"
|
||||
),
|
||||
comment_counts AS (
|
||||
SELECT c."documentId", COUNT(*) AS comment_count
|
||||
FROM comments c
|
||||
INNER JOIN partitioned_documents pd ON pd.id = c."documentId"
|
||||
WHERE c."createdAt" >= :dayStart::timestamp AT TIME ZONE 'UTC'
|
||||
AND c."createdAt" < (:dayStart::timestamp + INTERVAL '1 day') AT TIME ZONE 'UTC'
|
||||
GROUP BY c."documentId"
|
||||
),
|
||||
reaction_counts AS (
|
||||
SELECT c."documentId", COUNT(rx.id) AS reaction_count
|
||||
FROM reactions rx
|
||||
INNER JOIN comments c ON c.id = rx."commentId"
|
||||
INNER JOIN partitioned_documents pd ON pd.id = c."documentId"
|
||||
WHERE rx."createdAt" >= :dayStart::timestamp AT TIME ZONE 'UTC'
|
||||
AND rx."createdAt" < (:dayStart::timestamp + INTERVAL '1 day') AT TIME ZONE 'UTC'
|
||||
GROUP BY c."documentId"
|
||||
),
|
||||
revision_counts AS (
|
||||
SELECT
|
||||
r."documentId",
|
||||
COUNT(*) AS revision_count,
|
||||
COUNT(DISTINCT r."userId") AS editor_count
|
||||
FROM revisions r
|
||||
INNER JOIN partitioned_documents pd ON pd.id = r."documentId"
|
||||
WHERE r."createdAt" >= :dayStart::timestamp AT TIME ZONE 'UTC'
|
||||
AND r."createdAt" < (:dayStart::timestamp + INTERVAL '1 day') AT TIME ZONE 'UTC'
|
||||
GROUP BY r."documentId"
|
||||
),
|
||||
active AS (
|
||||
SELECT "documentId" FROM view_counts
|
||||
UNION SELECT "documentId" FROM comment_counts
|
||||
UNION SELECT "documentId" FROM reaction_counts
|
||||
UNION SELECT "documentId" FROM revision_counts
|
||||
),
|
||||
inserted AS (
|
||||
INSERT INTO document_insights (
|
||||
id, "documentId", "teamId", date,
|
||||
"viewCount", "viewerCount",
|
||||
"commentCount", "reactionCount",
|
||||
"revisionCount", "editorCount",
|
||||
"createdAt", "updatedAt"
|
||||
)
|
||||
SELECT
|
||||
uuid_generate_v4(),
|
||||
pd.id,
|
||||
pd."teamId",
|
||||
:dayStart::date,
|
||||
COALESCE(v.view_count, 0),
|
||||
COALESCE(v.viewer_count, 0),
|
||||
COALESCE(c.comment_count, 0),
|
||||
COALESCE(rx.reaction_count, 0),
|
||||
COALESCE(r.revision_count, 0),
|
||||
COALESCE(r.editor_count, 0),
|
||||
NOW(), NOW()
|
||||
FROM active a
|
||||
INNER JOIN partitioned_documents pd ON pd.id = a."documentId"
|
||||
LEFT JOIN view_counts v ON v."documentId" = pd.id
|
||||
LEFT JOIN comment_counts c ON c."documentId" = pd.id
|
||||
LEFT JOIN reaction_counts rx ON rx."documentId" = pd.id
|
||||
LEFT JOIN revision_counts r ON r."documentId" = pd.id
|
||||
ON CONFLICT ("documentId", date) DO UPDATE SET
|
||||
"viewCount" = EXCLUDED."viewCount",
|
||||
"viewerCount" = EXCLUDED."viewerCount",
|
||||
"commentCount" = EXCLUDED."commentCount",
|
||||
"reactionCount" = EXCLUDED."reactionCount",
|
||||
"revisionCount" = EXCLUDED."revisionCount",
|
||||
"editorCount" = EXCLUDED."editorCount",
|
||||
"updatedAt" = NOW()
|
||||
RETURNING 1
|
||||
)
|
||||
SELECT COUNT(*)::text AS upserted FROM inserted
|
||||
`,
|
||||
{
|
||||
replacements: { dayStart: date, startUuid, endUuid },
|
||||
type: QueryTypes.SELECT,
|
||||
}
|
||||
);
|
||||
|
||||
Logger.info("task", `Rolled up document insights for ${date}`, {
|
||||
upserted: parseInt(upserted, 10),
|
||||
});
|
||||
}
|
||||
|
||||
public get cron() {
|
||||
return {
|
||||
interval: TaskInterval.Day,
|
||||
|
||||
@@ -0,0 +1,157 @@
|
||||
import { format, startOfISOWeek, subDays } from "date-fns";
|
||||
import { DocumentInsight, Event } from "@server/models";
|
||||
import { DocumentInsightPeriod } from "@server/models/DocumentInsight";
|
||||
import { buildDocument, buildTeam, buildUser } from "@server/test/factories";
|
||||
import RollupWeeklyDocumentInsightsTask from "./RollupWeeklyDocumentInsightsTask";
|
||||
|
||||
const props = {
|
||||
limit: 10000,
|
||||
partition: {
|
||||
partitionIndex: 0,
|
||||
partitionCount: 1,
|
||||
},
|
||||
};
|
||||
|
||||
const daysAgo = (n: number) => subDays(new Date(), n);
|
||||
const dayStr = (d: Date) => format(d, "yyyy-MM-dd");
|
||||
|
||||
vi.setConfig({ testTimeout: 30000 });
|
||||
|
||||
describe("RollupWeeklyDocumentInsightsTask", () => {
|
||||
let task: RollupWeeklyDocumentInsightsTask;
|
||||
|
||||
beforeEach(() => {
|
||||
task = new RollupWeeklyDocumentInsightsTask();
|
||||
});
|
||||
|
||||
it("leaves recent daily rows untouched", async () => {
|
||||
const team = await buildTeam();
|
||||
const document = await buildDocument({
|
||||
teamId: team.id,
|
||||
publishedAt: new Date(),
|
||||
});
|
||||
|
||||
await DocumentInsight.create({
|
||||
documentId: document.id,
|
||||
teamId: team.id,
|
||||
date: dayStr(daysAgo(5)),
|
||||
period: DocumentInsightPeriod.Day,
|
||||
viewCount: 3,
|
||||
});
|
||||
|
||||
await task.perform(props);
|
||||
|
||||
const rows = await DocumentInsight.findAll({
|
||||
where: { documentId: document.id },
|
||||
});
|
||||
expect(rows).toHaveLength(1);
|
||||
expect(rows[0].period).toBe(DocumentInsightPeriod.Day);
|
||||
});
|
||||
|
||||
it("collapses daily rows older than the cutoff into a weekly row", async () => {
|
||||
const team = await buildTeam();
|
||||
const user = await buildUser({ teamId: team.id });
|
||||
const document = await buildDocument({
|
||||
teamId: team.id,
|
||||
publishedAt: new Date(),
|
||||
});
|
||||
|
||||
// Pick a date comfortably past the 30-day cutoff + week buffer.
|
||||
const aDayInOldWeek = daysAgo(60);
|
||||
const weekStart = startOfISOWeek(aDayInOldWeek);
|
||||
|
||||
// A historical daily row within the old week.
|
||||
await DocumentInsight.create({
|
||||
documentId: document.id,
|
||||
teamId: team.id,
|
||||
date: dayStr(aDayInOldWeek),
|
||||
period: DocumentInsightPeriod.Day,
|
||||
viewCount: 99,
|
||||
});
|
||||
|
||||
// Source event in the same week so rollup can recompute accurately.
|
||||
await Event.create({
|
||||
name: "views.create",
|
||||
teamId: team.id,
|
||||
userId: user.id,
|
||||
documentId: document.id,
|
||||
createdAt: aDayInOldWeek,
|
||||
});
|
||||
|
||||
await task.perform(props);
|
||||
|
||||
const rows = await DocumentInsight.findAll({
|
||||
where: { documentId: document.id },
|
||||
});
|
||||
expect(rows).toHaveLength(1);
|
||||
expect(rows[0].period).toBe(DocumentInsightPeriod.Week);
|
||||
expect(rows[0].date).toBe(dayStr(weekStart));
|
||||
expect(rows[0].viewCount).toBe(1);
|
||||
expect(rows[0].viewerCount).toBe(1);
|
||||
});
|
||||
|
||||
it("preserves daily rows for soft-deleted documents", async () => {
|
||||
const team = await buildTeam();
|
||||
const document = await buildDocument({
|
||||
teamId: team.id,
|
||||
publishedAt: new Date(),
|
||||
deletedAt: new Date(),
|
||||
});
|
||||
|
||||
const aDayInOldWeek = daysAgo(60);
|
||||
|
||||
await DocumentInsight.create({
|
||||
documentId: document.id,
|
||||
teamId: team.id,
|
||||
date: dayStr(aDayInOldWeek),
|
||||
period: DocumentInsightPeriod.Day,
|
||||
viewCount: 7,
|
||||
});
|
||||
|
||||
await task.perform(props);
|
||||
|
||||
const rows = await DocumentInsight.findAll({
|
||||
where: { documentId: document.id },
|
||||
});
|
||||
// The daily row stays put — rollupPeriod skips deleted documents, so we
|
||||
// must not delete data we wouldn't replace.
|
||||
expect(rows).toHaveLength(1);
|
||||
expect(rows[0].period).toBe(DocumentInsightPeriod.Day);
|
||||
expect(rows[0].viewCount).toBe(7);
|
||||
});
|
||||
|
||||
it("is idempotent when re-run", async () => {
|
||||
const team = await buildTeam();
|
||||
const user = await buildUser({ teamId: team.id });
|
||||
const document = await buildDocument({
|
||||
teamId: team.id,
|
||||
publishedAt: new Date(),
|
||||
});
|
||||
|
||||
const aDayInOldWeek = daysAgo(60);
|
||||
|
||||
await DocumentInsight.create({
|
||||
documentId: document.id,
|
||||
teamId: team.id,
|
||||
date: dayStr(aDayInOldWeek),
|
||||
period: DocumentInsightPeriod.Day,
|
||||
viewCount: 1,
|
||||
});
|
||||
await Event.create({
|
||||
name: "views.create",
|
||||
teamId: team.id,
|
||||
userId: user.id,
|
||||
documentId: document.id,
|
||||
createdAt: aDayInOldWeek,
|
||||
});
|
||||
|
||||
await task.perform(props);
|
||||
await task.perform(props);
|
||||
|
||||
const rows = await DocumentInsight.findAll({
|
||||
where: { documentId: document.id },
|
||||
});
|
||||
expect(rows).toHaveLength(1);
|
||||
expect(rows[0].period).toBe(DocumentInsightPeriod.Week);
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,95 @@
|
||||
import { QueryTypes } from "sequelize";
|
||||
import { Minute } from "@shared/utils/time";
|
||||
import Logger from "@server/logging/Logger";
|
||||
import { DocumentInsight } from "@server/models";
|
||||
import { DocumentInsightPeriod } from "@server/models/DocumentInsight";
|
||||
import { sequelize } from "@server/storage/database";
|
||||
import { TaskPriority } from "./base/BaseTask";
|
||||
import type { Props } from "./base/CronTask";
|
||||
import { CronTask, TaskInterval } from "./base/CronTask";
|
||||
|
||||
/**
|
||||
* Daily rows older than this threshold are collapsed into weekly rollups to
|
||||
* keep the table size manageable. Recent data remains at daily granularity.
|
||||
*/
|
||||
const CUTOFF_DAYS = 30;
|
||||
|
||||
export default class RollupWeeklyDocumentInsightsTask extends CronTask {
|
||||
public async perform({ partition }: Props) {
|
||||
const [startUuid, endUuid] = this.getPartitionBounds(partition);
|
||||
|
||||
// Find ISO week starts (Monday) whose Sunday end is older than the cutoff
|
||||
// and still have daily rows inside this partition. Postgres `date_trunc`
|
||||
// uses ISO weeks, so the result is always a Monday. Join to documents and
|
||||
// filter out soft-deleted ones so we stay consistent with rollupPeriod —
|
||||
// otherwise we'd delete daily rows it didn't replace. The `date <` bound
|
||||
// is implied by the `date_trunc` filter but stated separately so the
|
||||
// planner can short-circuit via the (documentId, date, period) index
|
||||
// before evaluating the truncation.
|
||||
const weeks = await sequelize.query<{ weekStart: string }>(
|
||||
`
|
||||
SELECT DISTINCT date_trunc('week', di.date)::date AS "weekStart"
|
||||
FROM document_insights di
|
||||
INNER JOIN documents d
|
||||
ON d.id = di."documentId"
|
||||
AND d."deletedAt" IS NULL
|
||||
WHERE di.period = 'day'
|
||||
AND di."documentId" >= :startUuid::uuid
|
||||
AND di."documentId" <= :endUuid::uuid
|
||||
AND di.date < (NOW() AT TIME ZONE 'UTC')::date - (:cutoffDays::int * INTERVAL '1 day')
|
||||
AND date_trunc('week', di.date) < (NOW() AT TIME ZONE 'UTC')::date - ((:cutoffDays::int + 6) * INTERVAL '1 day')
|
||||
ORDER BY "weekStart" ASC
|
||||
`,
|
||||
{
|
||||
replacements: { startUuid, endUuid, cutoffDays: CUTOFF_DAYS },
|
||||
type: QueryTypes.SELECT,
|
||||
}
|
||||
);
|
||||
|
||||
for (const { weekStart } of weeks) {
|
||||
const upserted = await DocumentInsight.rollupPeriod({
|
||||
periodStart: weekStart,
|
||||
intervalDays: 7,
|
||||
period: DocumentInsightPeriod.Week,
|
||||
startUuid,
|
||||
endUuid,
|
||||
});
|
||||
|
||||
await sequelize.query(
|
||||
`
|
||||
DELETE FROM document_insights di
|
||||
USING documents d
|
||||
WHERE d.id = di."documentId"
|
||||
AND d."deletedAt" IS NULL
|
||||
AND di.period = 'day'
|
||||
AND di."documentId" >= :startUuid::uuid
|
||||
AND di."documentId" <= :endUuid::uuid
|
||||
AND di.date >= :weekStart::date
|
||||
AND di.date < :weekStart::date + INTERVAL '7 days'
|
||||
`,
|
||||
{
|
||||
replacements: { startUuid, endUuid, weekStart },
|
||||
type: QueryTypes.DELETE,
|
||||
}
|
||||
);
|
||||
|
||||
Logger.info("task", `Rolled up document insights week ${weekStart}`, {
|
||||
upserted,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
public get cron() {
|
||||
return {
|
||||
interval: TaskInterval.Day,
|
||||
partitionWindow: 30 * Minute.ms,
|
||||
};
|
||||
}
|
||||
|
||||
public get options() {
|
||||
return {
|
||||
attempts: 1,
|
||||
priority: TaskPriority.Background,
|
||||
};
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user