Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
chore: comments
  • Loading branch information
durran committed Oct 8, 2024
commit 4ecbf5ffdc22548b342c06d65af5738492b38055
24 changes: 9 additions & 15 deletions src/operations/client_bulk_write/client_bulk_write.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ export class ClientBulkWriteOperation extends CommandOperation<ClientBulkWriteCu
}

override get canRetryWrite(): boolean {
return this.commandBuilder.isRetryable && super.canRetryWrite;
return this.commandBuilder.isBatchRetryable;
}

/**
Expand All @@ -49,12 +49,15 @@ export class ClientBulkWriteOperation extends CommandOperation<ClientBulkWriteCu

if (server.description.type === ServerType.LoadBalancer) {
if (session) {
// Checkout a connection to build the command.
const connection = await server.pool.checkOut();
// Pin the connection to the session so it get used to execute the command and we do not
// perform a double check-in/check-out.
let connection;
if (!session.pinnedConnection) {
// Checkout a connection to build the command.
connection = await server.pool.checkOut();
// Pin the connection to the session so it get used to execute the command and we do not
// perform a double check-in/check-out.
session.pin(connection);
} else {
connection = session.pinnedConnection;
}
command = this.commandBuilder.buildBatch(
connection.hello?.maxMessageSizeBytes,
Expand Down Expand Up @@ -86,16 +89,7 @@ export class ClientBulkWriteOperation extends CommandOperation<ClientBulkWriteCu
);
}

try {
return await super.executeCommand(server, session, command, ClientBulkWriteCursorResponse);
} finally {
if (server.description.type === ServerType.LoadBalancer) {
// Unpin the connection if there are no more batches.
if (session?.pinnedConnection && !this.commandBuilder.hasNextBatch()) {
session?.unpin({ force: true });
}
}
}
return await super.executeCommand(server, session, command, ClientBulkWriteCursorResponse);
}
}

Expand Down
30 changes: 15 additions & 15 deletions src/operations/client_bulk_write/command_builder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,14 @@ export class ClientBulkWriteCommandBuilder {
models: AnyClientBulkWriteModel[];
options: ClientBulkWriteOptions;
pkFactory: PkFactory;
/** The current index in the models array that is being processed. */
currentModelIndex: number;
/** The model index that the builder was on when it finished the previous batch. Used for resets when retrying. */
previousModelIndex: number;
/** The last array of operations that were created. Used by the results merger for indexing results. */
lastOperations: Document[];
isRetryable: boolean;
/** Returns true if the current batch being created has no multi-updates. */
isBatchRetryable: boolean;

/**
* Create the command builder.
Expand All @@ -59,10 +63,7 @@ export class ClientBulkWriteCommandBuilder {
this.currentModelIndex = 0;
this.previousModelIndex = 0;
this.lastOperations = [];
// Multi updates are not retryable.
this.isRetryable = !models.some(model => {
return model.name === 'deleteMany' || model.name === 'updateMany';
});
this.isBatchRetryable = true;
}

/**
Expand Down Expand Up @@ -104,6 +105,9 @@ export class ClientBulkWriteCommandBuilder {
maxWriteBatchSize: number,
maxBsonObjectSize: number
): ClientBulkWriteCommand {
// We start by assuming the batch has no multi-updates, so it is retryable
// until we find them.
this.isBatchRetryable = true;
let commandLength = 0;
let currentNamespaceIndex = 0;
const command: ClientBulkWriteCommand = this.baseCommand();
Expand All @@ -116,6 +120,11 @@ export class ClientBulkWriteCommandBuilder {
const ns = model.namespace;
const nsIndex = namespaces.get(ns);

// Multi updates are not retryable.
if (model.name === 'deleteMany' || model.name === 'updateMany') {
this.isBatchRetryable = false;
}

if (nsIndex != null) {
// Build the operation and serialize it to get the bytes buffer.
const operation = buildOperation(model, nsIndex, this.pkFactory);
Expand Down Expand Up @@ -379,16 +388,7 @@ function createUpdateOperation(
// required only to contain atomic modifiers (i.e. keys that start with "$").
// Drivers MUST throw an error if an update document is empty or if the
// document's first key does not start with "$".
if (Array.isArray(model.update)) {
if (model.update.length === 0) {
throw new MongoAPIError('Client bulk write update model pipelines may not be empty.');
}
for (const update of model.update) {
validateUpdate(update);
}
} else {
validateUpdate(model.update);
}
validateUpdate(model.update);
const document: ClientUpdateOperation = {
update: index,
multi: multi,
Expand Down