Skip to content

Commit a812c4c

Browse files
authored
Complete the migration of the collector node project (#43)
* Complete the migration of the collector node project * fix compile-check fail
1 parent 55640cc commit a812c4c

37 files changed

+2754
-3
lines changed
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
4+
Licensed to the Apache Software Foundation (ASF) under one
5+
or more contributor license agreements. See the NOTICE file
6+
distributed with this work for additional information
7+
regarding copyright ownership. The ASF licenses this file
8+
to you under the Apache License, Version 2.0 (the
9+
"License"); you may not use this file except in compliance
10+
with the License. You may obtain a copy of the License at
11+
12+
http://www.apache.org/licenses/LICENSE-2.0
13+
14+
Unless required by applicable law or agreed to in writing,
15+
software distributed under the License is distributed on an
16+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17+
KIND, either express or implied. See the License for the
18+
specific language governing permissions and limitations
19+
under the License.
20+
21+
-->
22+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
23+
<modelVersion>4.0.0</modelVersion>
24+
<parent>
25+
<groupId>org.apache.iotdb</groupId>
26+
<artifactId>iotdb-collector</artifactId>
27+
<version>1.3.2</version>
28+
</parent>
29+
<artifactId>collector-core</artifactId>
30+
<name>IoTDB: Collector: Core</name>
31+
<dependencies>
32+
<dependency>
33+
<groupId>org.apache.iotdb</groupId>
34+
<artifactId>collector-openapi</artifactId>
35+
<version>1.3.2</version>
36+
<exclusions>
37+
<exclusion>
38+
<groupId>org.eclipse.jetty</groupId>
39+
<artifactId>jetty-http</artifactId>
40+
</exclusion>
41+
<exclusion>
42+
<groupId>org.eclipse.jetty</groupId>
43+
<artifactId>jetty-util</artifactId>
44+
</exclusion>
45+
<exclusion>
46+
<groupId>org.glassfish.jersey.inject</groupId>
47+
<artifactId>jersey-hk2</artifactId>
48+
</exclusion>
49+
</exclusions>
50+
</dependency>
51+
<dependency>
52+
<groupId>org.apache.iotdb</groupId>
53+
<artifactId>node-commons</artifactId>
54+
<version>1.3.2</version>
55+
</dependency>
56+
<dependency>
57+
<groupId>org.apache.iotdb</groupId>
58+
<artifactId>service-rpc</artifactId>
59+
<version>1.3.2</version>
60+
</dependency>
61+
<dependency>
62+
<groupId>org.apache.iotdb</groupId>
63+
<artifactId>pipe-api</artifactId>
64+
<version>1.3.2</version>
65+
</dependency>
66+
<dependency>
67+
<groupId>jakarta.servlet</groupId>
68+
<artifactId>jakarta.servlet-api</artifactId>
69+
</dependency>
70+
<dependency>
71+
<groupId>jakarta.ws.rs</groupId>
72+
<artifactId>jakarta.ws.rs-api</artifactId>
73+
</dependency>
74+
<dependency>
75+
<groupId>org.slf4j</groupId>
76+
<artifactId>slf4j-api</artifactId>
77+
</dependency>
78+
<dependency>
79+
<groupId>org.eclipse.jetty</groupId>
80+
<artifactId>jetty-http</artifactId>
81+
</dependency>
82+
<dependency>
83+
<groupId>org.eclipse.jetty</groupId>
84+
<artifactId>jetty-util</artifactId>
85+
</dependency>
86+
<dependency>
87+
<groupId>org.eclipse.jetty</groupId>
88+
<artifactId>jetty-server</artifactId>
89+
</dependency>
90+
<dependency>
91+
<groupId>org.eclipse.jetty</groupId>
92+
<artifactId>jetty-servlet</artifactId>
93+
</dependency>
94+
<dependency>
95+
<groupId>org.glassfish.jersey.containers</groupId>
96+
<artifactId>jersey-container-servlet-core</artifactId>
97+
</dependency>
98+
<dependency>
99+
<groupId>org.glassfish.jersey.inject</groupId>
100+
<artifactId>jersey-hk2</artifactId>
101+
</dependency>
102+
</dependencies>
103+
<build>
104+
<plugins>
105+
<plugin>
106+
<groupId>org.apache.maven.plugins</groupId>
107+
<artifactId>maven-dependency-plugin</artifactId>
108+
<configuration>
109+
<usedDependencies>
110+
<!-- For some reason the plugin complains if this artifact is included -->
111+
<usedDependency>org.eclipse.jetty:jetty-http</usedDependency>
112+
<usedDependency>org.eclipse.jetty:jetty-util</usedDependency>
113+
<usedDependency>org.glassfish.jersey.inject:jersey-hk2</usedDependency>
114+
</usedDependencies>
115+
</configuration>
116+
</plugin>
117+
</plugins>
118+
</build>
119+
</project>
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.collector;
21+
22+
import org.apache.iotdb.collector.config.Configuration;
23+
import org.apache.iotdb.collector.service.ApiService;
24+
import org.apache.iotdb.collector.service.IService;
25+
26+
import org.slf4j.Logger;
27+
import org.slf4j.LoggerFactory;
28+
29+
import java.util.LinkedList;
30+
31+
public class Application {
32+
33+
private static final Logger LOGGER = LoggerFactory.getLogger(Application.class);
34+
35+
private final Configuration configuration = new Configuration();
36+
private final LinkedList<IService> services = new LinkedList<>();
37+
38+
private Application() {
39+
services.add(new ApiService());
40+
}
41+
42+
public static void main(String[] args) {
43+
LOGGER.info("[Application] Starting ...");
44+
final long startTime = System.currentTimeMillis();
45+
46+
final Application application = new Application();
47+
48+
application.logAllOptions();
49+
application.registerShutdownHook();
50+
application.startServices();
51+
52+
LOGGER.info(
53+
"[Application] Successfully started in {}ms", System.currentTimeMillis() - startTime);
54+
}
55+
56+
private void logAllOptions() {
57+
configuration.logAllOptions();
58+
}
59+
60+
private void registerShutdownHook() {
61+
Runtime.getRuntime()
62+
.addShutdownHook(
63+
new Thread(
64+
() -> {
65+
LOGGER.warn("[Application] Exiting ...");
66+
67+
for (final IService service : services) {
68+
try {
69+
service.stop();
70+
} catch (final Exception e) {
71+
LOGGER.warn(
72+
"[{}] Unexpected exception occurred when stopping: {}",
73+
service.name(),
74+
e.getMessage(),
75+
e);
76+
}
77+
}
78+
79+
LOGGER.warn(
80+
"[Application] JVM report: total memory {}, free memory {}, used memory {}",
81+
Runtime.getRuntime().totalMemory(),
82+
Runtime.getRuntime().freeMemory(),
83+
Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory());
84+
LOGGER.warn("[Application] Exited.");
85+
}));
86+
}
87+
88+
private void startServices() {
89+
for (final IService service : services) {
90+
service.start();
91+
}
92+
}
93+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.collector.agent;
21+
22+
import org.apache.iotdb.collector.agent.executor.CollectorTaskExecutorAgent;
23+
import org.apache.iotdb.collector.agent.plugin.CollectorPluginAgent;
24+
import org.apache.iotdb.collector.agent.task.CollectorTaskAgent;
25+
26+
public class CollectorAgent {
27+
28+
private final CollectorTaskAgent collectorTaskAgent = CollectorTaskAgent.instance();
29+
private final CollectorTaskExecutorAgent collectorTaskExecutorAgent =
30+
CollectorTaskExecutorAgent.instance();
31+
private final CollectorPluginAgent collectorPluginAgent = CollectorPluginAgent.instance();
32+
33+
private CollectorAgent() {}
34+
35+
public static CollectorTaskAgent task() {
36+
return CollectorAgentHolder.INSTANCE.collectorTaskAgent;
37+
}
38+
39+
public static CollectorTaskExecutorAgent executor() {
40+
return CollectorAgentHolder.INSTANCE.collectorTaskExecutorAgent;
41+
}
42+
43+
public static CollectorPluginAgent plugin() {
44+
return CollectorAgentHolder.INSTANCE.collectorPluginAgent;
45+
}
46+
47+
private static class CollectorAgentHolder {
48+
private static final CollectorAgent INSTANCE = new CollectorAgent();
49+
}
50+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.collector.agent.collect;
21+
22+
import org.apache.iotdb.pipe.api.collector.EventCollector;
23+
import org.apache.iotdb.pipe.api.event.Event;
24+
25+
import org.slf4j.Logger;
26+
import org.slf4j.LoggerFactory;
27+
28+
import java.util.concurrent.BlockingQueue;
29+
30+
public class CollectorEventCollector implements EventCollector {
31+
32+
private static final Logger LOGGER = LoggerFactory.getLogger(CollectorEventCollector.class);
33+
34+
private final BlockingQueue<Event> pendingQueue;
35+
36+
public CollectorEventCollector(final BlockingQueue<Event> pendingQueue) {
37+
this.pendingQueue = pendingQueue;
38+
}
39+
40+
@Override
41+
public void collect(final Event event) {
42+
try {
43+
pendingQueue.put(event);
44+
} catch (final InterruptedException e) {
45+
LOGGER.warn("collect event failed because {}", e.getMessage(), e);
46+
}
47+
}
48+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.collector.agent.executor;
21+
22+
import org.apache.iotdb.collector.agent.task.CollectorTask;
23+
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
24+
25+
import org.slf4j.Logger;
26+
import org.slf4j.LoggerFactory;
27+
28+
import java.util.Map;
29+
import java.util.Optional;
30+
import java.util.concurrent.ConcurrentHashMap;
31+
import java.util.concurrent.ExecutorService;
32+
33+
public class CollectorProcessorTaskExecutor extends CollectorTaskExecutor {
34+
35+
private static final Logger LOGGER =
36+
LoggerFactory.getLogger(CollectorProcessorTaskExecutor.class);
37+
38+
private static final Map<String, ExecutorService> PROCESSOR_EXECUTOR = new ConcurrentHashMap<>();
39+
private static final Map<String, CollectorTask> PROCESSOR_TASK_MAP = new ConcurrentHashMap<>();
40+
41+
public boolean validateIfAbsent(final String taskId) {
42+
return !PROCESSOR_EXECUTOR.containsKey(taskId) && !PROCESSOR_TASK_MAP.containsKey(taskId);
43+
}
44+
45+
@Override
46+
public Optional<ExecutorService> getExecutor(final String taskId) {
47+
return Optional.of(
48+
IoTDBThreadPoolFactory.newSingleThreadExecutor("collector-processor-executor-" + taskId));
49+
}
50+
51+
@Override
52+
public void recordExecution(
53+
final CollectorTask collectorTask, final ExecutorService executorService) {
54+
final String taskId = collectorTask.getTaskId();
55+
PROCESSOR_EXECUTOR.putIfAbsent(taskId, executorService);
56+
PROCESSOR_TASK_MAP.putIfAbsent(taskId, collectorTask);
57+
58+
LOGGER.info("register collector processor task {}", taskId);
59+
}
60+
61+
@Override
62+
public void eraseExecution(final String taskId) {
63+
PROCESSOR_TASK_MAP.remove(taskId).stop();
64+
PROCESSOR_EXECUTOR.remove(taskId).shutdownNow();
65+
66+
LOGGER.info("deregister collector processor task {}", taskId);
67+
}
68+
}

0 commit comments

Comments
 (0)