Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (c *Client) SubscribeWithContext(ctx context.Context, stream string, handle
return err
case msg := <-eventChan:
handler(msg)
case <- ctx.Done():
case <-ctx.Done():
return ctx.Err()
}
}
Expand Down Expand Up @@ -231,8 +231,11 @@ func (c *Client) readLoop(reader *EventStreamReader, outCh chan *Event, erChan c
} else {
msg.ID = []byte(c.EventID)
}
// Send downstream
outCh <- msg

// Send downstream if the event has something useful
if msg.hasContent() {
outCh <- msg
}
}
}
}
Expand Down Expand Up @@ -310,7 +313,6 @@ func (c *Client) processEvent(msg []byte) (event *Event, err error) {
}

// Normalize the crlf to lf to make it easier to split the lines.
bytes.Replace(msg, []byte("\n\r"), []byte("\n"), -1)
// Split the line by "\n" or "\r", per the spec.
for _, line := range bytes.FieldsFunc(msg, func(r rune) bool { return r == '\n' || r == '\r' }) {
switch {
Expand Down
20 changes: 20 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,26 @@ func TestClientLargeData(t *testing.T) {
require.Equal(t, data, d)
}

func TestClientComment(t *testing.T) {
srv = newServer()
defer cleanup()

c := NewClient(urlPath)

events := make(chan *Event)
err := c.SubscribeChan("test", events)
require.Nil(t, err)

srv.Publish("test", &Event{Comment: []byte("comment")})
srv.Publish("test", &Event{Data: []byte("test")})

ev, err := waitEvent(events, time.Second*1)
assert.Nil(t, err)
assert.Equal(t, []byte("test"), ev.Data)

c.Unsubscribe(events)
}

func TestTrimHeader(t *testing.T) {
tests := []struct {
input []byte
Expand Down
5 changes: 5 additions & 0 deletions event.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ type Event struct {
Data []byte
Event []byte
Retry []byte
Comment []byte
}

func (e *Event) hasContent() bool {
return len(e.ID) > 0 || len(e.Data) > 0 || len(e.Event) > 0 || len(e.Retry) > 0
}

// EventStreamReader scans an io.Reader looking for EventStream messages.
Expand Down
4 changes: 4 additions & 0 deletions event_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ type EventLog []*Event

// Add event to eventlog
func (e *EventLog) Add(ev *Event) {
if !ev.hasContent() {
return
}

ev.ID = []byte(e.currentindex())
ev.timestamp = time.Now()
*e = append(*e, ev)
Expand Down
38 changes: 22 additions & 16 deletions http.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Push events to client
for ev := range sub.connection {
// If the data buffer is an empty string abort.
if len(ev.Data) == 0 {
if len(ev.Data) == 0 && len(ev.Comment) == 0 {
break
}

Expand All @@ -84,27 +84,33 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
continue
}

fmt.Fprintf(w, "id: %s\n", ev.ID)
if len(ev.Data) > 0 {
fmt.Fprintf(w, "id: %s\n", ev.ID)

if s.SplitData {
sd := bytes.Split(ev.Data, []byte("\n"))
for i := range sd {
fmt.Fprintf(w, "data: %s\n", sd[i])
}
} else {
if bytes.HasPrefix(ev.Data, []byte(":")) {
fmt.Fprintf(w, "%s\n", ev.Data)
if s.SplitData {
sd := bytes.Split(ev.Data, []byte("\n"))
for i := range sd {
fmt.Fprintf(w, "data: %s\n", sd[i])
}
} else {
fmt.Fprintf(w, "data: %s\n", ev.Data)
if bytes.HasPrefix(ev.Data, []byte(":")) {
fmt.Fprintf(w, "%s\n", ev.Data)
} else {
fmt.Fprintf(w, "data: %s\n", ev.Data)
}
}
}

if len(ev.Event) > 0 {
fmt.Fprintf(w, "event: %s\n", ev.Event)
if len(ev.Event) > 0 {
fmt.Fprintf(w, "event: %s\n", ev.Event)
}

if len(ev.Retry) > 0 {
fmt.Fprintf(w, "retry: %s\n", ev.Retry)
}
}

if len(ev.Retry) > 0 {
fmt.Fprintf(w, "retry: %s\n", ev.Retry)
if len(ev.Comment) > 0 {
fmt.Fprintf(w, ": %s\n", ev.Comment)
}

fmt.Fprint(w, "\n")
Expand Down