Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
132 commits
Select commit Hold shift + click to select a range
cf7c68c
wip
flashmob Feb 18, 2019
0b7ccf8
wip
flashmob Feb 20, 2019
75a1230
add streams based processing, implementing io.Writer
flashmob Feb 21, 2019
f35b5cd
remove unnecessary functions
flashmob Feb 21, 2019
1367b5d
- add a stream processor that can parse headers on-the-fly (s_headers…
flashmob Feb 22, 2019
2faf37a
- and visible line feed to debug message <LF?
flashmob Feb 25, 2019
404f0f4
new mime reader wip
flashmob Mar 15, 2019
8e6ecb6
boundary detection tests
flashmob Mar 15, 2019
0ec041d
milestone: tree structure correct
flashmob Mar 22, 2019
6db20b9
milestone - counts are correct
flashmob Mar 30, 2019
9de6e5d
make the max header bytes limit configurable
flashmob Apr 22, 2019
b715b24
end of boundary returned as error
flashmob Apr 28, 2019
ceebf82
fixed boundary ending position marker
flashmob Apr 30, 2019
3375b86
move mime parser to its own package
flashmob May 24, 2019
5ca3365
begin work on chunk saver
flashmob Jun 5, 2019
ebac03b
it gives the correct result when buffer is 256 bytes
flashmob Jun 6, 2019
e3f1d0a
bug fixes
flashmob Jun 8, 2019
03ad4ac
don't consider "\n" part of boundary
flashmob Jun 8, 2019
e57359f
stringify ContentType fully.
flashmob Jun 9, 2019
fb8dcd6
- do not expose sync.Mutex
flashmob Jun 12, 2019
61481f1
- empty content-type param parsing
flashmob Jun 12, 2019
b541fd4
- parse emails with \r\n line endings (ignore \r)
flashmob Jun 15, 2019
e779d0d
forgot to incr p.msgPos
flashmob Jun 15, 2019
e9b44d6
dont need line counts
flashmob Jun 15, 2019
2adaadd
change content type params to slice instead of map
flashmob Jun 15, 2019
e671984
new open method, cleanup, normalization of header names
flashmob Jun 18, 2019
ff26a3b
dont report notMime error unless there are 0 parts
flashmob Jun 18, 2019
9f5c14f
compatibility fixes
flashmob Jun 22, 2019
62f30a7
alternative mime parser
flashmob Jun 30, 2019
943124e
do not nest if content-boundary of child is equal to parent
flashmob Jun 30, 2019
a563b6c
almost
flashmob Jul 3, 2019
1b3c550
3rd rewrite
flashmob Jul 7, 2019
59311ca
it works
flashmob Jul 12, 2019
9ed261d
hooray, it works
flashmob Jul 12, 2019
5e3eb79
- added stream_buffer_size setting to the config
flashmob Jul 13, 2019
190c862
- default stream buffer size of 4096
flashmob Aug 13, 2019
956c962
forgot to commit
flashmob Aug 13, 2019
2d0ca8c
- improved comments
flashmob Aug 15, 2019
a7202c5
- chunksaver WIP
flashmob Aug 20, 2019
6f3c254
don't output email in test
flashmob Aug 22, 2019
1a14ab3
- chunksaver is taking shape!
flashmob Aug 28, 2019
ec97c4f
- chunksaver is now chunks emails correctly
flashmob Sep 6, 2019
92d5062
- memory based storage for chunk saver
flashmob Sep 7, 2019
79b40c7
chunk reader taking shape
flashmob Sep 7, 2019
da6289a
flush() error checking
flashmob Sep 7, 2019
72acc2b
change scope for the stream processor property
flashmob Sep 7, 2019
6424c21
wip
flashmob Sep 9, 2019
fead324
- mime parser now implements io.Writer for more flexibility
flashmob Sep 9, 2019
2fe316c
zlib compression
flashmob Sep 9, 2019
c247b29
looks like parts info is pointing to the wrong chunks
flashmob Sep 10, 2019
0266a63
- chunk storage info compression
flashmob Sep 12, 2019
7875164
- commenting
flashmob Sep 13, 2019
fca7ac5
decoder wip
flashmob Sep 15, 2019
392dbf8
decoder wip
flashmob Sep 15, 2019
ab9b189
decoder wip (now reading using the cache)
flashmob Sep 18, 2019
23c8cf5
- pool the buffer for compressing the partsinfo json
flashmob Sep 18, 2019
3200d2c
use the memory storage engine when testing
flashmob Sep 18, 2019
f39b587
refactoring
flashmob Sep 19, 2019
6535833
refactor chunksaver to its own package
flashmob Sep 19, 2019
fd6bce0
simplify names
flashmob Sep 19, 2019
1efc919
refactoring, delete workdir
flashmob Sep 19, 2019
3528fbc
- add transport type to email (8bitmime or 7bit)
flashmob Sep 23, 2019
2942aff
- fix test
flashmob Sep 23, 2019
6b95595
remove the need for a temp buffer
flashmob Sep 24, 2019
3184a6d
wip
flashmob Sep 29, 2019
614a5a4
transform working
flashmob Sep 29, 2019
723b34f
comments
flashmob Sep 29, 2019
b8ed1a3
more robust charset processing / transforming
flashmob Sep 30, 2019
c87be0e
debug decoding
flashmob Oct 20, 2019
7ff74c5
removed unused properties
flashmob Oct 30, 2019
5fe9ede
add EndingPos counters
flashmob Oct 31, 2019
276f2e4
milestone: email transforming working
flashmob Dec 2, 2019
a99f336
merged in the latest master & resolve any conflicts
flashmob Dec 3, 2019
4e2fd4a
fix broken test
flashmob Dec 3, 2019
7670f69
remove test artifact after test
flashmob Dec 3, 2019
9383198
dont need these files
flashmob Jan 5, 2020
e680793
delete
flashmob Jan 5, 2020
9a24d7a
Merge branch 'master' into stream
flashmob Jan 5, 2020
dc792e9
resolve conflicting changes from master
flashmob Jan 5, 2020
7494c08
remove unnecessary \r
flashmob Jan 5, 2020
c53a59b
Merge branch 'master' into stream
flashmob Jan 5, 2020
aceffad
fix tests (after using the smtpReader, a new one needs to be re-spawn…
flashmob Jan 10, 2020
455e0a2
mime() no longer returns io.EOF when no errors happened, but nil inst…
flashmob Jan 10, 2020
2045cb5
"log" import alias was too confusing, changed to "loglib"
flashmob May 12, 2020
219b14a
close smtpReader after finishing reading from the DATA command
flashmob May 12, 2020
6563324
don't need these files
flashmob May 13, 2020
08774d5
undo
flashmob May 13, 2020
f932390
dont need this file
flashmob May 13, 2020
b84a955
refactor and merge type declarations to one file, remove unused decla…
flashmob May 13, 2020
c0f1c32
write mime parsing errors to the log
flashmob May 13, 2020
daa2510
use named struct fields instead of map for MimeParts and MessageID, p…
flashmob May 21, 2020
9ed7a08
fix up tests
flashmob May 21, 2020
18d4a37
fix up tests
flashmob May 21, 2020
572c32e
multiple backends, needs debugging
flashmob Jun 8, 2020
091c2bf
debugging in progress
flashmob Jun 15, 2020
bd17c23
debugging in progress 2
flashmob Jun 16, 2020
da62bb9
debugging in progress 3
flashmob Jun 16, 2020
589af8f
debugging in progress 4
flashmob Jun 17, 2020
41544ff
debugging in progress 5
flashmob Jun 19, 2020
137128d
debugging in progress 6
flashmob Jun 19, 2020
a959c32
debugging in progress 7
flashmob Jun 25, 2020
bf4a5e9
change queueID() to use hash/fnv instead of md5 (also fixed test)
flashmob Jun 26, 2020
6dba672
debugging progress 9
flashmob Jun 26, 2020
9eef28f
make the queuedid fit 32 bytes, add test case
flashmob Jun 27, 2020
4a8a962
- add "named stream processors" feature (stream processor can have mu…
flashmob Jul 3, 2020
7c1dbe4
cleanup
flashmob Jul 3, 2020
e80fa7b
don't need pointer when passing maps
flashmob Jul 3, 2020
949ff5c
don't need pointer, headerconfig can be local
flashmob Jul 3, 2020
a3f6d7c
storage engines follow the configuration pattern
flashmob Jul 6, 2020
982a6c1
comment
flashmob Jul 8, 2020
89947c4
- refactor starting workers and workDispatcher so now a unique workDi…
flashmob Jul 9, 2020
304ecca
- further refactoring to workDispatcher and developing background pos…
flashmob Jul 11, 2020
5610107
envelope: use WaitGroup instead of lock
flashmob Jul 14, 2020
945bb6d
gateway: each stream processor has its own buffer
flashmob Jul 14, 2020
2ebd1b6
try using reflection
flashmob Jul 14, 2020
59bf8f7
start gateway workers: incorrectly passed reflection value instead of…
flashmob Jul 14, 2020
9e5bd99
configuration: new backend gateway config values & defaults
flashmob Jul 17, 2020
49345fc
this bug won't be stored for a 1000 years in the arctic
flashmob Jul 17, 2020
8f860d1
use serverID instead of interface string for generating queuedID
flashmob Jul 18, 2020
b103deb
queuedID packs the seeds (3 uint64) in to a single 192 bit number bef…
flashmob Jul 18, 2020
55c0609
eliminate allocations
flashmob Jul 18, 2020
40185d0
- backend config struct uses typed sections
flashmob Jul 21, 2020
7f22655
more structured logging
flashmob Jul 22, 2020
7a50712
add logging from the dashboard branch
flashmob Jul 22, 2020
9f9bf90
logging: log more details in the DATA command, including the length o…
flashmob Jul 23, 2020
210353d
refactoring: protocol types gets a formal type & other small renames
flashmob Jul 27, 2020
0333b75
sql store: reading and writing emails to an sql db
flashmob Aug 1, 2020
5cb7bcc
sql store: mysql driver basic functionality
flashmob Aug 3, 2020
a6244bc
mimeparse: refactored errors, added custom error types
flashmob Aug 4, 2020
076e0aa
add mime parse error
flashmob Aug 6, 2020
f9053d7
- prepare queries (without the map)
flashmob Aug 7, 2020
a05030a
- sql store zlib compression support
flashmob Aug 8, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
- refactor starting workers and workDispatcher so now a unique workDi…
…spatcher is started for each processor type

- new ValidatingProcessor type
- removed "gw" prefix form gateway config options
  • Loading branch information
flashmob committed Jul 9, 2020
commit 89947c4399178ec5695aa76989c35fc52eedf8fe
9 changes: 7 additions & 2 deletions api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1063,7 +1063,7 @@ func TestStreamProcessor(t *testing.T) {
}

func TestStreamProcessorBackground(t *testing.T) {

return
if err := os.Truncate("tests/testlog", 0); err != nil {
t.Error(err)
}
Expand All @@ -1087,6 +1087,11 @@ func TestStreamProcessorBackground(t *testing.T) {
"stream_save_process": "mimeanalyzer|chunksaver",
"post_process_consumer": "Header|headersparser|compress|Decompress|debug",
"post_process_producer": "chunksaver",
"post_process_backlog": 100,
"stream_buffer_size": 1024,
"save_workers_size": 8,
"save_timeout": "1s",
"val_rcpt_timeout": "2s",
},
},
},
Expand Down Expand Up @@ -1554,7 +1559,7 @@ func TestStreamChunkSaver(t *testing.T) {
"default": {
"save_process": "HeadersParser|Debugger",
"stream_save_process": "mimeanalyzer|chunksaver",
"gw_save_timeout": "5",
"save_timeout": "5",
},
},
},
Expand Down
103 changes: 64 additions & 39 deletions backends/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,16 @@ type BackendGateway struct {
// name is the name of the gateway given in the config
name string
// channel for distributing envelopes to workers
conveyor chan *workerMsg
conveyor chan *workerMsg
conveyorValidation chan *workerMsg
conveyorStream chan *workerMsg
conveyorStreamBg chan *workerMsg

// waits for backend workers to start/stop
wg sync.WaitGroup
workStoppers []chan bool
processors []Processor
validators []Processor
validators []ValidatingProcessor
streamers []streamer

// controls access to state
Expand All @@ -48,9 +51,9 @@ type GatewayConfig struct {
// ValidateProcess is like ProcessorStack, but for recipient validation tasks
ValidateProcess string `json:"validate_process,omitempty"`
// TimeoutSave is duration before timeout when saving an email, eg "29s"
TimeoutSave string `json:"gw_save_timeout,omitempty"`
TimeoutSave string `json:"save_timeout,omitempty"`
// TimeoutValidateRcpt duration before timeout when validating a recipient, eg "1s"
TimeoutValidateRcpt string `json:"gw_val_rcpt_timeout,omitempty"`
TimeoutValidateRcpt string `json:"val_rcpt_timeout,omitempty"`
// StreamSaveProcess is same as a ProcessorStack, but reads from an io.Reader to write email data
StreamSaveProcess string `json:"stream_save_process,omitempty"`
// StreamBufferLen controls the size of the output buffer, in bytes. Default is 4096
Expand Down Expand Up @@ -137,9 +140,9 @@ const (
BackendStateError
BackendStateInitialized

// default timeout for saving email, if 'gw_save_timeout' not present in config
// default timeout for saving email, if 'save_timeout' not present in config
saveTimeout = time.Second * 30
// default timeout for validating rcpt to, if 'gw_val_rcpt_timeout' not present in config
// default timeout for validating rcpt to, if 'val_rcpt_timeout' not present in config
validateRcptTimeout = time.Second * 5
defaultProcessor = "Debugger"

Expand Down Expand Up @@ -267,7 +270,7 @@ func (gw *BackendGateway) ValidateRcpt(e *mail.Envelope) RcptError {
workerMsg := workerMsgPool.Get().(*workerMsg)
defer workerMsgPool.Put(workerMsg)
workerMsg.reset(e, TaskValidateRcpt)
gw.conveyor <- workerMsg
gw.conveyorValidation <- workerMsg
// wait for the validation to complete
// or timeout
select {
Expand Down Expand Up @@ -306,7 +309,7 @@ func (gw *BackendGateway) ProcessStream(r io.Reader, e *mail.Envelope) (Result,
workerMsg.reset(e, TaskSaveMailStream)
workerMsg.r = r
// place on the channel so that one of the save mail workers can pick it up
gw.conveyor <- workerMsg
gw.conveyorStream <- workerMsg
// wait for the save to complete
// or timeout
select {
Expand Down Expand Up @@ -481,7 +484,7 @@ func (gw *BackendGateway) Initialize(cfg BackendConfig) error {
return errors.New("must have at least 1 worker")
}
gw.processors = make([]Processor, 0)
gw.validators = make([]Processor, 0)
gw.validators = make([]ValidatingProcessor, 0)
gw.streamers = make([]streamer, 0)
for i := 0; i < workersSize; i++ {
p, err := gw.newStack(gw.gwConfig.SaveProcess)
Expand Down Expand Up @@ -514,6 +517,15 @@ func (gw *BackendGateway) Initialize(cfg BackendConfig) error {
if gw.conveyor == nil {
gw.conveyor = make(chan *workerMsg, workersSize)
}
if gw.conveyorValidation == nil {
gw.conveyorValidation = make(chan *workerMsg, workersSize)
}
if gw.conveyorStream == nil {
gw.conveyorStream = make(chan *workerMsg, workersSize)
}
if gw.conveyorStreamBg == nil {
gw.conveyorStreamBg = make(chan *workerMsg, workersSize)
}

size := streamBufferSize
if gw.gwConfig.StreamBufferSize > 0 {
Expand All @@ -534,38 +546,41 @@ func (gw *BackendGateway) Start() error {
workersSize := gw.workersSize()
// make our slice of channels for stopping
gw.workStoppers = make([]chan bool, 0)
// set the wait group
gw.wg.Add(workersSize)

for i := 0; i < workersSize; i++ {
stop := make(chan bool)
go func(workerId int, stop chan bool) {
// blocks here until the worker exits
// for-loop used so that if workDispatcher panics, re-enter gw.workDispatcher
for {
state := gw.workDispatcher(
gw.conveyor,
gw.processors[workerId],
gw.validators[workerId],
gw.streamers[workerId],
workerId+1,
stop)
// keep running after panic
if state != dispatcherStatePanic {
break
}
}
gw.wg.Done()
}(i, stop)
gw.workStoppers = append(gw.workStoppers, stop)
}
gw.startWorkers(gw.conveyor, workersSize, gw.processors)
gw.startWorkers(gw.conveyorValidation, workersSize, gw.validators)
gw.startWorkers(gw.conveyorStream, workersSize, gw.streamers)
gw.State = BackendStateRunning
return nil
} else {
return fmt.Errorf("cannot start backend because it's in %s state", gw.State)
}
}

func (gw *BackendGateway) startWorkers(conveyor chan *workerMsg, workersSize int, processors interface{}) {
// set the wait group
gw.wg.Add(workersSize)
for i := 0; i < workersSize; i++ {
stop := make(chan bool)
go func(workerId int, stop chan bool) {
// blocks here until the worker exits
// for-loop used so that if workDispatcher panics, re-enter gw.workDispatcher
for {
state := gw.workDispatcher(
conveyor,
processors,
workerId,
stop)
// keep running after panic
if state != dispatcherStatePanic {
break
}
}
gw.wg.Done()
}(i, stop)
gw.workStoppers = append(gw.workStoppers, stop)
}
}

// workersSize gets the number of workers to use for saving email by reading the save_workers_size config value
// Returns 1 if no config value was set
func (gw *BackendGateway) workersSize() int {
Expand Down Expand Up @@ -611,13 +626,23 @@ const (

func (gw *BackendGateway) workDispatcher(
workIn chan *workerMsg,
save Processor,
validate Processor,
stream streamer,
processors interface{},
workerId int,
stop chan bool) (state dispatcherState) {

var msg *workerMsg
var save Processor
var validate ValidatingProcessor
var stream streamer

switch v := processors.(type) {
case []Processor:
save = v[workerId]
case []ValidatingProcessor:
validate = v[workerId]
case []streamer:
stream = v[workerId]
}

defer func() {

Expand All @@ -637,13 +662,13 @@ func (gw *BackendGateway) workDispatcher(

}()
state = dispatcherStateIdle
Log().Fields("id", workerId, "gateway", gw.name).
Log().Fields("id", workerId+1, "gateway", gw.name).
Infof("processing worker started")
for {
select {
case <-stop:
state = dispatcherStateStopped
Log().Infof("stop signal for worker (#%d)", workerId)
Log().Infof("stop signal for worker (#%d)", workerId+1)
return
case msg = <-workIn:
state = dispatcherStateWorking // recovers from panic if in this state
Expand Down
4 changes: 4 additions & 0 deletions backends/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,7 @@ func (w DefaultStreamProcessor) Write(p []byte) (n int, err error) {

// NoopStreamProcessor does nothing, it's a placeholder when no stream processors have been configured
type NoopStreamProcessor struct{ DefaultStreamProcessor }

type ValidatingProcessor interface {
Processor
}
4 changes: 2 additions & 2 deletions goguerrilla.conf.sample
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
"save_workers_size": 1,
"save_process" : "HeadersParser|Header|Debugger",
"primary_mail_host" : "mail.example.com",
"gw_save_timeout" : "30s",
"gw_val_rcpt_timeout" : "3s"
"save_timeout" : "30s",
"val_rcpt_timeout" : "3s"
},
"servers" : [
{
Expand Down
8 changes: 4 additions & 4 deletions server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -939,8 +939,8 @@ func TestGatewayTimeout(t *testing.T) {
defer cleanTestArtifacts(t)

bcfg := backends.BackendConfig{}
bcfg.SetValue(backends.ConfigGateways, backends.DefaultGateway, "gw_save_timeout", "1s")
bcfg.SetValue(backends.ConfigGateways, backends.DefaultGateway, "gw_val_rcpt_timeout", "1s")
bcfg.SetValue(backends.ConfigGateways, backends.DefaultGateway, "save_timeout", "1s")
bcfg.SetValue(backends.ConfigGateways, backends.DefaultGateway, "val_rcpt_timeout", "1s")
bcfg.SetValue(backends.ConfigGateways, backends.DefaultGateway, "save_workers_size", 1)
bcfg.SetValue(backends.ConfigGateways, backends.DefaultGateway, "save_process", "HeadersParser|Debugger")
bcfg.SetValue(backends.ConfigProcessors, "header", "primary_mail_host", "example.com")
Expand Down Expand Up @@ -1026,8 +1026,8 @@ func TestGatewayPanic(t *testing.T) {
defer cleanTestArtifacts(t)

bcfg := backends.BackendConfig{}
bcfg.SetValue(backends.ConfigGateways, backends.DefaultGateway, "gw_save_timeout", "2s")
bcfg.SetValue(backends.ConfigGateways, backends.DefaultGateway, "gw_val_rcpt_timeout", "2s")
bcfg.SetValue(backends.ConfigGateways, backends.DefaultGateway, "save_timeout", "2s")
bcfg.SetValue(backends.ConfigGateways, backends.DefaultGateway, "val_rcpt_timeout", "2s")
bcfg.SetValue(backends.ConfigGateways, backends.DefaultGateway, "save_workers_size", 1)
bcfg.SetValue(backends.ConfigGateways, backends.DefaultGateway, "save_process", "HeadersParser|Debugger")
bcfg.SetValue(backends.ConfigProcessors, "header", "primary_mail_host", "example.com")
Expand Down