Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 41 additions & 25 deletions src/backend/cmd/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ import (
"os"
"os/signal"
"sync"
"time"

"github.com/astaxie/beego"
"github.com/spf13/cobra"
"github.com/streadway/amqp"

"github.com/Qihoo360/wayne/src/backend/bus"
"github.com/Qihoo360/wayne/src/backend/initial"
Expand Down Expand Up @@ -67,27 +69,50 @@ func run(cmd *cobra.Command, args []string) {
signalChan := make(chan os.Signal)
signal.Notify(signalChan, os.Interrupt)
signal.Notify(signalChan, os.Kill)
go func(ch chan os.Signal) {
go func(ch chan os.Signal, workerSet map[*workers.Worker]workers.Worker) {
select {
case <-ch:
lock.Lock()
for _, w := range workerSet {
w.Stop()
}
}
}(signalChan)
}(signalChan, workerSet)

for {
logs.Info("Start worker.......")
var err error
bus.DefaultBus, err = bus.NewBus(beego.AppConfig.String("BusRabbitMQURL"))
if err != nil {
logs.Critical("Connection bus error. Will retry connection after 5 second.", err)
time.Sleep(5 * time.Second)
continue
}
workerSet = make(map[*workers.Worker]workers.Worker)
wg := &sync.WaitGroup{}
for i := 0; i < concurrency; i++ {
wg.Add(1)
recoverableWorker(workerSet, workerType, wg)
wg.Done()

wg := sync.WaitGroup{}
for i := 0; i < concurrency; i++ {
recoverableWorker(workerSet, workerType, &wg)
}
wg.Wait()
// Waits here for the channel to be closed,Let Handle know it's not time to reconnect
logs.Warning("Receive closing error, will stop all working worker: ",
<-bus.DefaultBus.Conn.NotifyClose(make(chan *amqp.Error)))
for _, w := range workerSet {
err := w.Stop()
if err != nil {
logs.Error("Stop worker (%v) error. %v", w, err)
}
}
}
wg.Wait()

}

func recoverableWorker(workerSet map[*workers.Worker]workers.Worker, workerType string, wg *sync.WaitGroup) {
lock.Lock()
defer lock.Unlock()

var worker workers.Worker
var err error
switch workerType {
Expand All @@ -102,24 +127,15 @@ func recoverableWorker(workerSet map[*workers.Worker]workers.Worker, workerType
logs.Critical(err)
return
}
go func() {
// TODO run retry specified numbers, then exit
err := worker.Run()
if err != nil {
logs.Critical("Run worker error.Will try rerun after 5 second.", err)
wg.Add(1)
}

}()
workerSet[&worker] = worker

wg.Add(1)
go func(w workers.Worker) {
defer func() {
if r := recover(); r != nil {
logs.Critical(r)
if availableRecovery > 0 {
availableRecovery--
recoverableWorker(workerSet, workerType, wg)
} else {
panic(r)
}
}
w.Stop()
delete(workerSet, &w)
wg.Done()
}()
worker.Run()
}(worker)
}
18 changes: 5 additions & 13 deletions src/backend/workers/workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,32 +42,25 @@ func NewBaseMessageWorker(b *bus.Bus, queue string) *BaseMessageWorker {
}

func (w *BaseMessageWorker) Run() error {
ch, err := w.Bus.Channel.Consume(w.queue, w.consumer, false, false, false, false, nil)
deliveryChan, err := w.Bus.Channel.Consume(w.queue, w.consumer, false, false, false, false, nil)
if nil != err {
return err
}

for {
select {
case delivery := <-ch:
case delivery := <-deliveryChan:
logs.Debug("Delivery received: %p", &delivery)
processMessage(&delivery, w)
case <-w.stopChan:
logs.Info("WebhookWorker gracefully stopped")
logs.Info("Worker (%s) gracefully stopped", w.queue)
return nil
}
}

}

func processMessage(d *amqp.Delivery, w *BaseMessageWorker) {
defer func() {
if r := recover(); r != nil {
logs.Critical(r)
d.Reject(false)
}
}()

var m message.Message
err := json.Unmarshal(d.Body, &m)
if err != nil {
Expand All @@ -84,14 +77,13 @@ func processMessage(d *amqp.Delivery, w *BaseMessageWorker) {

func ackOrDie(d *amqp.Delivery, multiple bool) {
if err := d.Ack(multiple); err != nil { // Client heartbeats failed? Lost connection. Cannot recover.
logs.Critical(err)
panic(err)
logs.Error(err)
}
}

func (w *BaseMessageWorker) Stop() error {
close(w.stopChan)
w.Bus.Channel.Cancel(w.consumer, true)
logs.Debug("worker(consumer %s) stopped.", w.consumer)
logs.Info("worker(consumer %s) stopped.", w.consumer)
return nil
}