Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion services/galexie/internal/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func (a *App) init(ctx context.Context, runtimeSettings RuntimeSettings) error {
a.config.CoreVersion); err != nil {
return err
}
a.uploader = NewUploader(a.dataStore, queue, registry)
a.uploader = NewUploader(a.dataStore, queue, registry, a.config.Mode == Replace)

if a.config.AdminPort != 0 {
a.adminServer = newAdminServer(a.config.AdminPort, registry)
Expand Down
5 changes: 4 additions & 1 deletion services/galexie/internal/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ const (
_ Mode = iota
ScanFill Mode = iota
Append
Replace
LoadTest
)

Expand All @@ -42,6 +43,8 @@ func (mode Mode) Name() string {
return "Scan and Fill"
case Append:
return "Append"
case Replace:
return "Replace"
case LoadTest:
return "Load Test"
}
Expand Down Expand Up @@ -164,7 +167,7 @@ func (config *Config) ValidateAndSetLedgerRange(ctx context.Context, archive his
return errors.New("invalid start value, must be greater than one.")
}

if config.Mode == ScanFill && config.EndLedger == 0 {
if (config.Mode == ScanFill || config.Mode == Replace) && config.EndLedger == 0 {
return errors.New("invalid end value, unbounded mode not supported, end must be greater than start.")
}

Expand Down
70 changes: 61 additions & 9 deletions services/galexie/internal/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,52 @@ func TestValidateStartAndEndLedger(t *testing.T) {
errMsg: fmt.Sprintf("start %d exceeds latest network ledger %d",
latestNetworkLedger+latestNetworkLedgerPadding+1, latestNetworkLedger+latestNetworkLedgerPadding),
},
{
name: "Replace: End ledger same as start ledger (error)",
startLedger: 512,
endLedger: 512,
mode: Replace,
errMsg: "invalid end value, must be greater than start",
mockHas: false,
},
{
name: "Replace: End ledger greater than start ledger (pass)",
startLedger: 512,
endLedger: 600,
mode: Replace,
errMsg: "",
mockHas: true,
},
{
name: "Replace: No end ledger provided (error)",
startLedger: 512,
endLedger: 0,
mode: Replace,
errMsg: "invalid end value, unbounded mode not supported, end must be greater than start.",
},
{
name: "Replace: End ledger before start ledger (error)",
startLedger: 512,
endLedger: 2,
mode: Replace,
errMsg: "invalid end value, must be greater than start",
},
{
name: "Replace: Start ledger 0 (error)",
startLedger: 0,
endLedger: 2,
mode: Replace,
errMsg: "invalid start value, must be greater than one.",
},
{
name: "Replace: Start ledger exceeds latest ledger (error)",
startLedger: latestNetworkLedger + latestNetworkLedgerPadding + 1,
endLedger: latestNetworkLedger + latestNetworkLedgerPadding + 2,
mode: Replace,
mockHas: true,
errMsg: fmt.Sprintf("start %d exceeds latest network ledger %d",
latestNetworkLedger+latestNetworkLedgerPadding+1, latestNetworkLedger+latestNetworkLedgerPadding),
},
}

ctx := context.Background()
Expand Down Expand Up @@ -367,17 +413,23 @@ func TestAdjustedLedgerRangeBoundedMode(t *testing.T) {
Return(historyarchive.NewCheckpointManager(
historyarchive.DefaultCheckpointFrequency))

modes := []Mode{ScanFill, Replace}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
config, err := NewConfig(
RuntimeSettings{StartLedger: tt.start, EndLedger: tt.end, ConfigFilePath: tt.configFile, Mode: ScanFill}, nil)
for _, mode := range modes {
testName := fmt.Sprintf("%s_%s", tt.name, mode.Name())
t.Run(testName, func(t *testing.T) {
config, err := NewConfig(
// Use the current mode in RuntimeSettings
RuntimeSettings{StartLedger: tt.start, EndLedger: tt.end, ConfigFilePath: tt.configFile, Mode: mode}, nil)

require.NoError(t, err)
err = config.ValidateAndSetLedgerRange(ctx, mockArchive)
require.NoError(t, err)
require.EqualValues(t, tt.expectedStart, config.StartLedger)
require.EqualValues(t, tt.expectedEnd, config.EndLedger)
})
require.NoError(t, err)
err = config.ValidateAndSetLedgerRange(ctx, mockArchive)
require.NoError(t, err)
require.EqualValues(t, tt.expectedStart, config.StartLedger)
require.EqualValues(t, tt.expectedEnd, config.EndLedger)
})
}
}
mockArchive.AssertExpectations(t)
}
Expand Down
51 changes: 51 additions & 0 deletions services/galexie/internal/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,57 @@ func (s *GalexieTestSuite) TestScanAndFill() {
require.NoError(err)
}

func (s *GalexieTestSuite) TesReplace() {
require := s.Require()

rootCmd := defineCommands()

rootCmd.SetArgs([]string{"scan-and-fill", "--start", "4", "--end", "5", "--config-file", s.tempConfigFile})
var errWriter bytes.Buffer
var outWriter bytes.Buffer
rootCmd.SetErr(&errWriter)
rootCmd.SetOut(&outWriter)
err := rootCmd.ExecuteContext(s.ctx)
require.NoError(err)

output := outWriter.String()
errOutput := errWriter.String()
s.T().Log(output)
s.T().Log(errOutput)

datastore, err := datastore.NewDataStore(s.ctx, s.config.DataStoreConfig)
require.NoError(err)

_, err = datastore.GetFile(s.ctx, "FFFFFFFF--0-9/FFFFFFFA--5.xdr."+compressxdr.DefaultCompressor.Name())
require.NoError(err)

lastModified, err := datastore.GetFileLastModified(s.ctx, "FFFFFFFF--0-9/FFFFFFFA--5.xdr."+compressxdr.DefaultCompressor.Name())
require.NoError(err)

// S3 timestamps have second level precision. Sleep for 1 second to ensure the new timestamp is different.
time.Sleep(1 * time.Second)

// now run replace on an overlapping range, it will overwrite existing ledgers
rootCmd = defineCommands()
rootCmd.SetArgs([]string{"replace", "--start", "4", "--end", "9", "--config-file", s.tempConfigFile})
errWriter.Reset()
rootCmd.SetErr(&errWriter)
outWriter.Reset()
rootCmd.SetOut(&outWriter)
err = rootCmd.ExecuteContext(s.ctx)
require.NoError(err)

s.T().Log(outWriter.String())
s.T().Log(errWriter.String())

newLastModified, err := datastore.GetFileLastModified(s.ctx, "FFFFFFFF--0-9/FFFFFFFA--5.xdr."+compressxdr.DefaultCompressor.Name())
require.NoError(err)
require.NotEqual(lastModified, newLastModified)

_, err = datastore.GetFile(s.ctx, "FFFFFFFF--0-9/FFFFFFF6--9.xdr."+compressxdr.DefaultCompressor.Name())
require.NoError(err)
}

func (s *GalexieTestSuite) TestAppend() {
require := s.Require()

Expand Down
31 changes: 28 additions & 3 deletions services/galexie/internal/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,24 @@
return galexieCmdRunner(settings)
},
}
var ReplaceCmd = &cobra.Command{
Use: "replace",
Short: "Re-exports all ledgers, replacing existing files in the data lake.",
Long: "Performs a full re-export of all ledgers within the bounded range (defined by 'start' and 'end' flags)." +
" This command will overwrite any existing files at the destination path within the data lake",
RunE: func(cmd *cobra.Command, args []string) error {
settings := bindCliParameters(cmd.PersistentFlags().Lookup("start"),
cmd.PersistentFlags().Lookup("end"),
cmd.PersistentFlags().Lookup("config-file"),
)
settings.Mode = Replace
settings.Ctx = cmd.Context()
if settings.Ctx == nil {
settings.Ctx = context.Background()
}
return galexieCmdRunner(settings)
},
}

var loadTestCmd = &cobra.Command{
Use: "load-test",
Expand All @@ -93,13 +111,20 @@

rootCmd.AddCommand(scanAndFillCmd)
rootCmd.AddCommand(appendCmd)
rootCmd.AddCommand(ReplaceCmd)
rootCmd.AddCommand(loadTestCmd)

scanAndFillCmd.PersistentFlags().Uint32P("start", "s", 0, "Starting ledger (inclusive), must be set to a value greater than 1")
scanAndFillCmd.PersistentFlags().Uint32P("end", "e", 0, "Ending ledger (inclusive), must be set to value greater than 'start' and less than the network's current ledger")
scanAndFillCmd.PersistentFlags().String("config-file", "config.toml", "Path to the TOML config file. Defaults to 'config.toml' on runtime working directory path.")
commonFlags := pflag.NewFlagSet("common_flags", pflag.ExitOnError)
commonFlags.Uint32P("start", "s", 0, "Starting ledger (inclusive), must be set to a value greater than 1")
commonFlags.Uint32P("end", "e", 0, "Ending ledger (inclusive), must be set to value greater than 'start' and less than the network's current ledger")

Check failure on line 119 in services/galexie/internal/main.go

View workflow job for this annotation

GitHub Actions / golangci

The line is 150 characters long, which exceeds the maximum of 140 characters. (lll)
commonFlags.String("config-file", "config.toml", "Path to the TOML config file. Defaults to 'config.toml' on runtime working directory path.")

Check failure on line 120 in services/galexie/internal/main.go

View workflow job for this annotation

GitHub Actions / golangci

The line is 143 characters long, which exceeds the maximum of 140 characters. (lll)

scanAndFillCmd.PersistentFlags().AddFlagSet(commonFlags)
viper.BindPFlags(scanAndFillCmd.PersistentFlags())

ReplaceCmd.PersistentFlags().AddFlagSet(commonFlags)
viper.BindPFlags(ReplaceCmd.PersistentFlags())

Check failure on line 126 in services/galexie/internal/main.go

View workflow job for this annotation

GitHub Actions / golangci

Error return value of `viper.BindPFlags` is not checked (errcheck)

appendCmd.PersistentFlags().Uint32P("start", "s", 0, "Starting ledger (inclusive), must be set to a value greater than 1")
appendCmd.PersistentFlags().Uint32P("end", "e", 0, "Ending ledger (inclusive), optional, setting to non-zero means bounded mode, "+
"only export ledgers from 'start' up to 'end' value which must be greater than 'start' and less than the network's current ledger. "+
Expand Down
32 changes: 32 additions & 0 deletions services/galexie/internal/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,38 @@ func TestFlagsOutput(t *testing.T) {
expectedErrOutput: "test error",
appRunner: appRunnerError,
},
{
name: "replace sub-command with start and end present",
commandArgs: []string{"replace", "--start", "10", "--end", "20", "--config-file", "myfile"},
expectedErrOutput: "",
appRunner: appRunnerSuccess,
expectedSettings: RuntimeSettings{
StartLedger: 10,
EndLedger: 20,
ConfigFilePath: "myfile",
Mode: Replace,
Ctx: ctx,
},
},
{
name: "replace sub-command with start and end absent",
commandArgs: []string{"replace", "--config-file", "myfile"},
expectedErrOutput: "",
appRunner: appRunnerSuccess,
expectedSettings: RuntimeSettings{
StartLedger: 0,
EndLedger: 0,
ConfigFilePath: "myfile",
Mode: Replace,
Ctx: ctx,
},
},
{
name: "replace sub-command prints app error",
commandArgs: []string{"replace", "--start", "10", "--end", "20", "--config-file", "myfile"},
expectedErrOutput: "test error",
appRunner: appRunnerError,
},
{
name: "load-test sub-command with all parameters",
commandArgs: []string{"load-test", "--start", "4", "--end", "5", "--merge", "--ledgers-path", "ledgers.xdr", "--close-duration", "3.5", "--config-file", "myfile"},
Expand Down
35 changes: 27 additions & 8 deletions services/galexie/internal/uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ package galexie

import (
"context"
"fmt"
"io"
"strconv"
"time"

"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"

"github.com/stellar/go/support/compressxdr"
Expand All @@ -20,13 +20,15 @@ type Uploader struct {
uploadDurationMetric *prometheus.SummaryVec
objectSizeMetrics *prometheus.SummaryVec
latestLedgerMetric prometheus.Gauge
overwriteExisting bool
}

// NewUploader constructs a new Uploader instance
func NewUploader(
destination datastore.DataStore,
queue UploadQueue,
prometheusRegistry *prometheus.Registry,
overwriteExisting bool,
) Uploader {
uploadDurationMetric := prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Expand Down Expand Up @@ -55,6 +57,7 @@ func NewUploader(
uploadDurationMetric: uploadDurationMetric,
objectSizeMetrics: objectSizeMetrics,
latestLedgerMetric: latestLedgerMetric,
overwriteExisting: overwriteExisting,
}
}

Expand Down Expand Up @@ -86,7 +89,7 @@ func (r *writerToRecorder) WriteTo(w io.Writer) (int64, error) {

// Upload uploads the serialized binary data of ledger TxMeta to the specified destination.
func (u Uploader) Upload(ctx context.Context, metaArchive *LedgerMetaArchive) error {
logger.Infof("Uploading: %s", metaArchive.ObjectKey)
logger.Infof("Uploading %s, overwrite=%t", metaArchive.ObjectKey, u.overwriteExisting)
startTime := time.Now()
numLedgers := strconv.FormatUint(uint64(len(metaArchive.Data.LedgerCloseMetas)), 10)

Expand All @@ -95,13 +98,29 @@ func (u Uploader) Upload(ctx context.Context, metaArchive *LedgerMetaArchive) er
writerTo := &writerToRecorder{
WriterTo: xdrEncoder,
}
ok, err := u.dataStore.PutFileIfNotExists(ctx, metaArchive.ObjectKey, writerTo, metaArchive.metaData.ToMap())
if err != nil {
return errors.Wrapf(err, "error uploading %s", metaArchive.ObjectKey)
}

logger.Infof("Uploaded %s successfully", metaArchive.ObjectKey)
alreadyExists := strconv.FormatBool(!ok)
var uploaded bool
var err error
var alreadyExists string
if u.overwriteExisting {
// Overwrite unconditionally.
if err = u.dataStore.PutFile(ctx, metaArchive.ObjectKey, writerTo, metaArchive.metaData.ToMap()); err != nil {
return fmt.Errorf("error uploading %s (overwrite): %w", metaArchive.ObjectKey, err)
}
logger.Infof("Uploaded %s successfully", metaArchive.ObjectKey)
} else {
// Create only if it doesn't already exist.
uploaded, err = u.dataStore.PutFileIfNotExists(ctx, metaArchive.ObjectKey, writerTo, metaArchive.metaData.ToMap())
if err != nil {
return fmt.Errorf("error uploading %s: %w", metaArchive.ObjectKey, err)
}
if uploaded {
logger.Infof("Uploaded %s successfully", metaArchive.ObjectKey)
} else {
logger.Infof("Skipped %s (already exists)", metaArchive.ObjectKey)
}
alreadyExists = strconv.FormatBool(!uploaded)
}

u.uploadDurationMetric.With(prometheus.Labels{
"ledgers": numLedgers,
Expand Down
Loading
Loading