Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
b139bcb
Merge remote-tracking branch 'core/2.12.0_prep'
Apr 19, 2019
dd69942
Updating libuv version for *nix builds
Apr 19, 2019
e6e3c59
CPP-766 - Allow RPM packages to be built on Amazon Linux (#230)
mikefero Apr 25, 2019
53354e9
CPP-764 - Move internal components from `namespace cass` to `namespa…
mpenick Apr 29, 2019
0c8d1eb
CPP-770 Fix header file include guards (__CASS_XXX to DATASTAX_XXX) (…
mpenick Apr 30, 2019
a90986b
Manual formatting fixes before clang-format
May 1, 2019
4b8ab03
Add basic support for clang-format
May 1, 2019
4762815
Add format check to Jenkins CI
May 1, 2019
183e10f
Run clang-format
May 1, 2019
28db6f2
Fix clang-format cpp standard (Cpp03)
May 1, 2019
1a37d1f
Rerun clang-format
May 1, 2019
657e8fa
Merge pull request #235 from riptano/CPP-769
mpenick May 1, 2019
232f222
CPP-772 Remove per cpp file LOG_FILE attribute to speed up Windows build
May 1, 2019
95d6e15
Remove `/GL` and `/LTCG` from Windows builds
May 2, 2019
290f3c9
Merge pull request #236 from riptano/CPP-772
mpenick May 2, 2019
772e3a2
Run clang-format
May 7, 2019
625383a
CPP-775 - Add clang-format targets for Windows command line
May 7, 2019
e87cafb
Manually fixing code format before re-running clang-format
May 7, 2019
97ec92a
Merge pull request #240 from riptano/CPP-775
mikefero May 8, 2019
b576485
CPP-774 - Updating Windows binary publishing to Artifactory (#237)
mikefero May 8, 2019
b24fb3e
CPP-776 - test: Ensure mockssandra decodes entire frame (#247)
mikefero May 23, 2019
9b43b92
Merge remote-tracking branch 'core/master' into sync_core
May 29, 2019
a596639
Merge pull request #250 from riptano/sync_core
mikefero May 29, 2019
85a9294
Update copyright in README (#253)
mpenick Jun 6, 2019
3870627
CPP-745 - Exponential reconnection policy with jitter (#254)
mikefero Jun 21, 2019
cad4643
CPP-573 - docs: Fixing broken links and warnings (#257)
mikefero Jun 26, 2019
d7c3fd4
Merge remote-tracking branch 'core/master' into oss_merge
Jun 26, 2019
9ef1b03
Merge pull request #258 from riptano/oss_merge
mikefero Jun 26, 2019
52b42e2
Update CHANGELOG and versions
Jun 25, 2019
f9ef106
test: Updating Cassandra and DSE versions
Jun 26, 2019
09356d0
test: Fixing comparison warnings
Jun 26, 2019
cc05955
Fixing namespacing for core driver
Jun 26, 2019
a0f7023
Merge remote-tracking branch 'core/master' into 1.9.0_release_prep
Jun 27, 2019
7ec952d
test: Fixing comments and correcting values
Jun 28, 2019
b996200
doc: Fixing up formatting
Jun 28, 2019
08c85b3
Merge pull request #259 from riptano/1.9.0_release_prep
mikefero Jul 1, 2019
2670a38
Merge tag '1.9.0' into 2.13.0_release_prep
Jul 1, 2019
3b17f5c
CPP-769 - Adding format target for OSS driver
Jul 1, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
CPP-745 - Exponential reconnection policy with jitter (#254)
* CPP-745 - Exponential reconnection policy
  • Loading branch information
mikefero authored Jun 21, 2019
commit 387062766082ef44b12d15bf983c2ccb2733fb2a
8 changes: 4 additions & 4 deletions cpp-driver/gtests/src/integration/objects/cluster.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -302,13 +302,13 @@ class Cluster : public Object<CassCluster, cass_cluster_free> {
}

/**
* Sets the amount of time to wait before attempting to reconnect.
* Sets the constant reconnection policy.
*
* @param wait_time_ms Wait time in milliseconds (default: 2000)
* @param delay_ms Delay in milliseconds (default: 2000)
* @return Cluster object
*/
Cluster& with_reconnect_wait_time(unsigned int wait_time_ms) {
cass_cluster_set_reconnect_wait_time(get(), wait_time_ms);
Cluster& with_constant_reconnect(unsigned int delay_ms) {
cass_cluster_set_constant_reconnect(get(), delay_ms);
return *this;
}

Expand Down
18 changes: 18 additions & 0 deletions cpp-driver/gtests/src/integration/tests/test_cluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,21 @@ TEST(ClusterTest, SetLoadBalanceDcAwareNullLocalDc) {
EXPECT_EQ(CASS_ERROR_LIB_BAD_PARAMS,
cass_cluster_set_load_balance_dc_aware(cluster.get(), NULL, 99, cass_false));
}

/**
* Set invalid parameters for exponential reconnection policy.
*
* @jira_ticket CPP-745
* @test_category configuration
* @expected_result CASS_ERROR_LIB_BAD_PARAMS.
*/
TEST(ClusterTest, ExponentialReconnectionPolicyBadParameters) {
test::driver::Cluster cluster;

// Base delay cannot be zero
EXPECT_EQ(CASS_ERROR_LIB_BAD_PARAMS, cass_cluster_set_exponential_reconnect(cluster.get(), 0, 1));
// Max delay cannot be zero
EXPECT_EQ(CASS_ERROR_LIB_BAD_PARAMS, cass_cluster_set_exponential_reconnect(cluster.get(), 1, 0));
// Base delay cannot be greater than max delay
EXPECT_EQ(CASS_ERROR_LIB_BAD_PARAMS, cass_cluster_set_exponential_reconnect(cluster.get(), 2, 1));
}
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ CASSANDRA_INTEGRATION_TEST_F(ControlConnectionTwoNodeClusterTests, StatusChange)
* Create a new session connection using the round robin load balancing policy
* to ensure all nodes can be accessed during request execution
*/
Cluster cluster = default_cluster().with_load_balance_round_robin().with_reconnect_wait_time(
Cluster cluster = default_cluster().with_load_balance_round_robin().with_constant_reconnect(
10); // Ensure reconnect timeout is quick
Session session = cluster.connect();

Expand Down
14 changes: 13 additions & 1 deletion cpp-driver/gtests/src/unit/mockssandra.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,8 @@ ServerConnection::ServerConnection(const Address& address, const ClientConnectio
, rc_(0)
, address_(address)
, factory_(factory)
, ssl_context_(NULL) {
, ssl_context_(NULL)
, connection_attempts_(0) {
uv_mutex_init(&mutex_);
uv_cond_init(&cond_);
}
Expand Down Expand Up @@ -493,6 +494,7 @@ void ServerConnection::wait_close() {
}
}

unsigned ServerConnection::connection_attempts() const { return connection_attempts_.load(); }
void ServerConnection::run(const ServerConnectionTask::Ptr& task) {
ScopedMutex l(&mutex_);
if (state_ != STATE_LISTENING) return;
Expand Down Expand Up @@ -575,6 +577,8 @@ void ServerConnection::on_connection(uv_stream_t* server, int status) {
}

void ServerConnection::handle_connection(int status) {
connection_attempts_.fetch_add(1);

if (status != 0) {
fprintf(stderr, "Listen failure: %s\n", uv_strerror(status));
return;
Expand Down Expand Up @@ -2184,6 +2188,14 @@ Hosts Cluster::hosts() const {
return hosts;
}

unsigned Cluster::connection_attempts(size_t node) const {
if (node < 1 || node > servers_.size()) {
return 0;
}
const Server& server = servers_[node - 1];
return server.connection->connection_attempts();
}

int Cluster::create_and_add_server(AddressGenerator& generator, ClientConnectionFactory& factory,
const String& dc) {
Address address(generator.next());
Expand Down
4 changes: 4 additions & 0 deletions cpp-driver/gtests/src/unit/mockssandra.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ class ServerConnection : public RefCounted<ServerConnection> {
void close();
void wait_close();

unsigned connection_attempts() const;
void run(const ServerConnectionTask::Ptr& task);

private:
Expand Down Expand Up @@ -215,6 +216,7 @@ class ServerConnection : public RefCounted<ServerConnection> {
const Address address_;
const ClientConnectionFactory& factory_;
SSL_CTX* ssl_context_;
Atomic<unsigned> connection_attempts_;
};

} // namespace internal
Expand Down Expand Up @@ -1162,6 +1164,8 @@ class Cluster {
const Host& host(const Address& address) const;
Hosts hosts() const;

unsigned connection_attempts(size_t node) const;

void event(const Event::Ptr& event);

private:
Expand Down
97 changes: 93 additions & 4 deletions cpp-driver/gtests/src/unit/tests/test_cluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
using namespace datastax::internal;
using namespace datastax::internal::core;

#define FIFTEEN_PERCENT(value) static_cast<double>((value * 115) / 100)

class ClusterUnitTest : public EventLoopTest {
public:
ClusterUnitTest()
Expand Down Expand Up @@ -239,6 +241,53 @@ class ClusterUnitTest : public EventLoopTest {
mockssandra::Cluster& simple_cluster_;
};

class ClusterUnitTestReconnectionPolicy : public ReconnectionPolicy {
public:
typedef SharedRefPtr<ClusterUnitTestReconnectionPolicy> Ptr;

ClusterUnitTestReconnectionPolicy()
: ReconnectionPolicy(ReconnectionPolicy::CONSTANT)
, reconnection_schedule_count_(0)
, destroyed_reconnection_schedule_count_(0)
, scheduled_delay_count_(0) {}

virtual const char* name() const { return "blah"; }
virtual ReconnectionSchedule* new_reconnection_schedule() {
++reconnection_schedule_count_;
return new ClusterUnitTestReconnectionSchedule(&scheduled_delay_count_,
&destroyed_reconnection_schedule_count_);
}

unsigned reconnection_schedule_count() const { return reconnection_schedule_count_; }
unsigned destroyed_reconnection_schedule_count() const {
return destroyed_reconnection_schedule_count_;
}
unsigned scheduled_delay_count() const { return scheduled_delay_count_; }

private:
unsigned reconnection_schedule_count_;
unsigned destroyed_reconnection_schedule_count_;
unsigned scheduled_delay_count_;

class ClusterUnitTestReconnectionSchedule : public ReconnectionSchedule {
public:
ClusterUnitTestReconnectionSchedule(unsigned* delay_count, unsigned* destroyed_count)
: delay_count_(delay_count)
, destroyed_count_(destroyed_count) {}

~ClusterUnitTestReconnectionSchedule() { ++*destroyed_count_; }

virtual uint64_t next_delay_ms() {
++*delay_count_;
return 1;
}

private:
unsigned* delay_count_;
unsigned* destroyed_count_;
};
};

static void on_connection_connected(ClusterConnector* connector, Future* future) {
if (connector->is_ok()) {
future->set();
Expand Down Expand Up @@ -469,7 +518,7 @@ TEST_F(ClusterUnitTest, ReconnectToDiscoveredHosts) {
ReconnectClusterListener::Ptr listener(new ReconnectClusterListener(close_future, &outage_plan));

ClusterSettings settings;
settings.reconnect_timeout_ms = 1; // Reconnect immediately
settings.reconnection_policy.reset(new ConstantReconnectionPolicy(1)); // Reconnect immediately
settings.control_connection_settings.connection_settings.connect_timeout_ms =
200; // Give enough time for the connection to complete

Expand Down Expand Up @@ -512,7 +561,7 @@ TEST_F(ClusterUnitTest, ReconnectUpdateHosts) {
ReconnectClusterListener::Ptr listener(new ReconnectClusterListener(close_future, &outage_plan));

ClusterSettings settings;
settings.reconnect_timeout_ms = 1; // Reconnect immediately
settings.reconnection_policy.reset(new ConstantReconnectionPolicy(1)); // Reconnect immediately
settings.control_connection_settings.connection_settings.connect_timeout_ms =
200; // Give enough time for the connection to complete

Expand Down Expand Up @@ -553,7 +602,8 @@ TEST_F(ClusterUnitTest, CloseDuringReconnect) {
Listener::Ptr listener(new Listener(close_future));

ClusterSettings settings;
settings.reconnect_timeout_ms = 100000; // Make sure we're reconnecting when we close.
settings.reconnection_policy.reset(
new ConstantReconnectionPolicy(100000)); // Make sure we're reconnecting when we close.

connector->with_settings(settings)->with_listener(listener.get())->connect(event_loop());

Expand Down Expand Up @@ -772,7 +822,7 @@ TEST_F(ClusterUnitTest, DCAwareRecoverOnRemoteHost) {
new DCAwarePolicy("dc1", 1, false)); // Allow connection to a single remote host
settings.load_balancing_policies.clear();
settings.load_balancing_policies.push_back(settings.load_balancing_policy);
settings.reconnect_timeout_ms = 1; // Reconnect immediately
settings.reconnection_policy.reset(new ConstantReconnectionPolicy(1)); // Reconnect immediately
settings.control_connection_settings.connection_settings.connect_timeout_ms =
200; // Give enough time for the connection to complete

Expand Down Expand Up @@ -868,3 +918,42 @@ TEST_F(ClusterUnitTest, DisableEventsOnStartup) {
connect_future->cluster()->close();
ASSERT_TRUE(close_future->wait_for(WAIT_FOR_TIME));
}

TEST_F(ClusterUnitTest, ReconnectionPolicy) {
mockssandra::SimpleCluster mock_cluster(simple());
ASSERT_EQ(mock_cluster.start_all(), 0);

OutagePlan outage_plan(loop(), &mock_cluster);
outage_plan.stop_node(1);
outage_plan.start_node(1);
outage_plan.stop_node(1);
outage_plan.start_node(1);

ContactPointList contact_points;
contact_points.push_back("127.0.0.1");

Future::Ptr close_future(new Future());
Future::Ptr connect_future(new Future());
ClusterConnector::Ptr connector(
new ClusterConnector(contact_points, PROTOCOL_VERSION,
bind_callback(on_connection_reconnect, connect_future.get())));
ReconnectClusterListener::Ptr listener(new ReconnectClusterListener(close_future, &outage_plan));

ClusterSettings settings;
settings.reconnection_policy.reset(new ClusterUnitTestReconnectionPolicy());
settings.control_connection_settings.connection_settings.connect_timeout_ms =
200; // Give enough time for the connection to complete
connector->with_settings(settings)->with_listener(listener.get())->connect(event_loop());

ASSERT_TRUE(connect_future->wait_for(WAIT_FOR_TIME));
EXPECT_FALSE(connect_future->error());

ASSERT_TRUE(close_future->wait_for(WAIT_FOR_TIME));

ClusterUnitTestReconnectionPolicy::Ptr policy(
static_cast<ClusterUnitTestReconnectionPolicy::Ptr>(settings.reconnection_policy));
EXPECT_EQ(2, policy->reconnection_schedule_count());
EXPECT_EQ(2, policy->destroyed_reconnection_schedule_count());
EXPECT_GE(policy->scheduled_delay_count(), 2u);
EXPECT_EQ(3, mock_cluster.connection_attempts(1)); // Includes initial connection attempt
}
Loading