@@ -77,25 +77,25 @@ class asio_connection
7777 friend class asio_connection_pool ;
7878 friend class asio_client ;
7979public:
80- asio_connection (boost::asio::io_service& io_service, bool start_with_ssl, const std::function<void (boost::asio::ssl::context&)>& ssl_context_callback) :
81- m_socket (io_service),
82- m_ssl_context_callback (ssl_context_callback),
83- m_pool_timer (io_service),
84- m_is_reused (false ),
85- m_keep_alive (true )
80+ asio_connection (
81+ boost::asio::io_service& io_service,
82+ const std::string &pool_key,
83+ bool start_with_ssl,
84+ const std::function<void (boost::asio::ssl::context&)>& ssl_context_callback)
85+ : m_socket(io_service),
86+ m_ssl_context_callback (ssl_context_callback),
87+ m_pool_timer(io_service),
88+ m_is_reused(false ),
89+ m_keep_alive(true ),
90+ m_pool_key(pool_key),
91+ m_epoch(0 )
8692 {
8793 if (start_with_ssl)
8894 {
8995 upgrade_to_ssl ();
9096 }
9197 }
9298
93- asio_connection (boost::asio::io_service& io_service, const std::string &pool_key, bool start_with_ssl, const std::function<void (boost::asio::ssl::context&)>& ssl_context_callback) :
94- asio_connection (io_service, start_with_ssl, ssl_context_callback)
95- {
96- m_pool_key = pool_key;
97- }
98-
9999 ~asio_connection ()
100100 {
101101 close ();
@@ -146,8 +146,7 @@ class asio_connection
146146 bool keep_alive () const { return m_keep_alive; }
147147 bool is_ssl () const { return m_ssl_stream ? true : false ; }
148148 const std::string &pool_key () const { return m_pool_key; }
149- const std::string &nonce () const { return m_nonce; }
150- void generate_nonce () { m_nonce = m_nonce_generator.generate (); }
149+ uint32_t epoch () const { return m_epoch; }
151150
152151 template <typename Iterator, typename Handler>
153152 void async_connect (const Iterator &begin, const Handler &handler)
@@ -240,7 +239,7 @@ class asio_connection
240239 {
241240 cancel_pool_timer ();
242241 m_is_reused = true ;
243- generate_nonce () ;
242+ m_epoch++ ;
244243 }
245244
246245 // Guards concurrent access to socket/ssl::stream. This is necessary
@@ -256,18 +255,71 @@ class asio_connection
256255 bool m_is_reused;
257256 bool m_keep_alive;
258257 std::string m_pool_key;
259- std::string m_nonce;
260- utility::nonce_generator m_nonce_generator;
258+ uint32_t m_epoch;
259+ };
260+
261+ class asio_shared_connection_pool
262+ {
263+ public:
264+ asio_shared_connection_pool (boost::asio::io_service& io_service) :
265+ m_io_service (io_service)
266+ {}
267+
268+ ~asio_shared_connection_pool ()
269+ {
270+ std::lock_guard<std::mutex> lock (m_connections_mutex);
271+ // Cancel the pool timer for all connections.
272+ for (auto & connection : m_connections)
273+ {
274+ connection.second ->cancel_pool_timer ();
275+ }
276+ }
277+
278+ void release (const std::shared_ptr<asio_connection>& connection)
279+ {
280+ if (connection->keep_alive () && (m_timeout_secs > 0 ))
281+ {
282+ connection->cancel ();
283+
284+ std::lock_guard<std::mutex> lock (m_connections_mutex);
285+ auto it = m_connections.insert (std::make_pair (connection->pool_key (), connection));
286+
287+ // This will destroy and remove the connection from pool after the set timeout.
288+ // We use 'this' because async calls to timer handler only occur while the pool exists.
289+ auto connection_weak = std::weak_ptr<asio_connection>(connection);
290+ auto epoch = connection->epoch ();
291+ connection->start_pool_timer (m_timeout_secs, [this , connection_weak, epoch](const boost::error_code& ec) {
292+ this ->free_shared_connection (ec, connection_weak, epoch);
293+ });
294+ }
295+ }
296+
297+ private:
298+ void free_connection (const boost::system::error_code& ec, const std::weak_ptr<asio_connection> &connection, unsigned int epoch)
299+ {
300+ auto connection_shared = connection.lock ();
301+ if (!connection_shared)
302+ return ;
303+
304+ std::lock_guard<std::mutex> lock (m_connections_mutex);
305+ auto it = m_connections.find (connection_shared);
306+ if (it == m_connections.end ())
307+ // The connection was acquired while this callback was firing
308+ return ;
309+
310+ // The epoch is used to ensure the connection was not quickly acquired and released while this callback was firing.
311+ // Every acquisition increments the epoch.
312+ if (epoch != (*it)->epoch ())
313+ m_connections.erase (it);
314+ }
261315};
262316
263317class asio_connection_pool
264318{
265319public:
266320
267- asio_connection_pool (boost::asio::io_service& io_service, const std::chrono::seconds &idle_timeout, bool is_shared) :
268- m_io_service (io_service),
269- m_timeout_secs (static_cast <int >(idle_timeout.count())),
270- m_is_shared (is_shared)
321+ asio_connection_pool (boost::asio::io_service& io_service) :
322+ m_io_service (io_service)
271323 {}
272324
273325 ~asio_connection_pool ()
@@ -282,29 +334,15 @@ class asio_connection_pool
282334
283335 void release (const std::shared_ptr<asio_connection> &connection)
284336 {
285- if (connection->keep_alive () && (m_timeout_secs > 0 ))
286- {
287- connection->cancel ();
337+ connection->cancel ();
288338
289- if (m_is_shared)
290- {
291- std::lock_guard<std::mutex> lock (m_connections_mutex);
292- auto it = m_shared_connections.insert (std::make_pair (connection->pool_key (), connection));
293- // This will destroy and remove the connection from pool after the set timeout.
294- // We use 'this' because async calls to timer handler only occur while the pool exists.
295- connection->start_pool_timer (m_timeout_secs, boost::bind (&asio_connection_pool::free_shared_connection, this , boost::asio::placeholders::error, it, std::weak_ptr<asio_connection>(connection), connection->nonce ()));
296- }
297- else
298- {
299- std::lock_guard<std::mutex> lock (m_connections_mutex);
300- auto pair = m_connections.insert (connection);
301- if (pair.second )
302- {
303- // This will destroy and remove the connection from pool after the set timeout.
304- // We use 'this' because async calls to timer handler only occur while the pool exists.
305- connection->start_pool_timer (m_timeout_secs, boost::bind (&asio_connection_pool::free_connection, this , boost::asio::placeholders::error, pair.first , std::weak_ptr<asio_connection>(connection), connection->nonce ()));
306- }
307- }
339+ if (connection->keep_alive ())
340+ {
341+ std::lock_guard<std::mutex> lock (m_connections_mutex);
342+ // This will destroy and remove the connection from pool after the set timeout.
343+ // We use 'this' because async calls to timer handler only occur while the pool exists.
344+ connection->start_pool_timer (s_timeout_secs.count (), boost::bind (&asio_connection_pool::free_connection, this , boost::asio::placeholders::error, pair.first , std::weak_ptr<asio_connection>(connection), connection->nonce ()));
345+ m_connections.push_back (connection);
308346 }
309347 // Otherwise connection is not put to the pool and it will go out of scope.
310348 }
@@ -377,26 +415,33 @@ class asio_connection_pool
377415 }
378416
379417 // Using weak_ptr here ensures bind() to this handler will not prevent the connection object from going out of scope.
380- void free_connection (const boost::system::error_code& ec, std::set<std::shared_ptr<asio_connection>>::iterator it, const std::weak_ptr<asio_connection> &connection, const std::string &nonce )
418+ void free_connection (const boost::system::error_code& ec, const std::weak_ptr<asio_connection> &connection, uint32_t epoch )
381419 {
382420 if (!ec)
383421 {
384422 auto connection_shared = connection.lock ();
385- // Compare nonce here to ensure the iterator is valid, the connection not been reused.
386- if (connection_shared && (connection_shared->nonce () == nonce))
387- {
388- std::lock_guard<std::mutex> lock (m_connections_mutex);
423+ if (!connection_shared)
424+ return ;
425+
426+ std::lock_guard<std::mutex> lock (m_connections_mutex);
427+ auto it = m_connections.find (connection_shared);
428+ if (it == m_connections.end ())
429+ // The connection was acquired while this callback was firing
430+ return ;
431+
432+ // The epoch is used to ensure the connection was not quickly acquired and released while this callback was firing.
433+ // Every acquisition increments the epoch.
434+ if (epoch != (*it)->epoch ())
389435 m_connections.erase (it);
390- }
391436 }
392437 }
393438
394439 boost::asio::io_service& m_io_service;
395- const int m_timeout_secs;
396- bool m_is_shared;
397- std::multimap<std::string, std::shared_ptr<asio_connection>> m_shared_connections;
398- std::set<std::shared_ptr<asio_connection>> m_connections;
440+
399441 std::mutex m_connections_mutex;
442+ std::set<std::shared_ptr<asio_connection>> m_connections;
443+
444+ static const std::chrono::seconds s_timeout_secs = 30 ;
400445};
401446
402447std::shared_ptr<asio_connection_pool> asio_connection_pool::shared_instance ()
0 commit comments