Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package objects
import "encoding/json"

type batch struct {
Source string `json:"source"`
Collection string `json:"collection"`
WriteKey string `json:"write_key"`
Objects json.RawMessage `json:"objects"`
Expand Down
89 changes: 67 additions & 22 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,31 @@ const (

// Endpoint for Segment Objects API.
DefaultBaseEndpoint = "https://objects.segment.com"

// Default source
DefaultSource = "project"
)

var (
ErrClientClosed = errors.New("Client is closed")
)

type Client struct {
type Config struct {
BaseEndpoint string
Logger *log.Logger
Client *http.Client

Source string

MaxBatchBytes int
MaxBatchCount int
MaxBatchInterval time.Duration

PrintErrors bool
}

type Client struct {
Config
writeKey string
wg sync.WaitGroup
semaphore semaphore.Semaphore
Expand All @@ -48,19 +58,50 @@ type Client struct {
}

func New(writeKey string) *Client {
return NewWithConfig(writeKey, Config{})
}

func NewWithConfig(writeKey string, config Config) *Client {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think if Config contains the writeKey?

func New(writeKey string) *Client {
  return NewWithConfig(Config{WriteKey: writeKey})
}

func NewWithConfig(config Config) *Client {
...
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Configs tend to imply optionality. The writekey is not optional. I am not sure it belongs in the config

return &Client{
BaseEndpoint: DefaultBaseEndpoint,
Logger: log.New(os.Stderr, "segment ", log.LstdFlags),
writeKey: writeKey,
Client: http.DefaultClient,
cmap: newConcurrentMap(),
MaxBatchBytes: 500 << 10,
MaxBatchCount: 100,
MaxBatchInterval: 10 * time.Second,
semaphore: make(semaphore.Semaphore, 10),
Config: withDefaults(config),
writeKey: writeKey,
cmap: newConcurrentMap(),
semaphore: make(semaphore.Semaphore, 10),
}
}

func withDefaults(c Config) Config {
if c.BaseEndpoint == "" {
c.BaseEndpoint = DefaultBaseEndpoint
}

if c.Logger == nil {
c.Logger = log.New(os.Stderr, "segment ", log.LstdFlags)
}

if c.Client == nil {
c.Client = http.DefaultClient
}

if c.MaxBatchBytes <= 0 {
c.MaxBatchBytes = 500 << 10
}

if c.MaxBatchCount <= 0 {
c.MaxBatchCount = 100
}

if c.MaxBatchInterval <= 0 {
c.MaxBatchInterval = 10 * time.Second
}

if c.Source == "" {
c.Source = DefaultSource
}

return c
}

func (c *Client) fetchFunction(key string) *buffer {
b := newBuffer(key)
c.wg.Add(1)
Expand All @@ -76,12 +117,16 @@ func (c *Client) flush(b *buffer) {
rm := b.marshalArray()
c.semaphore.Run(func() {
batchRequest := &batch{
Source: c.Source,
Collection: b.collection,
WriteKey: c.writeKey,
Objects: rm,
}

c.makeRequest(batchRequest)
err := c.makeRequest(batchRequest)
if c.PrintErrors {
log.Printf("[ERROR] Batch failed making request: %v", err)
}
})
b.reset()
}
Expand All @@ -100,7 +145,9 @@ func (c *Client) buffer(b *buffer) {
})
x, err := json.Marshal(req)
if err != nil {
log.Printf("[Error] Message `%s` excluded from batch: %v", req.ID, err)
if c.PrintErrors {
log.Printf("[Error] Message `%s` excluded from batch: %v", req.ID, err)
}
continue
}
if b.size()+len(x) >= c.MaxBatchBytes || b.count()+1 >= c.MaxBatchCount {
Expand All @@ -116,7 +163,9 @@ func (c *Client) buffer(b *buffer) {
})
x, err := json.Marshal(req)
if err != nil {
log.Printf("[Error] Message `%s` excluded from batch: %v", req.ID, err)
if c.PrintErrors {
log.Printf("[Error] Exiting: Message `%s` excluded from batch: %v", req.ID, err)
}
continue
}
if b.size()+len(x) >= c.MaxBatchBytes || b.count()+1 >= c.MaxBatchCount {
Expand Down Expand Up @@ -161,11 +210,10 @@ func (c *Client) Set(v *Object) error {
return nil
}

func (c *Client) makeRequest(request *batch) {
func (c *Client) makeRequest(request *batch) error {
payload, err := json.Marshal(request)
if err != nil {
log.Printf("[Error] Batch failed to marshal: %v - %v", request, err)
return
return err
}

b := backoff.NewExponentialBackOff()
Expand All @@ -183,15 +231,12 @@ func (c *Client) makeRequest(request *batch) {
dec.Decode(&response)

if resp.StatusCode != http.StatusOK {
return fmt.Errorf("HTTP Post Request Failed, Status Code %d. \nResponse: %v \nRequest payload: %v",
resp.StatusCode, response, string(payload))
return fmt.Errorf("HTTP Post Request Failed, Status Code %d. \nResponse: %v",
resp.StatusCode, response)
}

return nil
}, b)

if err != nil {
log.Printf("[Error] %v", err)
return
}
return err
}
1 change: 1 addition & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func (c *ClientTestSuite) TestNewClient() {
c.NotNil(client.Logger)
c.NotNil(client.semaphore)
c.NotNil(client.wg)
c.NotNil(client.Source)
c.Equal("writeKey", client.writeKey)
c.Equal(0, client.cmap.Count())
}
Expand Down