Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
When initializing Refile::Postgres::Backend you must pass PG::Connect…
…ion or lambda that yields PG::Connection as an argument.
  • Loading branch information
krists committed Dec 1, 2015
commit d38b6a58f1a40091350d1461ca1f7fafaea0d42d
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
require "refile"
Refile.configure do |config|
config.store = Refile::Postgres::Backend.new(proc { ActiveRecord::Base.connection.raw_connection } )
connection = lambda { |&blk| ActiveRecord::Base.connection_pool.with_connection { |con| blk.call(con.raw_connection) } }
config.store = Refile::Postgres::Backend.new(connection)
end
111 changes: 48 additions & 63 deletions lib/refile/postgres/backend.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ class Backend
DEFAULT_NAMESPACE = "default"
PG_LARGE_OBJECT_METADATA_TABLE = "pg_largeobject_metadata"
READ_CHUNK_SIZE = 3000
INIT_CONNECTION_ARG_ERROR_MSG = "When initializing new Refile::Postgres::Backend first argument should be an instance of PG::Connection or a lambda/proc that returns it. When using ActiveRecord it is available as ActiveRecord::Base.connection.raw_connection"

def initialize(connection_or_proc, max_size: nil, namespace: DEFAULT_NAMESPACE, registry_table: DEFAULT_REGISTRY_TABLE)
@connection_or_proc = connection_or_proc
Expand All @@ -22,50 +21,43 @@ def initialize(connection_or_proc, max_size: nil, namespace: DEFAULT_NAMESPACE,

def registry_table
unless @registry_table_validated
connection.exec %{
SELECT count(*) from pg_catalog.pg_tables
WHERE tablename = '#{@registry_table}';
} do |result|
unless result[0]["count"].to_i > 0
raise RegistryTableDoesNotExistError.new(%{Please create a table "#{@registry_table}" where backend could store list of attachments})
with_connection do |connection|
connection.exec_params("SELECT * FROM pg_catalog.pg_tables WHERE tablename = $1::varchar;", [@registry_table]) do |result|
if result.count != 1
raise RegistryTableDoesNotExistError.new(%{Please create a table "#{@registry_table}" where backend could store list of attachments})
end
end
end
@registry_table_validated = true
end
@registry_table
end

def connection
if has_active_connection?
@connection
else
obtain_new_connection
end
end

verify_uploadable def upload(uploadable)
oid = connection.lo_creat
ensure_in_transaction do
begin
handle = connection.lo_open(oid, PG::INV_WRITE)
connection.lo_truncate(handle, 0)
buffer = "" # reuse the same buffer
until uploadable.eof?
uploadable.read(READ_CHUNK_SIZE, buffer)
connection.lo_write(handle, buffer)
with_connection do |connection|
oid = connection.lo_creat
ensure_in_transaction(connection) do
begin
handle = connection.lo_open(oid, PG::INV_WRITE)
connection.lo_truncate(handle, 0)
buffer = "" # reuse the same buffer
until uploadable.eof?
uploadable.read(READ_CHUNK_SIZE, buffer)
connection.lo_write(handle, buffer)
end
uploadable.close
connection.exec_params("INSERT INTO #{registry_table} VALUES ($1::integer, $2::varchar);", [oid, namespace])
Refile::File.new(self, oid.to_s)
ensure
connection.lo_close(handle)
end
uploadable.close
connection.exec_params("INSERT INTO #{registry_table} VALUES ($1::integer, $2::varchar);", [oid, namespace])
Refile::File.new(self, oid.to_s)
ensure
connection.lo_close(handle)
end
end
end

verify_id def open(id)
if exists?(id)
Reader.new(connection, id)
Reader.new(@connection_or_proc, id)
else
raise ArgumentError.new("No such attachment with ID: #{id}")
end
Expand All @@ -84,14 +76,16 @@ def connection
end

verify_id def exists?(id)
connection.exec_params(%{
SELECT count(*) FROM #{registry_table}
INNER JOIN #{PG_LARGE_OBJECT_METADATA_TABLE}
ON #{registry_table}.id = #{PG_LARGE_OBJECT_METADATA_TABLE}.oid
WHERE #{registry_table}.namespace = $1::varchar
AND #{registry_table}.id = $2::integer;
}, [namespace, id.to_s.to_i]) do |result|
result[0]["count"].to_i > 0
with_connection do |connection|
connection.exec_params(%{
SELECT count(*) FROM #{registry_table}
INNER JOIN #{PG_LARGE_OBJECT_METADATA_TABLE}
ON #{registry_table}.id = #{PG_LARGE_OBJECT_METADATA_TABLE}.oid
WHERE #{registry_table}.namespace = $1::varchar
AND #{registry_table}.id = $2::integer;
}, [namespace, id.to_s.to_i]) do |result|
result[0]["count"].to_i > 0
end
end
end

Expand All @@ -105,43 +99,34 @@ def connection

verify_id def delete(id)
if exists?(id)
ensure_in_transaction do
connection.lo_unlink(id.to_s.to_i)
connection.exec_params("DELETE FROM #{registry_table} WHERE id = $1::integer;", [id])
with_connection do |connection|
ensure_in_transaction(connection) do
connection.lo_unlink(id.to_s.to_i)
connection.exec_params("DELETE FROM #{registry_table} WHERE id = $1::integer;", [id])
end
end
end
end

def clear!(confirm = nil)
raise Refile::Confirm unless confirm == :confirm
registry_table
ensure_in_transaction do
connection.exec_params(%{
SELECT * FROM #{registry_table}
INNER JOIN #{PG_LARGE_OBJECT_METADATA_TABLE} ON #{registry_table}.id = #{PG_LARGE_OBJECT_METADATA_TABLE}.oid
WHERE #{registry_table}.namespace = $1::varchar;
}, [namespace]) do |result|
result.each_row do |row|
connection.lo_unlink(row[0].to_s.to_i)
with_connection do |connection|
ensure_in_transaction(connection) do
connection.exec_params(%{
SELECT * FROM #{registry_table}
INNER JOIN #{PG_LARGE_OBJECT_METADATA_TABLE} ON #{registry_table}.id = #{PG_LARGE_OBJECT_METADATA_TABLE}.oid
WHERE #{registry_table}.namespace = $1::varchar;
}, [namespace]) do |result|
result.each_row do |row|
connection.lo_unlink(row[0].to_s.to_i)
end
end
connection.exec_params("DELETE FROM #{registry_table} WHERE namespace = $1::varchar;", [namespace])
end
connection.exec_params("DELETE FROM #{registry_table} WHERE namespace = $1::varchar;", [namespace])
end
end

private

def has_active_connection?
@connection && [email protected]?
end

def obtain_new_connection
candidate = @connection_or_proc.is_a?(Proc) ? @connection_or_proc.call : @connection_or_proc
unless candidate.is_a?(PG::Connection)
raise ArgumentError.new(INIT_CONNECTION_ARG_ERROR_MSG)
end
@connection = candidate
end
end
end
end
50 changes: 32 additions & 18 deletions lib/refile/postgres/backend/reader.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,52 +4,66 @@ class Backend
class Reader
include SmartTransaction

def initialize(connection, oid)
@connection = connection
def initialize(connection_or_proc, oid)
@connection_or_proc = connection_or_proc
@oid = oid.to_s.to_i
@closed = false
@pos = 0
end

attr_reader :connection, :oid, :pos
attr_reader :oid, :pos

def read(length = nil, buffer = nil)
result = if length
raise "closed" if @closed
smart_transaction do |descriptor|
connection.lo_lseek(descriptor, @pos, PG::SEEK_SET)
data = connection.lo_read(descriptor, length)
@pos = connection.lo_tell(descriptor)
data
with_connection do |connection|
smart_transaction(connection) do |descriptor|
connection.lo_lseek(descriptor, @pos, PG::SEEK_SET)
data = connection.lo_read(descriptor, length)
@pos = connection.lo_tell(descriptor)
data
end
end
else
smart_transaction do |descriptor|
connection.lo_read(descriptor, size)
with_connection do |connection|
smart_transaction(connection) do |descriptor|
connection.lo_read(descriptor, size)
end
end
end
buffer.replace(result) if buffer and result
result
end

def eof?
smart_transaction do |descriptor|
@pos == size
with_connection do |connection|
smart_transaction(connection) do |descriptor|
@pos == size
end
end
end

def size
@size ||= smart_transaction do |descriptor|
current_position = connection.lo_tell(descriptor)
end_position = connection.lo_lseek(descriptor, 0, PG::SEEK_END)
connection.lo_lseek(descriptor, current_position, PG::SEEK_SET)
end_position
end
@size ||= fetch_size
end

def close
@closed = true
end

private

def fetch_size
with_connection do |connection|
smart_transaction(connection) do |descriptor|
current_position = connection.lo_tell(descriptor)
end_position = connection.lo_lseek(descriptor, 0, PG::SEEK_END)
connection.lo_lseek(descriptor, current_position, PG::SEEK_SET)
end_position
end
end
end

end
end
end
Expand Down
28 changes: 24 additions & 4 deletions lib/refile/postgres/smart_transaction.rb
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
module Refile
module Postgres
module SmartTransaction

INIT_CONNECTION_ARG_ERROR_MSG = "When initializing new Refile::Postgres::Backend first argument should be an instance of PG::Connection or a lambda/proc that yields it."
PQTRANS_INTRANS = 2 # (idle, within transaction block)

def smart_transaction
def smart_transaction(connection)
result = nil
ensure_in_transaction do
ensure_in_transaction(connection) do
begin
handle = connection.lo_open(oid)
result = yield handle
Expand All @@ -16,7 +16,7 @@ def smart_transaction
result
end

def ensure_in_transaction
def ensure_in_transaction(connection)
if connection.transaction_status == PQTRANS_INTRANS
yield
else
Expand All @@ -26,6 +26,26 @@ def ensure_in_transaction
end
end

def with_connection
if @connection_or_proc.is_a?(PG::Connection)
yield @connection_or_proc
else
if @connection_or_proc.is_a?(Proc)
block_has_been_executed = false
value = nil
@connection_or_proc.call do |connection|
block_has_been_executed = true
raise ArgumentError.new(INIT_CONNECTION_ARG_ERROR_MSG) unless connection.is_a?(PG::Connection)
value = yield connection
end
raise ArgumentError.new(INIT_CONNECTION_ARG_ERROR_MSG) unless block_has_been_executed
value
else
raise ArgumentError.new(INIT_CONNECTION_ARG_ERROR_MSG)
end
end
end

end
end
end
4 changes: 2 additions & 2 deletions refile-postgres.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ Gem::Specification.new do |spec|
spec.test_files = spec.files.grep(%r{^(test|spec|features)/})
spec.require_paths = ["lib"]

spec.add_dependency "refile", "~> 0.6.1"
spec.add_dependency "refile", "~> 0.6.2"
spec.add_dependency "pg"

spec.add_development_dependency "bundler"
spec.add_development_dependency "rspec"
spec.add_development_dependency "webmock"
spec.add_development_dependency "pry"
spec.add_development_dependency "pry-stack_explorer"
spec.add_development_dependency "rails", "~> 4.2.1"
spec.add_development_dependency "rails", "~> 4.2.5"
spec.add_development_dependency "rake"
spec.add_development_dependency "codeclimate-test-reporter"
end
56 changes: 23 additions & 33 deletions spec/refile/postgres/backend_spec.rb
Original file line number Diff line number Diff line change
@@ -1,47 +1,37 @@
require "spec_helper"

RSpec.describe Refile::Postgres::Backend do
let(:connection_or_proc) { PG.connect(dbname: 'refile_test') }
describe Refile::Postgres::Backend do
let(:connection) { PG.connect(dbname: 'refile_test') }
let(:backend) { Refile::Postgres::Backend.new(connection_or_proc, max_size: 100) }
it_behaves_like :backend

context "Connection tests" do
def connection
PG.connect(dbname: 'refile_test')
context "when not using procs and providing PG::Connection directly" do
let(:connection_or_proc) { connection }
it "reuses the same PG::Connection" do
expect(backend.with_connection { |c| c.db }).to eq("refile_test")
end
end

context "when using proc" do
def connection_or_proc
proc { connection }
end

it "reuses the same PG::Connection if connection is ok" do
expect(backend.connection).to eq(backend.connection)
context "when lambda does not yield a block but returns connection" do
let(:connection_or_proc) { lambda { connection } }
it "raises argument error" do
expect {
backend.with_connection { |c| c.db }
}.to raise_error(ArgumentError, "When initializing new Refile::Postgres::Backend first argument should be an instance of PG::Connection or a lambda/proc that yields it.")
end
end

it "executes proc and obtains new connection if old one is closed" do
old = backend.connection
old.close
expect(backend.connection).not_to eq(old)
expect(backend.connection.finished?).to be_falsey
context "when lambda does yield a PG::Connection" do
let(:connection_or_proc) { lambda { |&blk| blk.call(connection) } }
it "is usable in queries" do
expect(backend.with_connection { |c| c.db }).to eq("refile_test")
end
end
end
end

context "when not using procs and providing PG::Connection directly" do
def connection_or_proc
connection
end

it "reuses the same PG::Connection" do
expect(backend.connection).to eq(backend.connection)
end

it "continues to use old connection if the old one is closed" do
old = backend.connection
old.close
expect(backend.connection).to eq(old)
expect(backend.connection.finished?).to be_truthy
end
end
context "Refile Provided tests" do
let(:connection_or_proc) { connection }
it_behaves_like :backend
end
end