Skip to content

Conversation

@panapol-p
Copy link
Contributor

this change makes callback function to support when client register or un-register SSE

issue : #132

Screen Recording 2565-05-08 at 03 05 54

example server

package main

import (
	"fmt"
	"net/http"
	"time"

	"github.com/r3labs/sse/v2"
)

func main() {
	c := make(map[string]int32)

	onRegister := func(str *sse.Stream) {
		c[str.ID]++
		fmt.Printf("\U0001F7E2 [channel : %s] client is registered (%d registered)\n", str.ID, c[str.ID])
	}

	onUnRegister := func(str *sse.Stream) {
		c[str.ID]--
		fmt.Printf("🔴 [channel : %s] client is registered (%d registered)\n", str.ID, c[str.ID])
	}

	server := sse.NewWithCallback(onRegister, onUnRegister)
	server.AutoStream = true
	server.AutoReplay = false

	// Create a new Mux and set the handler
	mux := http.NewServeMux()
	mux.HandleFunc("/events", server.ServeHTTP)

	go func() {
		for {
			time.Sleep(2 * time.Second)
			server.Publish("ch1", &sse.Event{
				Data: []byte("this is channel 1"),
			})
			server.Publish("ch2", &sse.Event{
				Data: []byte("this is channel 2"),
			})
		}
	}()

	http.ListenAndServe(":8080", mux)
}

example client

package main

import (
	"flag"
	"fmt"
	"log"

	"github.com/r3labs/sse/v2"
)

func main() {
	channelPtr := flag.String("channel", "test", "channel id")
	flag.Parse()
	c := sse.NewClient("http://localhost:8080/events")

	err := c.Subscribe(*channelPtr, func(msg *sse.Event) {
		fmt.Println(string(msg.Data))
	})
	if err != nil {
		log.Fatalln(err.Error())
	}
}

@purehyperbole
Copy link
Member

Hey @panapol-p ,

Thanks for submitting the PR and apologies for missing issue #132 !

The changes look really good, happy to merge them once the method names have been changed 😄

@panapol-p
Copy link
Contributor Author

Hi, @purehyperbole thank for your commented i will change it.
by the way, I have some idea. good or not if OnSubscribe can access data like query parameter.
I think the developer can use this one for control client-id, security (access-token), or anything else they need.

@purehyperbole
Copy link
Member

purehyperbole commented May 10, 2022

@panapol-p thanks for making the changes.

I think that's a good suggestion. Seems like if that's information that you're after, it may be best to define the OnSubscribe and OnUnsubscribe methods as:

func onSubscribe(streamID string, sub *Subscriber) {
   ...
}

and add extend Subscriber to include URL *url.URL, which can get passed in the call to addSubscriber. What do you think?

@panapol-p
Copy link
Contributor Author

thank you for your suggestion.

I think onSubscriber should look like this

func onSubscribe(sub *Subscriber) {
   ...
}

stream-id no need to add in this method, it will be not easy if we use more than 1 stream channel in 1 url path for this problem I add stream-id in to Stream already and then I will add stream-id into Subscriber too, we can get stream-id by call sub.streamID

func onSubscribe(sub *Subscriber) {
    switch(sub.streamID){
    case ... : ...
 }
}

and good to include *url.URL into Subscriber by addSubscriber
What do you think?

@purehyperbole
Copy link
Member

@panapol-p yep, I think that approach looks good!

@panapol-p
Copy link
Contributor Author

So sorry i'm so confuse before.
I think your method is good I will cover your suggestion thank you kub :)

@panapol-p
Copy link
Contributor Author

panapol-p commented May 12, 2022

Hi @purehyperbole
Could you please review.

example server (get client name from query parameter that was include into subscriber)

c := make(map[string]int32)

onSubscribe := func(streamID string, sub *sse.Subscriber) {
    c[streamID]++
    clientName := sub.URL.Query().Get("name")
    fmt.Printf("\U0001F7E2 [channel : %s] client [%s] is subscribed (%d registered)\n", streamID, clientName, c[streamID])
}

onUnsubscribe := func(streamID string, sub *sse.Subscriber) {
    c[streamID]--
    clientName := sub.URL.Query().Get("name")
    fmt.Printf("🔴 [channel : %s] client [%s] is unsubscribed (%d registered)\n", streamID, clientName, c[streamID])
}

server := sse.NewWithCallback(onSubscribe, onUnsubscribe)
server.AutoStream = true
server.AutoReplay = false

// Create a new Mux and set the handler
mux := http.NewServeMux()
mux.HandleFunc("/events", server.ServeHTTP)

go func() {
for {
	time.Sleep(2 * time.Second)
	server.Publish("ch1", &sse.Event{
		Data: []byte("this is channel 1"),
	})
	server.Publish("ch2", &sse.Event{
		Data: []byte("this is channel 2"),
	})
}
}()

http.ListenAndServe(":8081", mux)

go run main.go

example client (we put client name into query parameter)

channelPtr := flag.String("channel", "test", "channel id")
namePtr := flag.String("name", "test", "client name")
flag.Parse()
c := sse.NewClient("http://localhost:8081/events?name=" + *namePtr)

err := c.Subscribe(*channelPtr, func(msg *sse.Event) {
    fmt.Println(string(msg.Data))
})
if err != nil {
    log.Fatalln(err.Error())
}

go run main.go -channel={{$channel_name}} -name={{$client_name}}

result
image

Copy link
Member

@purehyperbole purehyperbole left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm! 🎉

@purehyperbole purehyperbole merged commit f69fb8a into r3labs:master May 12, 2022
@purehyperbole
Copy link
Member

Changes look great. I will merge and release these changes now.

Thanks again for submitting this feature 😄

@panapol-p panapol-p deleted the add-callback-function branch May 12, 2022 11:37
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants