Skip to content

Commit b2c2d27

Browse files
ynguyensegmentprayanshtysonmote
authored
Add client configurations (#9)
* Add client configurations * support Source config * Release v0.0.1-webhookfn * Patch objects-go (remove sensitive logs) (#11) * Release v0.0.2-webhookfn * hide sensitive payload request. support PrintErrors using Config * update message * Release v1.0.0-webhookfn * History.md * Rename function for clarity Co-authored-by: Prayansh Srivastava <[email protected]> Co-authored-by: Prayansh Srivastava <[email protected]> Co-authored-by: Tyson Mote <[email protected]>
1 parent ea7a247 commit b2c2d27

File tree

3 files changed

+69
-22
lines changed

3 files changed

+69
-22
lines changed

batch.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package objects
33
import "encoding/json"
44

55
type batch struct {
6+
Source string `json:"source"`
67
Collection string `json:"collection"`
78
WriteKey string `json:"write_key"`
89
Objects json.RawMessage `json:"objects"`

client.go

Lines changed: 67 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -25,21 +25,31 @@ const (
2525

2626
// Endpoint for Segment Objects API.
2727
DefaultBaseEndpoint = "https://objects.segment.com"
28+
29+
// Default source
30+
DefaultSource = "project"
2831
)
2932

3033
var (
3134
ErrClientClosed = errors.New("Client is closed")
3235
)
3336

34-
type Client struct {
37+
type Config struct {
3538
BaseEndpoint string
3639
Logger *log.Logger
3740
Client *http.Client
3841

42+
Source string
43+
3944
MaxBatchBytes int
4045
MaxBatchCount int
4146
MaxBatchInterval time.Duration
4247

48+
PrintErrors bool
49+
}
50+
51+
type Client struct {
52+
Config
4353
writeKey string
4454
wg sync.WaitGroup
4555
semaphore semaphore.Semaphore
@@ -48,19 +58,50 @@ type Client struct {
4858
}
4959

5060
func New(writeKey string) *Client {
61+
return NewWithConfig(writeKey, Config{})
62+
}
63+
64+
func NewWithConfig(writeKey string, config Config) *Client {
5165
return &Client{
52-
BaseEndpoint: DefaultBaseEndpoint,
53-
Logger: log.New(os.Stderr, "segment ", log.LstdFlags),
54-
writeKey: writeKey,
55-
Client: http.DefaultClient,
56-
cmap: newConcurrentMap(),
57-
MaxBatchBytes: 500 << 10,
58-
MaxBatchCount: 100,
59-
MaxBatchInterval: 10 * time.Second,
60-
semaphore: make(semaphore.Semaphore, 10),
66+
Config: withDefaults(config),
67+
writeKey: writeKey,
68+
cmap: newConcurrentMap(),
69+
semaphore: make(semaphore.Semaphore, 10),
6170
}
6271
}
6372

73+
func withDefaults(c Config) Config {
74+
if c.BaseEndpoint == "" {
75+
c.BaseEndpoint = DefaultBaseEndpoint
76+
}
77+
78+
if c.Logger == nil {
79+
c.Logger = log.New(os.Stderr, "segment ", log.LstdFlags)
80+
}
81+
82+
if c.Client == nil {
83+
c.Client = http.DefaultClient
84+
}
85+
86+
if c.MaxBatchBytes <= 0 {
87+
c.MaxBatchBytes = 500 << 10
88+
}
89+
90+
if c.MaxBatchCount <= 0 {
91+
c.MaxBatchCount = 100
92+
}
93+
94+
if c.MaxBatchInterval <= 0 {
95+
c.MaxBatchInterval = 10 * time.Second
96+
}
97+
98+
if c.Source == "" {
99+
c.Source = DefaultSource
100+
}
101+
102+
return c
103+
}
104+
64105
func (c *Client) fetchFunction(key string) *buffer {
65106
b := newBuffer(key)
66107
c.wg.Add(1)
@@ -76,12 +117,16 @@ func (c *Client) flush(b *buffer) {
76117
rm := b.marshalArray()
77118
c.semaphore.Run(func() {
78119
batchRequest := &batch{
120+
Source: c.Source,
79121
Collection: b.collection,
80122
WriteKey: c.writeKey,
81123
Objects: rm,
82124
}
83125

84-
c.makeRequest(batchRequest)
126+
err := c.makeRequest(batchRequest)
127+
if c.PrintErrors {
128+
log.Printf("[ERROR] Batch failed making request: %v", err)
129+
}
85130
})
86131
b.reset()
87132
}
@@ -100,7 +145,9 @@ func (c *Client) buffer(b *buffer) {
100145
})
101146
x, err := json.Marshal(req)
102147
if err != nil {
103-
log.Printf("[Error] Message `%s` excluded from batch: %v", req.ID, err)
148+
if c.PrintErrors {
149+
log.Printf("[Error] Message `%s` excluded from batch: %v", req.ID, err)
150+
}
104151
continue
105152
}
106153
if b.size()+len(x) >= c.MaxBatchBytes || b.count()+1 >= c.MaxBatchCount {
@@ -116,7 +163,9 @@ func (c *Client) buffer(b *buffer) {
116163
})
117164
x, err := json.Marshal(req)
118165
if err != nil {
119-
log.Printf("[Error] Message `%s` excluded from batch: %v", req.ID, err)
166+
if c.PrintErrors {
167+
log.Printf("[Error] Exiting: Message `%s` excluded from batch: %v", req.ID, err)
168+
}
120169
continue
121170
}
122171
if b.size()+len(x) >= c.MaxBatchBytes || b.count()+1 >= c.MaxBatchCount {
@@ -161,11 +210,10 @@ func (c *Client) Set(v *Object) error {
161210
return nil
162211
}
163212

164-
func (c *Client) makeRequest(request *batch) {
213+
func (c *Client) makeRequest(request *batch) error {
165214
payload, err := json.Marshal(request)
166215
if err != nil {
167-
log.Printf("[Error] Batch failed to marshal: %v - %v", request, err)
168-
return
216+
return err
169217
}
170218

171219
b := backoff.NewExponentialBackOff()
@@ -183,15 +231,12 @@ func (c *Client) makeRequest(request *batch) {
183231
dec.Decode(&response)
184232

185233
if resp.StatusCode != http.StatusOK {
186-
return fmt.Errorf("HTTP Post Request Failed, Status Code %d. \nResponse: %v \nRequest payload: %v",
187-
resp.StatusCode, response, string(payload))
234+
return fmt.Errorf("HTTP Post Request Failed, Status Code %d. \nResponse: %v",
235+
resp.StatusCode, response)
188236
}
189237

190238
return nil
191239
}, b)
192240

193-
if err != nil {
194-
log.Printf("[Error] %v", err)
195-
return
196-
}
241+
return err
197242
}

client_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ func (c *ClientTestSuite) TestNewClient() {
5858
c.NotNil(client.Logger)
5959
c.NotNil(client.semaphore)
6060
c.NotNil(client.wg)
61+
c.NotNil(client.Source)
6162
c.Equal("writeKey", client.writeKey)
6263
c.Equal(0, client.cmap.Count())
6364
}

0 commit comments

Comments
 (0)