@@ -235,9 +235,10 @@ class asio_connection
235235// /
236236// / During the cleanup phase, connections are removed starting with the oldest. This
237237// / ensures that if a high intensity workload is followed by a low intensity workload,
238- // / the connection pool will correctly adapt to the current workload. Specifically,
239- // / the following code will eventually result in a maximum of one pooled connection
240- // / regardless of the initial number of pooled connections:
238+ // / the connection pool will correctly adapt to the low intensity workload.
239+ // /
240+ // / Specifically, the following code will eventually result in a maximum of one pooled
241+ // / connection regardless of the initial number of pooled connections:
241242// / <code>
242243// / while(1)
243244// / {
@@ -246,18 +247,11 @@ class asio_connection
246247// / pool.release(conn);
247248// / }
248249// / </code>
249- // /
250- // / Additionally, when two cleanup phases have occurred with no calls to `release()`
251- // / between them, the internal self-reference is cleared. If there are no active
252- // / `http_client`s keeping the pool alive, this will cause the pool to expire upon
253- // / cleanup handler termination. Whenever a new call to `release()` arrives, the self
254- // / reference is re-applied to keep the pool alive.
255250// / </remarks>
256251class asio_connection_pool : public std ::enable_shared_from_this<asio_connection_pool>
257252{
258253public:
259- asio_connection_pool ()
260- : m_pool_epoch_timer(crossplat::threadpool::shared_instance().service())
254+ asio_connection_pool () : m_pool_epoch_timer(crossplat::threadpool::shared_instance().service())
261255 {}
262256
263257 std::shared_ptr<asio_connection> acquire ()
@@ -281,29 +275,29 @@ class asio_connection_pool : public std::enable_shared_from_this<asio_connection
281275 return ;
282276
283277 std::lock_guard<std::mutex> lock (m_lock);
284- if (m_self_reference == nullptr )
278+ if (!is_timer_running )
285279 {
286- auto sptr = this ->shared_from_this ();
287- m_self_reference = sptr;
288- start_epoch_interval (sptr);
280+ start_epoch_interval (shared_from_this ());
281+ is_timer_running = true ;
289282 }
290283
291284 m_epoch++;
292- m_connections.emplace_back (m_epoch, connection);
285+ m_connections.emplace_back (m_epoch, std::move ( connection) );
293286 }
294287
295288private:
296289 // Note: must be called under m_lock
297- static void start_epoch_interval (const std::shared_ptr<asio_connection_pool>& pool) {
290+ static void start_epoch_interval (const std::shared_ptr<asio_connection_pool>& pool)
291+ {
298292 _ASSERTE (pool.get () != nullptr );
299- _ASSERTE (pool->m_self_reference != nullptr );
300293
301294 auto & self = *pool;
302295 std::weak_ptr<asio_connection_pool> weak_pool = pool;
303296
304297 self.m_prev_epoch = self.m_epoch ;
305298 pool->m_pool_epoch_timer .expires_from_now (boost::posix_time::seconds (30 ));
306- pool->m_pool_epoch_timer .async_wait ([weak_pool](const boost::system::error_code& ec) {
299+ pool->m_pool_epoch_timer .async_wait ([weak_pool](const boost::system::error_code& ec)
300+ {
307301 if (ec)
308302 return ;
309303
@@ -313,11 +307,11 @@ class asio_connection_pool : public std::enable_shared_from_this<asio_connection
313307 auto & self = *pool;
314308
315309 std::lock_guard<std::mutex> lock (self.m_lock );
316- _ASSERTE (self.m_self_reference != nullptr );
317310 if (self.m_prev_epoch == self.m_epoch )
318311 {
319312 self.m_connections .clear ();
320- self.m_self_reference = nullptr ;
313+ self.is_timer_running = false ;
314+ return ;
321315 }
322316 else
323317 {
@@ -335,109 +329,23 @@ class asio_connection_pool : public std::enable_shared_from_this<asio_connection
335329 }
336330
337331 std::mutex m_lock;
338- boost::asio::deadline_timer m_pool_epoch_timer;
339332 std::deque<std::pair<uint64_t , std::shared_ptr<asio_connection>>> m_connections;
333+
340334 uint64_t m_epoch = 0 ;
341335 uint64_t m_prev_epoch = 0 ;
342-
343- std::shared_ptr<asio_connection_pool> m_self_reference;
344- };
345-
346- class asio_shared_connection_pool : public std ::enable_shared_from_this<asio_shared_connection_pool>
347- {
348- public:
349- std::shared_ptr<asio_connection_pool> obtain (const std::string &pool_key)
350- {
351- std::shared_ptr<asio_connection_pool> ret;
352-
353- std::lock_guard<std::mutex> lock (m_lock);
354- auto it = m_pools.find (pool_key);
355- if (it != m_pools.end ())
356- {
357- ret = it->second .lock ();
358- if (ret == nullptr )
359- {
360- // Previous pool expired
361- ret = std::make_shared<asio_connection_pool>();
362- it->second = ret;
363- }
364- }
365- else
366- {
367- if (m_pools.empty ())
368- {
369- // If transitioning from empty to having a single element, restart the timer.
370- start_timer (shared_from_this ());
371- }
372- ret = std::make_shared<asio_connection_pool>();
373- m_pools.emplace (pool_key, ret);
374- }
375-
376- assert (ret != nullptr );
377- return ret;
378- }
379-
380- static std::shared_ptr<asio_shared_connection_pool>& shared_instance ()
381- {
382- static std::shared_ptr<asio_shared_connection_pool> s_instance = std::make_shared<asio_shared_connection_pool>();
383-
384- return s_instance;
385- }
386-
387- asio_shared_connection_pool () : m_timer(crossplat::threadpool::shared_instance().service()) {}
388-
389- private:
390- static void start_timer (const std::shared_ptr<asio_shared_connection_pool>& self)
391- {
392- self->m_timer .expires_from_now (boost::posix_time::seconds (60 ));
393- std::weak_ptr<asio_shared_connection_pool> weak_this = self;
394- self->m_timer .async_wait ([weak_this](const boost::system::error_code& ec)
395- {
396- if (ec)
397- return ;
398- auto strong_this = weak_this.lock ();
399- if (!strong_this)
400- return ;
401-
402- std::lock_guard<std::mutex> lock (strong_this->m_lock );
403- auto b = strong_this->m_pools .begin ();
404- auto e = strong_this->m_pools .end ();
405- for (; b != e;)
406- {
407- if (b->second .expired ())
408- b = strong_this->m_pools .erase (b);
409- else
410- ++b;
411- }
412- if (!strong_this->m_pools .empty ())
413- start_timer (strong_this);
414- });
415- }
416-
417- boost::asio::deadline_timer m_timer;
418- std::mutex m_lock;
419- std::unordered_map<std::string, std::weak_ptr<asio_connection_pool>> m_pools;
336+ bool is_timer_running = false ;
337+ boost::asio::deadline_timer m_pool_epoch_timer;
420338};
421339
422340class asio_client final : public _http_client_communicator
423341{
424342public:
425- asio_client (http::uri address, http_client_config client_config)
426- : _http_client_communicator(std::move(address), std::move(client_config))
427- , m_resolver(crossplat::threadpool::shared_instance().service())
428- {
429- m_start_with_ssl = base_uri ().scheme () == " https" && !this ->client_config ().proxy ().is_specified ();
430-
431- if (this ->client_config ().get_ssl_context_callback ())
432- {
433- // We will use a private connection pool because there is no better approaches to compare callback functors.
434- m_pool = std::make_shared<asio_connection_pool>();
435- }
436- else
437- {
438- m_pool = asio_shared_connection_pool::shared_instance ()->obtain (get_pool_key ());
439- }
440- }
343+ asio_client (http::uri&& address, http_client_config&& client_config)
344+ : _http_client_communicator(std::move(address), std::move(client_config))
345+ , m_resolver(crossplat::threadpool::shared_instance().service())
346+ , m_pool(std::make_shared<asio_connection_pool>())
347+ , m_start_with_ssl(base_uri().scheme() == " https" && !this ->client_config ().proxy().is_specified())
348+ {}
441349
442350 void send_request (const std::shared_ptr<request_context> &request_ctx) override ;
443351
@@ -464,35 +372,11 @@ class asio_client final : public _http_client_communicator
464372
465373 virtual pplx::task<http_response> propagate (http_request request) override ;
466374
467- private:
468- std::string get_pool_key () const
469- {
470- auto pool_key = base_uri ().to_string ();
471-
472- auto &credentials = _http_client_communicator::client_config ().credentials ();
473- if (credentials.is_set ())
474- {
475- pool_key.append (credentials.username ());
476- }
477-
478- auto &proxy = _http_client_communicator::client_config ().proxy ();
479- if (proxy.is_specified ())
480- {
481- pool_key.append (proxy.address ().to_string ());
482- if (proxy.credentials ().is_set ())
483- {
484- pool_key.append (proxy.credentials ().username ());
485- }
486- }
487-
488- return pool_key;
489- }
490-
491- std::shared_ptr<asio_connection_pool> m_pool;
492375public:
493376 tcp::resolver m_resolver;
494377private:
495- bool m_start_with_ssl;
378+ const std::shared_ptr<asio_connection_pool> m_pool;
379+ const bool m_start_with_ssl;
496380};
497381
498382class asio_context : public request_context , public std ::enable_shared_from_this<asio_context>
@@ -1612,9 +1496,9 @@ class asio_context : public request_context, public std::enable_shared_from_this
16121496};
16131497
16141498
1615- std::shared_ptr<_http_client_communicator> create_platform_final_pipeline_stage (uri base_uri, const http_client_config& client_config)
1499+ std::shared_ptr<_http_client_communicator> create_platform_final_pipeline_stage (uri&& base_uri, http_client_config& & client_config)
16161500{
1617- return std::make_shared<asio_client>(base_uri, client_config);
1501+ return std::make_shared<asio_client>(std::move ( base_uri), std::move ( client_config) );
16181502}
16191503
16201504void asio_client::send_request (const std::shared_ptr<request_context> &request_ctx)
0 commit comments