From bbe28d76d2109ec342c3366bd2f57aa8ef4fb0fe Mon Sep 17 00:00:00 2001 From: Y Nguyen Date: Thu, 9 May 2019 22:30:40 -0700 Subject: [PATCH 1/6] Add client configurations --- client.go | 54 ++++++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 42 insertions(+), 12 deletions(-) diff --git a/client.go b/client.go index d0d0997..a9c23af 100644 --- a/client.go +++ b/client.go @@ -31,7 +31,7 @@ var ( ErrClientClosed = errors.New("Client is closed") ) -type Client struct { +type Config struct { BaseEndpoint string Logger *log.Logger Client *http.Client @@ -39,7 +39,10 @@ type Client struct { MaxBatchBytes int MaxBatchCount int MaxBatchInterval time.Duration +} +type Client struct { + Config writeKey string wg sync.WaitGroup semaphore semaphore.Semaphore @@ -48,17 +51,45 @@ type Client struct { } func New(writeKey string) *Client { + return NewWithConfig(writeKey, Config{}) +} + +func NewWithConfig(writeKey string, config Config) *Client { + conf := getFinalConfig(config) return &Client{ - BaseEndpoint: DefaultBaseEndpoint, - Logger: log.New(os.Stderr, "segment ", log.LstdFlags), - writeKey: writeKey, - Client: http.DefaultClient, - cmap: newConcurrentMap(), - MaxBatchBytes: 500 << 10, - MaxBatchCount: 100, - MaxBatchInterval: 10 * time.Second, - semaphore: make(semaphore.Semaphore, 10), + Config: conf, + writeKey: writeKey, + cmap: newConcurrentMap(), + semaphore: make(semaphore.Semaphore, 10), + } +} + +func getFinalConfig(c Config) Config { + if c.BaseEndpoint == "" { + c.BaseEndpoint = DefaultBaseEndpoint + } + + if c.Logger == nil { + c.Logger = log.New(os.Stderr, "segment ", log.LstdFlags) } + + if c.Client == nil { + c.Client = http.DefaultClient + } + + if c.MaxBatchBytes <= 0 { + c.MaxBatchBytes = 500 << 10 + } + + if c.MaxBatchCount <= 0 { + c.MaxBatchCount = 100 + } + + if c.MaxBatchInterval <= 0 { + c.MaxBatchInterval = 10 * time.Second + } + + return c } func (c *Client) fetchFunction(key string) *buffer { @@ -167,7 +198,6 @@ func (c *Client) makeRequest(request *batch) { return } - b := backoff.NewExponentialBackOff() b.MaxElapsedTime = 10 * time.Second err = backoff.Retry(func() error { @@ -184,7 +214,7 @@ func (c *Client) makeRequest(request *batch) { if resp.StatusCode != http.StatusOK { return fmt.Errorf("HTTP Post Request Failed, Status Code %d. \nResponse: %v \nRequest payload: %v", - resp.StatusCode, response, string(payload)) + resp.StatusCode, response, string(payload)) } return nil From 67871b0a7a562a0de2ec7065d2939732a7894df7 Mon Sep 17 00:00:00 2001 From: Prayansh Srivastava Date: Mon, 26 Aug 2019 17:39:19 -0700 Subject: [PATCH 2/6] support Source config --- batch.go | 1 + client.go | 10 ++++++++++ client_test.go | 1 + 3 files changed, 12 insertions(+) diff --git a/batch.go b/batch.go index 1e12e32..e493efb 100644 --- a/batch.go +++ b/batch.go @@ -3,6 +3,7 @@ package objects import "encoding/json" type batch struct { + Source string `json:"source"` Collection string `json:"collection"` WriteKey string `json:"write_key"` Objects json.RawMessage `json:"objects"` diff --git a/client.go b/client.go index a9c23af..4368f5c 100644 --- a/client.go +++ b/client.go @@ -25,6 +25,9 @@ const ( // Endpoint for Segment Objects API. DefaultBaseEndpoint = "https://objects.segment.com" + + // Default source + DefaultSource = "project" ) var ( @@ -36,6 +39,8 @@ type Config struct { Logger *log.Logger Client *http.Client + Source string + MaxBatchBytes int MaxBatchCount int MaxBatchInterval time.Duration @@ -89,6 +94,10 @@ func getFinalConfig(c Config) Config { c.MaxBatchInterval = 10 * time.Second } + if c.Source == "" { + c.Source = DefaultSource + } + return c } @@ -107,6 +116,7 @@ func (c *Client) flush(b *buffer) { rm := b.marshalArray() c.semaphore.Run(func() { batchRequest := &batch{ + Source: c.Source, Collection: b.collection, WriteKey: c.writeKey, Objects: rm, diff --git a/client_test.go b/client_test.go index 9319684..5bbd63a 100644 --- a/client_test.go +++ b/client_test.go @@ -58,6 +58,7 @@ func (c *ClientTestSuite) TestNewClient() { c.NotNil(client.Logger) c.NotNil(client.semaphore) c.NotNil(client.wg) + c.NotNil(client.Source) c.Equal("writeKey", client.writeKey) c.Equal(0, client.cmap.Count()) } From d2152144b7cfb6927c695fc92ace19fb06673424 Mon Sep 17 00:00:00 2001 From: Y Nguyen Date: Fri, 30 Aug 2019 11:56:41 -0700 Subject: [PATCH 3/6] Release v0.0.1-webhookfn --- History.md | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) create mode 100644 History.md diff --git a/History.md b/History.md new file mode 100644 index 0000000..a237524 --- /dev/null +++ b/History.md @@ -0,0 +1,19 @@ + +v0.0.1-webhookfn / 2019-08-30 +============================= + + * Add client configurations + * Move reader to within retry loop, means it will work after first request (#7) + * Log actual request object when Set call fails. + * Updating tableize method call. + * Make close return an error, remove call to reset() + * Use `atomic.CompareAndSwapInt64` in Close + * Using unbuffered exit channel to fix race condition. + * Close channel after sending signal to exit + * Make NewConcurrentMap private + * Add validation, Client Tests, Reset buffers on close + * Update cmap.go + * Change check if closed + * Wait for completition + * First commit + * Initial commit From 879501799a3032f262cf0b95bd1f145e5163ad75 Mon Sep 17 00:00:00 2001 From: Prayansh Srivastava Date: Tue, 1 Oct 2019 18:08:02 -0700 Subject: [PATCH 4/6] Patch objects-go (remove sensitive logs) (#11) * Release v0.0.2-webhookfn * hide sensitive payload request. support PrintErrors using Config * update message * Release v1.0.0-webhookfn --- History.md | 11 +++++++++++ client.go | 29 +++++++++++++++++------------ 2 files changed, 28 insertions(+), 12 deletions(-) diff --git a/History.md b/History.md index a237524..e0622db 100644 --- a/History.md +++ b/History.md @@ -1,4 +1,15 @@ +v1.0.0-webhookfn / 2019-10-01 +============================= + + * update message + * hide sensitive payload request. support PrintErrors using Config + +v0.0.2-webhookfn / 2019-08-30 +============================= + + * support Source config + v0.0.1-webhookfn / 2019-08-30 ============================= diff --git a/client.go b/client.go index 4368f5c..834732e 100644 --- a/client.go +++ b/client.go @@ -44,6 +44,8 @@ type Config struct { MaxBatchBytes int MaxBatchCount int MaxBatchInterval time.Duration + + PrintErrors bool } type Client struct { @@ -122,7 +124,10 @@ func (c *Client) flush(b *buffer) { Objects: rm, } - c.makeRequest(batchRequest) + err := c.makeRequest(batchRequest) + if c.PrintErrors { + log.Printf("[ERROR] Batch failed making request: %v", err) + } }) b.reset() } @@ -140,7 +145,9 @@ func (c *Client) buffer(b *buffer) { }) x, err := json.Marshal(req) if err != nil { - log.Printf("[Error] Message `%s` excluded from batch: %v", req.ID, err) + if c.PrintErrors { + log.Printf("[Error] Message `%s` excluded from batch: %v", req.ID, err) + } continue } if b.size()+len(x) >= c.MaxBatchBytes || b.count()+1 >= c.MaxBatchCount { @@ -156,7 +163,9 @@ func (c *Client) buffer(b *buffer) { }) x, err := json.Marshal(req) if err != nil { - log.Printf("[Error] Message `%s` excluded from batch: %v", req.ID, err) + if c.PrintErrors { + log.Printf("[Error] Exiting: Message `%s` excluded from batch: %v", req.ID, err) + } continue } if b.size()+len(x) >= c.MaxBatchBytes || b.count()+1 >= c.MaxBatchCount { @@ -201,11 +210,10 @@ func (c *Client) Set(v *Object) error { return nil } -func (c *Client) makeRequest(request *batch) { +func (c *Client) makeRequest(request *batch) error { payload, err := json.Marshal(request) if err != nil { - log.Printf("[Error] Batch failed to marshal: %v - %v", request, err) - return + return err } b := backoff.NewExponentialBackOff() @@ -223,15 +231,12 @@ func (c *Client) makeRequest(request *batch) { dec.Decode(&response) if resp.StatusCode != http.StatusOK { - return fmt.Errorf("HTTP Post Request Failed, Status Code %d. \nResponse: %v \nRequest payload: %v", - resp.StatusCode, response, string(payload)) + return fmt.Errorf("HTTP Post Request Failed, Status Code %d. \nResponse: %v", + resp.StatusCode, response) } return nil }, b) - if err != nil { - log.Printf("[Error] %v", err) - return - } + return err } From 7d5a6239fbeebc762301c742b35f954100de438f Mon Sep 17 00:00:00 2001 From: Tyson Mote Date: Mon, 12 Jul 2021 10:10:52 -0700 Subject: [PATCH 5/6] History.md --- History.md | 30 ------------------------------ 1 file changed, 30 deletions(-) delete mode 100644 History.md diff --git a/History.md b/History.md deleted file mode 100644 index e0622db..0000000 --- a/History.md +++ /dev/null @@ -1,30 +0,0 @@ - -v1.0.0-webhookfn / 2019-10-01 -============================= - - * update message - * hide sensitive payload request. support PrintErrors using Config - -v0.0.2-webhookfn / 2019-08-30 -============================= - - * support Source config - -v0.0.1-webhookfn / 2019-08-30 -============================= - - * Add client configurations - * Move reader to within retry loop, means it will work after first request (#7) - * Log actual request object when Set call fails. - * Updating tableize method call. - * Make close return an error, remove call to reset() - * Use `atomic.CompareAndSwapInt64` in Close - * Using unbuffered exit channel to fix race condition. - * Close channel after sending signal to exit - * Make NewConcurrentMap private - * Add validation, Client Tests, Reset buffers on close - * Update cmap.go - * Change check if closed - * Wait for completition - * First commit - * Initial commit From 28ae6688998801e36e75296018ba2e245e5605c0 Mon Sep 17 00:00:00 2001 From: Tyson Mote Date: Mon, 12 Jul 2021 10:12:31 -0700 Subject: [PATCH 6/6] Rename function for clarity --- client.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/client.go b/client.go index a72642a..3cf16ad 100644 --- a/client.go +++ b/client.go @@ -62,16 +62,15 @@ func New(writeKey string) *Client { } func NewWithConfig(writeKey string, config Config) *Client { - conf := getFinalConfig(config) return &Client{ - Config: conf, + Config: withDefaults(config), writeKey: writeKey, cmap: newConcurrentMap(), semaphore: make(semaphore.Semaphore, 10), } } -func getFinalConfig(c Config) Config { +func withDefaults(c Config) Config { if c.BaseEndpoint == "" { c.BaseEndpoint = DefaultBaseEndpoint }