Skip to content

Commit 541e1f5

Browse files
Collector: Refactor package structure and polish code style (#49)
1 parent 25a88f6 commit 541e1f5

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

52 files changed

+1891
-1099
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
kl7k3ragjregzk2gtxfrzupvie

iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/plugin/impl/PluginApiServiceImpl.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import org.apache.iotdb.collector.api.v1.plugin.model.DropPluginRequest;
2626
import org.apache.iotdb.collector.api.v1.plugin.model.StartPluginRequest;
2727
import org.apache.iotdb.collector.api.v1.plugin.model.StopPluginRequest;
28-
import org.apache.iotdb.collector.service.RuntimeService;
2928

3029
import javax.ws.rs.core.Response;
3130
import javax.ws.rs.core.SecurityContext;
@@ -35,30 +34,30 @@ public class PluginApiServiceImpl extends PluginApiService {
3534
@Override
3635
public Response createPlugin(
3736
final CreatePluginRequest createPluginRequest, final SecurityContext securityContext) {
38-
return Response.ok("create plugin").entity(RuntimeService.plugin().createPlugin()).build();
37+
return Response.ok("create plugin").build();
3938
}
4039

4140
@Override
4241
public Response alterPlugin(
4342
final AlterPluginRequest alterPluginRequest, final SecurityContext securityContext) {
44-
return Response.ok("alter plugin").entity(RuntimeService.plugin().alterPlugin()).build();
43+
return Response.ok("alter plugin").build();
4544
}
4645

4746
@Override
4847
public Response startPlugin(
4948
final StartPluginRequest startPluginRequest, final SecurityContext securityContext) {
50-
return Response.ok("start plugin").entity(RuntimeService.plugin().startPlugin()).build();
49+
return Response.ok("start plugin").build();
5150
}
5251

5352
@Override
5453
public Response stopPlugin(
5554
final StopPluginRequest stopPluginRequest, final SecurityContext securityContext) {
56-
return Response.ok("stop plugin").entity(RuntimeService.plugin().stopPlugin()).build();
55+
return Response.ok("stop plugin").build();
5756
}
5857

5958
@Override
6059
public Response dropPlugin(
6160
final DropPluginRequest dropPluginRequest, final SecurityContext securityContext) {
62-
return Response.ok("drop plugin").entity(RuntimeService.plugin().dropPlugin()).build();
61+
return Response.ok("drop plugin").build();
6362
}
6463
}

iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/task/impl/TaskApiServiceImpl.java

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,15 @@ public Response createTask(
3737
final CreateTaskRequest createTaskRequest, final SecurityContext securityContext) {
3838
TaskApiServiceRequestValidationHandler.validateCreateRequest(createTaskRequest);
3939

40-
return RuntimeService.task()
41-
.createTask(
42-
createTaskRequest.getTaskId(),
43-
createTaskRequest.getSourceAttribute(),
44-
createTaskRequest.getProcessorAttribute(),
45-
createTaskRequest.getSinkAttribute());
40+
return RuntimeService.task().isPresent()
41+
? RuntimeService.task()
42+
.get()
43+
.createTask(
44+
createTaskRequest.getTaskId(),
45+
createTaskRequest.getSourceAttribute(),
46+
createTaskRequest.getProcessorAttribute(),
47+
createTaskRequest.getSinkAttribute())
48+
: Response.serverError().entity("Task runtime is down").build();
4649
}
4750

4851
@Override
@@ -56,22 +59,28 @@ public Response startTask(
5659
final StartTaskRequest startTaskRequest, final SecurityContext securityContext) {
5760
TaskApiServiceRequestValidationHandler.validateStartRequest(startTaskRequest);
5861

59-
return RuntimeService.task().startTask(startTaskRequest.getTaskId());
62+
return RuntimeService.task().isPresent()
63+
? RuntimeService.task().get().startTask(startTaskRequest.getTaskId())
64+
: Response.serverError().entity("Task runtime is down").build();
6065
}
6166

6267
@Override
6368
public Response stopTask(
6469
final StopTaskRequest stopTaskRequest, final SecurityContext securityContext) {
6570
TaskApiServiceRequestValidationHandler.validateStopRequest(stopTaskRequest);
6671

67-
return RuntimeService.task().stopTask(stopTaskRequest.getTaskId());
72+
return RuntimeService.task().isPresent()
73+
? RuntimeService.task().get().stopTask(stopTaskRequest.getTaskId())
74+
: Response.serverError().entity("Task runtime is down").build();
6875
}
6976

7077
@Override
7178
public Response dropTask(
7279
final DropTaskRequest dropTaskRequest, final SecurityContext securityContext) {
7380
TaskApiServiceRequestValidationHandler.validateDropRequest(dropTaskRequest);
7481

75-
return RuntimeService.task().dropTask(dropTaskRequest.getTaskId());
82+
return RuntimeService.task().isPresent()
83+
? RuntimeService.task().get().dropTask(dropTaskRequest.getTaskId())
84+
: Response.serverError().entity("Task runtime is down").build();
7685
}
7786
}

iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/Options.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,10 @@ public class Options {
3434

3535
static {
3636
try {
37-
Class.forName("org.apache.iotdb.collector.config.ApiServiceOptions");
38-
Class.forName("org.apache.iotdb.collector.config.TaskRuntimeOptions");
37+
Class.forName(ApiServiceOptions.class.getName());
38+
Class.forName(TaskRuntimeOptions.class.getName());
3939
} catch (final ClassNotFoundException e) {
40-
throw new RuntimeException("Failed to load ApiServiceOptions", e);
40+
throw new RuntimeException("Failed to load options", e);
4141
}
4242
}
4343

iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/api/CollectorPullSource.java

Lines changed: 0 additions & 22 deletions
This file was deleted.
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,4 +21,4 @@
2121

2222
import org.apache.iotdb.pipe.api.PipeSource;
2323

24-
public interface CollectorSource extends StoppablePlugin, PipeSource {}
24+
public abstract class PullSource implements PipeSource {}
Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,19 @@
1919

2020
package org.apache.iotdb.collector.plugin.api;
2121

22+
import org.apache.iotdb.pipe.api.PipeSource;
2223
import org.apache.iotdb.pipe.api.collector.EventCollector;
2324
import org.apache.iotdb.pipe.api.event.Event;
2425

25-
public abstract class CollectorPushSource implements CollectorSource {
26+
public abstract class PushSource implements PipeSource {
2627

27-
protected final EventCollector collector;
28+
protected EventCollector collector;
2829

29-
public CollectorPushSource(final EventCollector collector) {
30+
public PushSource() {
31+
this.collector = null;
32+
}
33+
34+
public final void setCollector(final EventCollector collector) {
3035
this.collector = collector;
3136
}
3237

iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/api/StoppablePlugin.java

Lines changed: 0 additions & 24 deletions
This file was deleted.

iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/api/RuntimeConfig.java renamed to iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/api/customizer/CollectorProcessorRuntimeConfiguration.java

Lines changed: 12 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -17,47 +17,26 @@
1717
* under the License.
1818
*/
1919

20-
package org.apache.iotdb.collector.plugin.api;
20+
package org.apache.iotdb.collector.plugin.api.customizer;
2121

22-
import org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration;
22+
import org.apache.iotdb.pipe.api.customizer.configuration.PipeProcessorRuntimeConfiguration;
2323
import org.apache.iotdb.pipe.api.customizer.configuration.PipeRuntimeEnvironment;
2424

25-
public class RuntimeConfig implements PipeExtractorRuntimeConfiguration {
25+
public class CollectorProcessorRuntimeConfiguration implements PipeProcessorRuntimeConfiguration {
2626

27-
public static class RuntimeEnvironment implements PipeRuntimeEnvironment {
27+
private final CollectorRuntimeEnvironment runtimeEnvironment;
2828

29-
public int getParallelism() {
30-
return 0;
31-
}
32-
33-
public int getParallelismIndex() {
34-
return 0;
35-
}
36-
37-
@Override
38-
public String getPipeName() {
39-
return "";
40-
}
41-
42-
@Override
43-
public long getCreationTime() {
44-
return 0;
45-
}
29+
public CollectorProcessorRuntimeConfiguration(
30+
final String pipeName,
31+
final long creationTime,
32+
final int parallelism,
33+
final int instanceIndex) {
34+
runtimeEnvironment =
35+
new CollectorRuntimeEnvironment(pipeName, creationTime, parallelism, instanceIndex);
4636
}
4737

4838
@Override
4939
public PipeRuntimeEnvironment getRuntimeEnvironment() {
50-
return new PipeRuntimeEnvironment() {
51-
52-
@Override
53-
public String getPipeName() {
54-
return "";
55-
}
56-
57-
@Override
58-
public long getCreationTime() {
59-
return 0;
60-
}
61-
};
40+
return runtimeEnvironment;
6241
}
6342
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.collector.plugin.api.customizer;
21+
22+
import org.apache.iotdb.pipe.api.customizer.configuration.PipeRuntimeEnvironment;
23+
24+
public class CollectorRuntimeEnvironment implements PipeRuntimeEnvironment {
25+
26+
private final String pipeName;
27+
private final long creationTime;
28+
private final int parallelism;
29+
private final int instanceIndex;
30+
31+
public CollectorRuntimeEnvironment(
32+
final String pipeName,
33+
final long creationTime,
34+
final int parallelism,
35+
final int instanceIndex) {
36+
this.pipeName = pipeName;
37+
this.creationTime = creationTime;
38+
this.parallelism = parallelism;
39+
this.instanceIndex = instanceIndex;
40+
}
41+
42+
@Override
43+
public String getPipeName() {
44+
return pipeName;
45+
}
46+
47+
@Override
48+
public long getCreationTime() {
49+
return creationTime;
50+
}
51+
52+
public int getParallelism() {
53+
return parallelism;
54+
}
55+
56+
public int getInstanceIndex() {
57+
return instanceIndex;
58+
}
59+
}

0 commit comments

Comments
 (0)