Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
adjust kafka due to the difference between kafka and rabbitmq
  • Loading branch information
chenyu468 committed Apr 13, 2024
commit d97575042246b79a8dcdf749050916f3031d4b97
148 changes: 148 additions & 0 deletions kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,165 @@ package kafka

import (
"context"
"fmt"
"net"
"sync"
"sync/atomic"
"time"

"github.com/golang-queue/queue"
"github.com/golang-queue/queue/core"
kafkaAPI "github.com/segmentio/kafka-go"
ktesting "github.com/segmentio/kafka-go/testing"
)

var _ core.Worker = (*Worker)(nil)

// one consumer connect to kafka broker
var kafkaConsumer *KafkaConsumer

type KafkaConsumer struct {
//stopFlag int32
opts options
//conn *kafkaAPI.Conn
client *kafkaAPI.Client
shutdown func()
}
type ConnWaitGroup struct {
DialFunc func(context.Context, string, string) (net.Conn, error)
sync.WaitGroup
}

// start consumer, get message from kafka
func InitConsumer(opts ...Option) {
//var err error
kafkaConsumer = &KafkaConsumer{
opts: newOptions(opts...),
}
// // connect to broker
// // 需要解决自动重连的问题
// conn, err :=
// (&kafkaAPI.Dialer{
// Resolver: &net.Resolver{},
// }).DialLeader(context.Background(), kafkaConsumer.opts.network,
// kafkaConsumer.opts.addr, kafkaConsumer.opts.topic, 0)
// kafkaConsumer.conn = conn

// 创建client,创建topic,创建shutdown
client, shutdown := newLocalClientAndTopic(kafkaConsumer.opts.topic)
kafkaConsumer.client = client
kafkaConsumer.shutdown = shutdown
//return err
}

func newLocalClient(address string) (*kafkaAPI.Client, func()) {
return newClient(kafkaAPI.TCP(address))
}

func newClient(addr net.Addr) (*kafkaAPI.Client, func()) {
conns := &ktesting.ConnWaitGroup{
DialFunc: (&net.Dialer{}).DialContext,
}

transport := &kafkaAPI.Transport{
Dial: conns.Dial,
Resolver: kafkaAPI.NewBrokerResolver(nil),
}

client := &kafkaAPI.Client{
Addr: addr,
Timeout: 5 * time.Second,
Transport: transport,
}

return client, func() { transport.CloseIdleConnections(); conns.Wait() }
}

func newLocalClientAndTopic(topic string) (*kafkaAPI.Client, func()) {
//topic := makeTopic()
client, shutdown := newLocalClientWithTopic(topic, 1)
return client, shutdown
}

func newLocalClientWithTopic(topic string, partitions int) (*kafkaAPI.Client, func()) {
client, shutdown := newLocalClient(topic)
if err := clientCreateTopic(client, topic, partitions); err != nil {
shutdown()
panic(err)
}
return client, func() {
client.DeleteTopics(context.Background(), &kafkaAPI.DeleteTopicsRequest{
Topics: []string{topic},
})
shutdown()
}
}

func clientCreateTopic(client *kafkaAPI.Client, topic string, partitions int) error {
_, err := client.CreateTopics(context.Background(), &kafkaAPI.CreateTopicsRequest{
Topics: []kafkaAPI.TopicConfig{{
Topic: topic,
NumPartitions: partitions,
ReplicationFactor: 1,
}},
})
if err != nil {
return err
}

// Topic creation seems to be asynchronous. Metadata for the topic partition
// layout in the cluster is available in the controller before being synced
// with the other brokers, which causes "Error:[3] Unknown Topic Or Partition"
// when sending requests to the partition leaders.
//
// This loop will wait up to 2 seconds polling the cluster until no errors
// are returned.
for i := 0; i < 20; i++ {
r, err := client.Fetch(context.Background(), &kafkaAPI.FetchRequest{
Topic: topic,
Partition: 0,
Offset: 0,
})
if err == nil && r.Error == nil {
break
}
time.Sleep(100 * time.Millisecond)
}

return nil
}

// 获取消息发送到队列中去
func GetData() {
for {
// select {
// case <-time.After(leftTime):
// return //context.DeadlineExceeded
// // case err := <-done: // job finish
// // return err
// // case p := <-panicChan:
// // panic(p)
// default:
// 接收消息
fmt.Printf("start fetch data")
res, err := kafkaConsumer.client.Fetch(context.Background(), &kafkaAPI.FetchRequest{
Topic: kafkaConsumer.opts.topic,
Partition: 0,
Offset: 0,
MinBytes: 1,
MaxBytes: 64 * 1024,
MaxWait: 100 * time.Millisecond,
})
if err != nil {
//t.Fatal(err)
fmt.Printf("%v", err)
}
// 打印出消息,后续放入队列中去
fmt.Printf("%v", res)
// }
}
}

type Worker struct {
//
//shutdown func() //
Expand Down
38 changes: 38 additions & 0 deletions kafka_test.go
Original file line number Diff line number Diff line change
@@ -1 +1,39 @@
package kafka

import (
"fmt"
"testing"
)

// import (
// "testing"
// "time"

// "github.com/golang-queue/queue"
// "github.com/stretchr/testify/assert"
// )

func TestFetchData(t *testing.T) {
// m := mockMessage{
// Message: "foo",
// }
// w := NewWorker()
// q, err := queue.NewQueue(
// queue.WithWorker(w),
// queue.WithWorkerCount(2),
// )
// assert.NoError(t, err)
// q.Start()
// time.Sleep(50 * time.Millisecond)
// q.Shutdown()
// // can't queue task after shutdown
// err = q.Queue(m)
// assert.Error(t, err)
// assert.Equal(t, queue.ErrQueueShutdown, err)
// q.Wait()
fmt.Printf("start")
InitConsumer(WithAddr("localhost"),
WithPartition(1),
WithTopic("hello"))
fmt.Printf("end")
}
6 changes: 6 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,5 +79,11 @@ func WithLogger(l queue.Logger) Option {

func newOptions(opts ...Option) options {
defaultOpts := options{}

for _, opt := range opts {
// Call the option giving the instantiated
opt(&defaultOpts)
}

return defaultOpts
}