diff --git a/lib/octopus.rb b/lib/octopus.rb index ffea3f78..95404e40 100644 --- a/lib/octopus.rb +++ b/lib/octopus.rb @@ -141,6 +141,7 @@ def self.using(shard, &block) conn = ActiveRecord::Base.connection if conn.is_a?(Octopus::Proxy) + conn.current_shard_dirty = true conn.run_queries_on_shard(shard, &block) else yield diff --git a/lib/octopus/model.rb b/lib/octopus/model.rb index ba9649e4..cc40e837 100644 --- a/lib/octopus/model.rb +++ b/lib/octopus/model.rb @@ -19,6 +19,7 @@ def using(shard) end if Octopus.enabled? + self.connection_proxy.current_shard_dirty = true Octopus::ScopeProxy.new(shard, self) else self diff --git a/lib/octopus/proxy.rb b/lib/octopus/proxy.rb index b18f5ab1..09e7c0d6 100644 --- a/lib/octopus/proxy.rb +++ b/lib/octopus/proxy.rb @@ -8,6 +8,7 @@ class Proxy delegate :current_model, :current_model=, :current_shard, :current_shard=, + :current_shard_dirty, :current_shard_dirty=, :current_group, :current_group=, :current_slave_group, :current_slave_group=, :current_load_balance_options, :current_load_balance_options=, @@ -106,6 +107,7 @@ def send_queries_to_all_shards(&block) def clean_connection_proxy self.current_shard = Octopus.master_shard + self.current_shard_dirty = false self.current_model = nil self.current_group = nil self.block = nil @@ -269,7 +271,7 @@ def should_clean_connection_proxy?(method) # Try to use slaves if and only if `replicated: true` is specified in `shards.yml` and no slaves groups are defined def should_send_queries_to_replicated_databases?(method) - replicated && method.to_s =~ /select/ && !block && !slaves_grouped? + replicated && method.to_s =~ /select/ && (!block || !current_shard_dirty) && !slaves_grouped? end def send_queries_to_selected_slave(method, *args, &block) @@ -335,6 +337,7 @@ def keeping_connection_proxy(shard, &_block) # Temporarily switch `current_shard` and run the block def using_shard(shard, &_block) older_shard = current_shard + older_dirty = current_shard_dirty older_slave_group = current_slave_group older_load_balance_options = current_load_balance_options @@ -345,6 +348,7 @@ def using_shard(shard, &_block) yield ensure self.current_shard = older_shard + self.current_shard_dirty = older_dirty self.current_slave_group = older_slave_group self.current_load_balance_options = older_load_balance_options end diff --git a/lib/octopus/proxy_config.rb b/lib/octopus/proxy_config.rb index 93a2e5bc..688970f7 100644 --- a/lib/octopus/proxy_config.rb +++ b/lib/octopus/proxy_config.rb @@ -2,6 +2,7 @@ module Octopus class ProxyConfig CURRENT_MODEL_KEY = 'octopus.current_model'.freeze CURRENT_SHARD_KEY = 'octopus.current_shard'.freeze + CURRENT_SHARD_DIRTY_KEY = 'octopus.current_shard_dirty'.freeze CURRENT_GROUP_KEY = 'octopus.current_group'.freeze CURRENT_SLAVE_GROUP_KEY = 'octopus.current_slave_group'.freeze CURRENT_LOAD_BALANCE_OPTIONS_KEY = 'octopus.current_load_balance_options'.freeze @@ -25,6 +26,14 @@ def current_model=(model) Thread.current[CURRENT_MODEL_KEY] = model.is_a?(ActiveRecord::Base) ? model.class : model end + def current_shard_dirty + Thread.current[CURRENT_SHARD_DIRTY_KEY] ||= false + end + + def current_shard_dirty=(value) + Thread.current[CURRENT_SHARD_DIRTY_KEY] = value + end + def current_shard Thread.current[CURRENT_SHARD_KEY] ||= Octopus.master_shard end diff --git a/lib/octopus/scope_proxy.rb b/lib/octopus/scope_proxy.rb index 0cf3dd5b..5e50ef67 100644 --- a/lib/octopus/scope_proxy.rb +++ b/lib/octopus/scope_proxy.rb @@ -23,6 +23,7 @@ def initialize(shard, klass) def using(shard) fail "Nonexistent Shard Name: #{shard}" if @klass.connection.shards[shard].nil? @current_shard = shard + @klass.connection_proxy.current_shard_dirty = true self end diff --git a/spec/octopus/octopus_spec.rb b/spec/octopus/octopus_spec.rb index db2077bf..09a408e3 100644 --- a/spec/octopus/octopus_spec.rb +++ b/spec/octopus/octopus_spec.rb @@ -88,34 +88,86 @@ end describe '#fully_replicated' do - before do - OctopusHelper.using_environment :production_replicated do - OctopusHelper.clean_all_shards([:slave1, :slave2, :slave3, :slave4]) - 4.times { |i| User.using(:"slave#{i + 1}").create!(:name => 'Slave User') } + describe '#without_association' do + before do + OctopusHelper.using_environment :production_replicated do + OctopusHelper.clean_all_shards([:slave1, :slave2, :slave3, :slave4]) + 4.times { |i| User.using(:"slave#{i + 1}").create!(:name => 'Slave User') } + end + end + + it 'sends queries to slaves' do + OctopusHelper.using_environment :production_replicated do + expect(User.count).to eq(0) + 4.times do |_i| + Octopus.fully_replicated do + expect(User.count).to eq(1) + end + end + end end - end - it 'sends queries to slaves' do - OctopusHelper.using_environment :production_replicated do - expect(User.count).to eq(0) - 4.times do |_i| + it 'sends queries to master when forced to use master' do + OctopusHelper.using_environment :production_replicated do Octopus.fully_replicated do + expect(User.using(:master).count).to eq(0) + end + end + end + + it 'allows nesting' do + OctopusHelper.using_environment :production_replicated do + Octopus.fully_replicated do + expect(User.count).to eq(1) + + Octopus.fully_replicated do + expect(User.count).to eq(1) + end + expect(User.count).to eq(1) end end end end - it 'allows nesting' do - OctopusHelper.using_environment :production_replicated do - Octopus.fully_replicated do - expect(User.count).to eq(1) + describe '#with_association' do + before do + OctopusHelper.using_environment :production_replicated do + master = Client.create!(:name => 'Master Client') + OctopusHelper.clean_all_shards([:slave1, :slave2, :slave3, :slave4]) + Octopus.fully_replicated do + 4.times do |i| + client = Client.using(:"slave#{i + 1}").create!(master.as_json) + client.items << Item.using(:"slave#{i + 1}").create(:name => 'Slave Item') + client.save + end + end + end + end + + it 'sends queries to slaves' do + OctopusHelper.using_environment :production_replicated do + 4.times do |_i| + Octopus.fully_replicated do + expect(Client.last.items.count).to eq(1) + end + end + end + end + it 'sends queries to master when forced to use master' do + OctopusHelper.using_environment :production_replicated do Octopus.fully_replicated do - expect(User.count).to eq(1) + expect(Client.using(:master).last.items.count).to eq(0) end + end + end - expect(User.count).to eq(1) + it 'sends queries to slave when forced to use slave' do + OctopusHelper.using_environment :production_replicated do + Octopus.fully_replicated do + expect(Client.using(:slave1).last.items.count).to eq(1) + end end end end diff --git a/spec/support/octopus_helper.rb b/spec/support/octopus_helper.rb index 01db6431..f501dc8d 100644 --- a/spec/support/octopus_helper.rb +++ b/spec/support/octopus_helper.rb @@ -20,6 +20,7 @@ def self.clean_all_shards(shards) def self.clean_connection_proxy Thread.current['octopus.current_model'] = nil Thread.current['octopus.current_shard'] = nil + Thread.current['octopus.current_shard_dirty'] = false Thread.current['octopus.current_group'] = nil Thread.current['octopus.current_slave_group'] = nil Thread.current['octopus.block'] = nil