Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions config/setup/common/data/agent_relay_gateway_config.dat
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,7 @@ elasticsearch:
basic_auth:
username: ingest
password: password
endpoints:
- $[[SETUP_ENDPOINT]]
endpoints: $[[SETUP_ENDPOINTS]]

pipeline:
- name: bulk_request_ingest
Expand Down
4 changes: 2 additions & 2 deletions config/setup/common/data/system_ingest_config.dat
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,6 @@ pipeline:
# key_file: /xxx/client.key
# skip_insecure_verify: false
schema: "$[[SETUP_SCHEME]]"
hosts: # receiver endpoint, fallback in order
- "$[[SETUP_ENDPOINT]]"
# receiver endpoint, fallback in order
hosts: $[[SETUP_HOSTS]]
valid_status_code: [200,201] #panic on other status code
1 change: 1 addition & 0 deletions docs/content.en/docs/release-notes/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ Information about release notes of INFINI Console is provided here.

### Breaking changes
### Features
- Support configuring multiple hosts when creating a cluster
### Bug fix
### Improvements

Expand Down
11 changes: 10 additions & 1 deletion modules/elastic/api/manage.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,18 @@ func (h *APIHandler) HandleCreateClusterAction(w http.ResponseWriter, req *http.
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
// TODO validate data format
conf.Enabled = true
if len(conf.Hosts) > 0 && conf.Host == "" {
conf.Host = conf.Hosts[0]
}
conf.Host = strings.TrimSpace(conf.Host)
if conf.Host == "" {
h.WriteError(w, "host is required", http.StatusBadRequest)
return
}
if conf.Schema == "" {
conf.Schema = "http"
}
conf.Endpoint = fmt.Sprintf("%s://%s", conf.Schema, conf.Host)
conf.ID = util.GetUUID()
ctx := &orm.Context{
Expand Down
147 changes: 90 additions & 57 deletions modules/elastic/api/test_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,23 +80,29 @@ func (h TestAPI) HandleTestConnectionAction(w http.ResponseWriter, req *http.Req
} else if config.Host != "" && config.Schema != "" {
url = fmt.Sprintf("%s://%s", config.Schema, config.Host)
config.Endpoint = url
} else {
resBody["error"] = fmt.Sprintf("invalid config: %v", util.MustToJSON(config))
h.WriteJSON(w, resBody, http.StatusInternalServerError)
return
}

if url == "" {
panic(errors.Error("invalid url: " + util.MustToJSON(config)))
if url != "" && !util.StringInArray(config.Endpoints, url) {
config.Endpoints = append(config.Endpoints, url)
}

if !util.SuffixStr(url, "/") {
url = fmt.Sprintf("%s/", url)
if config.Schema != "" && len(config.Hosts) > 0 {
for _, host := range config.Hosts {
host = strings.TrimSpace(host)
if host == "" {
continue
}
url = fmt.Sprintf("%s://%s", config.Schema, host)
if !util.StringInArray(config.Endpoints, url) {
config.Endpoints = append(config.Endpoints, url)
}
}
}
if len(config.Endpoints) == 0 {
panic(errors.Error(fmt.Sprintf("invalid config: %v", util.MustToJSON(config))))
}
// limit the number of endpoints to a maximum of 10 to prevent excessive processing
if len(config.Endpoints) > 10 {
config.Endpoints = config.Endpoints[0:10]
}

freq.SetRequestURI(url)
freq.Header.SetMethod("GET")

if (config.BasicAuth == nil || (config.BasicAuth != nil && config.BasicAuth.Username == "")) &&
config.CredentialID != "" && config.CredentialID != "manual" {
credential, err := common.GetCredential(config.CredentialID)
Expand All @@ -112,58 +118,85 @@ func (h TestAPI) HandleTestConnectionAction(w http.ResponseWriter, req *http.Req
config.BasicAuth = &auth
}
}
var (
i int
clusterUUID string
)
for i, url = range config.Endpoints {
if !util.SuffixStr(url, "/") {
url = fmt.Sprintf("%s/", url)
}

if config.BasicAuth != nil && strings.TrimSpace(config.BasicAuth.Username) != "" {
freq.SetBasicAuth(config.BasicAuth.Username, config.BasicAuth.Password.Get())
}
freq.SetRequestURI(url)
freq.Header.SetMethod("GET")

const testClientName = "elasticsearch_test_connection"
err = api.GetFastHttpClient(testClientName).DoTimeout(freq, fres, 10*time.Second)
if config.BasicAuth != nil && strings.TrimSpace(config.BasicAuth.Username) != "" {
freq.SetBasicAuth(config.BasicAuth.Username, config.BasicAuth.Password.Get())
}

if err != nil {
panic(err)
}
const testClientName = "elasticsearch_test_connection"
err = api.GetFastHttpClient(testClientName).DoTimeout(freq, fres, 10*time.Second)

var statusCode = fres.StatusCode()
if statusCode > 300 || statusCode == 0 {
resBody["error"] = fmt.Sprintf("invalid status code: %d", statusCode)
h.WriteJSON(w, resBody, 500)
return
}
if err != nil {
panic(err)
}

b := fres.Body()
clusterInfo := &elastic.ClusterInformation{}
err = json.Unmarshal(b, clusterInfo)
if err != nil {
panic(err)
}
var statusCode = fres.StatusCode()
if statusCode > 300 || statusCode == 0 {
resBody["error"] = fmt.Sprintf("invalid status code: %d", statusCode)
h.WriteJSON(w, resBody, 500)
return
}

resBody["version"] = clusterInfo.Version.Number
resBody["cluster_uuid"] = clusterInfo.ClusterUUID
resBody["cluster_name"] = clusterInfo.ClusterName
resBody["distribution"] = clusterInfo.Version.Distribution
b := fres.Body()
clusterInfo := &elastic.ClusterInformation{}
err = json.Unmarshal(b, clusterInfo)
if err != nil {
panic(err)
}

//fetch cluster health info
freq.SetRequestURI(fmt.Sprintf("%s/_cluster/health", config.Endpoint))
fres.Reset()
err = api.GetFastHttpClient(testClientName).Do(freq, fres)
if err != nil {
resBody["error"] = fmt.Sprintf("error on get cluster health: %v", err)
h.WriteJSON(w, resBody, http.StatusInternalServerError)
return
}
resBody["version"] = clusterInfo.Version.Number
resBody["cluster_uuid"] = clusterInfo.ClusterUUID
resBody["cluster_name"] = clusterInfo.ClusterName
resBody["distribution"] = clusterInfo.Version.Distribution

if i == 0 {
clusterUUID = clusterInfo.ClusterUUID
} else {
//validate whether two endpoints point to the same cluster
if clusterUUID != clusterInfo.ClusterUUID {
resBody["error"] = fmt.Sprintf("invalid multiple cluster endpoints: %v", config.Endpoints)
h.WriteJSON(w, resBody, http.StatusInternalServerError)
return
}
//skip fetch cluster health info if it's not the first endpoint
break
}
//fetch cluster health info
freq.SetRequestURI(fmt.Sprintf("%s/_cluster/health", url))
fres.Reset()
err = api.GetFastHttpClient(testClientName).Do(freq, fres)
if err != nil {
resBody["error"] = fmt.Sprintf("error on get cluster health: %v", err)
h.WriteJSON(w, resBody, http.StatusInternalServerError)
return
}

healthInfo := &elastic.ClusterHealth{}
err = json.Unmarshal(fres.Body(), &healthInfo)
if err != nil {
resBody["error"] = fmt.Sprintf("error on decode cluster health info : %v", err)
h.WriteJSON(w, resBody, http.StatusInternalServerError)
return
healthInfo := &elastic.ClusterHealth{}
err = json.Unmarshal(fres.Body(), &healthInfo)
if err != nil {
resBody["error"] = fmt.Sprintf("error on decode cluster health info : %v", err)
h.WriteJSON(w, resBody, http.StatusInternalServerError)
return
}
resBody["status"] = healthInfo.Status
resBody["number_of_nodes"] = healthInfo.NumberOfNodes
resBody["number_of_data_nodes"] = healthInfo.NumberOf_data_nodes
resBody["active_shards"] = healthInfo.ActiveShards

freq.Reset()
fres.Reset()
}
resBody["status"] = healthInfo.Status
resBody["number_of_nodes"] = healthInfo.NumberOfNodes
resBody["number_of_data_nodes"] = healthInfo.NumberOf_data_nodes
resBody["active_shards"] = healthInfo.ActiveShards

h.WriteJSON(w, resBody, http.StatusOK)

Expand Down
26 changes: 19 additions & 7 deletions plugin/setup/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,12 @@ func (module *Module) Stop() error {

type SetupRequest struct {
Cluster struct {
Host string `json:"host"`
Schema string `json:"schema"`
Endpoint string `json:"endpoint"`
Username string `json:"username"`
Password string `json:"password"`
Host string `json:"host"`
Schema string `json:"schema"`
Endpoint string `json:"endpoint"`
Username string `json:"username"`
Password string `json:"password"`
Hosts []string `json:"hosts"`
} `json:"cluster"`

Skip bool `json:"skip"`
Expand Down Expand Up @@ -796,8 +797,19 @@ func (module *Module) initializeTemplate(w http.ResponseWriter, r *http.Request,
return w.Write([]byte(request.Cluster.Password))
case "SETUP_SCHEME":
return w.Write([]byte(strings.Split(request.Cluster.Endpoint, "://")[0]))
case "SETUP_ENDPOINT":
return w.Write([]byte(strings.Split(request.Cluster.Endpoint, "://")[1]))
case "SETUP_ENDPOINTS":
endpoints := []string{request.Cluster.Endpoint}
for _, host := range request.Cluster.Hosts {
endpoint := fmt.Sprintf("%s://%s", request.Cluster.Schema, host)
if !util.StringInArray(endpoints, endpoint) {
endpoints = append(endpoints, endpoint)
}
}
endpointsStr := fmt.Sprintf("[%s]", strings.Join(endpoints, ", "))
return w.Write([]byte(endpointsStr))
case "SETUP_HOSTS":
hostsStr := fmt.Sprintf("[%s]", strings.Join(request.Cluster.Hosts, ", "))
return w.Write([]byte(hostsStr))
case "SETUP_TEMPLATE_NAME":
return w.Write([]byte(cfg1.TemplateName))
case "SETUP_INDEX_PREFIX":
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { useEffect, useState } from 'react';
import { Alert, Button, Form, Icon, Input, Switch } from 'antd';
import { Alert, Button, Form, Icon, Input, Switch, Select } from 'antd';
import request from '@/utils/request';
import { formatMessage } from "umi/locale";
import TrimSpaceInput from '@/components/TrimSpaceInput';
Expand Down Expand Up @@ -49,9 +49,9 @@ export default ({ onNext, form, formData, onFormDataChange }) => {
setTestLoading(true);
setTestStatus();
setTestError();
const { host, isTLS, isAuth, username, password } = values;
const { hosts, isTLS, isAuth, username, password } = values;
const body = {
host: host.trim(),
hosts: (hosts || []).map(host=>host.trim()),
schema: isTLS === true ? "https" : "http",
}
if (isAuth) {
Expand Down Expand Up @@ -104,32 +104,41 @@ export default ({ onNext, form, formData, onFormDataChange }) => {

const onFormDataSave = () => {
const values = form.getFieldsValue();
const { host, isAuth, username, password } = form.getFieldsValue();
const { hosts, isAuth, username, password } = values;
onFormDataChange({
host: host.trim(), isAuth, username, password
hosts: (hosts || []).map(host=>host.trim()),
isAuth, username, password
})
onNext();
}
const validateHostsRule = (rule, value, callback) => {
let vals = value || [];
for(let i = 0; i < vals.length; i++) {
if (!/^[\w\.\-_~%]+(\:\d+)?$/.test(vals[i])) {
return callback(formatMessage({ id: 'guide.cluster.host.validate'}));
}
}
// validation passed
callback();
};

const { getFieldDecorator } = form;

return (
<Form {...formItemLayout} onSubmit={onSubmit} colon={false}>
<Form.Item label={formatMessage({ id: 'guide.cluster.host'})}>
{getFieldDecorator("host", {
initialValue: formData.host,
{getFieldDecorator("hosts", {
initialValue: formData.hosts,
rules: [
{
required: true,
message: formatMessage({ id: 'guide.cluster.host.required'}),
},
{
type: "string",
pattern: /^[\w\.\-_~%]+(\:\d+)?$/,
message: formatMessage({ id: 'guide.cluster.host.validate'}),
},
validator: validateHostsRule,
}
],
})(<TrimSpaceInput placeholder="127.0.0.1:9200" onChange={resetTestStatus}/>)}
})(<Select placeholder="127.0.0.1:9200" mode="tags" allowClear={true} onChange={resetTestStatus}/>)}
</Form.Item>
<Form.Item label="TLS">
{getFieldDecorator("isTLS", {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,12 @@ export default ({ onPrev, onNext, form, formData, onFormDataChange }) => {
const onCheck = async () => {
try {
setCheckLoading(true);
const { host, isTLS, isAuth, username, password } = formData;
const { hosts, isTLS, isAuth, username, password } = formData;
const host = hosts[0];
const cluster = {
endpoint: isTLS ? `https://${host}` : `http://${host}`
endpoint: isTLS ? `https://${host}` : `http://${host}`,
hosts: hosts,
schema: isTLS ? "https": "http",
}
if (isAuth) {
cluster.username = username
Expand Down Expand Up @@ -110,14 +113,17 @@ export default ({ onPrev, onNext, form, formData, onFormDataChange }) => {
}
})
const {
host,
hosts,
isTLS,
isAuth,
username,
password,
} = formData;
const host = hosts[0];
const cluster = {
endpoint: isTLS ? `https://${host}` : `http://${host}`,
hosts: hosts,
schema: isTLS ? "https": "http"
};
if (isAuth) {
cluster.username = username;
Expand Down
Loading
Loading