Skip to content

Commit e993b4d

Browse files
committed
add more file and reader functionality for crypto-reader project
1 parent 2c36f56 commit e993b4d

File tree

4 files changed

+334
-1
lines changed

4 files changed

+334
-1
lines changed
Lines changed: 250 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,251 @@
11
package crypto
2+
3+
import (
4+
"bufio"
5+
"errors"
6+
"fmt"
7+
"io"
8+
"math"
9+
"os"
10+
"regexp"
11+
"strings"
12+
"time"
13+
)
14+
15+
const (
16+
addressGroupName = "address"
17+
operationGroupName = "operation"
18+
fromCoinGroupName = "from_coin"
19+
toCoinGroupName = "to_coin"
20+
fromToNumberCoinGroupName = "from_to_number"
21+
amountCoinGroupName = "amount_coin"
22+
amountNumberGroupName = "amount_number"
23+
feePercentGroupName = "fee_percent"
24+
feeAmountGroupName = "fee_amount"
25+
feeCurrencyGroupName = "fee_currency"
26+
fixedFeeGroupName = "fixed_fee"
27+
datetimeGroupName = "datetime"
28+
dateTimeFormat = "02/Jan/2006:15:04:05 -0700"
29+
)
30+
31+
var errInvalidLogFormat = errors.New("invalid log format")
32+
33+
// NewFile wraps an os.File creating a special apache common log format regex
34+
// and adding useful helper functions such as seekLine and search for easier working with log files
35+
func NewFile(file *os.File) *File {
36+
// start
37+
regExString := `^`
38+
// 0xeeaFf5e4B8B488303A9F1db36edbB9d73b38dFcf - crypto address
39+
regExString += fmt.Sprintf(`(?P<%s>\S+) `, addressGroupName)
40+
// BUY | SELL | CONVERT | WITHDRAW - operation type
41+
regExString += fmt.Sprintf(`(?P<%s>\S+) `, operationGroupName)
42+
// BTC/USD:26782.60 - COIN/COIN:price | COIN/COIN:amount
43+
regExString += fmt.Sprintf(`((?P<%s>\S+)\/((?P<%s>\S+):(?P<%s>\S+))) `, fromCoinGroupName, toCoinGroupName, fromToNumberCoinGroupName)
44+
// USD:13967.95 - COIN:amount
45+
regExString += fmt.Sprintf(`((?P<%s>\S+):(?P<%s>\S+)) `, amountCoinGroupName, amountNumberGroupName)
46+
// 2%(0.02 USD) | 0% | 15USD - fee
47+
regExString += fmt.Sprintf(`((?P<%s>\d%%)*(\((?P<%s>\S+)\s(?P<%s>\S+)\))*(?P<%s>\d+\S+)*) `, feePercentGroupName, feeAmountGroupName, feeCurrencyGroupName, fixedFeeGroupName)
48+
// 03/13/2022 11:36:51 +0000 - date time
49+
regExString += fmt.Sprintf(`(?P<%s>\d{2}\/\d{2}\/\d{4} \d{2}:\d{2}:\d{2} \+\d{4})`, datetimeGroupName)
50+
// end
51+
regExString += `$`
52+
return &File{
53+
File: file,
54+
regEx: regexp.MustCompile(regExString),
55+
}
56+
}
57+
58+
// File represents a wrapped structure around os.File
59+
// providing additional constructs and helpers for working with log files
60+
type File struct {
61+
*os.File
62+
regEx *regexp.Regexp
63+
}
64+
65+
// IndexTime applies a binary search on a log file looking for
66+
// the offset of the log that is withing lookup time (that took place within the last T time).
67+
// offset >= 0 -> means an actual log line to begin reading logs at was found
68+
// offset == -1 -> all the logs inside the log file are older than the lookup time T
69+
func (file *File) IndexTime(lookupTime time.Time) (int64, error) {
70+
var top, bottom, pos, prevPos, offset, prevOffset int64
71+
scanLines := func(data []byte, atEOF bool) (advance int, token []byte, err error) {
72+
advance, token, err = bufio.ScanLines(data, atEOF)
73+
prevPos = pos
74+
pos += int64(advance)
75+
return
76+
}
77+
78+
stat, err := os.Stat(file.Name())
79+
if err != nil {
80+
return -1, err
81+
}
82+
bottom = stat.Size()
83+
var prevLogTime time.Time
84+
for top <= bottom {
85+
// define the middle relative to the top and bottom positions
86+
middle := top + (bottom-top)/2
87+
// seek the file at the middle
88+
_, err := file.Seek(middle, io.SeekStart)
89+
if err != nil {
90+
return -1, err
91+
}
92+
// reposition the middle to the beginning of the current line
93+
offset, err = file.seekLine(0, io.SeekCurrent)
94+
if err != nil {
95+
return -1, err
96+
}
97+
98+
// scan 1 line and parse 1 log line
99+
scanner := bufio.NewScanner(file)
100+
scanner.Split(scanLines)
101+
scanner.Scan()
102+
line := scanner.Text()
103+
if strings.TrimSpace(line) == "" {
104+
// we'll consider empty line an EOF
105+
break
106+
}
107+
108+
logTime, err := file.parseLogTime(line)
109+
if err != nil {
110+
return -1, err
111+
}
112+
113+
if lookupTime.Sub(logTime) > 0 {
114+
// the starting log is way down (relative to the middle)
115+
// move down the top
116+
top = offset + (pos - prevPos)
117+
} else if prevLogTime.Sub(logTime) < 0 {
118+
// the starting log is way up (relative to the middle)
119+
// move up the bottom
120+
bottom = offset - (pos - prevPos)
121+
} else if lookupTime.Sub(prevLogTime) < 0 && offset != top {
122+
if lookupTime.Minute() == logTime.Minute() {
123+
return offset - (pos - prevPos), nil
124+
}
125+
return top, nil
126+
}
127+
128+
if offset == top {
129+
if lookupTime.Minute() == logTime.Minute() || top == 0 {
130+
return top, nil
131+
}
132+
return offset - (pos - prevPos), nil
133+
}
134+
if offset == bottom {
135+
if lookupTime.Minute() > logTime.Minute() {
136+
return top, nil
137+
}
138+
return bottom, nil
139+
}
140+
if top == bottom && top == stat.Size() {
141+
return -1, nil
142+
}
143+
144+
prevLogTime = logTime
145+
prevOffset = offset
146+
}
147+
148+
if lookupTime.Unix() == prevLogTime.Unix() {
149+
return prevOffset, nil
150+
}
151+
152+
return -1, nil
153+
}
154+
155+
// seekLine resets the cursor for N lines relative to whence, back to the beginning (seek back)
156+
// lines: 0 -> means seek back (till new line) for the current line
157+
// lines > 0 -> means seek back that many lines
158+
func (file *File) seekLine(lines int64, whence int) (int64, error) {
159+
const bufferSize = 32 * 1024 // 32KB
160+
buf := make([]byte, bufferSize)
161+
bufLen := 0
162+
lines = int64(math.Abs(float64(lines)))
163+
seekBack := lines < 1
164+
lineCount := int64(0)
165+
166+
// seekBack ignores the first match lines == 0
167+
// then goes to the beginning of the current line
168+
if seekBack {
169+
lineCount = -1
170+
}
171+
172+
pos, err := file.Seek(0, whence)
173+
left := pos
174+
offset := int64(bufferSize * -1)
175+
for b := 1; ; b++ {
176+
if seekBack {
177+
// on seekBack 2nd buffer onward needs to seek
178+
// past what was just read plus another buffer size
179+
if b == 2 {
180+
offset *= 2
181+
}
182+
183+
// if next seekBack will pass beginning of file
184+
// buffer is 0 to unread position
185+
if pos+offset <= 0 {
186+
buf = make([]byte, left)
187+
left = 0
188+
pos, err = file.Seek(0, io.SeekStart)
189+
} else {
190+
left = left - bufferSize
191+
pos, err = file.Seek(offset, io.SeekCurrent)
192+
}
193+
}
194+
if err != nil {
195+
break
196+
}
197+
198+
bufLen, err = file.Read(buf)
199+
if err != nil {
200+
return file.Seek(0, io.SeekEnd)
201+
}
202+
for i := 0; i < bufLen; i++ {
203+
idx := i
204+
if seekBack {
205+
idx = bufLen - i - 1
206+
}
207+
if buf[idx] == '\n' {
208+
lineCount++
209+
}
210+
if lineCount == lines {
211+
if seekBack {
212+
return file.Seek(int64(i)*-1, io.SeekCurrent)
213+
}
214+
return file.Seek(int64(bufLen*-1+i+1), io.SeekCurrent)
215+
}
216+
}
217+
if seekBack && left == 0 {
218+
return file.Seek(0, io.SeekStart)
219+
}
220+
}
221+
222+
return pos, err
223+
}
224+
225+
// parseLogTime parses a given apache common log line and attempts to convert it into time.Time
226+
// example of apache common log line:
227+
// 127.0.0.1 user-identifier frank [04/Mar/2022:05:30:00 +0000] "GET /api/endpoint HTTP/1.0" 500 123
228+
func (file *File) parseLogTime(l string) (time.Time, error) {
229+
matches := file.regEx.FindStringSubmatch(l)
230+
if len(matches) == 0 {
231+
return time.Time{}, fmt.Errorf("line '%s': %w", l, errInvalidLogFormat)
232+
}
233+
234+
var dateTime string
235+
for i, name := range file.regEx.SubexpNames() {
236+
if name == datetimeGroupName {
237+
dateTime = matches[i]
238+
break
239+
}
240+
}
241+
if dateTime == "" {
242+
return time.Time{}, fmt.Errorf("invalid date: %w", errInvalidLogFormat)
243+
}
244+
245+
t, err := time.Parse(dateTimeFormat, dateTime)
246+
if err != nil {
247+
return time.Time{}, err
248+
}
249+
250+
return t, nil
251+
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
package crypto

mutexes/crypto-reader/crypto/reader.go

Lines changed: 82 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@ package crypto
33
import (
44
"bufio"
55
"context"
6+
"fmt"
67
"io"
78
"io/ioutil"
89
"os"
10+
"path"
911
"sort"
1012
"time"
1113
)
@@ -49,6 +51,7 @@ func NewTransactionsReader(cfg TransactionsReaderConfig) (*TransactionsReader, e
4951
type TransactionsReader struct {
5052
cfg TransactionsReaderConfig
5153
filesInfo []os.FileInfo
54+
nowFunc func() time.Time
5255
}
5356

5457
// Read reads and streams the crypto transactions to an io.Writer using the given config
@@ -61,8 +64,86 @@ func (r *TransactionsReader) Read(ctx context.Context, w io.Writer) error {
6164
}
6265
}
6366

67+
// if there are an infinite number of log files,
68+
// knowing the exact log rotation period may help
69+
// skip iterations up to the very close of the log file
6470
func (r *TransactionsReader) read(w io.Writer) error {
65-
return nil
71+
logFileIndex := -1
72+
for i, fi := range r.filesInfo {
73+
nowMinusT := r.nowFunc().Add(-r.cfg.Interval * time.Minute)
74+
if nowMinusT.Sub(fi.ModTime()) <= 0 {
75+
logFileIndex = i
76+
break
77+
}
78+
}
79+
if logFileIndex == -1 {
80+
return nil
81+
}
82+
83+
filePath := path.Join(r.cfg.Directory, r.filesInfo[logFileIndex].Name())
84+
f, err := os.Open(filePath)
85+
defer func() { _ = f.Close() }()
86+
if err != nil {
87+
return err
88+
}
89+
90+
nowMinusT := r.nowFunc().Add(-r.cfg.Interval * time.Minute)
91+
file := NewFile(f)
92+
offset, err := file.IndexTime(nowMinusT)
93+
if err != nil {
94+
return err
95+
}
96+
97+
others := r.filesInfo[logFileIndex+1 : len(r.filesInfo)]
98+
readTheRest := func() error {
99+
for _, fi := range others {
100+
file, err := os.Open(path.Join(r.cfg.Directory, fi.Name()))
101+
if err != nil {
102+
return err
103+
}
104+
105+
for line := range r.stream(file) {
106+
107+
_, err := fmt.Fprintln(w, line)
108+
if err != nil {
109+
return err
110+
}
111+
}
112+
}
113+
return nil
114+
}
115+
116+
if offset < 0 {
117+
if logFileIndex+1 >= len(r.filesInfo) {
118+
return nil
119+
}
120+
121+
nowMinusT := r.nowFunc().Add(-r.cfg.Interval * time.Minute)
122+
fi := r.filesInfo[logFileIndex+1]
123+
if nowMinusT.Sub(fi.ModTime()) > 0 {
124+
return nil
125+
}
126+
return readTheRest()
127+
}
128+
129+
_, err = f.Seek(offset, io.SeekStart)
130+
if err != nil {
131+
return err
132+
}
133+
writer := bufio.NewWriter(w)
134+
scanner := bufio.NewScanner(f)
135+
for scanner.Scan() {
136+
_, err := writer.WriteString(scanner.Text() + "\n")
137+
if err != nil {
138+
return err
139+
}
140+
err = writer.Flush()
141+
if err != nil {
142+
return err
143+
}
144+
}
145+
146+
return readTheRest()
66147
}
67148

68149
func (r *TransactionsReader) stream(file io.ReadCloser) chan string {
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
package crypto

0 commit comments

Comments
 (0)