Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .cspell.json
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@
"Keymap",
"keyrings",
"keysize",
"KILLSWITCH",
"kitterma",
"Kitterman",
"klass",
Expand Down
24 changes: 22 additions & 2 deletions apps/api/src/app/auth/services/passport/apikey.strategy.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Injectable } from '@nestjs/common';
import { Injectable, ServiceUnavailableException } from '@nestjs/common';
import { PassportStrategy } from '@nestjs/passport';
import { FeatureFlagsService, HttpRequestHeaderKeysEnum } from '@novu/application-generic';
import { ApiAuthSchemeEnum, FeatureFlagsKeysEnum, UserSessionData } from '@novu/shared';
Expand Down Expand Up @@ -52,6 +52,8 @@ export class ApiKeyStrategy extends PassportStrategy(HeaderAPIKeyStrategy) {
if (isLruCacheEnabled) {
const cached = apiKeyUserCache.get(hashedApiKey);
if (cached) {
await this.checkKillSwitch(cached);

return cached;
}

Expand All @@ -63,11 +65,15 @@ export class ApiKeyStrategy extends PassportStrategy(HeaderAPIKeyStrategy) {

const fetchPromise = this.authService
.getUserByApiKey(apiKey)
.then((user) => {
.then(async (user) => {
if (user && isLruCacheEnabled) {
apiKeyUserCache.set(hashedApiKey, user);
}

if (user) {
await this.checkKillSwitch(user);
}

return user;
})
.finally(() => {
Expand All @@ -82,4 +88,18 @@ export class ApiKeyStrategy extends PassportStrategy(HeaderAPIKeyStrategy) {

return fetchPromise;
}

private async checkKillSwitch(user: UserSessionData): Promise<void> {
const isKillSwitchEnabled = await this.featureFlagsService.getFlag({
key: FeatureFlagsKeysEnum.IS_ORG_KILLSWITCH_FLAG_ENABLED,
defaultValue: false,
organization: { _id: user.organizationId },
environment: { _id: user.environmentId },
component: 'api',
});

if (isKillSwitchEnabled) {
throw new ServiceUnavailableException('Service temporarily unavailable for this organization');
}
}
}
28 changes: 25 additions & 3 deletions apps/api/src/app/events/events.controller.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import { Body, Controller, Delete, Param, Post, Req, Scope } from '@nestjs/common';
import { Body, Controller, Delete, Param, Post, Req, Scope, ServiceUnavailableException } from '@nestjs/common';
import { ApiExcludeEndpoint, ApiOperation, ApiTags } from '@nestjs/swagger';
import { RequirePermissions, ResourceCategory } from '@novu/application-generic';
import { FeatureFlagsService, RequirePermissions, ResourceCategory } from '@novu/application-generic';
import {
AddressingTypeEnum,
ApiRateLimitCategoryEnum,
ApiRateLimitCostEnum,
FeatureFlagsKeysEnum,
PermissionsEnum,
ResourceEnum,
TriggerRequestCategoryEnum,
Expand Down Expand Up @@ -64,9 +65,24 @@ export class EventsController {
private triggerEventToAll: TriggerEventToAll,
private sendTestEmail: SendTestEmail,
private parseEventRequest: ParseEventRequest,
private processBulkTriggerUsecase: ProcessBulkTrigger
private processBulkTriggerUsecase: ProcessBulkTrigger,
private featureFlagsService: FeatureFlagsService
) {}

private async checkKillSwitch(user: UserSessionData): Promise<void> {
const isKillSwitchEnabled = await this.featureFlagsService.getFlag({
key: FeatureFlagsKeysEnum.IS_ORG_KILLSWITCH_FLAG_ENABLED,
defaultValue: false,
organization: { _id: user.organizationId },
environment: { _id: user.environmentId },
component: 'trigger',
});

if (isKillSwitchEnabled) {
throw new ServiceUnavailableException('Service temporarily unavailable for this organization');
}
}

@KeylessAccessible()
@ExternalApiAccessible()
@Post('/trigger')
Expand All @@ -91,6 +107,8 @@ export class EventsController {
@Req() req: RequestWithReqId,
@Body() body: TriggerEventRequestDto
): Promise<TriggerEventResponseDto> {
await this.checkKillSwitch(user);

const result = await this.parseEventRequest.execute(
ParseEventRequestMulticastCommand.create({
userId: user._id,
Expand Down Expand Up @@ -140,6 +158,8 @@ export class EventsController {
@Body() body: BulkTriggerEventDto,
@Req() req: RequestWithReqId
): Promise<TriggerEventResponseDto[]> {
await this.checkKillSwitch(user);

return this.processBulkTriggerUsecase.execute(
ProcessBulkTriggerCommand.create({
userId: user._id,
Expand Down Expand Up @@ -178,6 +198,8 @@ export class EventsController {
@Body() body: TriggerEventToAllRequestDto,
@Req() req: RequestWithReqId
): Promise<TriggerEventResponseDto> {
await this.checkKillSwitch(user);

const transactionId = body.transactionId || uuidv4();

return this.triggerEventToAll.execute(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ describe('Standard Worker', () => {
WorkflowInMemoryProviderService
);
const organizationRepository = moduleRef.get<CommunityOrganizationRepository>(CommunityOrganizationRepository);
const featureFlagsService = moduleRef.get<FeatureFlagsService>(FeatureFlagsService);

standardWorker = new StandardWorker(
handleLastFailedJob,
Expand All @@ -150,7 +151,8 @@ describe('Standard Worker', () => {
webhookFilterBackoffStrategy,
workflowInMemoryProviderService,
organizationRepository,
jobRepository
jobRepository,
featureFlagsService
);
});

Expand Down
25 changes: 23 additions & 2 deletions apps/worker/src/app/workflow/services/standard.worker.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { forwardRef, Inject, Injectable, Logger } from '@nestjs/common';
import {
BullMqService,
FeatureFlagsService,
getStandardWorkerOptions,
IStandardDataDto,
Job,
Expand All @@ -12,7 +13,7 @@ import {
WorkflowInMemoryProviderService,
} from '@novu/application-generic';
import { CommunityOrganizationRepository, JobRepository } from '@novu/dal';
import { JobStatusEnum, ObservabilityBackgroundTransactionEnum } from '@novu/shared';
import { FeatureFlagsKeysEnum, JobStatusEnum, ObservabilityBackgroundTransactionEnum } from '@novu/shared';
import {
HandleLastFailedJob,
HandleLastFailedJobCommand,
Expand All @@ -38,7 +39,8 @@ export class StandardWorker extends StandardWorkerService {
@Inject(forwardRef(() => WorkflowInMemoryProviderService))
public workflowInMemoryProviderService: WorkflowInMemoryProviderService,
private organizationRepository: CommunityOrganizationRepository,
private jobRepository: JobRepository
private jobRepository: JobRepository,
private featureFlagsService: FeatureFlagsService
) {
super(new BullMqService(workflowInMemoryProviderService));

Expand Down Expand Up @@ -93,10 +95,29 @@ export class StandardWorker extends StandardWorkerService {
};
}

private async isKillSwitchEnabled(data: IStandardDataDto): Promise<boolean> {
return this.featureFlagsService.getFlag({
key: FeatureFlagsKeysEnum.IS_ORG_KILLSWITCH_FLAG_ENABLED,
defaultValue: false,
organization: { _id: data._organizationId },
environment: { _id: data._environmentId },
component: 'worker',
});
}

private getWorkerProcessor() {
return async ({ data }: { data: IStandardDataDto }) => {
const isKillSwitchEnabled = await this.isKillSwitchEnabled(data);

if (isKillSwitchEnabled) {
Logger.log(`Kill switch enabled for organizationId ${data._organizationId}. Skipping job.`, LOG_CONTEXT);

return;
}
Comment on lines +98 to +116
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Kill-switch lookup can miss org/env for legacy job payloads.
isKillSwitchEnabled uses _organizationId/_environmentId directly, but extractMinimalJobData already handles missing IDs via payload.message. Since the kill-switch check runs before that extraction, jobs lacking those fields may bypass the kill switch or error unexpectedly.

🔧 Minimal fix to reuse the fallback extraction for kill-switch IDs
-  private async isKillSwitchEnabled(data: IStandardDataDto): Promise<boolean> {
-    return this.featureFlagsService.getFlag({
-      key: FeatureFlagsKeysEnum.IS_ORG_KILLSWITCH_FLAG_ENABLED,
-      defaultValue: false,
-      organization: { _id: data._organizationId },
-      environment: { _id: data._environmentId },
-      component: 'worker',
-    });
-  }
+  private async isKillSwitchEnabled(data: IStandardDataDto): Promise<boolean> {
+    const minimalJobData = this.extractMinimalJobData(data);
+
+    return this.featureFlagsService.getFlag({
+      key: FeatureFlagsKeysEnum.IS_ORG_KILLSWITCH_FLAG_ENABLED,
+      defaultValue: false,
+      organization: { _id: minimalJobData.organizationId },
+      environment: { _id: minimalJobData.environmentId },
+      component: 'worker',
+    });
+  }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
private async isKillSwitchEnabled(data: IStandardDataDto): Promise<boolean> {
return this.featureFlagsService.getFlag({
key: FeatureFlagsKeysEnum.IS_ORG_KILLSWITCH_FLAG_ENABLED,
defaultValue: false,
organization: { _id: data._organizationId },
environment: { _id: data._environmentId },
component: 'worker',
});
}
private getWorkerProcessor() {
return async ({ data }: { data: IStandardDataDto }) => {
const isKillSwitchEnabled = await this.isKillSwitchEnabled(data);
if (isKillSwitchEnabled) {
Logger.log(`Kill switch enabled for organizationId ${data._organizationId}. Skipping job.`, LOG_CONTEXT);
return;
}
private async isKillSwitchEnabled(data: IStandardDataDto): Promise<boolean> {
const minimalJobData = this.extractMinimalJobData(data);
return this.featureFlagsService.getFlag({
key: FeatureFlagsKeysEnum.IS_ORG_KILLSWITCH_FLAG_ENABLED,
defaultValue: false,
organization: { _id: minimalJobData.organizationId },
environment: { _id: minimalJobData.environmentId },
component: 'worker',
});
}
private getWorkerProcessor() {
return async ({ data }: { data: IStandardDataDto }) => {
const isKillSwitchEnabled = await this.isKillSwitchEnabled(data);
if (isKillSwitchEnabled) {
Logger.log(`Kill switch enabled for organizationId ${data._organizationId}. Skipping job.`, LOG_CONTEXT);
return;
}
🤖 Prompt for AI Agents
In `@apps/worker/src/app/workflow/services/standard.worker.ts` around lines 98 -
116, The kill-switch lookup in getWorkerProcessor currently calls
isKillSwitchEnabled with data._organizationId/_environmentId which can be
missing for legacy jobs; modify getWorkerProcessor to first call
extractMinimalJobData (or reuse its fallback logic) to derive organizationId and
environmentId from payload.message when needed, then pass those resolved IDs
into isKillSwitchEnabled (or alter isKillSwitchEnabled to accept resolved IDs)
so the kill switch always checks the correct org/env for legacy payloads; refer
to the getWorkerProcessor, isKillSwitchEnabled, and extractMinimalJobData
symbols when making the change.


if (data.skipProcessing) {
Logger.log(`Skipping job ${data._id} - skipProcessing flag is set,`, LOG_CONTEXT);

return;
}

Expand Down
24 changes: 22 additions & 2 deletions apps/worker/src/app/workflow/services/subscriber-process.worker.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Injectable, Logger } from '@nestjs/common';
import {
BullMqService,
FeatureFlagsService,
getSubscriberProcessWorkerOptions,
IProcessSubscriberDataDto,
PinoLogger,
Expand All @@ -11,7 +12,7 @@ import {
WorkflowInMemoryProviderService,
} from '@novu/application-generic';
import { CommunityOrganizationRepository } from '@novu/dal';
import { ObservabilityBackgroundTransactionEnum } from '@novu/shared';
import { FeatureFlagsKeysEnum, ObservabilityBackgroundTransactionEnum } from '@novu/shared';
import { SubscriberJobBound } from '../usecases/subscriber-job-bound/subscriber-job-bound.usecase';

const nr = require('newrelic');
Expand All @@ -23,15 +24,34 @@ export class SubscriberProcessWorker extends SubscriberProcessWorkerService {
constructor(
private subscriberJobBoundUsecase: SubscriberJobBound,
public workflowInMemoryProviderService: WorkflowInMemoryProviderService,
private organizationRepository: CommunityOrganizationRepository
private organizationRepository: CommunityOrganizationRepository,
private featureFlagsService: FeatureFlagsService
) {
super(new BullMqService(workflowInMemoryProviderService));

this.initWorker(this.getWorkerProcessor(), this.getWorkerOpts());
}

private async isKillSwitchEnabled(data: IProcessSubscriberDataDto): Promise<boolean> {
return this.featureFlagsService.getFlag({
key: FeatureFlagsKeysEnum.IS_ORG_KILLSWITCH_FLAG_ENABLED,
defaultValue: false,
organization: { _id: data.organizationId },
environment: { _id: data.environmentId },
component: 'worker',
});
}

public getWorkerProcessor() {
return async ({ data }: { data: IProcessSubscriberDataDto }) => {
const isKillSwitchEnabled = await this.isKillSwitchEnabled(data);

if (isKillSwitchEnabled) {
Logger.log(`Kill switch enabled for organizationId ${data.organizationId}. Skipping job.`, LOG_CONTEXT);

return;
}

const organizationExists = await this.organizationExist(data);

if (!organizationExists) {
Expand Down
24 changes: 22 additions & 2 deletions apps/worker/src/app/workflow/services/workflow.worker.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Injectable } from '@nestjs/common';
import {
BullMqService,
FeatureFlagsService,
getWorkflowWorkerOptions,
IWorkflowDataDto,
PinoLogger,
Expand All @@ -13,7 +14,7 @@ import {
WorkflowWorkerService,
} from '@novu/application-generic';
import { CommunityOrganizationRepository } from '@novu/dal';
import { ObservabilityBackgroundTransactionEnum } from '@novu/shared';
import { FeatureFlagsKeysEnum, ObservabilityBackgroundTransactionEnum } from '@novu/shared';

const nr = require('newrelic');

Expand All @@ -23,7 +24,8 @@ export class WorkflowWorker extends WorkflowWorkerService {
private triggerEventUsecase: TriggerEvent,
public workflowInMemoryProviderService: WorkflowInMemoryProviderService,
private organizationRepository: CommunityOrganizationRepository,
private logger: PinoLogger
private logger: PinoLogger,
private featureFlagsService: FeatureFlagsService
) {
super(new BullMqService(workflowInMemoryProviderService));
this.logger.setContext(this.constructor.name);
Expand All @@ -34,8 +36,26 @@ export class WorkflowWorker extends WorkflowWorkerService {
return getWorkflowWorkerOptions();
}

private async isKillSwitchEnabled(data: IWorkflowDataDto): Promise<boolean> {
return this.featureFlagsService.getFlag({
key: FeatureFlagsKeysEnum.IS_ORG_KILLSWITCH_FLAG_ENABLED,
defaultValue: false,
organization: { _id: data.organizationId },
environment: { _id: data.environmentId },
component: 'worker',
});
}

private getWorkerProcessor(): WorkerProcessor {
return async ({ data }: { data: IWorkflowDataDto }) => {
const isKillSwitchEnabled = await this.isKillSwitchEnabled(data);

if (isKillSwitchEnabled) {
this.logger.warn(`Kill switch enabled for organizationId ${data.organizationId}. Skipping job.`);

return;
}

const organizationExists = await this.organizationExist(data);

if (!organizationExists) {
Expand Down
1 change: 1 addition & 0 deletions packages/shared/src/types/feature-flags.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ export enum FeatureFlagsKeysEnum {
IS_BILLING_USAGE_CLICKHOUSE_ENABLED = 'IS_BILLING_USAGE_CLICKHOUSE_ENABLED',
IS_BILLING_USAGE_CLICKHOUSE_SHADOW_ENABLED = 'IS_BILLING_USAGE_CLICKHOUSE_SHADOW_ENABLED',
IS_BILLING_USAGE_DETAILED_DIAGNOSTICS_ENABLED = 'IS_BILLING_USAGE_DETAILED_DIAGNOSTICS_ENABLED',
IS_ORG_KILLSWITCH_FLAG_ENABLED = 'IS_ORG_KILLSWITCH_FLAG_ENABLED',

// String flags
CF_SCHEDULER_MODE = 'CF_SCHEDULER_MODE', // Values: "off" | "shadow" | "live" | "complete"
Expand Down
Loading