Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
437e45b
feat(api): Add ClickHouse data seeding scripts and materialized view …
djabarovgeorge Jan 8, 2026
844f5f9
refactor(analytic-logs): Remove unused WorkflowRunOptimizeCronService…
djabarovgeorge Jan 11, 2026
038e027
refactor(analytic-logs): Replace TraceLogRepository with MessageSentC…
djabarovgeorge Jan 11, 2026
2fc70fe
feat(analytic-logs): Implement feature flag logic for message deliver…
djabarovgeorge Jan 11, 2026
6269f12
refactor(analytic-logs): Rename method for fetching message delivery …
djabarovgeorge Jan 11, 2026
8bacfe5
refactor(analytic-logs): Remove MessageSentCountsRepository as it is …
djabarovgeorge Jan 11, 2026
f35251e
refactor(analytic-logs): Replace MessageSentCountsRepository with Wor…
djabarovgeorge Jan 11, 2026
2bb1208
feat(analytic-logs): Add new method for average messages per subscrib…
djabarovgeorge Jan 11, 2026
0eb0e1f
feat(activity): Implement provider volume data retrieval
djabarovgeorge Jan 11, 2026
7dfb2d5
refactor(vscode): Update settings to exclude additional build artifac…
djabarovgeorge Jan 12, 2026
49559ae
feat(activity): Integrate feature flag logic for interaction trend an…
djabarovgeorge Jan 12, 2026
971585c
feat(activity): Enhance delivery trend chart with feature flag integr…
djabarovgeorge Jan 12, 2026
7073aaf
feat(activity): Integrate feature flag logic for workflow volume data…
djabarovgeorge Jan 12, 2026
2fb6a67
refactor(analytic-logs): Remove InteractionCountsRepository and relat…
djabarovgeorge Jan 12, 2026
6897c70
refactor(analytic-logs): Remove WorkflowVolumeCounts and related comp…
djabarovgeorge Jan 12, 2026
6325722
refactor(analytic-logs): Remove deprecated SQL migrations for workflo…
djabarovgeorge Jan 12, 2026
e04aa3b
refactor(activity): update feature flag keys for various charts
djabarovgeorge Jan 13, 2026
2f671ca
feat(analytic-logs): Add provider_id column to traces table and updat…
djabarovgeorge Jan 13, 2026
0cbd9f6
refactor(analytic-logs): Replace ProviderVolumeCounts with WorkflowAc…
djabarovgeorge Jan 13, 2026
c7a96d8
refactor(analytic-logs): Rename WorkflowActivityCounts to TraceRollup…
djabarovgeorge Jan 13, 2026
6285055
feat: update hash
djabarovgeorge Jan 13, 2026
c24394d
Merge branch 'next' into wip-improve-pref-usage-2
djabarovgeorge Jan 13, 2026
618db5c
refactor(activity): update feature flag keys for various charts to an…
djabarovgeorge Jan 13, 2026
b281e92
feat(analytics): add provider_id column to traces table and update re…
djabarovgeorge Jan 13, 2026
b83443b
fix(clickhouse-seeder): update trace event type selection logic and a…
djabarovgeorge Jan 13, 2026
86e3791
refactor(analytics): update trace rollup table and materialized view …
djabarovgeorge Jan 13, 2026
f67b588
fix(analytic-logs): change provider_id type to non-nullable string in…
djabarovgeorge Jan 13, 2026
b51bfa2
refactor(analytics): reorder columns in trace rollup table and schema…
djabarovgeorge Jan 14, 2026
80d4bce
chore: update hash to next
djabarovgeorge Jan 14, 2026
9b61f6c
chore: update hash to next
djabarovgeorge Jan 14, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
feat(activity): Implement provider volume data retrieval
  • Loading branch information
djabarovgeorge committed Jan 11, 2026
commit 0eb0e1fd625d6da41b39e7c29bc96488434aa6c6
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
-- Provider volume counts table
-- Pre-aggregates completed step runs by provider for efficient volume queries
-- Handles message volume per provider from step_runs table

CREATE TABLE IF NOT EXISTS provider_volume_counts (
date Date,
organization_id String,
environment_id String,
workflow_id String DEFAULT '',
provider_id String,
count UInt64
)
ENGINE = SummingMergeTree(count)
PARTITION BY toYYYYMM(date)
ORDER BY (organization_id, environment_id, date, workflow_id, provider_id);

-- Materialized view populates from step_runs table (completed messaging steps)
CREATE MATERIALIZED VIEW IF NOT EXISTS provider_volume_counts_mv
TO provider_volume_counts
AS SELECT
toDate(created_at) AS date,
organization_id,
environment_id,
ifNull(workflow_id, '') AS workflow_id,
ifNull(provider_id, '') AS provider_id,
1 AS count
FROM step_runs
WHERE
status = 'completed'
AND step_type IN ('in_app', 'email', 'sms', 'chat', 'push');
Original file line number Diff line number Diff line change
@@ -1,12 +1,21 @@
import { Injectable } from '@nestjs/common';
import { InstrumentUsecase, PinoLogger, StepRunRepository } from '@novu/application-generic';
import {
FeatureFlagsService,
InstrumentUsecase,
PinoLogger,
ProviderVolumeCountsRepository,
StepRunRepository,
} from '@novu/application-generic';
import { FeatureFlagsKeysEnum } from '@novu/shared';
import { ProviderVolumeDataPointDto } from '../../dtos/get-charts.response.dto';
import { BuildProviderByVolumeChartCommand } from './build-provider-by-volume-chart.command';

@Injectable()
export class BuildProviderByVolumeChart {
constructor(
private providerVolumeCountsRepository: ProviderVolumeCountsRepository,
private stepRunRepository: StepRunRepository,
private featureFlagsService: FeatureFlagsService,
private logger: PinoLogger
) {
this.logger.setContext(BuildProviderByVolumeChart.name);
Expand All @@ -16,13 +25,42 @@ export class BuildProviderByVolumeChart {
async execute(command: BuildProviderByVolumeChartCommand): Promise<ProviderVolumeDataPointDto[]> {
const { environmentId, organizationId, startDate, endDate, workflowIds } = command;

const providerData = await this.stepRunRepository.getProviderVolumeData(
environmentId,
organizationId,
startDate,
endDate,
workflowIds
);
const featureFlagContext = {
organization: { _id: organizationId },
environment: { _id: environmentId },
};

const [isGlobalEnabled, isDedicatedEnabled] = await Promise.all([
this.featureFlagsService.getFlag({
key: FeatureFlagsKeysEnum.IS_ANALYTIC_V2_LOGS_READ_GLOBAL_ENABLED,
defaultValue: false,
...featureFlagContext,
}),
this.featureFlagsService.getFlag({
key: FeatureFlagsKeysEnum.IS_V2_PROVIDER_VOLUME_READ_ENABLED,
defaultValue: false,
...featureFlagContext,
}),
]);
console.log('@@@@isGlobalEnabled', isGlobalEnabled);

const useNewQuery = isGlobalEnabled || isDedicatedEnabled;

const providerData = useNewQuery
? await this.providerVolumeCountsRepository.getProviderVolumeData(
environmentId,
organizationId,
startDate,
endDate,
workflowIds
)
: await this.stepRunRepository.getProviderVolumeData(
environmentId,
organizationId,
startDate,
endDate,
workflowIds
);

return providerData.map((dataPoint) => ({
providerId: dataPoint.provider_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,10 @@ export class GetCharts {
const buffer = 1 * 60 * 60 * 1000; // 1 hour
const bufferedEarliestAllowedDate = new Date(earliestAllowedDate.getTime() - buffer);

if (startDate < bufferedEarliestAllowedDate || endDate < bufferedEarliestAllowedDate) {
if (
process.env.NODE_ENV !== 'local' &&
(startDate < bufferedEarliestAllowedDate || endDate < bufferedEarliestAllowedDate)
) {
Comment on lines +354 to +357
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i wonder if to keep this in the code process.env.NODE_ENV !== 'local'
it is pretty annoying to develop with this limit locally

throw new HttpException(
`Requested date range exceeds your plan's retention period. ` +
`The earliest accessible date for your plan is ${earliestAllowedDate.toISOString().split('T')[0]}. ` +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,10 @@ export class GetActivityFeed {
const buffer = 1 * 60 * 60 * 1000; // 1 hour
const bufferedEarliestAllowedDate = new Date(earliestAllowedDate.getTime() - buffer);

if (afterDate < bufferedEarliestAllowedDate || beforeDate < bufferedEarliestAllowedDate) {
if (
process.env.NODE_ENV !== 'local' &&
(afterDate < bufferedEarliestAllowedDate || beforeDate < bufferedEarliestAllowedDate)
) {
throw new HttpException(
`Requested date range exceeds your plan's retention period. ` +
`The earliest accessible date for your plan is ${earliestAllowedDate.toISOString().split('T')[0]}. ` +
Expand Down
4 changes: 3 additions & 1 deletion apps/api/src/app/shared/shared.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@ import {
GetDecryptedSecretKey,
InvalidateCacheService,
LoggerModule,
WorkflowActivityCountsRepository,
ProviderVolumeCountsRepository,
QueuesModule,
RequestLogRepository,
StepRunRepository,
storageService,
TraceLogRepository,
WorkflowActivityCountsRepository,
WorkflowRunRepository,
} from '@novu/application-generic';
import {
Expand Down Expand Up @@ -122,6 +123,7 @@ const ANALYTICS_PROVIDERS = [
StepRunRepository,
WorkflowRunRepository,
WorkflowActivityCountsRepository,
ProviderVolumeCountsRepository,

// Services
clickHouseService,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
export { createClient as createClickHouseClient } from '@clickhouse/client';
export * from './clickhouse.service';
export * from './log.repository';
export * from './provider-volume-counts';
export * from './request-log';
export { StepRun, StepRunFinalStatus, StepRunNonFinalStatus, StepRunRepository, StepRunStatus } from './step-run';
export { EventType, mapEventTypeToTitle, Trace, TraceLogRepository, TraceStatus, traceLogSchema } from './trace-log';
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export * from './provider-volume-counts.repository';
export * from './provider-volume-counts.schema';
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import { Injectable } from '@nestjs/common';
import { PinoLogger } from 'nestjs-pino';
import { FeatureFlagsService } from '../../feature-flags/feature-flags.service';
import { ClickHouseService } from '../clickhouse.service';
import { LogRepository } from '../log.repository';
import {
PROVIDER_VOLUME_COUNTS_ORDER_BY,
PROVIDER_VOLUME_COUNTS_TABLE_NAME,
ProviderVolumeCount,
providerVolumeCountsSchema,
} from './provider-volume-counts.schema';

@Injectable()
export class ProviderVolumeCountsRepository extends LogRepository<
typeof providerVolumeCountsSchema,
ProviderVolumeCount
> {
public readonly table = PROVIDER_VOLUME_COUNTS_TABLE_NAME;
public readonly identifierPrefix = 'pvc_';

constructor(
protected readonly clickhouseService: ClickHouseService,
protected readonly logger: PinoLogger,
protected readonly featureFlagsService: FeatureFlagsService
) {
super(
clickhouseService,
logger,
providerVolumeCountsSchema,
PROVIDER_VOLUME_COUNTS_ORDER_BY,
featureFlagsService
);
this.logger.setContext(this.constructor.name);
}

async getProviderVolumeData(
environmentId: string,
organizationId: string,
startDate: Date,
endDate: Date,
workflowIds?: string[]
): Promise<Array<{ provider_id: string; count: string }>> {
const workflowFilter =
workflowIds && workflowIds.length > 0 ? 'AND workflow_id IN {workflowIds:Array(String)}' : '';

const query = `
SELECT
provider_id,
sum(count) as count
FROM ${PROVIDER_VOLUME_COUNTS_TABLE_NAME}
WHERE
environment_id = {environmentId:String}
AND organization_id = {organizationId:String}
AND date >= {startDate:Date}
AND date <= {endDate:Date}
${workflowFilter}
GROUP BY provider_id
ORDER BY count DESC
LIMIT 5
`;

const params: Record<string, unknown> = {
environmentId,
organizationId,
startDate: startDate.toISOString().split('T')[0],
endDate: endDate.toISOString().split('T')[0],
};

if (workflowIds && workflowIds.length > 0) {
params.workflowIds = workflowIds;
}

const result = await this.clickhouseService.query<{
provider_id: string;
count: string;
}>({
query,
params,
});

return result.data;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import { CHDate, CHString, CHUInt64, ClickhouseSchema } from 'clickhouse-schema';

export const PROVIDER_VOLUME_COUNTS_TABLE_NAME = 'provider_volume_counts';

const schemaDefinition = {
date: { type: CHDate() },
organization_id: { type: CHString() },
environment_id: { type: CHString() },
workflow_id: { type: CHString() },
provider_id: { type: CHString() },
count: { type: CHUInt64() },
};

export const PROVIDER_VOLUME_COUNTS_ORDER_BY: (keyof typeof schemaDefinition)[] = [
'organization_id',
'environment_id',
'date',
'workflow_id',
'provider_id',
];

const clickhouseSchemaOptions = {
table_name: PROVIDER_VOLUME_COUNTS_TABLE_NAME,
engine: 'SummingMergeTree',
order_by: `(${PROVIDER_VOLUME_COUNTS_ORDER_BY.join(', ')})` as any,
additional_options: ['PARTITION BY toYYYYMM(date)'],
};

export const providerVolumeCountsSchema = new ClickhouseSchema(schemaDefinition, clickhouseSchemaOptions);

export type ProviderVolumeCount = {
date: string;
organization_id: string;
environment_id: string;
workflow_id: string;
provider_id: string;
count: number;
};
1 change: 1 addition & 0 deletions packages/shared/src/types/feature-flags.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ export enum FeatureFlagsKeysEnum {
IS_V2_MESSAGE_SENT_COUNTS_READ_ENABLED = 'IS_V2_MESSAGE_SENT_COUNTS_READ_ENABLED',
IS_V2_SUBSCRIBER_ACTIVITY_READ_ENABLED = 'IS_V2_SUBSCRIBER_ACTIVITY_READ_ENABLED',
IS_V2_AVG_MESSAGES_PER_SUBSCRIBER_READ_ENABLED = 'IS_V2_AVG_MESSAGES_PER_SUBSCRIBER_READ_ENABLED',
IS_V2_PROVIDER_VOLUME_READ_ENABLED = 'IS_V2_PROVIDER_VOLUME_READ_ENABLED',

// String flags
CF_SCHEDULER_MODE = 'CF_SCHEDULER_MODE', // Values: "off" | "shadow" | "live" | "complete"
Expand Down