Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions gemfiles/standard.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
def standard_dependencies
gem 'yard', '>= 0.9.35'
gem 'ffi'
gem 'opentelemetry-sdk'

group :development, :testing do
gem 'jruby-openssl', platforms: :jruby
Expand Down
2 changes: 2 additions & 0 deletions lib/mongo.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
autoload :CGI, 'cgi'

require 'bson'
require 'opentelemetry-api'

require 'mongo/id'
require 'mongo/bson'
Expand Down Expand Up @@ -74,6 +75,7 @@
require 'mongo/socket'
require 'mongo/srv'
require 'mongo/timeout'
require 'mongo/tracing'
require 'mongo/uri'
require 'mongo/version'
require 'mongo/write_concern'
Expand Down
37 changes: 34 additions & 3 deletions lib/mongo/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ class Client
:ssl_verify_hostname,
:ssl_verify_ocsp_endpoint,
:timeout_ms,
:tracing,
:truncate_logs,
:user,
:wait_queue_timeout,
Expand Down Expand Up @@ -437,6 +438,20 @@ def hash
# See Ruby's Zlib module for valid levels.
# @option options [ Hash ] :resolv_options For internal driver use only.
# Options to pass through to Resolv::DNS constructor for SRV lookups.
# @option options [ Hash ] :tracing OpenTelemetry tracing options.
# - :enabled => Boolean, whether to enable OpenTelemetry tracing. The default
# value is nil that means that the configuration will be taken from the
# OTEL_RUBY_INSTRUMENTATION_MONGODB_ENABLED environment variable.
# - :tracer => OpenTelemetry::Trace::Tracer, the tracer to use for
# tracing. Must be an implementation of OpenTelemetry::Trace::Tracer
# interface.
# - :query_text_max_length => Integer, the maximum length of the query text
# to be included in the span attributes. If the query text exceeds this
# length, it will be truncated. Value 0 means no query text
# will be included in the span attributes. The default value is nil that
# means that the configuration will be taken from the
# OTEL_RUBY_INSTRUMENTATION_MONGODB_QUERY_TEXT_MAX_LENGTH environment
# variable.
# @option options [ Hash ] :auto_encryption_options Auto-encryption related
# options.
# - :key_vault_client => Client | nil, a client connected to the MongoDB
Expand Down Expand Up @@ -574,8 +589,12 @@ def initialize(addresses_or_uri, options = nil)

@connect_lock = Mutex.new
@connect_lock.synchronize do
@cluster = Cluster.new(addresses, @monitoring,
cluster_options.merge(srv_uri: srv_uri))
@cluster = Cluster.new(
addresses,
@monitoring,
tracer,
cluster_options.merge(srv_uri: srv_uri)
)
end

begin
Expand Down Expand Up @@ -965,7 +984,10 @@ def list_databases(filter = {}, name_only = false, opts = {})
cmd[:nameOnly] = !!name_only
cmd[:filter] = filter unless filter.empty?
cmd[:authorizedDatabases] = true if opts[:authorized_databases]
use(Database::ADMIN).database.read_command(cmd, opts).first[Database::DATABASES]
use(Database::ADMIN)
.database
.read_command(cmd, opts.merge(op_name: 'listDatabases'))
.first[Database::DATABASES]
end

# Returns a list of Mongo::Database objects.
Expand Down Expand Up @@ -1195,6 +1217,15 @@ def timeout_sec
end
end

def tracer
tracing_opts = @options[:tracing] || {}
@tracer ||= Tracing.create_tracer(
enabled: tracing_opts[:enabled],
query_text_max_length: tracing_opts[:query_text_max_length],
otel_tracer: tracing_opts[:tracer],
)
end

private

# Attempts to parse the given list of addresses, using the provided options.
Expand Down
5 changes: 4 additions & 1 deletion lib/mongo/cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ class Cluster
# - *:deprecation_errors* -- boolean
#
# @since 2.0.0
def initialize(seeds, monitoring, options = Options::Redacted.new)
def initialize(seeds, monitoring, tracer = nil, options = Options::Redacted.new)
if seeds.nil?
raise ArgumentError, 'Seeds cannot be nil'
end
Expand All @@ -136,6 +136,7 @@ def initialize(seeds, monitoring, options = Options::Redacted.new)
@update_lock = Mutex.new
@servers = []
@monitoring = monitoring
@tracer = tracer
@event_listeners = Event::Listeners.new
@app_metadata = Server::AppMetadata.new(@options.merge(purpose: :application))
@monitor_app_metadata = Server::Monitor::AppMetadata.new(@options.merge(purpose: :monitor))
Expand Down Expand Up @@ -309,6 +310,8 @@ def self.create(client, monitoring: nil)
# @return [ Monitoring ] monitoring The monitoring.
attr_reader :monitoring

attr_reader :tracer

# @return [ Object ] The cluster topology.
attr_reader :topology

Expand Down
102 changes: 56 additions & 46 deletions lib/mongo/collection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ class Collection
# Delegate to the cluster for the next primary.
def_delegators :cluster, :next_primary

def_delegators :client, :tracer

# Options that can be updated on a new Collection instance via the #with method.
#
# @since 2.1.0
Expand Down Expand Up @@ -410,21 +412,24 @@ def create(opts = {})
client: client,
session: session
)
maybe_create_qe_collections(opts[:encrypted_fields], client, session) do |encrypted_fields|
Operation::Create.new(
selector: operation,
db_name: database.name,
write_concern: write_concern,
session: session,
# Note that these are collection options, collation isn't
# taken from options passed to the create method.
collation: options[:collation] || options['collation'],
encrypted_fields: encrypted_fields,
validator: options[:validator],
).execute(
next_primary(nil, session),
context: context
)
operation = Operation::Create.new(
selector: operation,
db_name: database.name,
write_concern: write_concern,
session: session,
# Note that these are collection options, collation isn't
# taken from options passed to the create method.
collation: options[:collation] || options['collation'],
validator: options[:validator],
)
tracer.trace_operation(operation, context, op_name: 'createCollection') do
maybe_create_qe_collections(opts[:encrypted_fields], client, session) do |encrypted_fields|
operation.encrypted_fields = encrypted_fields
operation.execute(
next_primary(nil, session),
context: context
)
end
end
end
end
Expand Down Expand Up @@ -453,25 +458,27 @@ def create(opts = {})
# @since 2.0.0
def drop(opts = {})
client.with_session(opts) do |session|
maybe_drop_emm_collections(opts[:encrypted_fields], client, session) do
temp_write_concern = write_concern
write_concern = if opts[:write_concern]
WriteConcern.get(opts[:write_concern])
else
temp_write_concern
context = Operation::Context.new(
client: client,
session: session,
operation_timeouts: operation_timeouts(opts)
)
operation = Operation::Drop.new({
selector: { :drop => name },
db_name: database.name,
write_concern: write_concern,
session: session,
})
tracer.trace_operation(operation, context, op_name: 'dropCollection') do
maybe_drop_emm_collections(opts[:encrypted_fields], client, session) do
temp_write_concern = write_concern
write_concern = if opts[:write_concern]
WriteConcern.get(opts[:write_concern])
else
temp_write_concern
end
do_drop(operation, session, context)
end
context = Operation::Context.new(
client: client,
session: session,
operation_timeouts: operation_timeouts(opts)
)
operation = Operation::Drop.new({
selector: { :drop => name },
db_name: database.name,
write_concern: write_concern,
session: session,
})
do_drop(operation, session, context)
end
end
end
Expand Down Expand Up @@ -865,19 +872,22 @@ def insert_one(document, opts = {})
session: session,
operation_timeouts: operation_timeouts(opts)
)
write_with_retry(write_concern, context: context) do |connection, txn_num, context|
Operation::Insert.new(
:documents => [ document ],
:db_name => database.name,
:coll_name => name,
:write_concern => write_concern,
:bypass_document_validation => !!opts[:bypass_document_validation],
:options => opts,
:id_generator => client.options[:id_generator],
:session => session,
:txn_num => txn_num,
:comment => opts[:comment]
).execute_with_connection(connection, context: context)
operation = Operation::Insert.new(
:documents => [ document ],
:db_name => database.name,
:coll_name => name,
:write_concern => write_concern,
:bypass_document_validation => !!opts[:bypass_document_validation],
:options => opts,
:id_generator => client.options[:id_generator],
:session => session,
:comment => opts[:comment]
)
tracer.trace_operation(operation, context) do
write_with_retry(write_concern, context: context) do |connection, txn_num, context|
operation.txn_num = txn_num
operation.execute_with_connection(connection, context: context)
end
end
end
end
Expand Down
2 changes: 2 additions & 0 deletions lib/mongo/collection/view.rb
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ class View
# Delegate to the cluster for the next primary.
def_delegators :cluster, :next_primary

def_delegators :client, :tracer

alias :selector :filter

# @return [ Integer | nil | The timeout_ms value that was passed as an
Expand Down
5 changes: 4 additions & 1 deletion lib/mongo/collection/view/aggregation.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,14 @@ class View
#
# @since 2.0.0
class Aggregation
extend Forwardable
include Behavior

# @return [ Array<Hash> ] pipeline The aggregation pipeline.
attr_reader :pipeline

def_delegators :view, :tracer

# Initialize the aggregation for the provided collection view, pipeline
# and options.
#
Expand Down Expand Up @@ -80,7 +83,7 @@ def new(options)
Aggregation.new(view, pipeline, options)
end

def initial_query_op(session, read_preference)
def initial_query_op(session, read_preference = nil)
Operation::Aggregate.new(aggregate_spec(session, read_preference))
end

Expand Down
2 changes: 1 addition & 1 deletion lib/mongo/collection/view/aggregation/behavior.rb
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def server_selector
@view.send(:server_selector)
end

def aggregate_spec(session, read_preference)
def aggregate_spec(session, read_preference = nil)
Builder::Aggregation.new(
pipeline,
view,
Expand Down
26 changes: 14 additions & 12 deletions lib/mongo/collection/view/iterable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -88,19 +88,21 @@ def select_cursor(session)
operation_timeouts: operation_timeouts,
view: self
)

if respond_to?(:write?, true) && write?
server = server_selector.select_server(cluster, nil, session, write_aggregation: true)
result = send_initial_query(server, context)

if use_query_cache?
CachingCursor.new(view, result, server, session: session, context: context)
op = initial_query_op(session)
tracer.trace_operation(op, context) do
if respond_to?(:write?, true) && write?
server = server_selector.select_server(cluster, nil, session, write_aggregation: true)
result = send_initial_query(server, context)

if use_query_cache?
CachingCursor.new(view, result, server, session: session, context: context)
else
Cursor.new(view, result, server, session: session, context: context)
end
else
Cursor.new(view, result, server, session: session, context: context)
end
else
read_with_retry_cursor(session, server_selector, view, context: context) do |server|
send_initial_query(server, context)
read_with_retry_cursor(session, server_selector, view, context: context) do |server|
send_initial_query(server, context)
end
end
end
end
Expand Down
Loading
Loading