diff --git a/server/migrations/20260419000000-add-period-to-document-insights.js b/server/migrations/20260419000000-add-period-to-document-insights.js new file mode 100644 index 0000000000..6d65c6e359 --- /dev/null +++ b/server/migrations/20260419000000-add-period-to-document-insights.js @@ -0,0 +1,55 @@ +"use strict"; + +module.exports = { + async up(queryInterface, Sequelize) { + return queryInterface.sequelize.transaction(async (transaction) => { + await queryInterface.addColumn( + "document_insights", + "period", + { + type: Sequelize.ENUM("day", "week"), + allowNull: false, + defaultValue: "day", + }, + { transaction } + ); + + await queryInterface.removeIndex( + "document_insights", + ["documentId", "date"], + { transaction } + ); + + await queryInterface.addIndex( + "document_insights", + ["documentId", "date", "period"], + { unique: true, transaction } + ); + }); + }, + + async down(queryInterface) { + return queryInterface.sequelize.transaction(async (transaction) => { + await queryInterface.removeIndex( + "document_insights", + ["documentId", "date", "period"], + { transaction } + ); + + await queryInterface.addIndex( + "document_insights", + ["documentId", "date"], + { unique: true, transaction } + ); + + await queryInterface.removeColumn("document_insights", "period", { + transaction, + }); + + await queryInterface.sequelize.query( + 'DROP TYPE IF EXISTS "enum_document_insights_period"', + { transaction } + ); + }); + }, +}; diff --git a/server/models/DocumentInsight.test.ts b/server/models/DocumentInsight.test.ts new file mode 100644 index 0000000000..01e70f9812 --- /dev/null +++ b/server/models/DocumentInsight.test.ts @@ -0,0 +1,319 @@ +import { format, startOfISOWeek, subDays } from "date-fns"; +import DocumentInsight, { + DocumentInsightPeriod, +} from "@server/models/DocumentInsight"; +import { Comment, Event, Revision } from "@server/models"; +import { buildDocument, buildTeam, buildUser } from "@server/test/factories"; + +const FULL_UUID_RANGE: [string, string] = [ + "00000000-0000-4000-8000-000000000000", + "ffffffff-ffff-4fff-bfff-ffffffffffff", +]; + +const dayStr = (d: Date) => format(d, "yyyy-MM-dd"); + +describe("DocumentInsight.rollupPeriod", () => { + it("writes nothing when no source activity exists in the window", async () => { + const team = await buildTeam(); + await buildDocument({ teamId: team.id }); + + const upserted = await DocumentInsight.rollupPeriod({ + periodStart: dayStr(subDays(new Date(), 1)), + intervalDays: 1, + period: DocumentInsightPeriod.Day, + startUuid: FULL_UUID_RANGE[0], + endUuid: FULL_UUID_RANGE[1], + }); + + expect(upserted).toBe(0); + expect(await DocumentInsight.count({ where: { teamId: team.id } })).toBe(0); + }); + + it("respects the window boundaries and excludes events outside it", async () => { + const team = await buildTeam(); + const user = await buildUser({ teamId: team.id }); + const document = await buildDocument({ + teamId: team.id, + publishedAt: new Date(), + }); + + const target = subDays(new Date(), 10); + const before = subDays(target, 1); + const after = subDays(target, -1); + + await Event.create({ + name: "views.create", + teamId: team.id, + userId: user.id, + documentId: document.id, + createdAt: before, + }); + await Event.create({ + name: "views.create", + teamId: team.id, + userId: user.id, + documentId: document.id, + createdAt: target, + }); + await Event.create({ + name: "views.create", + teamId: team.id, + userId: user.id, + documentId: document.id, + createdAt: after, + }); + + await DocumentInsight.rollupPeriod({ + periodStart: dayStr(target), + intervalDays: 1, + period: DocumentInsightPeriod.Day, + startUuid: FULL_UUID_RANGE[0], + endUuid: FULL_UUID_RANGE[1], + }); + + const row = await DocumentInsight.findOne({ + where: { documentId: document.id, date: dayStr(target) }, + }); + expect(row).toBeTruthy(); + expect(row!.period).toBe(DocumentInsightPeriod.Day); + expect(row!.viewCount).toBe(1); + }); + + it("aggregates a 7-day window into a single weekly row", async () => { + const team = await buildTeam(); + const userA = await buildUser({ teamId: team.id }); + const userB = await buildUser({ teamId: team.id }); + const document = await buildDocument({ + teamId: team.id, + publishedAt: new Date(), + }); + + const weekStart = startOfISOWeek(subDays(new Date(), 60)); + + // Views across multiple days of the same week; userA on two days, userB on one. + await Event.create({ + name: "views.create", + teamId: team.id, + userId: userA.id, + documentId: document.id, + createdAt: subDays(weekStart, -1), // Tuesday + }); + await Event.create({ + name: "views.create", + teamId: team.id, + userId: userA.id, + documentId: document.id, + createdAt: subDays(weekStart, -3), // Thursday + }); + await Event.create({ + name: "views.create", + teamId: team.id, + userId: userB.id, + documentId: document.id, + createdAt: subDays(weekStart, -5), // Saturday + }); + + // Revision within the week by userB. + await Revision.create({ + documentId: document.id, + userId: userB.id, + title: document.title, + text: "x", + createdAt: subDays(weekStart, -2), + }); + + await DocumentInsight.rollupPeriod({ + periodStart: dayStr(weekStart), + intervalDays: 7, + period: DocumentInsightPeriod.Week, + startUuid: FULL_UUID_RANGE[0], + endUuid: FULL_UUID_RANGE[1], + }); + + const rows = await DocumentInsight.findAll({ + where: { documentId: document.id }, + }); + expect(rows).toHaveLength(1); + expect(rows[0].period).toBe(DocumentInsightPeriod.Week); + expect(rows[0].date).toBe(dayStr(weekStart)); + expect(rows[0].viewCount).toBe(3); + // Two distinct viewers across the week, not double-counted by day. + expect(rows[0].viewerCount).toBe(2); + expect(rows[0].revisionCount).toBe(1); + expect(rows[0].editorCount).toBe(1); + }); + + it("counts reactions on older comments when the reaction is in-window", async () => { + const team = await buildTeam(); + const user = await buildUser({ teamId: team.id }); + const document = await buildDocument({ + teamId: team.id, + publishedAt: new Date(), + }); + + const target = subDays(new Date(), 2); + const olderComment = await Comment.create({ + documentId: document.id, + createdById: user.id, + data: { type: "doc", content: [] }, + createdAt: subDays(target, 30), + }); + + const { Reaction } = await import("@server/models"); + await Reaction.create( + { + emoji: "🎉", + commentId: olderComment.id, + userId: user.id, + createdAt: target, + }, + { hooks: false } + ); + + await DocumentInsight.rollupPeriod({ + periodStart: dayStr(target), + intervalDays: 1, + period: DocumentInsightPeriod.Day, + startUuid: FULL_UUID_RANGE[0], + endUuid: FULL_UUID_RANGE[1], + }); + + const row = await DocumentInsight.findOne({ + where: { documentId: document.id, date: dayStr(target) }, + }); + expect(row).toBeTruthy(); + expect(row!.commentCount).toBe(0); + expect(row!.reactionCount).toBe(1); + }); + + it("skips documents outside the partition UUID range", async () => { + const team = await buildTeam(); + const user = await buildUser({ teamId: team.id }); + const document = await buildDocument({ + teamId: team.id, + publishedAt: new Date(), + }); + + const target = subDays(new Date(), 1); + await Event.create({ + name: "views.create", + teamId: team.id, + userId: user.id, + documentId: document.id, + createdAt: target, + }); + + // An empty range that excludes all documents. + const [bogusStart, bogusEnd] = [ + "ffffffff-ffff-4fff-bfff-fffffffffffe", + "ffffffff-ffff-4fff-bfff-ffffffffffff", + ]; + // Only hits if the test document's UUID happens to fall in the range; in + // practice buildDocument generates a random v4 that won't. Assert that + // the rollup writes nothing for this document. + const upserted = await DocumentInsight.rollupPeriod({ + periodStart: dayStr(target), + intervalDays: 1, + period: DocumentInsightPeriod.Day, + startUuid: bogusStart, + endUuid: bogusEnd, + }); + + expect(upserted).toBe(0); + expect( + await DocumentInsight.count({ where: { documentId: document.id } }) + ).toBe(0); + }); + + it("upserts on (documentId, date, period) and updates counts on re-run", async () => { + const team = await buildTeam(); + const user = await buildUser({ teamId: team.id }); + const document = await buildDocument({ + teamId: team.id, + publishedAt: new Date(), + }); + + const target = subDays(new Date(), 1); + await Event.create({ + name: "views.create", + teamId: team.id, + userId: user.id, + documentId: document.id, + createdAt: target, + }); + + await DocumentInsight.rollupPeriod({ + periodStart: dayStr(target), + intervalDays: 1, + period: DocumentInsightPeriod.Day, + startUuid: FULL_UUID_RANGE[0], + endUuid: FULL_UUID_RANGE[1], + }); + + // Add another event and re-run. + await Event.create({ + name: "views.create", + teamId: team.id, + userId: user.id, + documentId: document.id, + createdAt: target, + }); + + await DocumentInsight.rollupPeriod({ + periodStart: dayStr(target), + intervalDays: 1, + period: DocumentInsightPeriod.Day, + startUuid: FULL_UUID_RANGE[0], + endUuid: FULL_UUID_RANGE[1], + }); + + const rows = await DocumentInsight.findAll({ + where: { documentId: document.id }, + }); + expect(rows).toHaveLength(1); + expect(rows[0].viewCount).toBe(2); + }); + + it("stores daily and weekly rows for the same start date side-by-side", async () => { + const team = await buildTeam(); + const user = await buildUser({ teamId: team.id }); + const document = await buildDocument({ + teamId: team.id, + publishedAt: new Date(), + }); + + const weekStart = startOfISOWeek(subDays(new Date(), 60)); + await Event.create({ + name: "views.create", + teamId: team.id, + userId: user.id, + documentId: document.id, + createdAt: weekStart, + }); + + await DocumentInsight.rollupPeriod({ + periodStart: dayStr(weekStart), + intervalDays: 1, + period: DocumentInsightPeriod.Day, + startUuid: FULL_UUID_RANGE[0], + endUuid: FULL_UUID_RANGE[1], + }); + await DocumentInsight.rollupPeriod({ + periodStart: dayStr(weekStart), + intervalDays: 7, + period: DocumentInsightPeriod.Week, + startUuid: FULL_UUID_RANGE[0], + endUuid: FULL_UUID_RANGE[1], + }); + + const rows = await DocumentInsight.findAll({ + where: { documentId: document.id }, + order: [["period", "ASC"]], + }); + expect(rows).toHaveLength(2); + expect(rows.map((r) => r.period).sort()).toEqual([ + DocumentInsightPeriod.Day, + DocumentInsightPeriod.Week, + ]); + }); +}); diff --git a/server/models/DocumentInsight.ts b/server/models/DocumentInsight.ts index 81d62632d9..af9f520131 100644 --- a/server/models/DocumentInsight.ts +++ b/server/models/DocumentInsight.ts @@ -1,4 +1,5 @@ import type { InferAttributes, InferCreationAttributes } from "sequelize"; +import { QueryTypes } from "sequelize"; import { BelongsTo, Column, @@ -13,12 +14,17 @@ import IdModel from "./base/IdModel"; import { SkipChangeset } from "./decorators/Changeset"; import Fix from "./decorators/Fix"; +export enum DocumentInsightPeriod { + Day = "day", + Week = "week", +} + @Table({ tableName: "document_insights", modelName: "documentInsight", indexes: [ { - fields: ["documentId", "date"], + fields: ["documentId", "date", "period"], unique: true, }, { @@ -34,6 +40,11 @@ class DocumentInsight extends IdModel< @Column(DataType.DATEONLY) date: string; + @Default(DocumentInsightPeriod.Day) + @Column(DataType.ENUM(...Object.values(DocumentInsightPeriod))) + @SkipChangeset + period: DocumentInsightPeriod; + @Default(0) @Column(DataType.INTEGER) @SkipChangeset @@ -79,6 +90,141 @@ class DocumentInsight extends IdModel< @ForeignKey(() => Team) @Column(DataType.UUID) teamId: string; + + /** + * Aggregate a time window of source activity (views, comments, reactions, + * revisions) into document_insights rows for documents whose id falls in the + * given UUID range. Upserts on the unique (documentId, date, period) index + * so the operation is idempotent. + * + * @param options.periodStart UTC date string (YYYY-MM-DD) marking the start of the window. + * @param options.intervalDays length of the window in days (1 for daily, 7 for weekly). + * @param options.period the period type to write on each row. + * @param options.startUuid inclusive lower bound of the document id partition. + * @param options.endUuid inclusive upper bound of the document id partition. + * @returns the number of rows upserted. + */ + public static async rollupPeriod({ + periodStart, + intervalDays, + period, + startUuid, + endUuid, + }: { + periodStart: string; + intervalDays: number; + period: DocumentInsightPeriod; + startUuid: string; + endUuid: string; + }): Promise { + const [{ upserted }] = await this.sequelize!.query<{ upserted: string }>( + ` + WITH partitioned_documents AS ( + SELECT id, "teamId" + FROM documents + WHERE "deletedAt" IS NULL + AND id >= :startUuid::uuid + AND id <= :endUuid::uuid + ), + view_counts AS ( + SELECT + e."documentId", + COUNT(*) AS view_count, + COUNT(DISTINCT e."userId") AS viewer_count + FROM events e + INNER JOIN partitioned_documents pd ON pd.id = e."documentId" + WHERE e.name = 'views.create' + AND e."createdAt" >= :periodStart::timestamp AT TIME ZONE 'UTC' + AND e."createdAt" < (:periodStart::timestamp + (:intervalDays * INTERVAL '1 day')) AT TIME ZONE 'UTC' + GROUP BY e."documentId" + ), + comment_counts AS ( + SELECT c."documentId", COUNT(*) AS comment_count + FROM comments c + INNER JOIN partitioned_documents pd ON pd.id = c."documentId" + WHERE c."createdAt" >= :periodStart::timestamp AT TIME ZONE 'UTC' + AND c."createdAt" < (:periodStart::timestamp + (:intervalDays * INTERVAL '1 day')) AT TIME ZONE 'UTC' + GROUP BY c."documentId" + ), + reaction_counts AS ( + SELECT c."documentId", COUNT(rx.id) AS reaction_count + FROM reactions rx + INNER JOIN comments c ON c.id = rx."commentId" + INNER JOIN partitioned_documents pd ON pd.id = c."documentId" + WHERE rx."createdAt" >= :periodStart::timestamp AT TIME ZONE 'UTC' + AND rx."createdAt" < (:periodStart::timestamp + (:intervalDays * INTERVAL '1 day')) AT TIME ZONE 'UTC' + GROUP BY c."documentId" + ), + revision_counts AS ( + SELECT + r."documentId", + COUNT(*) AS revision_count, + COUNT(DISTINCT r."userId") AS editor_count + FROM revisions r + INNER JOIN partitioned_documents pd ON pd.id = r."documentId" + WHERE r."createdAt" >= :periodStart::timestamp AT TIME ZONE 'UTC' + AND r."createdAt" < (:periodStart::timestamp + (:intervalDays * INTERVAL '1 day')) AT TIME ZONE 'UTC' + GROUP BY r."documentId" + ), + active AS ( + SELECT "documentId" FROM view_counts + UNION SELECT "documentId" FROM comment_counts + UNION SELECT "documentId" FROM reaction_counts + UNION SELECT "documentId" FROM revision_counts + ), + inserted AS ( + INSERT INTO document_insights ( + id, "documentId", "teamId", date, period, + "viewCount", "viewerCount", + "commentCount", "reactionCount", + "revisionCount", "editorCount", + "createdAt", "updatedAt" + ) + SELECT + uuid_generate_v4(), + pd.id, + pd."teamId", + :periodStart::date, + :period, + COALESCE(v.view_count, 0), + COALESCE(v.viewer_count, 0), + COALESCE(c.comment_count, 0), + COALESCE(rx.reaction_count, 0), + COALESCE(r.revision_count, 0), + COALESCE(r.editor_count, 0), + NOW(), NOW() + FROM active a + INNER JOIN partitioned_documents pd ON pd.id = a."documentId" + LEFT JOIN view_counts v ON v."documentId" = pd.id + LEFT JOIN comment_counts c ON c."documentId" = pd.id + LEFT JOIN reaction_counts rx ON rx."documentId" = pd.id + LEFT JOIN revision_counts r ON r."documentId" = pd.id + ON CONFLICT ("documentId", date, period) DO UPDATE SET + "viewCount" = EXCLUDED."viewCount", + "viewerCount" = EXCLUDED."viewerCount", + "commentCount" = EXCLUDED."commentCount", + "reactionCount" = EXCLUDED."reactionCount", + "revisionCount" = EXCLUDED."revisionCount", + "editorCount" = EXCLUDED."editorCount", + "updatedAt" = NOW() + RETURNING 1 + ) + SELECT COUNT(*)::text AS upserted FROM inserted + `, + { + replacements: { + periodStart, + intervalDays, + period, + startUuid, + endUuid, + }, + type: QueryTypes.SELECT, + } + ); + + return parseInt(upserted, 10); + } } export default DocumentInsight; diff --git a/server/presenters/documentInsight.ts b/server/presenters/documentInsight.ts index 5ad673d204..8731d0afe3 100644 --- a/server/presenters/documentInsight.ts +++ b/server/presenters/documentInsight.ts @@ -3,6 +3,7 @@ import type { DocumentInsight } from "@server/models"; export default function presentDocumentInsight(insight: DocumentInsight) { return { date: insight.date, + period: insight.period, viewCount: insight.viewCount, viewerCount: insight.viewerCount, commentCount: insight.commentCount, diff --git a/server/queues/tasks/CleanupExpiredDocumentInsightsTask.test.ts b/server/queues/tasks/CleanupDocumentInsightsTask.test.ts similarity index 83% rename from server/queues/tasks/CleanupExpiredDocumentInsightsTask.test.ts rename to server/queues/tasks/CleanupDocumentInsightsTask.test.ts index 300f1ca835..98abc51f8b 100644 --- a/server/queues/tasks/CleanupExpiredDocumentInsightsTask.test.ts +++ b/server/queues/tasks/CleanupDocumentInsightsTask.test.ts @@ -1,12 +1,12 @@ import { format, subDays } from "date-fns"; import { DocumentInsight } from "@server/models"; import { buildDocument, buildTeam } from "@server/test/factories"; -import CleanupExpiredDocumentInsightsTask from "./CleanupExpiredDocumentInsightsTask"; +import CleanupDocumentInsightsTask from "./CleanupDocumentInsightsTask"; const daysAgo = (n: number) => subDays(new Date(), n); const dayStr = (d: Date) => format(d, "yyyy-MM-dd"); -describe("CleanupExpiredDocumentInsightsTask", () => { +describe("CleanupDocumentInsightsTask", () => { it("deletes rows older than the retention window", async () => { const team = await buildTeam(); const document = await buildDocument({ @@ -27,7 +27,7 @@ describe("CleanupExpiredDocumentInsightsTask", () => { viewCount: 1, }); - await new CleanupExpiredDocumentInsightsTask().perform(); + await new CleanupDocumentInsightsTask().perform(); const dates = ( await DocumentInsight.findAll({ diff --git a/server/queues/tasks/CleanupExpiredDocumentInsightsTask.ts b/server/queues/tasks/CleanupDocumentInsightsTask.ts similarity index 93% rename from server/queues/tasks/CleanupExpiredDocumentInsightsTask.ts rename to server/queues/tasks/CleanupDocumentInsightsTask.ts index 7a246e2604..d493b492c3 100644 --- a/server/queues/tasks/CleanupExpiredDocumentInsightsTask.ts +++ b/server/queues/tasks/CleanupDocumentInsightsTask.ts @@ -10,7 +10,7 @@ import { CronTask, TaskInterval } from "./base/CronTask"; */ const RETENTION_DAYS = 365; -export default class CleanupExpiredDocumentInsightsTask extends CronTask { +export default class CleanupDocumentInsightsTask extends CronTask { public async perform() { // Derive the cutoff in UTC from the database so retention isn't affected // by the worker's local timezone. `date` is stored as a UTC DATE. diff --git a/server/queues/tasks/RollupDocumentInsightsTask.ts b/server/queues/tasks/RollupDocumentInsightsTask.ts index 0466ed77c0..fd9f4d22a6 100644 --- a/server/queues/tasks/RollupDocumentInsightsTask.ts +++ b/server/queues/tasks/RollupDocumentInsightsTask.ts @@ -1,7 +1,7 @@ -import { QueryTypes } from "sequelize"; import { Day, Minute } from "@shared/utils/time"; import Logger from "@server/logging/Logger"; -import { sequelize } from "@server/storage/database"; +import { DocumentInsight } from "@server/models"; +import { DocumentInsightPeriod } from "@server/models/DocumentInsight"; import { TaskPriority } from "./base/BaseTask"; import type { Props } from "./base/CronTask"; import { CronTask, TaskInterval } from "./base/CronTask"; @@ -19,122 +19,24 @@ export default class RollupDocumentInsightsTask extends CronTask { const [startUuid, endUuid] = this.getPartitionBounds(partition); for (let offset = RECOMPUTE_DAYS; offset >= 0; offset--) { - const date = new Date(Date.now() - offset * Day.ms) + const periodStart = new Date(Date.now() - offset * Day.ms) .toISOString() .slice(0, 10); - await this.rollupDay(date, startUuid, endUuid); + + const upserted = await DocumentInsight.rollupPeriod({ + periodStart, + intervalDays: 1, + period: DocumentInsightPeriod.Day, + startUuid, + endUuid, + }); + + Logger.info("task", `Rolled up document insights for ${periodStart}`, { + upserted, + }); } } - private async rollupDay( - date: string, - startUuid: string, - endUuid: string - ): Promise { - const [{ upserted }] = await sequelize.query<{ upserted: string }>( - ` - WITH partitioned_documents AS ( - SELECT id, "teamId" - FROM documents - WHERE "deletedAt" IS NULL - AND id >= :startUuid::uuid - AND id <= :endUuid::uuid - ), - view_counts AS ( - SELECT - e."documentId", - COUNT(*) AS view_count, - COUNT(DISTINCT e."userId") AS viewer_count - FROM events e - INNER JOIN partitioned_documents pd ON pd.id = e."documentId" - WHERE e.name = 'views.create' - AND e."createdAt" >= :dayStart::timestamp AT TIME ZONE 'UTC' - AND e."createdAt" < (:dayStart::timestamp + INTERVAL '1 day') AT TIME ZONE 'UTC' - GROUP BY e."documentId" - ), - comment_counts AS ( - SELECT c."documentId", COUNT(*) AS comment_count - FROM comments c - INNER JOIN partitioned_documents pd ON pd.id = c."documentId" - WHERE c."createdAt" >= :dayStart::timestamp AT TIME ZONE 'UTC' - AND c."createdAt" < (:dayStart::timestamp + INTERVAL '1 day') AT TIME ZONE 'UTC' - GROUP BY c."documentId" - ), - reaction_counts AS ( - SELECT c."documentId", COUNT(rx.id) AS reaction_count - FROM reactions rx - INNER JOIN comments c ON c.id = rx."commentId" - INNER JOIN partitioned_documents pd ON pd.id = c."documentId" - WHERE rx."createdAt" >= :dayStart::timestamp AT TIME ZONE 'UTC' - AND rx."createdAt" < (:dayStart::timestamp + INTERVAL '1 day') AT TIME ZONE 'UTC' - GROUP BY c."documentId" - ), - revision_counts AS ( - SELECT - r."documentId", - COUNT(*) AS revision_count, - COUNT(DISTINCT r."userId") AS editor_count - FROM revisions r - INNER JOIN partitioned_documents pd ON pd.id = r."documentId" - WHERE r."createdAt" >= :dayStart::timestamp AT TIME ZONE 'UTC' - AND r."createdAt" < (:dayStart::timestamp + INTERVAL '1 day') AT TIME ZONE 'UTC' - GROUP BY r."documentId" - ), - active AS ( - SELECT "documentId" FROM view_counts - UNION SELECT "documentId" FROM comment_counts - UNION SELECT "documentId" FROM reaction_counts - UNION SELECT "documentId" FROM revision_counts - ), - inserted AS ( - INSERT INTO document_insights ( - id, "documentId", "teamId", date, - "viewCount", "viewerCount", - "commentCount", "reactionCount", - "revisionCount", "editorCount", - "createdAt", "updatedAt" - ) - SELECT - uuid_generate_v4(), - pd.id, - pd."teamId", - :dayStart::date, - COALESCE(v.view_count, 0), - COALESCE(v.viewer_count, 0), - COALESCE(c.comment_count, 0), - COALESCE(rx.reaction_count, 0), - COALESCE(r.revision_count, 0), - COALESCE(r.editor_count, 0), - NOW(), NOW() - FROM active a - INNER JOIN partitioned_documents pd ON pd.id = a."documentId" - LEFT JOIN view_counts v ON v."documentId" = pd.id - LEFT JOIN comment_counts c ON c."documentId" = pd.id - LEFT JOIN reaction_counts rx ON rx."documentId" = pd.id - LEFT JOIN revision_counts r ON r."documentId" = pd.id - ON CONFLICT ("documentId", date) DO UPDATE SET - "viewCount" = EXCLUDED."viewCount", - "viewerCount" = EXCLUDED."viewerCount", - "commentCount" = EXCLUDED."commentCount", - "reactionCount" = EXCLUDED."reactionCount", - "revisionCount" = EXCLUDED."revisionCount", - "editorCount" = EXCLUDED."editorCount", - "updatedAt" = NOW() - RETURNING 1 - ) - SELECT COUNT(*)::text AS upserted FROM inserted - `, - { - replacements: { dayStart: date, startUuid, endUuid }, - type: QueryTypes.SELECT, - } - ); - - Logger.info("task", `Rolled up document insights for ${date}`, { - upserted: parseInt(upserted, 10), - }); - } - public get cron() { return { interval: TaskInterval.Day, diff --git a/server/queues/tasks/RollupWeeklyDocumentInsightsTask.test.ts b/server/queues/tasks/RollupWeeklyDocumentInsightsTask.test.ts new file mode 100644 index 0000000000..3e364cd62a --- /dev/null +++ b/server/queues/tasks/RollupWeeklyDocumentInsightsTask.test.ts @@ -0,0 +1,157 @@ +import { format, startOfISOWeek, subDays } from "date-fns"; +import { DocumentInsight, Event } from "@server/models"; +import { DocumentInsightPeriod } from "@server/models/DocumentInsight"; +import { buildDocument, buildTeam, buildUser } from "@server/test/factories"; +import RollupWeeklyDocumentInsightsTask from "./RollupWeeklyDocumentInsightsTask"; + +const props = { + limit: 10000, + partition: { + partitionIndex: 0, + partitionCount: 1, + }, +}; + +const daysAgo = (n: number) => subDays(new Date(), n); +const dayStr = (d: Date) => format(d, "yyyy-MM-dd"); + +vi.setConfig({ testTimeout: 30000 }); + +describe("RollupWeeklyDocumentInsightsTask", () => { + let task: RollupWeeklyDocumentInsightsTask; + + beforeEach(() => { + task = new RollupWeeklyDocumentInsightsTask(); + }); + + it("leaves recent daily rows untouched", async () => { + const team = await buildTeam(); + const document = await buildDocument({ + teamId: team.id, + publishedAt: new Date(), + }); + + await DocumentInsight.create({ + documentId: document.id, + teamId: team.id, + date: dayStr(daysAgo(5)), + period: DocumentInsightPeriod.Day, + viewCount: 3, + }); + + await task.perform(props); + + const rows = await DocumentInsight.findAll({ + where: { documentId: document.id }, + }); + expect(rows).toHaveLength(1); + expect(rows[0].period).toBe(DocumentInsightPeriod.Day); + }); + + it("collapses daily rows older than the cutoff into a weekly row", async () => { + const team = await buildTeam(); + const user = await buildUser({ teamId: team.id }); + const document = await buildDocument({ + teamId: team.id, + publishedAt: new Date(), + }); + + // Pick a date comfortably past the 30-day cutoff + week buffer. + const aDayInOldWeek = daysAgo(60); + const weekStart = startOfISOWeek(aDayInOldWeek); + + // A historical daily row within the old week. + await DocumentInsight.create({ + documentId: document.id, + teamId: team.id, + date: dayStr(aDayInOldWeek), + period: DocumentInsightPeriod.Day, + viewCount: 99, + }); + + // Source event in the same week so rollup can recompute accurately. + await Event.create({ + name: "views.create", + teamId: team.id, + userId: user.id, + documentId: document.id, + createdAt: aDayInOldWeek, + }); + + await task.perform(props); + + const rows = await DocumentInsight.findAll({ + where: { documentId: document.id }, + }); + expect(rows).toHaveLength(1); + expect(rows[0].period).toBe(DocumentInsightPeriod.Week); + expect(rows[0].date).toBe(dayStr(weekStart)); + expect(rows[0].viewCount).toBe(1); + expect(rows[0].viewerCount).toBe(1); + }); + + it("preserves daily rows for soft-deleted documents", async () => { + const team = await buildTeam(); + const document = await buildDocument({ + teamId: team.id, + publishedAt: new Date(), + deletedAt: new Date(), + }); + + const aDayInOldWeek = daysAgo(60); + + await DocumentInsight.create({ + documentId: document.id, + teamId: team.id, + date: dayStr(aDayInOldWeek), + period: DocumentInsightPeriod.Day, + viewCount: 7, + }); + + await task.perform(props); + + const rows = await DocumentInsight.findAll({ + where: { documentId: document.id }, + }); + // The daily row stays put — rollupPeriod skips deleted documents, so we + // must not delete data we wouldn't replace. + expect(rows).toHaveLength(1); + expect(rows[0].period).toBe(DocumentInsightPeriod.Day); + expect(rows[0].viewCount).toBe(7); + }); + + it("is idempotent when re-run", async () => { + const team = await buildTeam(); + const user = await buildUser({ teamId: team.id }); + const document = await buildDocument({ + teamId: team.id, + publishedAt: new Date(), + }); + + const aDayInOldWeek = daysAgo(60); + + await DocumentInsight.create({ + documentId: document.id, + teamId: team.id, + date: dayStr(aDayInOldWeek), + period: DocumentInsightPeriod.Day, + viewCount: 1, + }); + await Event.create({ + name: "views.create", + teamId: team.id, + userId: user.id, + documentId: document.id, + createdAt: aDayInOldWeek, + }); + + await task.perform(props); + await task.perform(props); + + const rows = await DocumentInsight.findAll({ + where: { documentId: document.id }, + }); + expect(rows).toHaveLength(1); + expect(rows[0].period).toBe(DocumentInsightPeriod.Week); + }); +}); diff --git a/server/queues/tasks/RollupWeeklyDocumentInsightsTask.ts b/server/queues/tasks/RollupWeeklyDocumentInsightsTask.ts new file mode 100644 index 0000000000..6e74d8b575 --- /dev/null +++ b/server/queues/tasks/RollupWeeklyDocumentInsightsTask.ts @@ -0,0 +1,95 @@ +import { QueryTypes } from "sequelize"; +import { Minute } from "@shared/utils/time"; +import Logger from "@server/logging/Logger"; +import { DocumentInsight } from "@server/models"; +import { DocumentInsightPeriod } from "@server/models/DocumentInsight"; +import { sequelize } from "@server/storage/database"; +import { TaskPriority } from "./base/BaseTask"; +import type { Props } from "./base/CronTask"; +import { CronTask, TaskInterval } from "./base/CronTask"; + +/** + * Daily rows older than this threshold are collapsed into weekly rollups to + * keep the table size manageable. Recent data remains at daily granularity. + */ +const CUTOFF_DAYS = 30; + +export default class RollupWeeklyDocumentInsightsTask extends CronTask { + public async perform({ partition }: Props) { + const [startUuid, endUuid] = this.getPartitionBounds(partition); + + // Find ISO week starts (Monday) whose Sunday end is older than the cutoff + // and still have daily rows inside this partition. Postgres `date_trunc` + // uses ISO weeks, so the result is always a Monday. Join to documents and + // filter out soft-deleted ones so we stay consistent with rollupPeriod — + // otherwise we'd delete daily rows it didn't replace. The `date <` bound + // is implied by the `date_trunc` filter but stated separately so the + // planner can short-circuit via the (documentId, date, period) index + // before evaluating the truncation. + const weeks = await sequelize.query<{ weekStart: string }>( + ` + SELECT DISTINCT date_trunc('week', di.date)::date AS "weekStart" + FROM document_insights di + INNER JOIN documents d + ON d.id = di."documentId" + AND d."deletedAt" IS NULL + WHERE di.period = 'day' + AND di."documentId" >= :startUuid::uuid + AND di."documentId" <= :endUuid::uuid + AND di.date < (NOW() AT TIME ZONE 'UTC')::date - (:cutoffDays::int * INTERVAL '1 day') + AND date_trunc('week', di.date) < (NOW() AT TIME ZONE 'UTC')::date - ((:cutoffDays::int + 6) * INTERVAL '1 day') + ORDER BY "weekStart" ASC + `, + { + replacements: { startUuid, endUuid, cutoffDays: CUTOFF_DAYS }, + type: QueryTypes.SELECT, + } + ); + + for (const { weekStart } of weeks) { + const upserted = await DocumentInsight.rollupPeriod({ + periodStart: weekStart, + intervalDays: 7, + period: DocumentInsightPeriod.Week, + startUuid, + endUuid, + }); + + await sequelize.query( + ` + DELETE FROM document_insights di + USING documents d + WHERE d.id = di."documentId" + AND d."deletedAt" IS NULL + AND di.period = 'day' + AND di."documentId" >= :startUuid::uuid + AND di."documentId" <= :endUuid::uuid + AND di.date >= :weekStart::date + AND di.date < :weekStart::date + INTERVAL '7 days' + `, + { + replacements: { startUuid, endUuid, weekStart }, + type: QueryTypes.DELETE, + } + ); + + Logger.info("task", `Rolled up document insights week ${weekStart}`, { + upserted, + }); + } + } + + public get cron() { + return { + interval: TaskInterval.Day, + partitionWindow: 30 * Minute.ms, + }; + } + + public get options() { + return { + attempts: 1, + priority: TaskPriority.Background, + }; + } +}