-
Notifications
You must be signed in to change notification settings - Fork 54
Expand file tree
/
Copy pathmain.go
More file actions
127 lines (109 loc) · 4.29 KB
/
main.go
File metadata and controls
127 lines (109 loc) · 4.29 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package main
import (
"context"
"fmt"
"log/slog"
"os"
"strconv"
"sync"
"time"
"github.com/artie-labs/transfer/lib/config"
"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/cryptography"
"github.com/artie-labs/transfer/lib/destination"
"github.com/artie-labs/transfer/lib/destination/utils"
"github.com/artie-labs/transfer/lib/kafkalib"
"github.com/artie-labs/transfer/lib/logger"
"github.com/artie-labs/transfer/lib/telemetry/metrics"
"github.com/artie-labs/transfer/lib/webhooks"
"github.com/artie-labs/transfer/models"
"github.com/artie-labs/transfer/processes/consumer"
"github.com/artie-labs/transfer/processes/pool"
)
var version = "dev" // this will be set by the goreleaser configuration to appropriate value for the compiled binary.
func main() {
// Parse args into settings
ctx := context.Background()
settings, err := config.LoadSettings(os.Args, true)
var webhookSettings *config.WebhookSettings
if settings != nil {
webhookSettings = settings.Config.WebhookSettings
}
whClient, whErr := webhooks.NewClient(webhookSettings, webhooks.Transfer, version)
if whErr != nil {
logger.Fatal("Failed to initialize webhooks client", slog.Any("err", whErr))
}
if err != nil {
logger.Fatal("Failed to initialize config", slog.Any("err", err))
}
// Initialize default logger
_logger, cleanUpHandlers := logger.NewLogger(settings.VerboseLogging, settings.Config.Reporting.Sentry, version)
slog.SetDefault(_logger)
defer cleanUpHandlers()
// This is used to prevent all the instances from starting at the same time and causing a thundering herd problem
if value := os.Getenv("MAX_INIT_SLEEP_SECONDS"); value != "" {
castedValue, err := strconv.ParseInt(value, 10, 64)
if err != nil {
logger.Fatal("Failed to parse sleep duration", slog.Any("err", err), slog.String("value", value))
}
randomSeconds, err := cryptography.RandomInt64n(castedValue)
if err != nil {
logger.Fatal("Failed to generate random number", slog.Any("err", err))
}
duration := time.Duration(randomSeconds) * time.Second
slog.Info(fmt.Sprintf("Sleeping for %s before any data processing to prevent overwhelming Kafka", duration.String()))
time.Sleep(duration)
}
slog.Info("Config is loaded",
slog.Int("flushIntervalSeconds", settings.Config.FlushIntervalSeconds),
slog.Uint64("bufferPoolSize", uint64(settings.Config.BufferRows)),
slog.Int("flushPoolSizeKb", settings.Config.FlushSizeKb),
)
metricsClient := metrics.LoadExporter(settings.Config)
dest, err := utils.Load(ctx, settings.Config)
if err != nil {
whClient.SendEvent(ctx, webhooks.EventConnectionFailed, webhooks.SendEventArgs{
Error: fmt.Sprintf("Unable to load destination: %s", err),
})
logger.Fatal("Unable to load destination", slog.Any("err", err))
}
if sqlDest, ok := dest.(destination.SQLDestination); ok {
if err = sqlDest.SweepTemporaryTables(ctx); err != nil {
whClient.SendEvent(ctx, webhooks.EventConnectionFailed, webhooks.SendEventArgs{
Error: fmt.Sprintf("Failed to clean up temporary tables: %s", err),
})
logger.Fatal("Failed to clean up temporary tables", slog.Any("err", err))
}
}
slog.Info("Starting...", slog.String("version", version))
whClient.SendEvent(ctx, webhooks.EventReplicationStarted, webhooks.SendEventArgs{})
inMemDB := models.NewMemoryDB()
switch settings.Config.KafkaClient {
case config.FranzGoClient:
ctx, err = kafkalib.InjectFranzGoConsumerProvidersIntoContext(ctx, settings.Config.Kafka)
if err != nil {
logger.Fatal("Failed to inject franz-go consumer providers into context", slog.Any("err", err))
}
default:
logger.Fatal(fmt.Sprintf("Kafka client: %q not supported", settings.Config.KafkaClient))
}
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
defer logger.RecoverFatal()
pool.StartPool(ctx, inMemDB, dest, metricsClient, whClient, settings.Config.Kafka.Topics(), time.Duration(settings.Config.FlushIntervalSeconds)*time.Second, settings.Config)
}()
wg.Add(1)
go func(ctx context.Context) {
defer wg.Done()
defer logger.RecoverFatal()
switch settings.Config.Queue {
case constants.Kafka:
consumer.StartKafkaConsumer(ctx, settings.Config, inMemDB, dest, metricsClient, whClient)
default:
logger.Fatal(fmt.Sprintf("Message queue: %q not supported", settings.Config.Queue))
}
}(ctx)
wg.Wait()
}