Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix: batch reset on retry
  • Loading branch information
durran committed Oct 8, 2024
commit 6182bd1dd2691c365e2fe8eb58b08200c5526dc1
7 changes: 6 additions & 1 deletion src/operations/client_bulk_write/client_bulk_write.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ export class ClientBulkWriteOperation extends CommandOperation<ClientBulkWriteCu
this.ns = new MongoDBNamespace('admin', '$cmd');
}

override resetBatch(): boolean {
return this.commandBuilder.resetBatch();
}

/**
* Execute the command. Superclass will handle write concern, etc.
* @param server - The server.
Expand Down Expand Up @@ -84,5 +88,6 @@ defineAspects(ClientBulkWriteOperation, [
Aspect.WRITE_OPERATION,
Aspect.SKIP_COLLATION,
Aspect.CURSOR_CREATING,
Aspect.RETRYABLE
Aspect.RETRYABLE,
Aspect.COMMAND_BATCHING
]);
13 changes: 13 additions & 0 deletions src/operations/client_bulk_write/command_builder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ export class ClientBulkWriteCommandBuilder {
options: ClientBulkWriteOptions;
pkFactory: PkFactory;
currentModelIndex: number;
previousModelIndex: number;
lastOperations: Document[];

/**
Expand All @@ -55,6 +56,7 @@ export class ClientBulkWriteCommandBuilder {
this.options = options;
this.pkFactory = pkFactory ?? DEFAULT_PK_FACTORY;
this.currentModelIndex = 0;
this.previousModelIndex = 0;
this.lastOperations = [];
}

Expand All @@ -77,6 +79,15 @@ export class ClientBulkWriteCommandBuilder {
return this.currentModelIndex < this.models.length;
}

/**
* When we need to retry a command we need to set the current
* model index back to its previous value.
*/
resetBatch(): boolean {
this.currentModelIndex = this.previousModelIndex;
return true;
}

/**
* Build a single batch of a client bulk write command.
* @param maxMessageSizeBytes - The max message size in bytes.
Expand All @@ -92,6 +103,8 @@ export class ClientBulkWriteCommandBuilder {
let currentNamespaceIndex = 0;
const command: ClientBulkWriteCommand = this.baseCommand();
const namespaces = new Map<string, number>();
// In the case of retries we need to mark where we started this batch.
this.previousModelIndex = this.currentModelIndex;

while (this.currentModelIndex < this.models.length) {
const model = this.models[this.currentModelIndex];
Expand Down
1 change: 1 addition & 0 deletions src/operations/client_bulk_write/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ export interface ClientBulkWriteResult {
deleteResults?: Map<number, ClientDeleteResult>;
}

/** @public */
export interface ClientBulkWriteError {
code: number;
message: string;
Expand Down
4 changes: 4 additions & 0 deletions src/operations/execute_operation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,10 @@ async function tryOperation<
}

try {
// If tries > 0 and we are command batching we need to reset the batch.
if (tries > 0 && operation.hasAspect(Aspect.COMMAND_BATCHING)) {
operation.resetBatch();
}
return await operation.execute(server, session);
} catch (operationError) {
if (!(operationError instanceof MongoError)) throw operationError;
Expand Down
7 changes: 6 additions & 1 deletion src/operations/operation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ export const Aspect = {
EXPLAINABLE: Symbol('EXPLAINABLE'),
SKIP_COLLATION: Symbol('SKIP_COLLATION'),
CURSOR_CREATING: Symbol('CURSOR_CREATING'),
MUST_SELECT_SAME_SERVER: Symbol('MUST_SELECT_SAME_SERVER')
MUST_SELECT_SAME_SERVER: Symbol('MUST_SELECT_SAME_SERVER'),
COMMAND_BATCHING: Symbol('COMMAND_BATCHING')
} as const;

/** @public */
Expand Down Expand Up @@ -98,6 +99,10 @@ export abstract class AbstractOperation<TResult = any> {
this[kSession] = undefined;
}

resetBatch(): boolean {
return true;
}

get canRetryRead(): boolean {
return this.hasAspect(Aspect.RETRYABLE) && this.hasAspect(Aspect.READ_OPERATION);
}
Expand Down
1 change: 0 additions & 1 deletion test/unit/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ const EXPECTED_EXPORTS = [
'CancellationToken',
'ChangeStream',
'ChangeStreamCursor',
'ClientBulkWriteError',
'ClientEncryption',
'ClientSession',
'Code',
Expand Down