diff --git a/staging/operator-registry/alpha/action/generate_dockerfile.go b/staging/operator-registry/alpha/action/generate_dockerfile.go index ef2a6d8713..4d2a3ddfc7 100644 --- a/staging/operator-registry/alpha/action/generate_dockerfile.go +++ b/staging/operator-registry/alpha/action/generate_dockerfile.go @@ -45,10 +45,11 @@ FROM {{.BaseImage}} # Configure the entrypoint and command ENTRYPOINT ["/bin/opm"] -CMD ["serve", "/configs"] +CMD ["serve", "/configs", "--cache-dir=/tmp/cache"] -# Copy declarative config root into image at /configs +# Copy declarative config root into image at /configs and pre-populate serve cache ADD {{.IndexDir}} /configs +RUN ["/bin/opm", "serve", "/configs", "--cache-dir=/tmp/cache", "--cache-only"] # Set DC-specific label for the location of the DC root directory # in the image diff --git a/staging/operator-registry/alpha/action/generate_dockerfile_test.go b/staging/operator-registry/alpha/action/generate_dockerfile_test.go index 6e0a795430..d5a76516c1 100644 --- a/staging/operator-registry/alpha/action/generate_dockerfile_test.go +++ b/staging/operator-registry/alpha/action/generate_dockerfile_test.go @@ -50,10 +50,11 @@ FROM foo # Configure the entrypoint and command ENTRYPOINT ["/bin/opm"] -CMD ["serve", "/configs"] +CMD ["serve", "/configs", "--cache-dir=/tmp/cache"] -# Copy declarative config root into image at /configs +# Copy declarative config root into image at /configs and pre-populate serve cache ADD bar /configs +RUN ["/bin/opm", "serve", "/configs", "--cache-dir=/tmp/cache", "--cache-only"] # Set DC-specific label for the location of the DC root directory # in the image @@ -76,10 +77,11 @@ FROM foo # Configure the entrypoint and command ENTRYPOINT ["/bin/opm"] -CMD ["serve", "/configs"] +CMD ["serve", "/configs", "--cache-dir=/tmp/cache"] -# Copy declarative config root into image at /configs +# Copy declarative config root into image at /configs and pre-populate serve cache ADD bar /configs +RUN ["/bin/opm", "serve", "/configs", "--cache-dir=/tmp/cache", "--cache-only"] # Set DC-specific label for the location of the DC root directory # in the image diff --git a/staging/operator-registry/cmd/opm/registry/serve.go b/staging/operator-registry/cmd/opm/registry/serve.go index b6defb75ee..78aba5e52b 100644 --- a/staging/operator-registry/cmd/opm/registry/serve.go +++ b/staging/operator-registry/cmd/opm/registry/serve.go @@ -115,7 +115,7 @@ func serveFunc(cmd *cobra.Command, _ []string) error { lis, err := net.Listen("tcp", ":"+port) if err != nil { - logger.Fatalf("failed to listen: %s", err) + return fmt.Errorf("failed to listen: %s", err) } timeout, err := cmd.Flags().GetString("timeout-seconds") diff --git a/staging/operator-registry/cmd/opm/serve/serve.go b/staging/operator-registry/cmd/opm/serve/serve.go index 6853c245b9..633ad9ef3f 100644 --- a/staging/operator-registry/cmd/opm/serve/serve.go +++ b/staging/operator-registry/cmd/opm/serve/serve.go @@ -6,19 +6,17 @@ import ( "errors" "fmt" "net" - "os" - "sync" - "net/http" endpoint "net/http/pprof" + "os" "runtime/pprof" + "sync" "github.com/sirupsen/logrus" "github.com/spf13/cobra" "google.golang.org/grpc" "google.golang.org/grpc/reflection" - "github.com/operator-framework/operator-registry/alpha/declcfg" "github.com/operator-framework/operator-registry/pkg/api" health "github.com/operator-framework/operator-registry/pkg/api/grpc_health_v1" "github.com/operator-framework/operator-registry/pkg/lib/dns" @@ -30,6 +28,8 @@ import ( type serve struct { configDir string + cacheDir string + cacheOnly bool port string terminationLog string @@ -75,12 +75,16 @@ will not be reflected in the served content. cmd.Flags().StringVarP(&s.terminationLog, "termination-log", "t", "/dev/termination-log", "path to a container termination log file") cmd.Flags().StringVarP(&s.port, "port", "p", "50051", "port number to serve on") cmd.Flags().StringVar(&s.pprofAddr, "pprof-addr", "", "address of startup profiling endpoint (addr:port format)") + cmd.Flags().StringVar(&s.cacheDir, "cache-dir", "", "if set, sync and persist server cache directory") + cmd.Flags().BoolVar(&s.cacheOnly, "cache-only", false, "sync the serve cache and exit without serving") return cmd } func (s *serve) run(ctx context.Context) error { p := newProfilerInterface(s.pprofAddr, s.logger) - p.startEndpoint() + if err := p.startEndpoint(); err != nil { + return fmt.Errorf("could not start pprof endpoint: %v", err) + } if err := p.startCpuProfileCache(); err != nil { return fmt.Errorf("could not start CPU profile: %v", err) } @@ -98,24 +102,18 @@ func (s *serve) run(ctx context.Context) error { s.logger = s.logger.WithFields(logrus.Fields{"configs": s.configDir, "port": s.port}) - cfg, err := declcfg.LoadFS(os.DirFS(s.configDir)) - if err != nil { - return fmt.Errorf("load declarative config directory: %v", err) - } - - m, err := declcfg.ConvertToModel(*cfg) - if err != nil { - return fmt.Errorf("could not build index model from declarative config: %v", err) - } - store, err := registry.NewQuerier(m) + store, err := registry.NewQuerierFromFS(os.DirFS(s.configDir), s.cacheDir) defer store.Close() if err != nil { return err } + if s.cacheOnly { + return nil + } lis, err := net.Listen("tcp", ":"+s.port) if err != nil { - s.logger.Fatalf("failed to listen: %s", err) + return fmt.Errorf("failed to listen: %s", err) } grpcServer := grpc.NewServer() @@ -129,7 +127,9 @@ func (s *serve) run(ctx context.Context) error { return grpcServer.Serve(lis) }, func() { grpcServer.GracefulStop() - p.stopEndpoint(p.logger.Context) + if err := p.stopEndpoint(ctx); err != nil { + s.logger.Warnf("error shutting down pprof server: %v", err) + } }) } @@ -147,7 +147,8 @@ type profilerInterface struct { cacheReady bool cacheLock sync.RWMutex - logger *logrus.Entry + logger *logrus.Entry + closeErr chan error } func newProfilerInterface(a string, log *logrus.Entry) *profilerInterface { @@ -162,10 +163,10 @@ func (p *profilerInterface) isEnabled() bool { return p.addr != "" } -func (p *profilerInterface) startEndpoint() { +func (p *profilerInterface) startEndpoint() error { // short-circuit if not enabled if !p.isEnabled() { - return + return nil } mux := http.NewServeMux() @@ -181,14 +182,22 @@ func (p *profilerInterface) startEndpoint() { Handler: mux, } - // goroutine exits with main - go func() { + lis, err := net.Listen("tcp", p.addr) + if err != nil { + return err + } - p.logger.Info("starting pprof endpoint") - if err := p.server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { - p.logger.Fatal(err) - } + p.closeErr = make(chan error) + go func() { + p.closeErr <- func() error { + p.logger.Info("starting pprof endpoint") + if err := p.server.Serve(lis); err != nil && !errors.Is(err, http.ErrServerClosed) { + return err + } + return nil + }() }() + return nil } func (p *profilerInterface) startCpuProfileCache() error { @@ -222,10 +231,14 @@ func (p *profilerInterface) httpHandler(w http.ResponseWriter, r *http.Request) w.Write(p.cache.Bytes()) } -func (p *profilerInterface) stopEndpoint(ctx context.Context) { +func (p *profilerInterface) stopEndpoint(ctx context.Context) error { + if !p.isEnabled() { + return nil + } if err := p.server.Shutdown(ctx); err != nil { - p.logger.Fatal(err) + return err } + return <-p.closeErr } func (p *profilerInterface) isCacheReady() bool { diff --git a/staging/operator-registry/go.mod b/staging/operator-registry/go.mod index bfef9504b6..69396667b8 100644 --- a/staging/operator-registry/go.mod +++ b/staging/operator-registry/go.mod @@ -35,6 +35,7 @@ require ( golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 golang.org/x/net v0.0.0-20220407224826-aac1ed45d8e3 golang.org/x/sync v0.0.0-20210220032951-036812b2e83c + golang.org/x/sys v0.0.0-20220412211240-33da011f77ad google.golang.org/grpc v1.45.0 google.golang.org/grpc/cmd/protoc-gen-go-grpc v0.0.0-20200709232328-d8193ee9cc3e google.golang.org/protobuf v1.28.0 @@ -148,7 +149,6 @@ require ( go.opentelemetry.io/proto/otlp v0.7.0 // indirect golang.org/x/crypto v0.0.0-20220408190544-5352b0902921 // indirect golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8 // indirect - golang.org/x/sys v0.0.0-20220412211240-33da011f77ad // indirect golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect golang.org/x/text v0.3.7 // indirect golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 // indirect diff --git a/staging/operator-registry/pkg/registry/query.go b/staging/operator-registry/pkg/registry/query.go index 4ccf7eba4f..4c4212217f 100644 --- a/staging/operator-registry/pkg/registry/query.go +++ b/staging/operator-registry/pkg/registry/query.go @@ -3,24 +3,31 @@ package registry import ( "context" "encoding/json" + "errors" "fmt" + "hash/fnv" + "io/fs" "os" "path/filepath" "sort" + "strings" + "github.com/operator-framework/operator-registry/alpha/declcfg" "github.com/operator-framework/operator-registry/alpha/model" "github.com/operator-framework/operator-registry/pkg/api" ) -type Querier struct { - pkgs model.Model +const ( + cachePermissionDir = 0750 + cachePermissionFile = 0640 +) - tmpDir string - apiBundles map[apiBundleKey]string +type Querier struct { + *cache } func (q Querier) Close() error { - return os.RemoveAll(q.tmpDir) + return q.cache.close() } type apiBundleKey struct { @@ -39,43 +46,26 @@ func (s *SliceBundleSender) Send(b *api.Bundle) error { var _ GRPCQuery = &Querier{} -func NewQuerier(packages model.Model) (*Querier, error) { +func NewQuerierFromFS(fbcFS fs.FS, cacheDir string) (*Querier, error) { q := &Querier{} - - tmpDir, err := os.MkdirTemp("", "opm-registry-querier-") + var err error + q.cache, err = newCache(cacheDir, &fbcCacheModel{ + FBC: fbcFS, + Cache: os.DirFS(cacheDir), + }) if err != nil { - return nil, err + return q, err } - q.tmpDir = tmpDir + return q, nil +} - q.apiBundles = map[apiBundleKey]string{} - for _, pkg := range packages { - for _, ch := range pkg.Channels { - for _, b := range ch.Bundles { - apiBundle, err := api.ConvertModelBundleToAPIBundle(*b) - if err != nil { - return q, err - } - jsonBundle, err := json.Marshal(apiBundle) - if err != nil { - return q, err - } - filename := filepath.Join(tmpDir, fmt.Sprintf("%s_%s_%s.json", pkg.Name, ch.Name, b.Name)) - if err := os.WriteFile(filename, jsonBundle, 0666); err != nil { - return q, err - } - q.apiBundles[apiBundleKey{pkg.Name, ch.Name, b.Name}] = filename - packages[pkg.Name].Channels[ch.Name].Bundles[b.Name] = &model.Bundle{ - Package: pkg, - Channel: ch, - Name: b.Name, - Replaces: b.Replaces, - Skips: b.Skips, - } - } - } +func NewQuerier(m model.Model) (*Querier, error) { + q := &Querier{} + var err error + q.cache, err = newCache("", &nonDigestableModel{Model: m}) + if err != nil { + return q, err } - q.pkgs = packages return q, nil } @@ -147,19 +137,15 @@ func (q Querier) GetPackage(_ context.Context, name string) (*PackageManifest, e var channels []PackageChannel for _, ch := range pkg.Channels { - head, err := ch.Head() - if err != nil { - return nil, fmt.Errorf("package %q, channel %q has invalid head: %v", name, ch.Name, err) - } channels = append(channels, PackageChannel{ Name: ch.Name, - CurrentCSVName: head.Name, + CurrentCSVName: ch.Head, }) } return &PackageManifest{ PackageName: pkg.Name, Channels: channels, - DefaultChannelName: pkg.DefaultChannel.Name, + DefaultChannelName: pkg.DefaultChannel, }, nil } @@ -196,13 +182,9 @@ func (q Querier) GetBundleForChannel(_ context.Context, pkgName string, channelN if !ok { return nil, fmt.Errorf("package %q, channel %q not found", pkgName, channelName) } - head, err := ch.Head() - if err != nil { - return nil, fmt.Errorf("package %q, channel %q has invalid head: %v", pkgName, channelName, err) - } - apiBundle, err := q.loadAPIBundle(apiBundleKey{pkg.Name, ch.Name, head.Name}) + apiBundle, err := q.loadAPIBundle(apiBundleKey{pkg.Name, ch.Name, ch.Head}) if err != nil { - return nil, fmt.Errorf("convert bundle %q: %v", head.Name, err) + return nil, fmt.Errorf("convert bundle %q: %v", ch.Head, err) } // unset Replaces and Skips (sqlite query does not populate these fields) @@ -217,7 +199,7 @@ func (q Querier) GetChannelEntriesThatReplace(_ context.Context, name string) ([ for _, pkg := range q.pkgs { for _, ch := range pkg.Channels { for _, b := range ch.Bundles { - entries = append(entries, channelEntriesThatReplace(*b, name)...) + entries = append(entries, channelEntriesThatReplace(b, name)...) } } } @@ -242,7 +224,7 @@ func (q Querier) GetBundleThatReplaces(_ context.Context, name, pkgName, channel // is ALSO non-deterministic because it doesn't use ORDER BY, so its probably okay for this // implementation to be non-deterministic as well. for _, b := range ch.Bundles { - if bundleReplaces(*b, name) { + if bundleReplaces(b, name) { apiBundle, err := q.loadAPIBundle(apiBundleKey{pkg.Name, ch.Name, b.Name}) if err != nil { return nil, fmt.Errorf("convert bundle %q: %v", b.Name, err) @@ -263,7 +245,7 @@ func (q Querier) GetChannelEntriesThatProvide(_ context.Context, group, version, for _, pkg := range q.pkgs { for _, ch := range pkg.Channels { for _, b := range ch.Bundles { - provides, err := q.doesModelBundleProvide(*b, group, version, kind) + provides, err := q.doesModelBundleProvide(b, group, version, kind) if err != nil { return nil, err } @@ -274,7 +256,7 @@ func (q Querier) GetChannelEntriesThatProvide(_ context.Context, group, version, // the sqlite server and returns seemingly invalid channel entries. // Don't worry about this. Not used anymore. - entries = append(entries, channelEntriesForBundle(*b, true)...) + entries = append(entries, q.channelEntriesForBundle(b, true)...) } } } @@ -297,17 +279,13 @@ func (q Querier) GetLatestChannelEntriesThatProvide(_ context.Context, group, ve for _, pkg := range q.pkgs { for _, ch := range pkg.Channels { - b, err := ch.Head() - if err != nil { - return nil, fmt.Errorf("package %q, channel %q has invalid head: %v", pkg.Name, ch.Name, err) - } - - provides, err := q.doesModelBundleProvide(*b, group, version, kind) + b := ch.Bundles[ch.Head] + provides, err := q.doesModelBundleProvide(b, group, version, kind) if err != nil { return nil, err } if provides { - entries = append(entries, channelEntriesForBundle(*b, false)...) + entries = append(entries, q.channelEntriesForBundle(b, false)...) } } } @@ -337,15 +315,15 @@ func (q Querier) GetBundleThatProvides(ctx context.Context, group, version, kind // collected based on iterating over the packages in q.pkgs. continue } - if entry.ChannelName == pkg.DefaultChannel.Name { + if entry.ChannelName == pkg.DefaultChannel { return q.GetBundle(ctx, entry.PackageName, entry.ChannelName, entry.BundleName) } } return nil, fmt.Errorf("no entry found that provides group:%q version:%q kind:%q", group, version, kind) } -func (q Querier) doesModelBundleProvide(b model.Bundle, group, version, kind string) (bool, error) { - apiBundle, err := q.loadAPIBundle(apiBundleKey{b.Package.Name, b.Channel.Name, b.Name}) +func (q Querier) doesModelBundleProvide(b cBundle, group, version, kind string) (bool, error) { + apiBundle, err := q.loadAPIBundle(apiBundleKey{b.Package, b.Channel, b.Name}) if err != nil { return false, fmt.Errorf("convert bundle %q: %v", b.Name, err) } @@ -357,7 +335,7 @@ func (q Querier) doesModelBundleProvide(b model.Bundle, group, version, kind str return false, nil } -func bundleReplaces(b model.Bundle, name string) bool { +func bundleReplaces(b cBundle, name string) bool { if b.Replaces == name { return true } @@ -369,12 +347,12 @@ func bundleReplaces(b model.Bundle, name string) bool { return false } -func channelEntriesThatReplace(b model.Bundle, name string) []*ChannelEntry { +func channelEntriesThatReplace(b cBundle, name string) []*ChannelEntry { var entries []*ChannelEntry if b.Replaces == name { entries = append(entries, &ChannelEntry{ - PackageName: b.Package.Name, - ChannelName: b.Channel.Name, + PackageName: b.Package, + ChannelName: b.Channel, BundleName: b.Name, Replaces: b.Replaces, }) @@ -382,8 +360,8 @@ func channelEntriesThatReplace(b model.Bundle, name string) []*ChannelEntry { for _, s := range b.Skips { if s == name && s != b.Replaces { entries = append(entries, &ChannelEntry{ - PackageName: b.Package.Name, - ChannelName: b.Channel.Name, + PackageName: b.Package, + ChannelName: b.Channel, BundleName: b.Name, Replaces: b.Replaces, }) @@ -392,20 +370,20 @@ func channelEntriesThatReplace(b model.Bundle, name string) []*ChannelEntry { return entries } -func channelEntriesForBundle(b model.Bundle, ignoreChannel bool) []*ChannelEntry { +func (q Querier) channelEntriesForBundle(b cBundle, ignoreChannel bool) []*ChannelEntry { entries := []*ChannelEntry{{ - PackageName: b.Package.Name, - ChannelName: b.Channel.Name, + PackageName: b.Package, + ChannelName: b.Channel, BundleName: b.Name, Replaces: b.Replaces, }} for _, s := range b.Skips { // Ignore skips that duplicate b.Replaces. Also, only add it if its // in the same channel as b (or we're ignoring channel presence). - if _, inChannel := b.Channel.Bundles[s]; s != b.Replaces && (ignoreChannel || inChannel) { + if _, inChannel := q.pkgs[b.Package].Channels[b.Channel].Bundles[s]; s != b.Replaces && (ignoreChannel || inChannel) { entries = append(entries, &ChannelEntry{ - PackageName: b.Package.Name, - ChannelName: b.Channel.Name, + PackageName: b.Package, + ChannelName: b.Channel, BundleName: b.Name, Replaces: s, }) @@ -413,3 +391,271 @@ func channelEntriesForBundle(b model.Bundle, ignoreChannel bool) []*ChannelEntry } return entries } + +type cache struct { + digest string + baseDir string + persist bool + pkgs map[string]cPkg + apiBundles map[apiBundleKey]string +} + +func newCache(baseDir string, model digestableModel) (*cache, error) { + var ( + qc *cache + err error + ) + if baseDir == "" { + qc, err = newEphemeralCache() + } else { + qc, err = newPersistentCache(baseDir) + } + if err != nil { + return nil, err + } + return qc, qc.load(model) +} + +func (qc cache) close() error { + if qc.persist { + return nil + } + return os.RemoveAll(qc.baseDir) +} + +func newEphemeralCache() (*cache, error) { + baseDir, err := os.MkdirTemp("", "opm-serve-cache-") + if err != nil { + return nil, err + } + if err := os.MkdirAll(filepath.Join(baseDir, "cache"), cachePermissionDir); err != nil { + return nil, err + } + return &cache{ + digest: "", + baseDir: baseDir, + persist: false, + }, nil +} + +func newPersistentCache(baseDir string) (*cache, error) { + if err := os.MkdirAll(baseDir, cachePermissionDir); err != nil { + return nil, err + } + qc := &cache{baseDir: baseDir, persist: true} + if digest, err := os.ReadFile(filepath.Join(baseDir, "digest")); err == nil { + qc.digest = strings.TrimSpace(string(digest)) + } + return qc, nil +} + +func (qc *cache) load(model digestableModel) error { + computedDigest, err := model.GetDigest() + if err != nil && !errors.Is(err, errNonDigestable) { + return fmt.Errorf("compute digest: %v", err) + } + if err == nil && computedDigest == qc.digest { + err = qc.loadFromCache() + if err == nil { + return nil + } + // if there _was_ an error loading from the cache, + // we'll drop down and repopulate from scratch. + } + return qc.repopulateCache(model) +} + +func (qc *cache) loadFromCache() error { + packagesData, err := os.ReadFile(filepath.Join(qc.baseDir, "cache", "packages.json")) + if err != nil { + return err + } + if err := json.Unmarshal(packagesData, &qc.pkgs); err != nil { + return err + } + qc.apiBundles = map[apiBundleKey]string{} + for _, p := range qc.pkgs { + for _, ch := range p.Channels { + for _, b := range ch.Bundles { + filename := filepath.Join(qc.baseDir, "cache", fmt.Sprintf("%s_%s_%s.json", p.Name, ch.Name, b.Name)) + qc.apiBundles[apiBundleKey{pkgName: p.Name, chName: ch.Name, name: b.Name}] = filename + } + } + } + return nil +} + +func (qc *cache) repopulateCache(model digestableModel) error { + // ensure that generated cache is available to all future users + oldUmask := umask(000) + defer umask(oldUmask) + + m, err := model.GetModel() + if err != nil { + return err + } + cacheDirEntries, err := os.ReadDir(qc.baseDir) + if err != nil { + return err + } + for _, e := range cacheDirEntries { + if err := os.RemoveAll(filepath.Join(qc.baseDir, e.Name())); err != nil { + return err + } + } + if err := os.MkdirAll(filepath.Join(qc.baseDir, "cache"), cachePermissionDir); err != nil { + return err + } + + qc.pkgs, err = packagesFromModel(m) + if err != nil { + return err + } + + packageJson, err := json.Marshal(qc.pkgs) + if err != nil { + return err + } + if err := os.WriteFile(filepath.Join(qc.baseDir, "cache", "packages.json"), packageJson, cachePermissionFile); err != nil { + return err + } + + qc.apiBundles = map[apiBundleKey]string{} + for _, p := range m { + for _, ch := range p.Channels { + for _, b := range ch.Bundles { + apiBundle, err := api.ConvertModelBundleToAPIBundle(*b) + if err != nil { + return err + } + jsonBundle, err := json.Marshal(apiBundle) + if err != nil { + return err + } + filename := filepath.Join(qc.baseDir, "cache", fmt.Sprintf("%s_%s_%s.json", p.Name, ch.Name, b.Name)) + if err := os.WriteFile(filename, jsonBundle, cachePermissionFile); err != nil { + return err + } + qc.apiBundles[apiBundleKey{p.Name, ch.Name, b.Name}] = filename + } + } + } + computedHash, err := model.GetDigest() + if err == nil { + if err := os.WriteFile(filepath.Join(qc.baseDir, "digest"), []byte(computedHash), cachePermissionFile); err != nil { + return err + } + } else if !errors.Is(err, errNonDigestable) { + return fmt.Errorf("compute digest: %v", err) + } + return nil +} + +func packagesFromModel(m model.Model) (map[string]cPkg, error) { + pkgs := map[string]cPkg{} + for _, p := range m { + newP := cPkg{ + Name: p.Name, + Description: p.Description, + DefaultChannel: p.DefaultChannel.Name, + Channels: map[string]cChannel{}, + } + if p.Icon != nil { + newP.Icon = &declcfg.Icon{ + Data: p.Icon.Data, + MediaType: p.Icon.MediaType, + } + } + for _, ch := range p.Channels { + head, err := ch.Head() + if err != nil { + return nil, err + } + newCh := cChannel{ + Name: ch.Name, + Head: head.Name, + Bundles: map[string]cBundle{}, + } + for _, b := range ch.Bundles { + newB := cBundle{ + Package: b.Package.Name, + Channel: b.Channel.Name, + Name: b.Name, + Replaces: b.Replaces, + Skips: b.Skips, + } + newCh.Bundles[b.Name] = newB + } + newP.Channels[ch.Name] = newCh + } + pkgs[p.Name] = newP + } + return pkgs, nil +} + +type cPkg struct { + Name string `json:"name"` + Description string `json:"description"` + Icon *declcfg.Icon `json:"icon"` + DefaultChannel string `json:"defaultChannel"` + Channels map[string]cChannel +} + +type cChannel struct { + Name string + Head string + Bundles map[string]cBundle +} + +type cBundle struct { + Package string `json:"package"` + Channel string `json:"channel"` + Name string `json:"name"` + Replaces string `json:"replaces"` + Skips []string `json:"skips"` +} + +type digestableModel interface { + GetModel() (model.Model, error) + GetDigest() (string, error) +} + +type fbcCacheModel struct { + FBC fs.FS + Cache fs.FS +} + +func (m *fbcCacheModel) GetModel() (model.Model, error) { + fbc, err := declcfg.LoadFS(m.FBC) + if err != nil { + return nil, err + } + return declcfg.ConvertToModel(*fbc) +} + +func (m *fbcCacheModel) GetDigest() (string, error) { + computedHasher := fnv.New64a() + if err := fsToTar(computedHasher, m.FBC); err != nil { + return "", err + } + if cacheFS, err := fs.Sub(m.Cache, "cache"); err == nil { + if err := fsToTar(computedHasher, cacheFS); err != nil && !errors.Is(err, os.ErrNotExist) { + return "", fmt.Errorf("compute hash: %v", err) + } + } + return fmt.Sprintf("%x", computedHasher.Sum(nil)), nil +} + +var errNonDigestable = errors.New("cannot generate digest") + +type nonDigestableModel struct { + model.Model +} + +func (m *nonDigestableModel) GetModel() (model.Model, error) { + return m.Model, nil +} + +func (m *nonDigestableModel) GetDigest() (string, error) { + return "", errNonDigestable +} diff --git a/staging/operator-registry/pkg/registry/query_test.go b/staging/operator-registry/pkg/registry/query_test.go index 1b1d2c00b5..a732de7413 100644 --- a/staging/operator-registry/pkg/registry/query_test.go +++ b/staging/operator-registry/pkg/registry/query_test.go @@ -2,246 +2,268 @@ package registry import ( "context" + "io/fs" "testing" "testing/fstest" - "github.com/operator-framework/operator-registry/alpha/model" - "github.com/operator-framework/operator-registry/alpha/property" "github.com/stretchr/testify/require" "github.com/operator-framework/operator-registry/alpha/declcfg" + "github.com/operator-framework/operator-registry/alpha/model" + "github.com/operator-framework/operator-registry/alpha/property" ) func TestQuerier_GetBundle(t *testing.T) { - testModelQuerier := genTestModelQuerier(t) - defer testModelQuerier.Close() - b, err := testModelQuerier.GetBundle(context.TODO(), "etcd", "singlenamespace-alpha", "etcdoperator.v0.9.4") - require.NoError(t, err) - require.Equal(t, b.PackageName, "etcd") - require.Equal(t, b.ChannelName, "singlenamespace-alpha") - require.Equal(t, b.CsvName, "etcdoperator.v0.9.4") + for _, testQuerier := range genTestQueriers(t, validFS) { + defer testQuerier.Close() + b, err := testQuerier.GetBundle(context.TODO(), "etcd", "singlenamespace-alpha", "etcdoperator.v0.9.4") + require.NoError(t, err) + require.Equal(t, b.PackageName, "etcd") + require.Equal(t, b.ChannelName, "singlenamespace-alpha") + require.Equal(t, b.CsvName, "etcdoperator.v0.9.4") + } } func TestQuerier_GetBundleForChannel(t *testing.T) { - testModelQuerier := genTestModelQuerier(t) - defer testModelQuerier.Close() - b, err := testModelQuerier.GetBundleForChannel(context.TODO(), "etcd", "singlenamespace-alpha") - require.NoError(t, err) - require.NotNil(t, b) - require.Equal(t, b.PackageName, "etcd") - require.Equal(t, b.ChannelName, "singlenamespace-alpha") - require.Equal(t, b.CsvName, "etcdoperator.v0.9.4") + for _, testQuerier := range genTestQueriers(t, validFS) { + defer testQuerier.Close() + b, err := testQuerier.GetBundleForChannel(context.TODO(), "etcd", "singlenamespace-alpha") + require.NoError(t, err) + require.NotNil(t, b) + require.Equal(t, b.PackageName, "etcd") + require.Equal(t, b.ChannelName, "singlenamespace-alpha") + require.Equal(t, b.CsvName, "etcdoperator.v0.9.4") + } } func TestQuerier_GetBundleThatProvides(t *testing.T) { - testModelQuerier := genTestModelQuerier(t) - defer testModelQuerier.Close() - b, err := testModelQuerier.GetBundleThatProvides(context.TODO(), "etcd.database.coreos.com", "v1beta2", "EtcdBackup") - require.NoError(t, err) - require.NotNil(t, b) - require.Equal(t, b.PackageName, "etcd") - require.Equal(t, b.ChannelName, "singlenamespace-alpha") - require.Equal(t, b.CsvName, "etcdoperator.v0.9.4") + for _, testQuerier := range genTestQueriers(t, validFS) { + defer testQuerier.Close() + b, err := testQuerier.GetBundleThatProvides(context.TODO(), "etcd.database.coreos.com", "v1beta2", "EtcdBackup") + require.NoError(t, err) + require.NotNil(t, b) + require.Equal(t, b.PackageName, "etcd") + require.Equal(t, b.ChannelName, "singlenamespace-alpha") + require.Equal(t, b.CsvName, "etcdoperator.v0.9.4") + } } func TestQuerier_GetBundleThatReplaces(t *testing.T) { - testModelQuerier := genTestModelQuerier(t) - defer testModelQuerier.Close() - b, err := testModelQuerier.GetBundleThatReplaces(context.TODO(), "etcdoperator.v0.9.0", "etcd", "singlenamespace-alpha") - require.NoError(t, err) - require.NotNil(t, b) - require.Equal(t, b.PackageName, "etcd") - require.Equal(t, b.ChannelName, "singlenamespace-alpha") - require.Equal(t, b.CsvName, "etcdoperator.v0.9.2") + for _, testQuerier := range genTestQueriers(t, validFS) { + defer testQuerier.Close() + b, err := testQuerier.GetBundleThatReplaces(context.TODO(), "etcdoperator.v0.9.0", "etcd", "singlenamespace-alpha") + require.NoError(t, err) + require.NotNil(t, b) + require.Equal(t, b.PackageName, "etcd") + require.Equal(t, b.ChannelName, "singlenamespace-alpha") + require.Equal(t, b.CsvName, "etcdoperator.v0.9.2") + } } func TestQuerier_GetChannelEntriesThatProvide(t *testing.T) { - testModelQuerier := genTestModelQuerier(t) - defer testModelQuerier.Close() - entries, err := testModelQuerier.GetChannelEntriesThatProvide(context.TODO(), "etcd.database.coreos.com", "v1beta2", "EtcdBackup") - require.NoError(t, err) - require.NotNil(t, entries) - require.ElementsMatch(t, []*ChannelEntry{ - { - PackageName: "etcd", - ChannelName: "singlenamespace-alpha", - BundleName: "etcdoperator.v0.9.0", - Replaces: "", - }, - { - PackageName: "etcd", - ChannelName: "singlenamespace-alpha", - BundleName: "etcdoperator.v0.9.4", - Replaces: "etcdoperator.v0.9.2", - }, - { - PackageName: "etcd", - ChannelName: "clusterwide-alpha", - BundleName: "etcdoperator.v0.9.0", - Replaces: "", - }, - { - PackageName: "etcd", - ChannelName: "clusterwide-alpha", - BundleName: "etcdoperator.v0.9.2-clusterwide", - Replaces: "etcdoperator.v0.9.0", - }, - { - PackageName: "etcd", - ChannelName: "clusterwide-alpha", - BundleName: "etcdoperator.v0.9.2-clusterwide", - Replaces: "etcdoperator.v0.6.1", - }, - { - PackageName: "etcd", - ChannelName: "clusterwide-alpha", - BundleName: "etcdoperator.v0.9.4-clusterwide", - Replaces: "etcdoperator.v0.9.2-clusterwide", - }, - }, entries) + for _, testQuerier := range genTestQueriers(t, validFS) { + defer testQuerier.Close() + entries, err := testQuerier.GetChannelEntriesThatProvide(context.TODO(), "etcd.database.coreos.com", "v1beta2", "EtcdBackup") + require.NoError(t, err) + require.NotNil(t, entries) + require.ElementsMatch(t, []*ChannelEntry{ + { + PackageName: "etcd", + ChannelName: "singlenamespace-alpha", + BundleName: "etcdoperator.v0.9.0", + Replaces: "", + }, + { + PackageName: "etcd", + ChannelName: "singlenamespace-alpha", + BundleName: "etcdoperator.v0.9.4", + Replaces: "etcdoperator.v0.9.2", + }, + { + PackageName: "etcd", + ChannelName: "clusterwide-alpha", + BundleName: "etcdoperator.v0.9.0", + Replaces: "", + }, + { + PackageName: "etcd", + ChannelName: "clusterwide-alpha", + BundleName: "etcdoperator.v0.9.2-clusterwide", + Replaces: "etcdoperator.v0.9.0", + }, + { + PackageName: "etcd", + ChannelName: "clusterwide-alpha", + BundleName: "etcdoperator.v0.9.2-clusterwide", + Replaces: "etcdoperator.v0.6.1", + }, + { + PackageName: "etcd", + ChannelName: "clusterwide-alpha", + BundleName: "etcdoperator.v0.9.4-clusterwide", + Replaces: "etcdoperator.v0.9.2-clusterwide", + }, + }, entries) + } } func TestQuerier_GetChannelEntriesThatReplace(t *testing.T) { - testModelQuerier := genTestModelQuerier(t) - defer testModelQuerier.Close() - entries, err := testModelQuerier.GetChannelEntriesThatReplace(context.TODO(), "etcdoperator.v0.9.0") - require.NoError(t, err) - require.NotNil(t, entries) - require.ElementsMatch(t, []*ChannelEntry{ - { - PackageName: "etcd", - ChannelName: "singlenamespace-alpha", - BundleName: "etcdoperator.v0.9.2", - Replaces: "etcdoperator.v0.9.0", - }, - { - PackageName: "etcd", - ChannelName: "clusterwide-alpha", - BundleName: "etcdoperator.v0.9.2-clusterwide", - Replaces: "etcdoperator.v0.9.0", - }, - }, entries) + for _, testQuerier := range genTestQueriers(t, validFS) { + defer testQuerier.Close() + entries, err := testQuerier.GetChannelEntriesThatReplace(context.TODO(), "etcdoperator.v0.9.0") + require.NoError(t, err) + require.NotNil(t, entries) + require.ElementsMatch(t, []*ChannelEntry{ + { + PackageName: "etcd", + ChannelName: "singlenamespace-alpha", + BundleName: "etcdoperator.v0.9.2", + Replaces: "etcdoperator.v0.9.0", + }, + { + PackageName: "etcd", + ChannelName: "clusterwide-alpha", + BundleName: "etcdoperator.v0.9.2-clusterwide", + Replaces: "etcdoperator.v0.9.0", + }, + }, entries) + } } func TestQuerier_GetLatestChannelEntriesThatProvide(t *testing.T) { - testModelQuerier := genTestModelQuerier(t) - defer testModelQuerier.Close() - entries, err := testModelQuerier.GetLatestChannelEntriesThatProvide(context.TODO(), "etcd.database.coreos.com", "v1beta2", "EtcdBackup") - require.NoError(t, err) - require.NotNil(t, entries) - require.ElementsMatch(t, []*ChannelEntry{ - { - PackageName: "etcd", - ChannelName: "singlenamespace-alpha", - BundleName: "etcdoperator.v0.9.4", - Replaces: "etcdoperator.v0.9.2", - }, - { - PackageName: "etcd", - ChannelName: "clusterwide-alpha", - BundleName: "etcdoperator.v0.9.4-clusterwide", - Replaces: "etcdoperator.v0.9.2-clusterwide", - }, - }, entries) -} - -func TestQuerier_GetPackage(t *testing.T) { - testModelQuerier := genTestModelQuerier(t) - defer testModelQuerier.Close() - p, err := testModelQuerier.GetPackage(context.TODO(), "etcd") - require.NoError(t, err) - require.NotNil(t, p) - - expected := &PackageManifest{ - PackageName: "etcd", - DefaultChannelName: "singlenamespace-alpha", - Channels: []PackageChannel{ - { - Name: "singlenamespace-alpha", - CurrentCSVName: "etcdoperator.v0.9.4", - }, + for _, testQuerier := range genTestQueriers(t, validFS) { + defer testQuerier.Close() + entries, err := testQuerier.GetLatestChannelEntriesThatProvide(context.TODO(), "etcd.database.coreos.com", "v1beta2", "EtcdBackup") + require.NoError(t, err) + require.NotNil(t, entries) + require.ElementsMatch(t, []*ChannelEntry{ { - Name: "clusterwide-alpha", - CurrentCSVName: "etcdoperator.v0.9.4-clusterwide", + PackageName: "etcd", + ChannelName: "singlenamespace-alpha", + BundleName: "etcdoperator.v0.9.4", + Replaces: "etcdoperator.v0.9.2", }, { - Name: "alpha", - CurrentCSVName: "etcdoperator-community.v0.6.1", + PackageName: "etcd", + ChannelName: "clusterwide-alpha", + BundleName: "etcdoperator.v0.9.4-clusterwide", + Replaces: "etcdoperator.v0.9.2-clusterwide", }, - }, + }, entries) } +} + +func TestQuerier_GetPackage(t *testing.T) { + for _, testQuerier := range genTestQueriers(t, validFS) { + defer testQuerier.Close() + p, err := testQuerier.GetPackage(context.TODO(), "etcd") + require.NoError(t, err) + require.NotNil(t, p) - require.ElementsMatch(t, expected.Channels, p.Channels) - expected.Channels, p.Channels = nil, nil - require.Equal(t, expected, p) + expected := &PackageManifest{ + PackageName: "etcd", + DefaultChannelName: "singlenamespace-alpha", + Channels: []PackageChannel{ + { + Name: "singlenamespace-alpha", + CurrentCSVName: "etcdoperator.v0.9.4", + }, + { + Name: "clusterwide-alpha", + CurrentCSVName: "etcdoperator.v0.9.4-clusterwide", + }, + { + Name: "alpha", + CurrentCSVName: "etcdoperator-community.v0.6.1", + }, + }, + } + + require.ElementsMatch(t, expected.Channels, p.Channels) + expected.Channels, p.Channels = nil, nil + require.Equal(t, expected, p) + } } func TestQuerier_ListBundles(t *testing.T) { - testModelQuerier := genTestModelQuerier(t) - defer testModelQuerier.Close() - bundles, err := testModelQuerier.ListBundles(context.TODO()) - require.NoError(t, err) - require.NotNil(t, bundles) - require.Len(t, bundles, 12) - for _, b := range bundles { - require.Zero(t, b.CsvJson) - require.Zero(t, b.Object) + for _, testQuerier := range genTestQueriers(t, validFS) { + defer testQuerier.Close() + bundles, err := testQuerier.ListBundles(context.TODO()) + require.NoError(t, err) + require.NotNil(t, bundles) + require.Len(t, bundles, 12) + for _, b := range bundles { + require.Zero(t, b.CsvJson) + require.Zero(t, b.Object) + } } } func TestQuerier_ListPackages(t *testing.T) { - testModelQuerier := genTestModelQuerier(t) - defer testModelQuerier.Close() - packages, err := testModelQuerier.ListPackages(context.TODO()) - require.NoError(t, err) - require.NotNil(t, packages) - require.Equal(t, 2, len(packages)) + for _, testQuerier := range genTestQueriers(t, validFS) { + defer testQuerier.Close() + packages, err := testQuerier.ListPackages(context.TODO()) + require.NoError(t, err) + require.NotNil(t, packages) + require.Equal(t, 2, len(packages)) + } } func TestQuerier_BadBundleRaisesError(t *testing.T) { t.Helper() - // Convert a good FS into a model - cfg, err := declcfg.LoadFS(validFS) - require.NoError(t, err) + t.Run("InvalidModel", func(t *testing.T) { + // Convert a good FS into a model (we need the model to validate + // in the declcfg.ConvertToModel step) + cfg, err := declcfg.LoadFS(validFS) + require.NoError(t, err) - m, err := declcfg.ConvertToModel(*cfg) - require.NoError(t, err) + m, err := declcfg.ConvertToModel(*cfg) + require.NoError(t, err) - // break the model by adding another package property - bundle := func() *model.Bundle { - for _, pkg := range m { - for _, ch := range pkg.Channels { - for _, bundle := range ch.Bundles { - return bundle + // break the model by adding another package property + bundle := func() *model.Bundle { + for _, pkg := range m { + for _, ch := range pkg.Channels { + for _, bundle := range ch.Bundles { + return bundle + } } } - } - return nil - }() + return nil + }() + + bundle.Properties = append(bundle.Properties, property.Property{ + Type: PackageType, + Value: []byte("{\"packageName\": \"another-package\", \"version\": \"1.0.0\"}"), + }) - bundle.Properties = append(bundle.Properties, property.Property{ - Type: PackageType, - Value: []byte("{\"packageName\": \"another-package\", \"version\": \"1.0.0\"}"), + _, err = NewQuerier(m) + require.EqualError(t, err, `parse properties: expected exactly 1 property of type "olm.package", found 2`) }) - _, err = NewQuerier(m) - require.EqualError(t, err, "parse properties: expected exactly 1 property of type \"olm.package\", found 2") + t.Run("InvalidFS", func(t *testing.T) { + _, err := NewQuerierFromFS(badBundleFS, t.TempDir()) + require.EqualError(t, err, `package "cockroachdb" bundle "cockroachdb.v5.0.3" must have exactly 1 "olm.package" property, found 2`) + }) } -func genTestModelQuerier(t *testing.T) *Querier { +func genTestQueriers(t *testing.T, fbcFS fs.FS) []*Querier { t.Helper() - cfg, err := declcfg.LoadFS(validFS) + cfg, err := declcfg.LoadFS(fbcFS) require.NoError(t, err) m, err := declcfg.ConvertToModel(*cfg) require.NoError(t, err) - reg, err := NewQuerier(m) + fromModel, err := NewQuerier(m) + require.NoError(t, err) + + fromFS, err := NewQuerierFromFS(fbcFS, t.TempDir()) require.NoError(t, err) - return reg + return []*Querier{fromModel, fromFS} } var validFS = fstest.MapFS{ @@ -599,3 +621,45 @@ var validFS = fstest.MapFS{ }`), }, } + +var badBundleFS = fstest.MapFS{ + "cockroachdb.json": &fstest.MapFile{ + Data: []byte(`{ + "schema": "olm.package", + "name": "cockroachdb", + "defaultChannel": "stable-5.x", + "icon": { + "base64data": "PHN2ZyB4bWxucz0iaHR0cDovL3d3dy53My5vcmcvMjAwMC9zdmciIHZpZXdCb3g9IjAgMCAzMS44MiAzMiIgd2lkdGg9IjI0ODYiIGhlaWdodD0iMjUwMCI+PHRpdGxlPkNMPC90aXRsZT48cGF0aCBkPSJNMTkuNDIgOS4xN2ExNS4zOSAxNS4zOSAwIDAgMS0zLjUxLjQgMTUuNDYgMTUuNDYgMCAwIDEtMy41MS0uNCAxNS42MyAxNS42MyAwIDAgMSAzLjUxLTMuOTEgMTUuNzEgMTUuNzEgMCAwIDEgMy41MSAzLjkxek0zMCAuNTdBMTcuMjIgMTcuMjIgMCAwIDAgMjUuNTkgMGExNy40IDE3LjQgMCAwIDAtOS42OCAyLjkzQTE3LjM4IDE3LjM4IDAgMCAwIDYuMjMgMGExNy4yMiAxNy4yMiAwIDAgMC00LjQ0LjU3QTE2LjIyIDE2LjIyIDAgMCAwIDAgMS4xM2EuMDcuMDcgMCAwIDAgMCAuMDkgMTcuMzIgMTcuMzIgMCAwIDAgLjgzIDEuNTcuMDcuMDcgMCAwIDAgLjA4IDAgMTYuMzkgMTYuMzkgMCAwIDEgMS44MS0uNTQgMTUuNjUgMTUuNjUgMCAwIDEgMTEuNTkgMS44OCAxNy41MiAxNy41MiAwIDAgMC0zLjc4IDQuNDhjLS4yLjMyLS4zNy42NS0uNTUgMXMtLjIyLjQ1LS4zMy42OS0uMzEuNzItLjQ0IDEuMDhhMTcuNDYgMTcuNDYgMCAwIDAgNC4yOSAxOC43Yy4yNi4yNS41My40OS44MS43M3MuNDQuMzcuNjcuNTQuNTkuNDQuODkuNjRhLjA3LjA3IDAgMCAwIC4wOCAwYy4zLS4yMS42LS40Mi44OS0uNjRzLjQ1LS4zNS42Ny0uNTQuNTUtLjQ4LjgxLS43M2ExNy40NSAxNy40NSAwIDAgMCA1LjM4LTEyLjYxIDE3LjM5IDE3LjM5IDAgMCAwLTEuMDktNi4wOWMtLjE0LS4zNy0uMjktLjczLS40NS0xLjA5cy0uMjItLjQ3LS4zMy0uNjktLjM1LS42Ni0uNTUtMWExNy42MSAxNy42MSAwIDAgMC0zLjc4LTQuNDggMTUuNjUgMTUuNjUgMCAwIDEgMTEuNi0xLjg0IDE2LjEzIDE2LjEzIDAgMCAxIDEuODEuNTQuMDcuMDcgMCAwIDAgLjA4IDBxLjQ0LS43Ni44Mi0xLjU2YS4wNy4wNyAwIDAgMCAwLS4wOUExNi44OSAxNi44OSAwIDAgMCAzMCAuNTd6IiBmaWxsPSIjMTUxZjM0Ii8+PHBhdGggZD0iTTIxLjgyIDE3LjQ3YTE1LjUxIDE1LjUxIDAgMCAxLTQuMjUgMTAuNjkgMTUuNjYgMTUuNjYgMCAwIDEtLjcyLTQuNjggMTUuNSAxNS41IDAgMCAxIDQuMjUtMTAuNjkgMTUuNjIgMTUuNjIgMCAwIDEgLjcyIDQuNjgiIGZpbGw9IiMzNDg1NDAiLz48cGF0aCBkPSJNMTUgMjMuNDhhMTUuNTUgMTUuNTUgMCAwIDEtLjcyIDQuNjggMTUuNTQgMTUuNTQgMCAwIDEtMy41My0xNS4zN0ExNS41IDE1LjUgMCAwIDEgMTUgMjMuNDgiIGZpbGw9IiM3ZGJjNDIiLz48L3N2Zz4=", + "mediatype": "image/svg+xml" + } +} +{ + "schema": "olm.channel", + "package": "cockroachdb", + "name": "stable-5.x", + "entries": [ + {"name": "cockroachdb.v5.0.3"} + ] +} +{ + "schema": "olm.bundle", + "name": "cockroachdb.v5.0.3", + "package": "cockroachdb", + "image": "quay.io/openshift-community-operators/cockroachdb:v5.0.3", + "properties": [ + { + "type": "olm.package", + "value": { + "packageName": "cockroachdb", + "version": "5.0.3" + } + }, + { + "type": "olm.package", + "value": { + "packageName": "other-package", + "version": "5.0.3" + } + } + ] +}`)}} diff --git a/staging/operator-registry/pkg/registry/syscall_unix.go b/staging/operator-registry/pkg/registry/syscall_unix.go new file mode 100644 index 0000000000..b1edcf59fd --- /dev/null +++ b/staging/operator-registry/pkg/registry/syscall_unix.go @@ -0,0 +1,8 @@ +//go:build !windows +// +build !windows + +package registry + +import "golang.org/x/sys/unix" + +var umask = unix.Umask diff --git a/staging/operator-registry/pkg/registry/syscall_windows.go b/staging/operator-registry/pkg/registry/syscall_windows.go new file mode 100644 index 0000000000..525c656f1c --- /dev/null +++ b/staging/operator-registry/pkg/registry/syscall_windows.go @@ -0,0 +1,6 @@ +//go:build windows +// +build windows + +package registry + +var umask = func(i int) int { return 0 } diff --git a/staging/operator-registry/pkg/registry/tar.go b/staging/operator-registry/pkg/registry/tar.go new file mode 100644 index 0000000000..f62a15da85 --- /dev/null +++ b/staging/operator-registry/pkg/registry/tar.go @@ -0,0 +1,66 @@ +package registry + +import ( + "archive/tar" + "fmt" + "io" + "io/fs" + "os" + "time" +) + +// fsToTar writes the filesystem represented by fsys to w as a tar archive. +// This function unsets user and group information in the tar archive so that readers +// of archives produced by this function do not need to account for differences in +// permissions between source and destination filesystems. +func fsToTar(w io.Writer, fsys fs.FS) error { + tw := tar.NewWriter(w) + if err := fs.WalkDir(fsys, ".", func(path string, d fs.DirEntry, err error) error { + if err != nil { + return err + } + + if d.Type()&os.ModeSymlink != 0 { + return nil + } + info, err := d.Info() + if err != nil { + return fmt.Errorf("get file info for %q: %v", path, err) + } + + h, err := tar.FileInfoHeader(info, "") + if err != nil { + return fmt.Errorf("build tar file info header for %q: %v", path, err) + } + h.Uid = 0 + h.Gid = 0 + h.Uname = "" + h.Gname = "" + h.AccessTime = time.Time{} + h.ChangeTime = time.Time{} + h.ModTime = time.Time{} + h.Name = path + + if err := tw.WriteHeader(h); err != nil { + return fmt.Errorf("write tar header for %q: %v", path, err) + } + if d.IsDir() { + return nil + } + f, err := fsys.Open(path) + if err != nil { + return fmt.Errorf("open file %q: %v", path, err) + } + defer f.Close() + if _, err := io.Copy(tw, f); err != nil { + return fmt.Errorf("write tar data for %q: %v", path, err) + } + return nil + }); err != nil { + return fmt.Errorf("write tar: %w", err) + } + if err := tw.Close(); err != nil { + return err + } + return nil +} diff --git a/vendor/github.com/operator-framework/operator-registry/alpha/action/generate_dockerfile.go b/vendor/github.com/operator-framework/operator-registry/alpha/action/generate_dockerfile.go index ef2a6d8713..4d2a3ddfc7 100644 --- a/vendor/github.com/operator-framework/operator-registry/alpha/action/generate_dockerfile.go +++ b/vendor/github.com/operator-framework/operator-registry/alpha/action/generate_dockerfile.go @@ -45,10 +45,11 @@ FROM {{.BaseImage}} # Configure the entrypoint and command ENTRYPOINT ["/bin/opm"] -CMD ["serve", "/configs"] +CMD ["serve", "/configs", "--cache-dir=/tmp/cache"] -# Copy declarative config root into image at /configs +# Copy declarative config root into image at /configs and pre-populate serve cache ADD {{.IndexDir}} /configs +RUN ["/bin/opm", "serve", "/configs", "--cache-dir=/tmp/cache", "--cache-only"] # Set DC-specific label for the location of the DC root directory # in the image diff --git a/vendor/github.com/operator-framework/operator-registry/cmd/opm/registry/serve.go b/vendor/github.com/operator-framework/operator-registry/cmd/opm/registry/serve.go index b6defb75ee..78aba5e52b 100644 --- a/vendor/github.com/operator-framework/operator-registry/cmd/opm/registry/serve.go +++ b/vendor/github.com/operator-framework/operator-registry/cmd/opm/registry/serve.go @@ -115,7 +115,7 @@ func serveFunc(cmd *cobra.Command, _ []string) error { lis, err := net.Listen("tcp", ":"+port) if err != nil { - logger.Fatalf("failed to listen: %s", err) + return fmt.Errorf("failed to listen: %s", err) } timeout, err := cmd.Flags().GetString("timeout-seconds") diff --git a/vendor/github.com/operator-framework/operator-registry/cmd/opm/serve/serve.go b/vendor/github.com/operator-framework/operator-registry/cmd/opm/serve/serve.go index 6853c245b9..633ad9ef3f 100644 --- a/vendor/github.com/operator-framework/operator-registry/cmd/opm/serve/serve.go +++ b/vendor/github.com/operator-framework/operator-registry/cmd/opm/serve/serve.go @@ -6,19 +6,17 @@ import ( "errors" "fmt" "net" - "os" - "sync" - "net/http" endpoint "net/http/pprof" + "os" "runtime/pprof" + "sync" "github.com/sirupsen/logrus" "github.com/spf13/cobra" "google.golang.org/grpc" "google.golang.org/grpc/reflection" - "github.com/operator-framework/operator-registry/alpha/declcfg" "github.com/operator-framework/operator-registry/pkg/api" health "github.com/operator-framework/operator-registry/pkg/api/grpc_health_v1" "github.com/operator-framework/operator-registry/pkg/lib/dns" @@ -30,6 +28,8 @@ import ( type serve struct { configDir string + cacheDir string + cacheOnly bool port string terminationLog string @@ -75,12 +75,16 @@ will not be reflected in the served content. cmd.Flags().StringVarP(&s.terminationLog, "termination-log", "t", "/dev/termination-log", "path to a container termination log file") cmd.Flags().StringVarP(&s.port, "port", "p", "50051", "port number to serve on") cmd.Flags().StringVar(&s.pprofAddr, "pprof-addr", "", "address of startup profiling endpoint (addr:port format)") + cmd.Flags().StringVar(&s.cacheDir, "cache-dir", "", "if set, sync and persist server cache directory") + cmd.Flags().BoolVar(&s.cacheOnly, "cache-only", false, "sync the serve cache and exit without serving") return cmd } func (s *serve) run(ctx context.Context) error { p := newProfilerInterface(s.pprofAddr, s.logger) - p.startEndpoint() + if err := p.startEndpoint(); err != nil { + return fmt.Errorf("could not start pprof endpoint: %v", err) + } if err := p.startCpuProfileCache(); err != nil { return fmt.Errorf("could not start CPU profile: %v", err) } @@ -98,24 +102,18 @@ func (s *serve) run(ctx context.Context) error { s.logger = s.logger.WithFields(logrus.Fields{"configs": s.configDir, "port": s.port}) - cfg, err := declcfg.LoadFS(os.DirFS(s.configDir)) - if err != nil { - return fmt.Errorf("load declarative config directory: %v", err) - } - - m, err := declcfg.ConvertToModel(*cfg) - if err != nil { - return fmt.Errorf("could not build index model from declarative config: %v", err) - } - store, err := registry.NewQuerier(m) + store, err := registry.NewQuerierFromFS(os.DirFS(s.configDir), s.cacheDir) defer store.Close() if err != nil { return err } + if s.cacheOnly { + return nil + } lis, err := net.Listen("tcp", ":"+s.port) if err != nil { - s.logger.Fatalf("failed to listen: %s", err) + return fmt.Errorf("failed to listen: %s", err) } grpcServer := grpc.NewServer() @@ -129,7 +127,9 @@ func (s *serve) run(ctx context.Context) error { return grpcServer.Serve(lis) }, func() { grpcServer.GracefulStop() - p.stopEndpoint(p.logger.Context) + if err := p.stopEndpoint(ctx); err != nil { + s.logger.Warnf("error shutting down pprof server: %v", err) + } }) } @@ -147,7 +147,8 @@ type profilerInterface struct { cacheReady bool cacheLock sync.RWMutex - logger *logrus.Entry + logger *logrus.Entry + closeErr chan error } func newProfilerInterface(a string, log *logrus.Entry) *profilerInterface { @@ -162,10 +163,10 @@ func (p *profilerInterface) isEnabled() bool { return p.addr != "" } -func (p *profilerInterface) startEndpoint() { +func (p *profilerInterface) startEndpoint() error { // short-circuit if not enabled if !p.isEnabled() { - return + return nil } mux := http.NewServeMux() @@ -181,14 +182,22 @@ func (p *profilerInterface) startEndpoint() { Handler: mux, } - // goroutine exits with main - go func() { + lis, err := net.Listen("tcp", p.addr) + if err != nil { + return err + } - p.logger.Info("starting pprof endpoint") - if err := p.server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { - p.logger.Fatal(err) - } + p.closeErr = make(chan error) + go func() { + p.closeErr <- func() error { + p.logger.Info("starting pprof endpoint") + if err := p.server.Serve(lis); err != nil && !errors.Is(err, http.ErrServerClosed) { + return err + } + return nil + }() }() + return nil } func (p *profilerInterface) startCpuProfileCache() error { @@ -222,10 +231,14 @@ func (p *profilerInterface) httpHandler(w http.ResponseWriter, r *http.Request) w.Write(p.cache.Bytes()) } -func (p *profilerInterface) stopEndpoint(ctx context.Context) { +func (p *profilerInterface) stopEndpoint(ctx context.Context) error { + if !p.isEnabled() { + return nil + } if err := p.server.Shutdown(ctx); err != nil { - p.logger.Fatal(err) + return err } + return <-p.closeErr } func (p *profilerInterface) isCacheReady() bool { diff --git a/vendor/github.com/operator-framework/operator-registry/pkg/registry/query.go b/vendor/github.com/operator-framework/operator-registry/pkg/registry/query.go index 4ccf7eba4f..4c4212217f 100644 --- a/vendor/github.com/operator-framework/operator-registry/pkg/registry/query.go +++ b/vendor/github.com/operator-framework/operator-registry/pkg/registry/query.go @@ -3,24 +3,31 @@ package registry import ( "context" "encoding/json" + "errors" "fmt" + "hash/fnv" + "io/fs" "os" "path/filepath" "sort" + "strings" + "github.com/operator-framework/operator-registry/alpha/declcfg" "github.com/operator-framework/operator-registry/alpha/model" "github.com/operator-framework/operator-registry/pkg/api" ) -type Querier struct { - pkgs model.Model +const ( + cachePermissionDir = 0750 + cachePermissionFile = 0640 +) - tmpDir string - apiBundles map[apiBundleKey]string +type Querier struct { + *cache } func (q Querier) Close() error { - return os.RemoveAll(q.tmpDir) + return q.cache.close() } type apiBundleKey struct { @@ -39,43 +46,26 @@ func (s *SliceBundleSender) Send(b *api.Bundle) error { var _ GRPCQuery = &Querier{} -func NewQuerier(packages model.Model) (*Querier, error) { +func NewQuerierFromFS(fbcFS fs.FS, cacheDir string) (*Querier, error) { q := &Querier{} - - tmpDir, err := os.MkdirTemp("", "opm-registry-querier-") + var err error + q.cache, err = newCache(cacheDir, &fbcCacheModel{ + FBC: fbcFS, + Cache: os.DirFS(cacheDir), + }) if err != nil { - return nil, err + return q, err } - q.tmpDir = tmpDir + return q, nil +} - q.apiBundles = map[apiBundleKey]string{} - for _, pkg := range packages { - for _, ch := range pkg.Channels { - for _, b := range ch.Bundles { - apiBundle, err := api.ConvertModelBundleToAPIBundle(*b) - if err != nil { - return q, err - } - jsonBundle, err := json.Marshal(apiBundle) - if err != nil { - return q, err - } - filename := filepath.Join(tmpDir, fmt.Sprintf("%s_%s_%s.json", pkg.Name, ch.Name, b.Name)) - if err := os.WriteFile(filename, jsonBundle, 0666); err != nil { - return q, err - } - q.apiBundles[apiBundleKey{pkg.Name, ch.Name, b.Name}] = filename - packages[pkg.Name].Channels[ch.Name].Bundles[b.Name] = &model.Bundle{ - Package: pkg, - Channel: ch, - Name: b.Name, - Replaces: b.Replaces, - Skips: b.Skips, - } - } - } +func NewQuerier(m model.Model) (*Querier, error) { + q := &Querier{} + var err error + q.cache, err = newCache("", &nonDigestableModel{Model: m}) + if err != nil { + return q, err } - q.pkgs = packages return q, nil } @@ -147,19 +137,15 @@ func (q Querier) GetPackage(_ context.Context, name string) (*PackageManifest, e var channels []PackageChannel for _, ch := range pkg.Channels { - head, err := ch.Head() - if err != nil { - return nil, fmt.Errorf("package %q, channel %q has invalid head: %v", name, ch.Name, err) - } channels = append(channels, PackageChannel{ Name: ch.Name, - CurrentCSVName: head.Name, + CurrentCSVName: ch.Head, }) } return &PackageManifest{ PackageName: pkg.Name, Channels: channels, - DefaultChannelName: pkg.DefaultChannel.Name, + DefaultChannelName: pkg.DefaultChannel, }, nil } @@ -196,13 +182,9 @@ func (q Querier) GetBundleForChannel(_ context.Context, pkgName string, channelN if !ok { return nil, fmt.Errorf("package %q, channel %q not found", pkgName, channelName) } - head, err := ch.Head() - if err != nil { - return nil, fmt.Errorf("package %q, channel %q has invalid head: %v", pkgName, channelName, err) - } - apiBundle, err := q.loadAPIBundle(apiBundleKey{pkg.Name, ch.Name, head.Name}) + apiBundle, err := q.loadAPIBundle(apiBundleKey{pkg.Name, ch.Name, ch.Head}) if err != nil { - return nil, fmt.Errorf("convert bundle %q: %v", head.Name, err) + return nil, fmt.Errorf("convert bundle %q: %v", ch.Head, err) } // unset Replaces and Skips (sqlite query does not populate these fields) @@ -217,7 +199,7 @@ func (q Querier) GetChannelEntriesThatReplace(_ context.Context, name string) ([ for _, pkg := range q.pkgs { for _, ch := range pkg.Channels { for _, b := range ch.Bundles { - entries = append(entries, channelEntriesThatReplace(*b, name)...) + entries = append(entries, channelEntriesThatReplace(b, name)...) } } } @@ -242,7 +224,7 @@ func (q Querier) GetBundleThatReplaces(_ context.Context, name, pkgName, channel // is ALSO non-deterministic because it doesn't use ORDER BY, so its probably okay for this // implementation to be non-deterministic as well. for _, b := range ch.Bundles { - if bundleReplaces(*b, name) { + if bundleReplaces(b, name) { apiBundle, err := q.loadAPIBundle(apiBundleKey{pkg.Name, ch.Name, b.Name}) if err != nil { return nil, fmt.Errorf("convert bundle %q: %v", b.Name, err) @@ -263,7 +245,7 @@ func (q Querier) GetChannelEntriesThatProvide(_ context.Context, group, version, for _, pkg := range q.pkgs { for _, ch := range pkg.Channels { for _, b := range ch.Bundles { - provides, err := q.doesModelBundleProvide(*b, group, version, kind) + provides, err := q.doesModelBundleProvide(b, group, version, kind) if err != nil { return nil, err } @@ -274,7 +256,7 @@ func (q Querier) GetChannelEntriesThatProvide(_ context.Context, group, version, // the sqlite server and returns seemingly invalid channel entries. // Don't worry about this. Not used anymore. - entries = append(entries, channelEntriesForBundle(*b, true)...) + entries = append(entries, q.channelEntriesForBundle(b, true)...) } } } @@ -297,17 +279,13 @@ func (q Querier) GetLatestChannelEntriesThatProvide(_ context.Context, group, ve for _, pkg := range q.pkgs { for _, ch := range pkg.Channels { - b, err := ch.Head() - if err != nil { - return nil, fmt.Errorf("package %q, channel %q has invalid head: %v", pkg.Name, ch.Name, err) - } - - provides, err := q.doesModelBundleProvide(*b, group, version, kind) + b := ch.Bundles[ch.Head] + provides, err := q.doesModelBundleProvide(b, group, version, kind) if err != nil { return nil, err } if provides { - entries = append(entries, channelEntriesForBundle(*b, false)...) + entries = append(entries, q.channelEntriesForBundle(b, false)...) } } } @@ -337,15 +315,15 @@ func (q Querier) GetBundleThatProvides(ctx context.Context, group, version, kind // collected based on iterating over the packages in q.pkgs. continue } - if entry.ChannelName == pkg.DefaultChannel.Name { + if entry.ChannelName == pkg.DefaultChannel { return q.GetBundle(ctx, entry.PackageName, entry.ChannelName, entry.BundleName) } } return nil, fmt.Errorf("no entry found that provides group:%q version:%q kind:%q", group, version, kind) } -func (q Querier) doesModelBundleProvide(b model.Bundle, group, version, kind string) (bool, error) { - apiBundle, err := q.loadAPIBundle(apiBundleKey{b.Package.Name, b.Channel.Name, b.Name}) +func (q Querier) doesModelBundleProvide(b cBundle, group, version, kind string) (bool, error) { + apiBundle, err := q.loadAPIBundle(apiBundleKey{b.Package, b.Channel, b.Name}) if err != nil { return false, fmt.Errorf("convert bundle %q: %v", b.Name, err) } @@ -357,7 +335,7 @@ func (q Querier) doesModelBundleProvide(b model.Bundle, group, version, kind str return false, nil } -func bundleReplaces(b model.Bundle, name string) bool { +func bundleReplaces(b cBundle, name string) bool { if b.Replaces == name { return true } @@ -369,12 +347,12 @@ func bundleReplaces(b model.Bundle, name string) bool { return false } -func channelEntriesThatReplace(b model.Bundle, name string) []*ChannelEntry { +func channelEntriesThatReplace(b cBundle, name string) []*ChannelEntry { var entries []*ChannelEntry if b.Replaces == name { entries = append(entries, &ChannelEntry{ - PackageName: b.Package.Name, - ChannelName: b.Channel.Name, + PackageName: b.Package, + ChannelName: b.Channel, BundleName: b.Name, Replaces: b.Replaces, }) @@ -382,8 +360,8 @@ func channelEntriesThatReplace(b model.Bundle, name string) []*ChannelEntry { for _, s := range b.Skips { if s == name && s != b.Replaces { entries = append(entries, &ChannelEntry{ - PackageName: b.Package.Name, - ChannelName: b.Channel.Name, + PackageName: b.Package, + ChannelName: b.Channel, BundleName: b.Name, Replaces: b.Replaces, }) @@ -392,20 +370,20 @@ func channelEntriesThatReplace(b model.Bundle, name string) []*ChannelEntry { return entries } -func channelEntriesForBundle(b model.Bundle, ignoreChannel bool) []*ChannelEntry { +func (q Querier) channelEntriesForBundle(b cBundle, ignoreChannel bool) []*ChannelEntry { entries := []*ChannelEntry{{ - PackageName: b.Package.Name, - ChannelName: b.Channel.Name, + PackageName: b.Package, + ChannelName: b.Channel, BundleName: b.Name, Replaces: b.Replaces, }} for _, s := range b.Skips { // Ignore skips that duplicate b.Replaces. Also, only add it if its // in the same channel as b (or we're ignoring channel presence). - if _, inChannel := b.Channel.Bundles[s]; s != b.Replaces && (ignoreChannel || inChannel) { + if _, inChannel := q.pkgs[b.Package].Channels[b.Channel].Bundles[s]; s != b.Replaces && (ignoreChannel || inChannel) { entries = append(entries, &ChannelEntry{ - PackageName: b.Package.Name, - ChannelName: b.Channel.Name, + PackageName: b.Package, + ChannelName: b.Channel, BundleName: b.Name, Replaces: s, }) @@ -413,3 +391,271 @@ func channelEntriesForBundle(b model.Bundle, ignoreChannel bool) []*ChannelEntry } return entries } + +type cache struct { + digest string + baseDir string + persist bool + pkgs map[string]cPkg + apiBundles map[apiBundleKey]string +} + +func newCache(baseDir string, model digestableModel) (*cache, error) { + var ( + qc *cache + err error + ) + if baseDir == "" { + qc, err = newEphemeralCache() + } else { + qc, err = newPersistentCache(baseDir) + } + if err != nil { + return nil, err + } + return qc, qc.load(model) +} + +func (qc cache) close() error { + if qc.persist { + return nil + } + return os.RemoveAll(qc.baseDir) +} + +func newEphemeralCache() (*cache, error) { + baseDir, err := os.MkdirTemp("", "opm-serve-cache-") + if err != nil { + return nil, err + } + if err := os.MkdirAll(filepath.Join(baseDir, "cache"), cachePermissionDir); err != nil { + return nil, err + } + return &cache{ + digest: "", + baseDir: baseDir, + persist: false, + }, nil +} + +func newPersistentCache(baseDir string) (*cache, error) { + if err := os.MkdirAll(baseDir, cachePermissionDir); err != nil { + return nil, err + } + qc := &cache{baseDir: baseDir, persist: true} + if digest, err := os.ReadFile(filepath.Join(baseDir, "digest")); err == nil { + qc.digest = strings.TrimSpace(string(digest)) + } + return qc, nil +} + +func (qc *cache) load(model digestableModel) error { + computedDigest, err := model.GetDigest() + if err != nil && !errors.Is(err, errNonDigestable) { + return fmt.Errorf("compute digest: %v", err) + } + if err == nil && computedDigest == qc.digest { + err = qc.loadFromCache() + if err == nil { + return nil + } + // if there _was_ an error loading from the cache, + // we'll drop down and repopulate from scratch. + } + return qc.repopulateCache(model) +} + +func (qc *cache) loadFromCache() error { + packagesData, err := os.ReadFile(filepath.Join(qc.baseDir, "cache", "packages.json")) + if err != nil { + return err + } + if err := json.Unmarshal(packagesData, &qc.pkgs); err != nil { + return err + } + qc.apiBundles = map[apiBundleKey]string{} + for _, p := range qc.pkgs { + for _, ch := range p.Channels { + for _, b := range ch.Bundles { + filename := filepath.Join(qc.baseDir, "cache", fmt.Sprintf("%s_%s_%s.json", p.Name, ch.Name, b.Name)) + qc.apiBundles[apiBundleKey{pkgName: p.Name, chName: ch.Name, name: b.Name}] = filename + } + } + } + return nil +} + +func (qc *cache) repopulateCache(model digestableModel) error { + // ensure that generated cache is available to all future users + oldUmask := umask(000) + defer umask(oldUmask) + + m, err := model.GetModel() + if err != nil { + return err + } + cacheDirEntries, err := os.ReadDir(qc.baseDir) + if err != nil { + return err + } + for _, e := range cacheDirEntries { + if err := os.RemoveAll(filepath.Join(qc.baseDir, e.Name())); err != nil { + return err + } + } + if err := os.MkdirAll(filepath.Join(qc.baseDir, "cache"), cachePermissionDir); err != nil { + return err + } + + qc.pkgs, err = packagesFromModel(m) + if err != nil { + return err + } + + packageJson, err := json.Marshal(qc.pkgs) + if err != nil { + return err + } + if err := os.WriteFile(filepath.Join(qc.baseDir, "cache", "packages.json"), packageJson, cachePermissionFile); err != nil { + return err + } + + qc.apiBundles = map[apiBundleKey]string{} + for _, p := range m { + for _, ch := range p.Channels { + for _, b := range ch.Bundles { + apiBundle, err := api.ConvertModelBundleToAPIBundle(*b) + if err != nil { + return err + } + jsonBundle, err := json.Marshal(apiBundle) + if err != nil { + return err + } + filename := filepath.Join(qc.baseDir, "cache", fmt.Sprintf("%s_%s_%s.json", p.Name, ch.Name, b.Name)) + if err := os.WriteFile(filename, jsonBundle, cachePermissionFile); err != nil { + return err + } + qc.apiBundles[apiBundleKey{p.Name, ch.Name, b.Name}] = filename + } + } + } + computedHash, err := model.GetDigest() + if err == nil { + if err := os.WriteFile(filepath.Join(qc.baseDir, "digest"), []byte(computedHash), cachePermissionFile); err != nil { + return err + } + } else if !errors.Is(err, errNonDigestable) { + return fmt.Errorf("compute digest: %v", err) + } + return nil +} + +func packagesFromModel(m model.Model) (map[string]cPkg, error) { + pkgs := map[string]cPkg{} + for _, p := range m { + newP := cPkg{ + Name: p.Name, + Description: p.Description, + DefaultChannel: p.DefaultChannel.Name, + Channels: map[string]cChannel{}, + } + if p.Icon != nil { + newP.Icon = &declcfg.Icon{ + Data: p.Icon.Data, + MediaType: p.Icon.MediaType, + } + } + for _, ch := range p.Channels { + head, err := ch.Head() + if err != nil { + return nil, err + } + newCh := cChannel{ + Name: ch.Name, + Head: head.Name, + Bundles: map[string]cBundle{}, + } + for _, b := range ch.Bundles { + newB := cBundle{ + Package: b.Package.Name, + Channel: b.Channel.Name, + Name: b.Name, + Replaces: b.Replaces, + Skips: b.Skips, + } + newCh.Bundles[b.Name] = newB + } + newP.Channels[ch.Name] = newCh + } + pkgs[p.Name] = newP + } + return pkgs, nil +} + +type cPkg struct { + Name string `json:"name"` + Description string `json:"description"` + Icon *declcfg.Icon `json:"icon"` + DefaultChannel string `json:"defaultChannel"` + Channels map[string]cChannel +} + +type cChannel struct { + Name string + Head string + Bundles map[string]cBundle +} + +type cBundle struct { + Package string `json:"package"` + Channel string `json:"channel"` + Name string `json:"name"` + Replaces string `json:"replaces"` + Skips []string `json:"skips"` +} + +type digestableModel interface { + GetModel() (model.Model, error) + GetDigest() (string, error) +} + +type fbcCacheModel struct { + FBC fs.FS + Cache fs.FS +} + +func (m *fbcCacheModel) GetModel() (model.Model, error) { + fbc, err := declcfg.LoadFS(m.FBC) + if err != nil { + return nil, err + } + return declcfg.ConvertToModel(*fbc) +} + +func (m *fbcCacheModel) GetDigest() (string, error) { + computedHasher := fnv.New64a() + if err := fsToTar(computedHasher, m.FBC); err != nil { + return "", err + } + if cacheFS, err := fs.Sub(m.Cache, "cache"); err == nil { + if err := fsToTar(computedHasher, cacheFS); err != nil && !errors.Is(err, os.ErrNotExist) { + return "", fmt.Errorf("compute hash: %v", err) + } + } + return fmt.Sprintf("%x", computedHasher.Sum(nil)), nil +} + +var errNonDigestable = errors.New("cannot generate digest") + +type nonDigestableModel struct { + model.Model +} + +func (m *nonDigestableModel) GetModel() (model.Model, error) { + return m.Model, nil +} + +func (m *nonDigestableModel) GetDigest() (string, error) { + return "", errNonDigestable +} diff --git a/vendor/github.com/operator-framework/operator-registry/pkg/registry/syscall_unix.go b/vendor/github.com/operator-framework/operator-registry/pkg/registry/syscall_unix.go new file mode 100644 index 0000000000..b1edcf59fd --- /dev/null +++ b/vendor/github.com/operator-framework/operator-registry/pkg/registry/syscall_unix.go @@ -0,0 +1,8 @@ +//go:build !windows +// +build !windows + +package registry + +import "golang.org/x/sys/unix" + +var umask = unix.Umask diff --git a/vendor/github.com/operator-framework/operator-registry/pkg/registry/syscall_windows.go b/vendor/github.com/operator-framework/operator-registry/pkg/registry/syscall_windows.go new file mode 100644 index 0000000000..525c656f1c --- /dev/null +++ b/vendor/github.com/operator-framework/operator-registry/pkg/registry/syscall_windows.go @@ -0,0 +1,6 @@ +//go:build windows +// +build windows + +package registry + +var umask = func(i int) int { return 0 } diff --git a/vendor/github.com/operator-framework/operator-registry/pkg/registry/tar.go b/vendor/github.com/operator-framework/operator-registry/pkg/registry/tar.go new file mode 100644 index 0000000000..f62a15da85 --- /dev/null +++ b/vendor/github.com/operator-framework/operator-registry/pkg/registry/tar.go @@ -0,0 +1,66 @@ +package registry + +import ( + "archive/tar" + "fmt" + "io" + "io/fs" + "os" + "time" +) + +// fsToTar writes the filesystem represented by fsys to w as a tar archive. +// This function unsets user and group information in the tar archive so that readers +// of archives produced by this function do not need to account for differences in +// permissions between source and destination filesystems. +func fsToTar(w io.Writer, fsys fs.FS) error { + tw := tar.NewWriter(w) + if err := fs.WalkDir(fsys, ".", func(path string, d fs.DirEntry, err error) error { + if err != nil { + return err + } + + if d.Type()&os.ModeSymlink != 0 { + return nil + } + info, err := d.Info() + if err != nil { + return fmt.Errorf("get file info for %q: %v", path, err) + } + + h, err := tar.FileInfoHeader(info, "") + if err != nil { + return fmt.Errorf("build tar file info header for %q: %v", path, err) + } + h.Uid = 0 + h.Gid = 0 + h.Uname = "" + h.Gname = "" + h.AccessTime = time.Time{} + h.ChangeTime = time.Time{} + h.ModTime = time.Time{} + h.Name = path + + if err := tw.WriteHeader(h); err != nil { + return fmt.Errorf("write tar header for %q: %v", path, err) + } + if d.IsDir() { + return nil + } + f, err := fsys.Open(path) + if err != nil { + return fmt.Errorf("open file %q: %v", path, err) + } + defer f.Close() + if _, err := io.Copy(tw, f); err != nil { + return fmt.Errorf("write tar data for %q: %v", path, err) + } + return nil + }); err != nil { + return fmt.Errorf("write tar: %w", err) + } + if err := tw.Close(); err != nil { + return err + } + return nil +}