Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
104 commits
Select commit Hold shift + click to select a range
902d6c7
start on admin dashboard
jordanschalm Jan 3, 2017
4236bac
fix imports
jordanschalm Jan 3, 2017
8906068
get websockets working reliably across reloads, add unsubscribe metho…
jordanschalm Jan 4, 2017
7ad2f0c
get logs into dashboard module
jordanschalm Jan 5, 2017
2ae4951
add log hooks to main package
jordanschalm Jan 24, 2017
1515594
giving up on plain js -- setup react
jordanschalm Jan 24, 2017
55eedaa
set up statik file system and move session management to occur over w…
jordanschalm Jan 24, 2017
f0f59f3
set up redux store and hook up to websockets
jordanschalm Jan 30, 2017
2bbfa29
time conversion on fe, and feed from be into charts
jordanschalm Feb 4, 2017
5ed2b25
styling of charts, and fix sync issue with event store
jordanschalm Feb 6, 2017
4da0bbb
add initSess stub
jordanschalm Feb 6, 2017
13191e0
glide.lock auto-update
jordanschalm Feb 6, 2017
31cb581
merge master into dashboard
jordanschalm Feb 6, 2017
dd5094c
add decorator pattern
flashmob Feb 9, 2017
20cf073
Merge branch 'master' into more-backends-refactoring
flashmob Feb 9, 2017
6476239
dummy backend uses the decorator pattern with two decorators: "debugg…
flashmob Feb 9, 2017
44ad353
dummy backend uses the decorator pattern with two decorators: "debugg…
flashmob Feb 9, 2017
b19bac2
more progress with decorators:
flashmob Feb 10, 2017
00dfc08
more progress with decorators:
flashmob Feb 10, 2017
87c442a
refactored Processor initialization and shutdown: now using initializ…
flashmob Feb 13, 2017
9127bc0
- got rid of abstract.go backend
flashmob Feb 14, 2017
e2d7375
- redis: when not compressed, delivery header was not added
flashmob Feb 14, 2017
91d4e66
- synchronise BackendService
flashmob Feb 14, 2017
0c84f5e
- backend logger refactoring
flashmob Feb 14, 2017
35f069b
forgot to set the log level
flashmob Feb 14, 2017
32c1e9e
Fixed tests
flashmob Feb 15, 2017
6c1c48b
- AddProcessor function to give more abstraction into the processor c…
flashmob Feb 16, 2017
0d1e35c
add error checking for backend processor initialization
flashmob Feb 16, 2017
7dbe397
trim whitespace from user part
flashmob Feb 16, 2017
0c160b6
rename process_line to process_stack to better reflect name
flashmob Feb 16, 2017
c4fea83
Send actions over websockets directly as redux actions
jordanschalm Feb 21, 2017
8e1e342
Refactoring
flashmob Feb 22, 2017
7aa52bf
Add ranking telemetry for top clients by helo, ip, domain and create …
jordanschalm Feb 23, 2017
b90508a
test travis
jordanschalm Feb 23, 2017
f56b3c8
test travis
jordanschalm Feb 23, 2017
85d6aef
test travis
jordanschalm Feb 23, 2017
0a4965f
test travis
jordanschalm Feb 23, 2017
462ebb2
test travis
jordanschalm Feb 23, 2017
2107c27
test travis
jordanschalm Feb 23, 2017
5f44d28
test travis
jordanschalm Feb 23, 2017
dfcd016
test travis
jordanschalm Feb 23, 2017
2db6573
test travis
jordanschalm Feb 23, 2017
1d59402
add old backend compatibility
flashmob Feb 23, 2017
5764ae4
old version of guerrilla-db-redis, to remove
flashmob Feb 23, 2017
9c09c57
ported guerrilla_db_redis to new backend system
flashmob Feb 23, 2017
60494c1
remove old deprecated backend system
flashmob Feb 23, 2017
f94e4f4
Reimplement ranking analytics to save (lots of) memory
jordanschalm Feb 23, 2017
f96ebc7
Add config options to dashboard and update sample config and readme
jordanschalm Feb 25, 2017
1da6c64
change enabled key to match rest of config
jordanschalm Feb 25, 2017
f7a2032
- renamed envelope package to mail because *envelope.Envelope didn't …
flashmob Feb 25, 2017
2f62ce7
return ErrProcessorNotFound error if procesor not found
flashmob Feb 25, 2017
98b297c
update readme to include info about the new backend system
flashmob Feb 25, 2017
0966eaa
- debugger processor name, make case insensitive
flashmob Feb 26, 2017
407dcd3
limit helo to 16 characters logged, update README
jordanschalm Feb 26, 2017
2a12a28
fix slice bound error
jordanschalm Feb 26, 2017
bc52714
- tweak debug messages
flashmob Feb 26, 2017
83f98cc
- clean up comments
flashmob Feb 27, 2017
224edf4
fix broken build
flashmob Feb 27, 2017
a0e78f7
Add dashboard deps and build to makefile
jordanschalm Feb 27, 2017
b1fff47
update clean in makefile
jordanschalm Feb 28, 2017
6e9d2ca
update build process in readme
jordanschalm Feb 28, 2017
c359a95
- Backend shutdown process re-written to use channels (workStoppers)…
flashmob Mar 3, 2017
3a31bd2
remove unneeded import from test
flashmob Mar 3, 2017
74b0540
fix format error string
flashmob Mar 3, 2017
7188363
update readme
flashmob Mar 3, 2017
28d7542
change log re-open signal to SIGUSR1 instead of SIGHUP
flashmob Mar 3, 2017
e9878f3
make sure to register to get SIGUSR1 signal
flashmob Mar 3, 2017
df08327
- moved backed config from CmdConfig to AppConfig, including backend …
flashmob Mar 5, 2017
4b9a781
- New API (with default configuration options)
flashmob Mar 7, 2017
7043262
refactor serve.go to use the new API
flashmob Mar 7, 2017
7d6d1fd
* api tests
flashmob Mar 8, 2017
565c3f9
use the makefile to run test
flashmob Mar 8, 2017
d4bcc13
make log public
flashmob Mar 9, 2017
3cb670d
- Envelopes now have their own pool. This is so that if processing ta…
flashmob Mar 10, 2017
a452c06
add testrace to makefile
flashmob Mar 10, 2017
ef7202c
update sample config
flashmob Mar 10, 2017
29db21a
separate process stack for validating recipients - it means it use a …
flashmob Mar 11, 2017
b532c91
- remove /sloonz/go-qprintable and replace with go stdlib equivalent
flashmob Mar 11, 2017
e82cf80
Readme update: edit description and add features list
flashmob Mar 11, 2017
5427265
merge backend refactor into dashboard and fix conflicts. Also fix a b…
jordanschalm Mar 12, 2017
dd8dceb
update gitignore in test
jordanschalm Mar 12, 2017
9e50efb
add dashboard glide deps
jordanschalm Mar 13, 2017
bfe58e2
Merge branch 'master' into dashboard
flashmob Mar 21, 2017
e3b8469
fix merge conflicts
flashmob Mar 21, 2017
8fcb6c2
- add Stop function
flashmob Mar 23, 2017
2ff69cf
- fix deadlock for when the http server for dashboard fails and Stop…
flashmob Mar 23, 2017
abf5280
fix statik build
flashmob Mar 23, 2017
ebf74b8
Fix dashboard target in Makefile
jordanschalm Mar 23, 2017
5c46f53
Fix bug where JS console prints error on INIT message from dashboard …
jordanschalm Mar 23, 2017
f1521b0
add a simulation test for the dashboard
flashmob Mar 24, 2017
b44b3bf
Merge branch 'dashboard' of github.com:flashmob/go-guerrilla into das…
flashmob Mar 24, 2017
ea7149d
remove line limit from dashboard simulation test
flashmob Mar 24, 2017
7e86a4e
- split hook to hook.go
flashmob Mar 25, 2017
21e9d2a
create heap structure for connection records
jordanschalm Apr 11, 2017
96340cd
Merge branch 'master' into dashboard
flashmob May 31, 2018
b1b7061
fix imports
flashmob May 31, 2018
ef7478b
fix deadlock
flashmob May 31, 2018
e1c505b
Merge branch 'master' into dashboard
flashmob Jan 30, 2019
2b4a8cd
- update readme
flashmob Jan 31, 2019
c5ca3e5
fix deadlog caused by debuf message that wasn't removed
flashmob Jan 31, 2019
b314384
fix race condition when incrementing number of clients
flashmob Feb 1, 2019
339f7df
fix tests
flashmob Feb 1, 2019
666750d
update dashboard readme
flashmob Feb 1, 2019
2e7c6db
add screenshot
flashmob Feb 1, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
- backend logger refactoring
  • Loading branch information
flashmob committed Feb 14, 2017
commit 0c84f5e69412a71d72dac22aa5dc6acbc99805b8
45 changes: 16 additions & 29 deletions backends/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
)

var (
mainlog log.Logger
Service *BackendService
// deprecated backends system
backends = map[string]Backend{}
Expand All @@ -37,39 +37,16 @@ type Backend interface {
Shutdown() error
}

/*
type Worker interface {
// start save mail worker(s)
saveMailWorker(chan *savePayload)
// get the number of workers that will be stared
getNumberOfWorkers() int
// test database settings, permissions, correct paths, etc, before starting workers
// parse the configuration files
loadConfig(BackendConfig) error

Shutdown() error
Process(*envelope.Envelope) BackendResult
Initialize(BackendConfig) error

SetProcessors(p ...Decorator)
}
*/
type BackendConfig map[string]interface{}

// All config structs extend from this
type baseConfig interface{}

type saveStatus struct {
err error
hash string
}

type savePayload struct {
mail *envelope.Envelope
//from *envelope.EmailAddress
//recipient *envelope.EmailAddress
savedNotify chan *saveStatus
}

// BackendResult represents a response to an SMTP client after receiving DATA.
// The String method should return an SMTP message ready to send back to the
// client, for example `250 OK: Message received`.
Expand Down Expand Up @@ -129,13 +106,23 @@ func (s Shutdown) Shutdown() error {
}

type BackendService struct {
ProcessorHandlers
Initializers []ProcessorInitializer
Shutdowners []ProcessorShutdowner
sync.Mutex
mainlog atomic.Value
}

type ProcessorHandlers struct {
Initializers []ProcessorInitializer
Shutdowners []ProcessorShutdowner
// Get loads the log.logger in an atomic operation. Returns a stderr logger if not able to load
func Log() log.Logger {
if v, ok := Service.mainlog.Load().(log.Logger); ok {
return v
}
l, _ := log.GetLogger(log.OutputStderr.String())
return l
}

func (b *BackendService) StoreMainlog(l log.Logger) {
b.mainlog.Store(l)
}

// AddInitializer adds a function that impliments ProcessorShutdowner to be called when initializing
Expand Down
34 changes: 23 additions & 11 deletions backends/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,24 @@ type BackendGateway struct {
w *Worker
b Backend
// controls access to state
stateGuard sync.Mutex
State backendState
config BackendConfig
gwConfig *GatewayConfig
sync.Mutex
State backendState
config BackendConfig
gwConfig *GatewayConfig
}

type GatewayConfig struct {
WorkersSize int `json:"save_workers_size,omitempty"`
ProcessorLine string `json:"process_line,omitempty"`
}

// savePayload is what get placed on the BackendGateway.saveMailChan channel
type savePayload struct {
mail *envelope.Envelope
// savedNotify is used to notify that the save operation completed
savedNotify chan *saveStatus
}

// possible values for state
const (
BackendStateRunning = iota
Expand All @@ -51,7 +58,7 @@ func (s backendState) String() string {
// New retrieve a backend specified by the backendName, and initialize it using
// backendConfig
func New(backendName string, backendConfig BackendConfig, l log.Logger) (Backend, error) {
mainlog = l
Service.StoreMainlog(l)
gateway := &BackendGateway{config: backendConfig}
if backend, found := backends[backendName]; found {
gateway.b = backend
Expand Down Expand Up @@ -83,17 +90,19 @@ func (gw *BackendGateway) Process(e *envelope.Envelope) BackendResult {
return NewBackendResult(response.Canned.SuccessMessageQueued + status.hash)

case <-time.After(time.Second * 30):
mainlog.Infof("Backend has timed out")
Log().Infof("Backend has timed out")
return NewBackendResult(response.Canned.FailBackendTimeout)
}

}

// Shutdown shuts down the backend and leaves it in BackendStateShuttered state
func (gw *BackendGateway) Shutdown() error {
gw.stateGuard.Lock()
defer gw.stateGuard.Unlock()
gw.Lock()
defer gw.Unlock()
if gw.State != BackendStateShuttered {
close(gw.saveMailChan) // workers will stop
// wait for workers to stop
gw.wg.Wait()
Service.Shutdown()
gw.State = BackendStateShuttered
Expand All @@ -103,6 +112,8 @@ func (gw *BackendGateway) Shutdown() error {

// Reinitialize starts up a backend gateway that was shutdown before
func (gw *BackendGateway) Reinitialize() error {
gw.Lock()
defer gw.Unlock()
if gw.State != BackendStateShuttered {
return errors.New("backend must be in BackendStateshuttered state to Reinitialize")
}
Expand All @@ -114,7 +125,7 @@ func (gw *BackendGateway) Reinitialize() error {
return err
}

// newProcessorLine creates a new stack of decorators and returns as a single Processor
// newProcessorLine creates a new call-stack of decorators and returns as a single Processor
// Decorators are functions of Decorator type, source files prefixed with p_*
// Each decorator does a specific task during the processing stage.
// This function uses the config value process_line to figure out which Decorator to use
Expand Down Expand Up @@ -146,8 +157,10 @@ func (gw *BackendGateway) loadConfig(cfg BackendConfig) error {
return nil
}

// Initialize builds the workers and starts each worker in a thread
// Initialize builds the workers and starts each worker in a goroutine
func (gw *BackendGateway) Initialize(cfg BackendConfig) error {
gw.Lock()
defer gw.Unlock()
err := gw.loadConfig(cfg)
if err == nil {
workersSize := gw.getNumberOfWorkers()
Expand All @@ -170,7 +183,6 @@ func (gw *BackendGateway) Initialize(cfg BackendConfig) error {
gw.wg.Done()
}(i)
}

} else {
gw.State = BackendStateError
}
Expand Down
38 changes: 20 additions & 18 deletions backends/guerrilla_db_redis.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package backends

// This backend is presented here as an example only, please modify it to your needs.
//
// Deprecated: as of 14th Feb 2017, backends are composed via config, by chaining Processors (files prefixed with p_*)
//
// The backend stores the email data in Redis.
// Other meta-information is stored in MySQL to be joined later.
// A lot of email gets discarded without viewing on Guerrilla Mail,
Expand Down Expand Up @@ -31,7 +34,6 @@ import (
"bytes"
"compress/zlib"
"database/sql"
_ "github.com/go-sql-driver/mysql"

"github.com/flashmob/go-guerrilla/envelope"
"github.com/go-sql-driver/mysql"
Expand Down Expand Up @@ -178,7 +180,7 @@ func (g *GuerrillaDBAndRedisBackend) prepareInsertQuery(rows int, db *sql.DB) *s
}
stmt, sqlErr := db.Prepare(sqlstr)
if sqlErr != nil {
mainlog.WithError(sqlErr).Fatalf("failed while db.Prepare(INSERT...)")
Log().WithError(sqlErr).Fatalf("failed while db.Prepare(INSERT...)")
}
// cache it
g.cache[rows-1] = stmt
Expand All @@ -190,22 +192,22 @@ func (g *GuerrillaDBAndRedisBackend) doQuery(c int, db *sql.DB, insertStmt *sql.
defer func() {
if r := recover(); r != nil {
//logln(1, fmt.Sprintf("Recovered in %v", r))
mainlog.Error("Recovered form panic:", r, string(debug.Stack()))
Log().Error("Recovered form panic:", r, string(debug.Stack()))
sum := 0
for _, v := range *vals {
if str, ok := v.(string); ok {
sum = sum + len(str)
}
}
mainlog.Errorf("panic while inserting query [%s] size:%d, err %v", r, sum, execErr)
Log().Errorf("panic while inserting query [%s] size:%d, err %v", r, sum, execErr)
panic("query failed")
}
}()
// prepare the query used to insert when rows reaches batchMax
insertStmt = g.prepareInsertQuery(c, db)
_, execErr = insertStmt.Exec(*vals...)
if execErr != nil {
mainlog.WithError(execErr).Error("There was a problem the insert")
Log().WithError(execErr).Error("There was a problem the insert")
}
}

Expand Down Expand Up @@ -239,7 +241,7 @@ func (g *GuerrillaDBAndRedisBackend) insertQueryBatcher(feeder chan []interface{
}
defer func() {
if r := recover(); r != nil {
mainlog.Error("insertQueryBatcher caught a panic", r)
Log().Error("insertQueryBatcher caught a panic", r)
}
}()
// Keep getting values from feeder and add to batch.
Expand All @@ -251,14 +253,14 @@ func (g *GuerrillaDBAndRedisBackend) insertQueryBatcher(feeder chan []interface{
// it may panic when reading on a closed feeder channel. feederOK detects if it was closed
case row, feederOk := <-feeder:
if row == nil {
mainlog.Info("Query batchaer exiting")
Log().Info("Query batchaer exiting")
// Insert any remaining rows
insert(count)
return feederOk
}
vals = append(vals, row...)
count++
mainlog.Debug("new feeder row:", row, " cols:", len(row), " count:", count, " worker", workerId)
Log().Debug("new feeder row:", row, " cols:", len(row), " count:", count, " worker", workerId)
if count >= GuerrillaDBAndRedisBatchMax {
insert(GuerrillaDBAndRedisBatchMax)
}
Expand Down Expand Up @@ -299,15 +301,15 @@ func (g *GuerrillaDBAndRedisBackend) mysqlConnect() (*sql.DB, error) {
Params: map[string]string{"collation": "utf8_general_ci"},
}
if db, err := sql.Open("mysql", conf.FormatDSN()); err != nil {
mainlog.Error("cannot open mysql", err)
Log().Error("cannot open mysql", err)
return nil, err
} else {
return db, nil
}

}

func (g *GuerrillaDBAndRedisBackend) saveMailWorker_old(saveMailChan chan *savePayload) {
func (g *GuerrillaDBAndRedisBackend) saveMailWorker(saveMailChan chan *savePayload) {
var to, body string

var redisErr error
Expand All @@ -319,31 +321,31 @@ func (g *GuerrillaDBAndRedisBackend) saveMailWorker_old(saveMailChan chan *saveP
var err error
db, err = g.mysqlConnect()
if err != nil {
mainlog.Fatalf("cannot open mysql: %s", err)
Log().Fatalf("cannot open mysql: %s", err)
}

// start the query SQL batching where we will send data via the feeder channel
feeder := make(chan []interface{}, 1)
go func() {
for {
if feederOK := g.insertQueryBatcher(feeder, db); !feederOK {
mainlog.Debug("insertQueryBatcher exited")
Log().Debug("insertQueryBatcher exited")
return
}
// if insertQueryBatcher panics, it can recover and go in again
mainlog.Debug("resuming insertQueryBatcher")
Log().Debug("resuming insertQueryBatcher")
}

}()

defer func() {
if r := recover(); r != nil {
//recover form closed channel
mainlog.Error("panic recovered in saveMailWorker", r)
Log().Error("panic recovered in saveMailWorker", r)
}
db.Close()
if redisClient.conn != nil {
mainlog.Infof("closed redis")
Log().Infof("closed redis")
redisClient.conn.Close()
}
// close the feeder & wait for query batcher to exit.
Expand All @@ -358,10 +360,10 @@ func (g *GuerrillaDBAndRedisBackend) saveMailWorker_old(saveMailChan chan *saveP
for {
payload := <-saveMailChan
if payload == nil {
mainlog.Debug("No more saveMailChan payload")
Log().Debug("No more saveMailChan payload")
return
}
mainlog.Debug("Got mail from chan", payload.mail.RemoteAddress)
Log().Debug("Got mail from chan", payload.mail.RemoteAddress)
to = trimToLimit(strings.TrimSpace(payload.mail.RcptTo[0].User)+"@"+g.config.PrimaryHost, 255)
payload.mail.Helo = trimToLimit(payload.mail.Helo, 255)
host := trimToLimit(payload.mail.RcptTo[0].Host, 255)
Expand Down Expand Up @@ -395,7 +397,7 @@ func (g *GuerrillaDBAndRedisBackend) saveMailWorker_old(saveMailChan chan *saveP
data.clear() // blank
}
} else {
mainlog.WithError(redisErr).Warn("Error while connecting redis")
Log().WithError(redisErr).Warn("Error while connecting redis")
}

vals = []interface{}{} // clear the vals
Expand Down
4 changes: 2 additions & 2 deletions backends/p_debugger.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ func Debugger() Decorator {
return func(c Processor) Processor {
return ProcessorFunc(func(e *envelope.Envelope) (BackendResult, error) {
if config.LogReceivedMails {
mainlog.Infof("Mail from: %s / to: %v", e.MailFrom.String(), e.RcptTo)
mainlog.Info("Headers are:", e.Header)
Log().Infof("Mail from: %s / to: %v", e.MailFrom.String(), e.RcptTo)
Log().Info("Headers are:", e.Header)
}
// continue to the next Processor in the decorator chain
return c.Process(e)
Expand Down
Loading