Skip to content

Commit a9da405

Browse files
authored
Merge pull request #205 from grainier/fix-memory-issues
Fix executor service creation and memory releasing issue
2 parents 090a8b4 + 3ad97fa commit a9da405

File tree

8 files changed

+28
-13
lines changed

8 files changed

+28
-13
lines changed

component/src/main/java/io/siddhi/extension/io/http/sink/HttpSink.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -319,6 +319,12 @@
319319
type = {DataType.INT},
320320
optional = true,
321321
defaultValue = "100"),
322+
@Parameter(
323+
name = "executor.service.threads",
324+
description = "Thread count for the executor service.",
325+
type = {DataType.INT},
326+
optional = true,
327+
defaultValue = "20"),
322328
@Parameter(
323329
name = "min.evictable.idle.time",
324330
description = "Minimum time (in millis) a connection may sit idle in the " +
@@ -566,6 +572,8 @@ protected StateFactory init(StreamDefinition outputStreamDefinition, OptionHolde
566572
//pool configurations
567573
connectionPoolConfiguration = createPoolConfigurations(optionHolder);
568574

575+
executor = Executors.newFixedThreadPool(connectionPoolConfiguration.getExecutorServiceThreads());
576+
569577
parametersList = optionHolder.validateAndGetStaticValue(HttpConstants.SINK_PARAMETERS, EMPTY_STRING);
570578

571579
clientBootstrapConfiguration = optionHolder
@@ -1161,9 +1169,6 @@ public ClientConnector createClientConnector(DynamicOptions dynamicOptions) {
11611169
senderConfig.disableSsl();
11621170
}
11631171

1164-
executor = Executors.newFixedThreadPool(
1165-
senderConfig.getPoolConfiguration().getExecutorServiceThreads());
1166-
11671172
//overwrite default transport configuration
11681173
Map<String, Object> bootStrapProperties = HttpSinkUtil
11691174
.populateTransportConfiguration(clientBootstrapConfiguration);

component/src/main/java/io/siddhi/extension/io/http/sink/SSEWorkerThread.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,10 @@ public void run() {
5858

5959
try {
6060
buf.close();
61-
carbonMessage.waitAndReleaseAllEntities();
6261
} catch (IOException e) {
6362
logger.error("Error occurred when closing the byte buffer in source " + streamID, e);
63+
} finally {
64+
carbonMessage.waitAndReleaseAllEntities();
6465
}
6566
}
6667
}

component/src/main/java/io/siddhi/extension/io/http/sink/WebSubHubSink.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,12 @@
299299
type = {DataType.INT},
300300
optional = true,
301301
defaultValue = "100"),
302+
@Parameter(
303+
name = "executor.service.threads",
304+
description = "Thread count for the executor service.",
305+
type = {DataType.INT},
306+
optional = true,
307+
defaultValue = "20"),
302308
@Parameter(
303309
name = "min.evictable.idle.time",
304310
description = "Minimum time (in millis) a connection may sit idle in the " +
@@ -490,6 +496,7 @@ protected StateFactory init(StreamDefinition outputStreamDefinition, OptionHolde
490496

491497
//pool configurations
492498
connectionPoolConfiguration = createPoolConfigurations(optionHolder);
499+
executor = Executors.newFixedThreadPool(connectionPoolConfiguration.getExecutorServiceThreads());
493500
parametersList = optionHolder.validateAndGetStaticValue(HttpConstants.SINK_PARAMETERS, EMPTY_STRING);
494501
clientBootstrapConfiguration = optionHolder
495502
.validateAndGetStaticValue(HttpConstants.CLIENT_BOOTSTRAP_CONFIGURATION, EMPTY_STRING);
@@ -760,9 +767,6 @@ public ClientConnector createClientConnector(String publisherURL) {
760767
senderConfig.disableSsl();
761768
}
762769

763-
executor = Executors.newFixedThreadPool(
764-
senderConfig.getPoolConfiguration().getExecutorServiceThreads());
765-
766770
//overwrite default transport configuration
767771
Map<String, Object> bootStrapProperties = HttpSinkUtil
768772
.populateTransportConfiguration(clientBootstrapConfiguration);
@@ -823,7 +827,6 @@ private void setOnDemandQueryRuntimeForFindSubscription() {
823827
siddhiAppContext, tableMap, windowMap, aggregationRuntimeMap);
824828
}
825829

826-
827830
public void setWebSubSubscriptionMap(Map<String, List<WebSubSubscriptionDTO>> webSubSubscriptionMap) {
828831
this.webSubSubscriptionMap = webSubSubscriptionMap;
829832
}

component/src/main/java/io/siddhi/extension/io/http/sink/util/HttpSinkUtil.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,8 @@ private static Map<String, TrpPropertyTypes> trpPropertyTypeMap() {
295295
}
296296

297297
public static PoolConfiguration createPoolConfigurations(OptionHolder optionHolder) {
298+
int executorServiceThreads = Integer.parseInt(optionHolder.validateAndGetStaticValue(
299+
HttpConstants.EXECUTOR_SERVICE_THREAD_COUNT, HttpConstants.DEFAULT_EXECUTOR_SERVICE_THREAD_COUNT));
298300
int maxIdlePerPool = Integer.parseInt(optionHolder.validateAndGetStaticValue(
299301
HttpConstants.MAX_IDLE_CONNECTIONS_PER_POOL, HttpConstants.DEFAULT_MAX_IDLE_CONNECTIONS_PER_POOL));
300302
int minIdlePerPool = Integer.parseInt(optionHolder.validateAndGetStaticValue(
@@ -315,6 +317,7 @@ public static PoolConfiguration createPoolConfigurations(OptionHolder optionHold
315317
int maxWaitTime = Integer.parseInt(optionHolder.validateAndGetStaticValue(
316318
HttpConstants.MAX_WAIT_TIME, HttpConstants.DEFAULT_MAX_WAIT_TIME));
317319
PoolConfiguration connectionPoolConfiguration = new PoolConfiguration();
320+
connectionPoolConfiguration.setExecutorServiceThreads(executorServiceThreads);
318321
connectionPoolConfiguration.setMaxActivePerPool(maxActivePerPool);
319322
connectionPoolConfiguration.setMinIdlePerPool(minIdlePerPool);
320323
connectionPoolConfiguration.setMaxIdlePerPool(maxIdlePerPool);

component/src/main/java/io/siddhi/extension/io/http/source/HttpSyncWorkerThread.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,13 +94,13 @@ public void run() {
9494
} finally {
9595
try {
9696
buf.close();
97-
carbonMessage.waitAndReleaseAllEntities();
9897
} catch (IOException e) {
9998
if (metrics != null) {
10099
metrics.getTotalHttpErrorsMetric().inc();
101100
}
102-
103101
logger.error("Error occurred when closing the byte buffer in source " + sourceID, e);
102+
} finally {
103+
carbonMessage.waitAndReleaseAllEntities();
104104
}
105105
}
106106
}

component/src/main/java/io/siddhi/extension/io/http/source/HttpWebSubResponseProcessor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,12 +192,13 @@ public void run() {
192192
} finally {
193193
try {
194194
buf.close();
195-
carbonMessage.waitAndReleaseAllEntities();
196195
} catch (IOException e) {
197196
if (metrics != null) {
198197
metrics.getTotalHttpErrorsMetric().inc();
199198
}
200199
logger.error("Error occurred when closing the byte buffer in source " + sourceID, e);
200+
} finally {
201+
carbonMessage.waitAndReleaseAllEntities();
201202
}
202203
}
203204
}

component/src/main/java/io/siddhi/extension/io/http/source/HttpWorkerThread.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,13 +88,13 @@ public void run() {
8888
} finally {
8989
try {
9090
buf.close();
91-
carbonMessage.waitAndReleaseAllEntities();
9291
} catch (IOException e) {
9392
if (metrics != null) {
9493
metrics.getTotalHttpErrorsMetric().inc();
9594
}
96-
9795
logger.error("Error occurred when closing the byte buffer in source " + sourceID, e);
96+
} finally {
97+
carbonMessage.waitAndReleaseAllEntities();
9898
}
9999
}
100100
}

component/src/main/java/io/siddhi/extension/io/http/util/HttpConstants.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,8 @@ public class HttpConstants {
231231
public static final String BLOCKING_IO = "blocking.io";
232232

233233
//pool configurations
234+
public static final String EXECUTOR_SERVICE_THREAD_COUNT = "executor.service.threads";
235+
public static final String DEFAULT_EXECUTOR_SERVICE_THREAD_COUNT = "20";
234236
public static final String MAX_ACTIVE_CONNECTIONS_PER_POOL = "max.pool.active.connections";
235237
public static final String DEFAULT_MAX_ACTIVE_CONNECTIONS_PER_POOL = "-1"; // unlimited
236238
public static final String MIN_IDLE_CONNECTIONS_PER_POOL = "min.pool.idle.connections";

0 commit comments

Comments
 (0)