Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 26 additions & 30 deletions src/bulk/common.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { promisify } from 'util';

import { type BSONSerializeOptions, type Document, ObjectId, resolveBSONOptions } from '../bson';
import { type BSONSerializeOptions, type Document, resolveBSONOptions } from '../bson';
import type { Collection } from '../collection';
import {
type AnyError,
Expand All @@ -12,6 +12,7 @@ import {
} from '../error';
import type { Filter, OneOrMore, OptionalId, UpdateFilter, WithoutId } from '../mongo_types';
import type { CollationOptions, CommandOperationOptions } from '../operations/command';
import { maybeAddIdToDocuments } from '../operations/common_functions';
import { DeleteOperation, type DeleteStatement, makeDeleteStatement } from '../operations/delete';
import { executeOperation } from '../operations/execute_operation';
import { InsertOperation } from '../operations/insert';
Expand Down Expand Up @@ -917,7 +918,7 @@ export abstract class BulkOperationBase {
* Create a new OrderedBulkOperation or UnorderedBulkOperation instance
* @internal
*/
constructor(collection: Collection, options: BulkWriteOptions, isOrdered: boolean) {
constructor(private collection: Collection, options: BulkWriteOptions, isOrdered: boolean) {
// determine whether bulkOperation is ordered or unordered
this.isOrdered = isOrdered;

Expand Down Expand Up @@ -1032,9 +1033,9 @@ export abstract class BulkOperationBase {
* ```
*/
insert(document: Document): BulkOperationBase {
if (document._id == null && !shouldForceServerObjectId(this)) {
document._id = new ObjectId();
}
maybeAddIdToDocuments(this.collection, document, {
forceServerObjectId: this.shouldForceServerObjectId()
});

return this.addToOperationsList(BatchType.INSERT, document);
}
Expand Down Expand Up @@ -1093,21 +1094,16 @@ export abstract class BulkOperationBase {
throw new MongoInvalidArgumentError('Operation must be an object with an operation key');
}
if ('insertOne' in op) {
const forceServerObjectId = shouldForceServerObjectId(this);
if (op.insertOne && op.insertOne.document == null) {
// NOTE: provided for legacy support, but this is a malformed operation
if (forceServerObjectId !== true && (op.insertOne as Document)._id == null) {
(op.insertOne as Document)._id = new ObjectId();
}

return this.addToOperationsList(BatchType.INSERT, op.insertOne);
}
const forceServerObjectId = this.shouldForceServerObjectId();
const document =
op.insertOne && op.insertOne.document == null
? // NOTE: provided for legacy support, but this is a malformed operation
(op.insertOne as Document)
: op.insertOne.document;

if (forceServerObjectId !== true && op.insertOne.document._id == null) {
op.insertOne.document._id = new ObjectId();
}
maybeAddIdToDocuments(this.collection, document, { forceServerObjectId });

return this.addToOperationsList(BatchType.INSERT, op.insertOne.document);
return this.addToOperationsList(BatchType.INSERT, document);
}

if ('replaceOne' in op || 'updateOne' in op || 'updateMany' in op) {
Expand Down Expand Up @@ -1268,6 +1264,18 @@ export abstract class BulkOperationBase {
batchType: BatchType,
document: Document | UpdateStatement | DeleteStatement
): this;

private shouldForceServerObjectId(): boolean {
if (typeof this.s.options.forceServerObjectId === 'boolean') {
return this.s.options.forceServerObjectId;
}

if (typeof this.s.collection.s.db.options?.forceServerObjectId === 'boolean') {
return this.s.collection.s.db.options?.forceServerObjectId;
}

return false;
}
}

Object.defineProperty(BulkOperationBase.prototype, 'length', {
Expand All @@ -1277,18 +1285,6 @@ Object.defineProperty(BulkOperationBase.prototype, 'length', {
}
});

function shouldForceServerObjectId(bulkOperation: BulkOperationBase): boolean {
if (typeof bulkOperation.s.options.forceServerObjectId === 'boolean') {
return bulkOperation.s.options.forceServerObjectId;
}

if (typeof bulkOperation.s.collection.s.db.options?.forceServerObjectId === 'boolean') {
return bulkOperation.s.collection.s.db.options?.forceServerObjectId;
}

return false;
}

function isInsertBatch(batch: Batch): boolean {
return batch.batchType === BatchType.INSERT;
}
Expand Down
21 changes: 16 additions & 5 deletions src/operations/common_functions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,26 +43,37 @@ export async function indexInformation(
return info;
}

export function prepareDocs(
export function maybeAddIdToDocuments(
coll: Collection,
docs: Document[],
options: { forceServerObjectId?: boolean }
): Document[] {
): Document[];
export function maybeAddIdToDocuments(
coll: Collection,
docs: Document,
options: { forceServerObjectId?: boolean }
): Document;
export function maybeAddIdToDocuments(
coll: Collection,
docOrDocs: Document[] | Document,
options: { forceServerObjectId?: boolean }
): Document[] | Document {
const forceServerObjectId =
typeof options.forceServerObjectId === 'boolean'
? options.forceServerObjectId
: coll.s.db.options?.forceServerObjectId;

// no need to modify the docs if server sets the ObjectId
if (forceServerObjectId === true) {
return docs;
return docOrDocs;
}

return docs.map(doc => {
const transform = (doc: Document): Document => {
if (doc._id == null) {
doc._id = coll.s.pkFactory.createPk();
}

return doc;
});
};
return Array.isArray(docOrDocs) ? docOrDocs.map(transform) : transform(docOrDocs);
}
8 changes: 5 additions & 3 deletions src/operations/insert.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import type { MongoDBNamespace } from '../utils';
import { WriteConcern } from '../write_concern';
import { BulkWriteOperation } from './bulk_write';
import { CommandOperation, type CommandOperationOptions } from './command';
import { prepareDocs } from './common_functions';
import { maybeAddIdToDocuments } from './common_functions';
import { AbstractOperation, Aspect, defineAspects } from './operation';

/** @internal */
Expand Down Expand Up @@ -69,7 +69,7 @@ export interface InsertOneResult<TSchema = Document> {

export class InsertOneOperation extends InsertOperation {
constructor(collection: Collection, doc: Document, options: InsertOneOptions) {
super(collection.s.namespace, prepareDocs(collection, [doc], options), options);
super(collection.s.namespace, maybeAddIdToDocuments(collection, [doc], options), options);
}

override async execute(
Expand Down Expand Up @@ -131,7 +131,9 @@ export class InsertManyOperation extends AbstractOperation<InsertManyResult> {
const writeConcern = WriteConcern.fromOptions(options);
const bulkWriteOperation = new BulkWriteOperation(
coll,
prepareDocs(coll, this.docs, options).map(document => ({ insertOne: { document } })),
maybeAddIdToDocuments(coll, this.docs, options).map(document => ({
insertOne: { document }
})),
options
);

Expand Down
71 changes: 70 additions & 1 deletion test/integration/crud/bulk.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import * as crypto from 'crypto';

import {
type Collection,
Double,
Long,
MongoBatchReExecutionError,
MongoBulkWriteError,
Expand Down Expand Up @@ -65,16 +66,84 @@ describe('Bulk', function () {
context('when called with a valid operation', function () {
it('should not throw a MongoInvalidArgument error', async function () {
try {
client.db('test').collection('test').initializeUnorderedBulkOp().raw({ insertOne: {} });
client
.db('test')
.collection('test')
.initializeUnorderedBulkOp()
.raw({ insertOne: { document: {} } });
} catch (error) {
expect(error).not.to.exist;
}
});
});

it('supports the legacy specification (no nested document field)', async function () {
await client
.db('test')
.collection('test')
.initializeUnorderedBulkOp()
// @ts-expect-error Not allowed in TS, but allowed for legacy compat
.raw({ insertOne: { name: 'john doe' } })
.execute();
const result = await client.db('test').collection('test').findOne({ name: 'john doe' });
expect(result).to.exist;
});
});
});

describe('Collection', function () {
describe('when a pkFactory is set on the client', function () {
let client: MongoClient;
const pkFactory = {
count: 0,
createPk: function () {
return new Double(this.count++);
}
};
let collection: Collection;

beforeEach(async function () {
client = this.configuration.newClient({}, { pkFactory, promoteValues: false });
collection = client.db('integration').collection('pk_factory_tests');
await collection.deleteMany({});
});

afterEach(() => client.close());

it('insertMany() generates _ids using the pkFactory', async function () {
await collection.insertMany([{ name: 'john doe' }]);
const result = await collection.findOne({ name: 'john doe' });
expect(result).to.have.property('_id').to.be.instanceOf(Double);
});

it('bulkWrite() generates _ids using the pkFactory', async function () {
await collection.bulkWrite([{ insertOne: { document: { name: 'john doe' } } }]);
const result = await collection.findOne({ name: 'john doe' });
expect(result).to.have.property('_id').to.be.instanceOf(Double);
});

it('ordered bulk operations generate _ids using pkFactory', async function () {
await collection.initializeOrderedBulkOp().insert({ name: 'john doe' }).execute();
const result = await collection.findOne({ name: 'john doe' });
expect(result).to.have.property('_id').to.be.instanceOf(Double);
});

it('unordered bulk operations generate _ids using pkFactory', async function () {
await collection.initializeUnorderedBulkOp().insert({ name: 'john doe' }).execute();
const result = await collection.findOne({ name: 'john doe' });
expect(result).to.have.property('_id').to.be.instanceOf(Double);
});

it('bulkOperation.raw() with the legacy syntax (no nested document field) generates _ids using pkFactory', async function () {
await collection
.initializeOrderedBulkOp()
// @ts-expect-error Not allowed by TS, but still permitted.
.raw({ insertOne: { name: 'john doe' } })
.execute();
const result = await collection.findOne({ name: 'john doe' });
expect(result).to.have.property('_id').to.be.instanceOf(Double);
});
});
describe('#insertMany()', function () {
context('when passed an invalid docs argument', function () {
it('should throw a MongoInvalidArgument error', async function () {
Expand Down