Skip to content

Commit 027256e

Browse files
committed
feat:配置中心支持配置变更事件的上报
1 parent e1e186f commit 027256e

File tree

19 files changed

+557
-14
lines changed

19 files changed

+557
-14
lines changed

pkg/config/api.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ type GlobalConfig interface {
4242
GetServerConnector() ServerConnectorConfig
4343
// GetStatReporter global.statReporter前缀开头的所有配置项
4444
GetStatReporter() StatReporterConfig
45+
// GetEventReporter global.eventReporter前缀开头的所有配置项
46+
GetEventReporter() EventReporterConfig
4547
// GetLocation global.location前缀开头的所有配置项
4648
GetLocation() LocationConfig
4749
// GetClient global.client前缀开头的所有配置项
@@ -212,6 +214,19 @@ type StatReporterConfig interface {
212214
SetChain([]string)
213215
}
214216

217+
type EventReporterConfig interface {
218+
BaseConfig
219+
PluginConfig
220+
// IsEnable 是否启用
221+
IsEnable() bool
222+
// SetEnable 设置是否启用上报
223+
SetEnable(bool)
224+
// GetChain 统计上报器插件链
225+
GetChain() []string
226+
// SetChain 设置统计上报器插件链
227+
SetChain([]string)
228+
}
229+
215230
// LocationConfig SDK获取自身当前地理位置配置.
216231
type LocationConfig interface {
217232
BaseConfig

pkg/config/default.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,8 @@ const (
252252
DefaultStatReportEnabled = true
253253
// DefaultMetricsChain .
254254
DefaultMetricsChain = "prometheus"
255+
// DefaultEventReporterEnabled 事件上报默认不开启
256+
DefaultEventReporterEnabled = false
255257
)
256258

257259
const (
@@ -438,6 +440,9 @@ func (g *GlobalConfigImpl) Verify() error {
438440
if err = g.Location.Verify(); err != nil {
439441
errs = multierror.Append(errs, err)
440442
}
443+
if err = g.EventReporter.Verify(); err != nil {
444+
errs = multierror.Append(errs, err)
445+
}
441446
return errs
442447
}
443448

@@ -447,6 +452,7 @@ func (g *GlobalConfigImpl) SetDefault() {
447452
g.ServerConnector.SetDefault()
448453
g.System.SetDefault()
449454
g.StatReporter.SetDefault()
455+
g.EventReporter.SetDefault()
450456
g.Location.SetDefault()
451457
}
452458

@@ -459,6 +465,8 @@ func (g *GlobalConfigImpl) Init() {
459465
g.ServerConnector.Init()
460466
g.StatReporter = &StatReporterConfigImpl{}
461467
g.StatReporter.Init()
468+
g.EventReporter = &EventReporterConfigImpl{}
469+
g.EventReporter.Init()
462470
g.Location = &LocationConfigImpl{}
463471
g.Location.Init()
464472
g.Client = &ClientConfigImpl{}

pkg/config/eventreporter.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package config
2+
3+
import (
4+
"github.com/polarismesh/polaris-go/pkg/plugin/common"
5+
)
6+
7+
type EventReporterConfigImpl struct {
8+
// 是否启动上报
9+
Enable *bool `yaml:"enable" json:"enable"`
10+
// 上报插件链
11+
Chain []string `yaml:"chain" json:"chain"`
12+
// 插件相关配置
13+
Plugin PluginConfigs `yaml:"plugin" json:"plugin"`
14+
}
15+
16+
func (e *EventReporterConfigImpl) IsEnable() bool {
17+
return *e.Enable
18+
}
19+
20+
func (e *EventReporterConfigImpl) SetEnable(enable bool) {
21+
e.Enable = &enable
22+
}
23+
24+
func (e *EventReporterConfigImpl) GetChain() []string {
25+
return e.Chain
26+
}
27+
28+
func (e *EventReporterConfigImpl) SetChain(chain []string) {
29+
e.Chain = chain
30+
}
31+
32+
func (e *EventReporterConfigImpl) GetPluginConfig(name string) BaseConfig {
33+
value, ok := e.Plugin[name]
34+
if !ok {
35+
return nil
36+
}
37+
return value.(BaseConfig)
38+
}
39+
40+
func (e *EventReporterConfigImpl) Verify() error {
41+
return e.Plugin.Verify()
42+
}
43+
44+
func (e *EventReporterConfigImpl) SetDefault() {
45+
if nil == e.Enable {
46+
enable := DefaultEventReporterEnabled
47+
e.Enable = &enable
48+
return
49+
}
50+
51+
e.Plugin.SetDefault(common.TypeEventReporter)
52+
}
53+
54+
func (e *EventReporterConfigImpl) SetPluginConfig(plugName string, value BaseConfig) error {
55+
return e.Plugin.SetPluginConfig(common.TypeEventReporter, plugName, value)
56+
}
57+
58+
// Init 配置初始化.
59+
func (e *EventReporterConfigImpl) Init() {
60+
e.Plugin = PluginConfigs{}
61+
e.Plugin.Init(common.TypeEventReporter)
62+
}

pkg/config/impl.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ type GlobalConfigImpl struct {
6363
API *APIConfigImpl `yaml:"api" json:"api"`
6464
ServerConnector *ServerConnectorConfigImpl `yaml:"serverConnector" json:"serverConnector"`
6565
StatReporter *StatReporterConfigImpl `yaml:"statReporter" json:"statReporter"`
66+
EventReporter *EventReporterConfigImpl `yaml:"eventReporter" json:"eventReporter"`
6667
Location *LocationConfigImpl `yaml:"location" json:"location"`
6768
Client *ClientConfigImpl `yaml:"client" json:"client"`
6869
}
@@ -87,6 +88,11 @@ func (g *GlobalConfigImpl) GetStatReporter() StatReporterConfig {
8788
return g.StatReporter
8889
}
8990

91+
// GetEventReporter global.eventReporter前缀开头的所有配置项
92+
func (g *GlobalConfigImpl) GetEventReporter() EventReporterConfig {
93+
return g.EventReporter
94+
}
95+
9096
// GetLocation cl5.global.location前缀开头的所有配置项.
9197
func (g *GlobalConfigImpl) GetLocation() LocationConfig {
9298
return g.Location

pkg/flow/configuration/config_flow.go

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
"github.com/polarismesh/polaris-go/pkg/model"
3232
"github.com/polarismesh/polaris-go/pkg/plugin/configconnector"
3333
"github.com/polarismesh/polaris-go/pkg/plugin/configfilter"
34+
"github.com/polarismesh/polaris-go/pkg/plugin/event"
3435
)
3536

3637
// ConfigFileFlow 配置中心核心服务门面类
@@ -50,11 +51,13 @@ type ConfigFileFlow struct {
5051
persistHandler *CachePersistHandler
5152

5253
startLongPollingTaskOnce sync.Once
54+
55+
eventReporterChain []event.EventReporter
5356
}
5457

5558
// NewConfigFileFlow 创建配置中心服务
5659
func NewConfigFileFlow(connector configconnector.ConfigConnector, chain configfilter.Chain,
57-
conf config.Configuration) (*ConfigFileFlow, error) {
60+
conf config.Configuration, eventReporterChain []event.EventReporter) (*ConfigFileFlow, error) {
5861
persistHandler, err := NewCachePersistHandler(
5962
conf.GetConfigFile().GetLocalCache().GetPersistDir(),
6063
conf.GetConfigFile().GetLocalCache().GetPersistMaxWriteRetry(),
@@ -66,14 +69,15 @@ func NewConfigFileFlow(connector configconnector.ConfigConnector, chain configfi
6669
}
6770

6871
configFileService := &ConfigFileFlow{
69-
connector: connector,
70-
chain: chain,
71-
conf: conf,
72-
repos: make([]*ConfigFileRepo, 0, 8),
73-
configFileCache: map[string]model.ConfigFile{},
74-
configFilePool: map[string]*ConfigFileRepo{},
75-
notifiedVersion: map[string]uint64{},
76-
persistHandler: persistHandler,
72+
connector: connector,
73+
chain: chain,
74+
conf: conf,
75+
repos: make([]*ConfigFileRepo, 0, 8),
76+
configFileCache: map[string]model.ConfigFile{},
77+
configFilePool: map[string]*ConfigFileRepo{},
78+
notifiedVersion: map[string]uint64{},
79+
persistHandler: persistHandler,
80+
eventReporterChain: eventReporterChain,
7781
}
7882

7983
return configFileService, nil
@@ -113,7 +117,7 @@ func (c *ConfigFileFlow) GetConfigFile(req *model.GetConfigFileRequest) (model.C
113117
return configFile, nil
114118
}
115119

116-
fileRepo, err := newConfigFileRepo(configFileMetadata, c.connector, c.chain, c.conf, c.persistHandler)
120+
fileRepo, err := newConfigFileRepo(configFileMetadata, c.connector, c.chain, c.conf, c.persistHandler, c.eventReporterChain)
117121
if err != nil {
118122
return nil, err
119123
}

pkg/flow/configuration/file_repo.go

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
"github.com/polarismesh/polaris-go/pkg/model"
3232
"github.com/polarismesh/polaris-go/pkg/plugin/configconnector"
3333
"github.com/polarismesh/polaris-go/pkg/plugin/configfilter"
34+
"github.com/polarismesh/polaris-go/pkg/plugin/event"
3435
)
3536

3637
const (
@@ -62,6 +63,8 @@ type ConfigFileRepo struct {
6263
persistHandler *CachePersistHandler
6364

6465
fallbackToLocalCache bool
66+
67+
eventReporterChain []event.EventReporter
6568
}
6669

6770
// ConfigFileRepoChangeListener 远程配置文件发布监听器
@@ -72,7 +75,8 @@ func newConfigFileRepo(metadata model.ConfigFileMetadata,
7275
connector configconnector.ConfigConnector,
7376
chain configfilter.Chain,
7477
conf config.Configuration,
75-
persistHandler *CachePersistHandler) (*ConfigFileRepo, error) {
78+
persistHandler *CachePersistHandler,
79+
eventChain []event.EventReporter) (*ConfigFileRepo, error) {
7680
repo := &ConfigFileRepo{
7781
connector: connector,
7882
chain: chain,
@@ -86,6 +90,7 @@ func newConfigFileRepo(metadata model.ConfigFileMetadata,
8690
remoteConfigFileRef: &atomic.Value{},
8791
persistHandler: persistHandler,
8892
fallbackToLocalCache: conf.GetConfigFile().GetLocalCache().IsFallbackToLocalCache(),
93+
eventReporterChain: eventChain,
8994
}
9095
repo.remoteConfigFileRef.Store(&configconnector.ConfigFile{
9196
Namespace: metadata.GetNamespace(),
@@ -285,6 +290,27 @@ func (r *ConfigFileRepo) removeCacheConfigFile(file *configconnector.ConfigFile)
285290
r.persistHandler.DeleteCacheFromFile(fileName)
286291
}
287292

293+
func (r *ConfigFileRepo) handleEventReporterChain(f *configconnector.ConfigFile) {
294+
e := &model.BaseEventImpl{
295+
BaseType: model.ConfigBaseEvent,
296+
ConfigEvent: &model.ConfigEventImpl{
297+
EventName: model.ConfigUpdated,
298+
EventTime: time.Now().Format("2006-01-02 15:04:05"),
299+
Namespace: r.configFileMetadata.GetNamespace(),
300+
ConfigGroup: r.configFileMetadata.GetFileGroup(),
301+
ConfigFileName: r.configFileMetadata.GetFileName(),
302+
ConfigFileVersion: f.GetVersionName(),
303+
ClientType: model.ConfigFileRequestMode2Str[r.configFileMetadata.GetFileMode()],
304+
},
305+
}
306+
for _, chain := range r.eventReporterChain {
307+
if err := chain.ReportEvent(e); err != nil {
308+
log.GetBaseLogger().Errorf("[Config] report event(%+v) err: %+v", e, err)
309+
continue
310+
}
311+
}
312+
}
313+
288314
func deepCloneConfigFile(sourceConfigFile *configconnector.ConfigFile) *configconnector.ConfigFile {
289315
tags := make([]*configconnector.ConfigFileTag, 0, len(sourceConfigFile.Tags))
290316
for _, tag := range sourceConfigFile.Tags {
@@ -338,4 +364,7 @@ func (r *ConfigFileRepo) fireChangeEvent(f *configconnector.ConfigFile) {
338364
zap.Any("file", r.configFileMetadata), zap.Error(err))
339365
}
340366
}
367+
368+
// 处理文件配置变更事件上报
369+
r.handleEventReporterChain(f)
341370
}

pkg/flow/configuration/flow.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"github.com/polarismesh/polaris-go/pkg/config"
2222
"github.com/polarismesh/polaris-go/pkg/plugin/configconnector"
2323
"github.com/polarismesh/polaris-go/pkg/plugin/configfilter"
24+
"github.com/polarismesh/polaris-go/pkg/plugin/event"
2425
)
2526

2627
type ConfigFlow struct {
@@ -30,8 +31,8 @@ type ConfigFlow struct {
3031

3132
// NewConfigFlow 创建配置中心服务
3233
func NewConfigFlow(connector configconnector.ConfigConnector, chain configfilter.Chain,
33-
configuration config.Configuration) (*ConfigFlow, error) {
34-
fileFlow, err := NewConfigFileFlow(connector, chain, configuration)
34+
configuration config.Configuration, eventChain []event.EventReporter) (*ConfigFlow, error) {
35+
fileFlow, err := NewConfigFileFlow(connector, chain, configuration, eventChain)
3536
if err != nil {
3637
return nil, err
3738
}

pkg/flow/data/util.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"github.com/polarismesh/polaris-go/pkg/plugin/common"
3131
"github.com/polarismesh/polaris-go/pkg/plugin/configconnector"
3232
"github.com/polarismesh/polaris-go/pkg/plugin/configfilter"
33+
"github.com/polarismesh/polaris-go/pkg/plugin/event"
3334
"github.com/polarismesh/polaris-go/pkg/plugin/healthcheck"
3435
"github.com/polarismesh/polaris-go/pkg/plugin/loadbalancer"
3536
"github.com/polarismesh/polaris-go/pkg/plugin/localregistry"
@@ -157,6 +158,26 @@ func GetStatReporterChain(cfg config.Configuration, supplier plugin.Supplier) ([
157158
return reporterChain, nil
158159
}
159160

161+
// GetEventReporterChain 获取事件上报插件
162+
func GetEventReporterChain(cfg config.Configuration, supplier plugin.Supplier) ([]event.EventReporter, error) {
163+
if !cfg.GetGlobal().GetEventReporter().IsEnable() {
164+
return make([]event.EventReporter, 0), nil
165+
}
166+
167+
reporterNames := cfg.GetGlobal().GetEventReporter().GetChain()
168+
reporterChain := make([]event.EventReporter, 0, len(reporterNames))
169+
if len(reporterNames) > 0 {
170+
for _, reporter := range reporterNames {
171+
targetPlugin, err := supplier.GetPlugin(common.TypeEventReporter, reporter)
172+
if err != nil {
173+
return nil, err
174+
}
175+
reporterChain = append(reporterChain, targetPlugin.(event.EventReporter))
176+
}
177+
}
178+
return reporterChain, nil
179+
}
180+
160181
// GetLoadBalancer 获取负载均衡插件
161182
func GetLoadBalancer(cfg config.Configuration, supplier plugin.Supplier) (loadbalancer.LoadBalancer, error) {
162183
lbType := cfg.GetConsumer().GetLoadbalancer().GetType()

pkg/flow/impl.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import (
3838
"github.com/polarismesh/polaris-go/pkg/plugin/common"
3939
"github.com/polarismesh/polaris-go/pkg/plugin/configconnector"
4040
"github.com/polarismesh/polaris-go/pkg/plugin/configfilter"
41+
"github.com/polarismesh/polaris-go/pkg/plugin/event"
4142
"github.com/polarismesh/polaris-go/pkg/plugin/loadbalancer"
4243
"github.com/polarismesh/polaris-go/pkg/plugin/localregistry"
4344
"github.com/polarismesh/polaris-go/pkg/plugin/location"
@@ -62,6 +63,8 @@ type Engine struct {
6263
routerChain *servicerouter.RouterChain
6364
// 上报插件链
6465
reporterChain []statreporter.StatReporter
66+
// 事件插件链
67+
eventChain []event.EventReporter
6568
// 负载均衡器
6669
loadbalancer loadbalancer.LoadBalancer
6770
// 限流处理协助辅助类
@@ -129,6 +132,11 @@ func InitFlowEngine(flowEngine *Engine, initContext plugin.InitContext) error {
129132
}
130133
}
131134

135+
flowEngine.eventChain, err = data.GetEventReporterChain(cfg, plugins)
136+
if err != nil {
137+
return err
138+
}
139+
132140
// 加载服务路由链插件
133141
err = flowEngine.LoadFlowRouteChain()
134142
if err != nil {
@@ -166,7 +174,7 @@ func InitFlowEngine(flowEngine *Engine, initContext plugin.InitContext) error {
166174

167175
// 初始化配置中心服务
168176
if cfg.GetConfigFile().IsEnable() {
169-
configFlow, err := configuration.NewConfigFlow(flowEngine.configConnector, flowEngine.configFilterChain, flowEngine.configuration)
177+
configFlow, err := configuration.NewConfigFlow(flowEngine.configConnector, flowEngine.configFilterChain, flowEngine.configuration, flowEngine.eventChain)
170178
if err != nil {
171179
return err
172180
}

pkg/model/config.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,12 @@ const (
161161
AgentMode GetConfigFileRequestMode = 1
162162
)
163163

164+
// mode to str的映射
165+
var ConfigFileRequestMode2Str = map[GetConfigFileRequestMode]string{
166+
SDKMode: "sdk",
167+
AgentMode: "agent",
168+
}
169+
164170
type GetConfigFileRequest struct {
165171
Namespace string
166172
FileGroup string

0 commit comments

Comments
 (0)