diff --git a/CHANGELOG.md b/CHANGELOG.md
index 76c933a9308..f4cbe3ae07c 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -25,7 +25,8 @@ We use *breaking* word for marking changes that are not backward compatible (rel
- S3 provider:
- Added `put_user_metadata` option to config.
-
+ - Added `insecure_skip_verify` option to config.
+
## [v0.2.1](https://github.com/improbable-eng/thanos/releases/tag/v0.2.1) - 2018.12.27
### Added
diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md
index 91adf93874c..d8118e696aa 100644
--- a/CONTRIBUTING.md
+++ b/CONTRIBUTING.md
@@ -6,6 +6,21 @@ method with the owners of this repository before making a change.
Please follow the [code of conduct](CODE_OF_CONDUCT.md) in all your interactions with the project.
+## Thanos Philosophy
+
+The philosophy of Thanos and our community is borrowing much from UNIX philosophy and the golang programming language.
+
+* Each sub command should do one thing and do it well
+ * eg. thanos query proxies incoming calls to known store API endpoints merging the result
+* Write components that work together
+ * e.g. blocks should be stored in native prometheus format
+* Make it easy to read, write, and, run components
+ * e.g. reduce complexity in system design and implementation
+
+## Adding New Features / Components
+
+Adding large new features and components to Thanos should be done by first creating a [proposal](docs/proposals) document outlining the design decisions of the change, motivations for the change, and any alternatives that might have been considered.
+
## Pull Request Process
1. Read [getting started docs](docs/getting_started.md) and prepare Thanos.
diff --git a/Gopkg.lock b/Gopkg.lock
index 94d9dfb1be6..1441443d693 100644
--- a/Gopkg.lock
+++ b/Gopkg.lock
@@ -34,7 +34,7 @@
version = "0.1.7"
[[projects]]
- digest = "1:5da4d3b3b9949b9043d2fd36c4ff9b208f72ad5260a3dcb6f94267a769ee1899"
+ digest = "0:"
name = "github.com/Azure/azure-storage-blob-go"
packages = ["azblob"]
pruneopts = ""
@@ -199,12 +199,12 @@
revision = "2e65f85255dbc3072edf28d6b5b8efc472979f5a"
[[projects]]
- branch = "master"
- digest = "1:9abc49f39e3e23e262594bb4fb70abf74c0c99e94f99153f43b143805e850719"
+ digest = "1:cea4aa2038169ee558bf507d5ea02c94ca85bcca28a4c7bb99fd59b31e43a686"
name = "github.com/google/go-querystring"
packages = ["query"]
pruneopts = ""
- revision = "53e6ce116135b80d037921a7fdd5138cf32d7a8a"
+ revision = "44c6ddd0a2342c386950e880b658017258da92fc"
+ version = "v1.0.0"
[[projects]]
digest = "1:e097a364f4e8d8d91b9b9eeafb992d3796a41fde3eb548c1a87eb9d9f60725cf"
@@ -335,12 +335,12 @@
version = "v0.3.0"
[[projects]]
- digest = "1:82b912465c1da0668582a7d1117339c278e786c2536b3c3623029a0c7141c2d0"
+ digest = "1:84c28d9899cc4e00c38042d345cea8819275a5a62403a58530cac67022894776"
name = "github.com/mattn/go-runewidth"
packages = ["."]
pruneopts = ""
- revision = "ce7b0b5c7b45a81508558cd1dba6bb1e4ddb51bb"
- version = "v0.0.3"
+ revision = "3ee7d812e62a0804a7d0a324e0249ca2db3476d3"
+ version = "v0.0.4"
[[projects]]
digest = "1:49a8b01a6cd6558d504b65608214ca40a78000e1b343ed0da5c6a9ccd83d6d30"
@@ -390,20 +390,12 @@
version = "v0.11.0"
[[projects]]
- digest = "1:912349f5cf927bf96dca709623631ace7db723f07c70c4d56cfc22d9a667ed16"
+ digest = "1:b09858acd58e0873236c7b96903e3ec4e238d5de644c08bd8e712fa2d3d51ad2"
name = "github.com/mozillazg/go-httpheader"
packages = ["."]
pruneopts = ""
- revision = "4e5d6424981844faafc4b0649036b2e0395bdf99"
- version = "v0.2.0"
-
-[[projects]]
- branch = "master"
- digest = "1:3adc46876d4d0e4d5bbcfcc44c2116b95d7a5c966e2ee92a219488547fd453f2"
- name = "github.com/nightlyone/lockfile"
- packages = ["."]
- pruneopts = ""
- revision = "0ad87eef1443f64d3d8c50da647e2b1552851124"
+ revision = "61f2392c3317b60616c9dcb10d0a4cfef131fe62"
+ version = "v0.2.1"
[[projects]]
digest = "1:94e9081cc450d2cdf4e6886fc2c06c07272f86477df2d74ee5931951fa3d2577"
@@ -508,11 +500,12 @@
revision = "05ee40e3a273f7245e8777337fc7b46e533a9a92"
[[projects]]
- digest = "1:b5ff9852eabe841003da4b0a4b742a2878c722dda6481003432344f633a814fc"
+ digest = "1:43ad0a170d6f826f8dd63244960384eb205e75423a444f7afbf145425f287227"
name = "github.com/prometheus/prometheus"
packages = [
"discovery/file",
"discovery/targetgroup",
+ "pkg/gate",
"pkg/labels",
"pkg/rulefmt",
"pkg/textparse",
@@ -528,11 +521,10 @@
"util/testutil",
]
pruneopts = ""
- revision = "71af5e29e815795e9dd14742ee7725682fa14b7b"
- version = "v2.3.2"
+ revision = "3bd41cc92c7800cc6072171bd4237406126fa169"
[[projects]]
- digest = "1:216dcf26fbfb3f36f286ca3306882a157c51648e4b5d4f3a9e9c719faea6ea58"
+ digest = "1:00780e2d7a870f4de3da0d854cc419170c81cf82e6f2e802a3543fcf54c1867d"
name = "github.com/prometheus/tsdb"
packages = [
".",
@@ -541,9 +533,11 @@
"fileutil",
"index",
"labels",
+ "wal",
]
pruneopts = ""
- revision = "bd832fc8274e8fe63999ac749daaaff9d881241f"
+ revision = "10ba228e6baa4811818e04b1ab9b48110bb43d7b"
+ version = "v0.4.0"
[[projects]]
branch = "master"
diff --git a/Gopkg.toml b/Gopkg.toml
index 98bbc9494dc..057fa27ac4a 100644
--- a/Gopkg.toml
+++ b/Gopkg.toml
@@ -37,7 +37,8 @@ ignored = ["github.com/improbable-eng/thanos/benchmark/*"]
name = "github.com/prometheus/common"
[[constraint]]
- version = "v2.3.2"
+ # TODO(bwplotka): Move to released version once our recent fixes will be released (v2.7.0)
+ revision = "3bd41cc92c7800cc6072171bd4237406126fa169"
name = "github.com/prometheus/prometheus"
[[override]]
@@ -46,7 +47,7 @@ ignored = ["github.com/improbable-eng/thanos/benchmark/*"]
[[constraint]]
name = "github.com/prometheus/tsdb"
- revision = "bd832fc8274e8fe63999ac749daaaff9d881241f"
+ version = "v0.4.0"
[[constraint]]
branch = "master"
diff --git a/Makefile b/Makefile
index 56ae87c6352..3c8b8341a6e 100644
--- a/Makefile
+++ b/Makefile
@@ -33,7 +33,8 @@ PROMU ?= $(BIN_DIR)/promu-$(PROMU_VERSION)
PROMU_VERSION ?= 264dc36af9ea3103255063497636bd5713e3e9c1
# E2e test deps.
-SUPPORTED_PROM_VERSIONS ?=v2.0.0 v2.2.1 v2.3.2 v2.4.3 v2.5.0
+# Referenced by github.com/improbable-eng/thanos/blob/master/docs/getting_started.md#prometheus
+SUPPORTED_PROM_VERSIONS ?=v2.2.1 v2.3.2 v2.4.3 v2.5.0
ALERTMANAGER_VERSION ?=v0.15.2
MINIO_SERVER_VERSION ?=RELEASE.2018-10-06T00-15-16Z
diff --git a/README.md b/README.md
index af0fb426746..871400b878e 100644
--- a/README.md
+++ b/README.md
@@ -2,17 +2,31 @@
[](https://circleci.com/gh/improbable-eng/thanos)
[](https://goreportcard.com/report/github.com/improbable-eng/thanos)
+[](https://godoc.org/github.com/improbable-eng/thanos)
[](https://join.slack.com/t/improbable-eng/shared_invite/enQtMzQ1ODcyMzQ5MjM4LWY5ZWZmNGM2ODc5MmViNmQ3ZTA3ZTY3NzQwOTBlMTkzZmIxZTIxODk0OWU3YjZhNWVlNDU3MDlkZGViZjhkMjc)
## Overview
-Thanos is a set of components that can be composed into a highly available
-metric system with unlimited storage capacity. It can be added seamlessly on
-top of existing Prometheus deployments and leverages the Prometheus 2.0
-storage format to cost-efficiently store historical metric data in any object
-storage while retaining fast query latencies. Additionally, it provides
-a global query view across all Prometheus installations and can merge
-data from Prometheus HA pairs on the fly.
+Thanos is a set of components that can be composed into a highly available metric
+system with unlimited storage capacity, which can be added seamlessly on top of existing
+Prometheus deployments.
+
+Thanos leverages the Prometheus 2.0 storage format to cost-efficiently store historical metric
+data in any object storage while retaining fast query latencies. Additionally, it provides
+a global query view across all Prometheus installations and can merge data from Prometheus
+HA pairs on the fly.
+
+Concretely the aims of the project are:
+
+1. Global query view of metrics.
+1. Unlimited retention of metrics.
+1. High availability of components, including Prometheus.
+
+## Architecture Overview
+
+
+
+## Getting Started
* **[Getting Started](docs/getting_started.md)**
* [Design](docs/design.md)
@@ -33,9 +47,20 @@ data from Prometheus HA pairs on the fly.
* Simple gRPC "Store API" for unified data access across all metric data
* Easy integration points for custom metric providers
+## Thanos Philosophy
+
+The philosophy of Thanos and our community is borrowing much from UNIX philosophy and the golang programming language.
+
+* Each sub command should do one thing and do it well
+ * eg. thanos query proxies incoming calls to known store API endpoints merging the result
+* Write components that work together
+ * e.g. blocks should be stored in native prometheus format
+* Make it easy to read, write, and, run components
+ * e.g. reduce complexity in system design and implementation
+
## Contributing
-Contributions are very welcome!
+Contributions are very welcome! See our [CONTRIBUTING.md](CONTRIBUTING.md) for more information.
## Community
diff --git a/cmd/thanos/bucket.go b/cmd/thanos/bucket.go
index 01b9b54bf68..ff68feee2a8 100644
--- a/cmd/thanos/bucket.go
+++ b/cmd/thanos/bucket.go
@@ -10,10 +10,9 @@ import (
"text/template"
"time"
- "github.com/prometheus/tsdb/labels"
-
"github.com/go-kit/kit/log"
"github.com/improbable-eng/thanos/pkg/block"
+ "github.com/improbable-eng/thanos/pkg/block/metadata"
"github.com/improbable-eng/thanos/pkg/objstore/client"
"github.com/improbable-eng/thanos/pkg/runutil"
"github.com/improbable-eng/thanos/pkg/verifier"
@@ -23,6 +22,7 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
+ "github.com/prometheus/tsdb/labels"
"golang.org/x/text/language"
"golang.org/x/text/message"
"gopkg.in/alecthomas/kingpin.v2"
@@ -254,7 +254,7 @@ func registerBucket(m map[string]setupFunc, app *kingpin.Application, name strin
defer cancel()
// Getting Metas.
- var blockMetas []*block.Meta
+ var blockMetas []*metadata.Meta
if err = bkt.Iter(ctx, "", func(name string) error {
id, ok := block.IsBlockDir(name)
if !ok {
@@ -277,7 +277,7 @@ func registerBucket(m map[string]setupFunc, app *kingpin.Application, name strin
}
}
-func printTable(blockMetas []*block.Meta, selectorLabels labels.Labels, sortBy []string) error {
+func printTable(blockMetas []*metadata.Meta, selectorLabels labels.Labels, sortBy []string) error {
header := inspectColumns
var lines [][]string
@@ -355,7 +355,7 @@ func getKeysAlphabetically(labels map[string]string) []string {
// matchesSelector checks if blockMeta contains every label from
// the selector with the correct value
-func matchesSelector(blockMeta *block.Meta, selectorLabels labels.Labels) bool {
+func matchesSelector(blockMeta *metadata.Meta, selectorLabels labels.Labels) bool {
for _, l := range selectorLabels {
if v, ok := blockMeta.Thanos.Labels[l.Name]; !ok || v != l.Value {
return false
diff --git a/cmd/thanos/downsample.go b/cmd/thanos/downsample.go
index c00aadd43db..5f3846ff50a 100644
--- a/cmd/thanos/downsample.go
+++ b/cmd/thanos/downsample.go
@@ -8,11 +8,10 @@ import (
"path/filepath"
"time"
- "github.com/prometheus/tsdb/chunkenc"
-
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/improbable-eng/thanos/pkg/block"
+ "github.com/improbable-eng/thanos/pkg/block/metadata"
"github.com/improbable-eng/thanos/pkg/compact/downsample"
"github.com/improbable-eng/thanos/pkg/objstore"
"github.com/improbable-eng/thanos/pkg/objstore/client"
@@ -23,6 +22,7 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/tsdb"
+ "github.com/prometheus/tsdb/chunkenc"
"gopkg.in/alecthomas/kingpin.v2"
)
@@ -105,7 +105,7 @@ func downsampleBucket(
if err := os.MkdirAll(dir, 0777); err != nil {
return errors.Wrap(err, "create dir")
}
- var metas []*block.Meta
+ var metas []*metadata.Meta
err := bkt.Iter(ctx, "", func(name string) error {
id, ok := block.IsBlockDir(name)
@@ -119,7 +119,7 @@ func downsampleBucket(
}
defer runutil.CloseWithLogOnErr(logger, rc, "block reader")
- var m block.Meta
+ var m metadata.Meta
if err := json.NewDecoder(rc).Decode(&m); err != nil {
return errors.Wrap(err, "decode meta")
}
@@ -201,7 +201,7 @@ func downsampleBucket(
return nil
}
-func processDownsampling(ctx context.Context, logger log.Logger, bkt objstore.Bucket, m *block.Meta, dir string, resolution int64) error {
+func processDownsampling(ctx context.Context, logger log.Logger, bkt objstore.Bucket, m *metadata.Meta, dir string, resolution int64) error {
begin := time.Now()
bdir := filepath.Join(dir, m.ULID.String())
@@ -224,7 +224,7 @@ func processDownsampling(ctx context.Context, logger log.Logger, bkt objstore.Bu
pool = downsample.NewPool()
}
- b, err := tsdb.OpenBlock(bdir, pool)
+ b, err := tsdb.OpenBlock(logger, bdir, pool)
if err != nil {
return errors.Wrapf(err, "open block %s", m.ULID)
}
diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go
index bf6cfa64fe8..9e6c5ac19ef 100644
--- a/cmd/thanos/query.go
+++ b/cmd/thanos/query.go
@@ -296,7 +296,16 @@ func runQuery(
return stores.Get(), nil
}, selectorLset)
queryableCreator = query.NewQueryableCreator(logger, proxy, replicaLabel)
- engine = promql.NewEngine(logger, reg, maxConcurrentQueries, queryTimeout)
+ engine = promql.NewEngine(
+ promql.EngineOpts{
+ Logger: logger,
+ Reg: reg,
+ MaxConcurrent: maxConcurrentQueries,
+ // TODO(bwplotka): Expose this as a flag: https://github.com/improbable-eng/thanos/issues/703
+ MaxSamples: math.MaxInt32,
+ Timeout: queryTimeout,
+ },
+ )
)
// Periodically update the store set with the addresses we see in our cluster.
{
diff --git a/cmd/thanos/rule.go b/cmd/thanos/rule.go
index 03fed102d2e..56ad52f438b 100644
--- a/cmd/thanos/rule.go
+++ b/cmd/thanos/rule.go
@@ -19,15 +19,14 @@ import (
"syscall"
"time"
- "github.com/improbable-eng/thanos/pkg/extprom"
-
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/improbable-eng/thanos/pkg/alert"
- "github.com/improbable-eng/thanos/pkg/block"
+ "github.com/improbable-eng/thanos/pkg/block/metadata"
"github.com/improbable-eng/thanos/pkg/cluster"
"github.com/improbable-eng/thanos/pkg/discovery/cache"
"github.com/improbable-eng/thanos/pkg/discovery/dns"
+ "github.com/improbable-eng/thanos/pkg/extprom"
"github.com/improbable-eng/thanos/pkg/objstore/client"
"github.com/improbable-eng/thanos/pkg/runutil"
"github.com/improbable-eng/thanos/pkg/shipper"
@@ -117,7 +116,6 @@ func registerRule(m map[string]setupFunc, app *kingpin.Application, name string)
MaxBlockDuration: *tsdbBlockDuration,
Retention: *tsdbRetention,
NoLockfile: true,
- WALFlushInterval: 30 * time.Second,
}
lookupQueries := map[string]struct{}{}
@@ -290,7 +288,7 @@ func runRule(
ctx, cancel := context.WithCancel(context.Background())
ctx = tracing.ContextWithTracer(ctx, tracer)
- notify := func(ctx context.Context, expr string, alerts ...*rules.Alert) error {
+ notify := func(ctx context.Context, expr string, alerts ...*rules.Alert) {
res := make([]*alert.Alert, 0, len(alerts))
for _, alrt := range alerts {
// Only send actually firing alerts.
@@ -309,8 +307,6 @@ func runRule(
res = append(res, a)
}
alertQ.Push(res)
-
- return nil
}
mgr = rules.NewManager(&rules.ManagerOptions{
Context: ctx,
@@ -579,7 +575,7 @@ func runRule(
}
}()
- s := shipper.New(logger, nil, dataDir, bkt, func() labels.Labels { return lset }, block.RulerSource)
+ s := shipper.New(logger, nil, dataDir, bkt, func() labels.Labels { return lset }, metadata.RulerSource)
ctx, cancel := context.WithCancel(context.Background())
diff --git a/cmd/thanos/sidecar.go b/cmd/thanos/sidecar.go
index 61f493f93a8..bd462b2f650 100644
--- a/cmd/thanos/sidecar.go
+++ b/cmd/thanos/sidecar.go
@@ -14,7 +14,7 @@ import (
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
- "github.com/improbable-eng/thanos/pkg/block"
+ "github.com/improbable-eng/thanos/pkg/block/metadata"
"github.com/improbable-eng/thanos/pkg/cluster"
"github.com/improbable-eng/thanos/pkg/objstore/client"
"github.com/improbable-eng/thanos/pkg/reloader"
@@ -102,7 +102,7 @@ func runSidecar(
reloader *reloader.Reloader,
component string,
) error {
- var metadata = &metadata{
+ var m = &promMetadata{
promURL: promURL,
// Start out with the full time range. The shipper will constrain it later.
@@ -128,7 +128,7 @@ func runSidecar(
// Blocking query of external labels before joining as a Source Peer into gossip.
// We retry infinitely until we reach and fetch labels from our Prometheus.
err := runutil.Retry(2*time.Second, ctx.Done(), func() error {
- if err := metadata.UpdateLabels(ctx, logger); err != nil {
+ if err := m.UpdateLabels(ctx, logger); err != nil {
level.Warn(logger).Log(
"msg", "failed to fetch initial external labels. Is Prometheus running? Retrying",
"err", err,
@@ -145,14 +145,14 @@ func runSidecar(
return errors.Wrap(err, "initial external labels query")
}
- if len(metadata.Labels()) == 0 {
+ if len(m.Labels()) == 0 {
return errors.New("no external labels configured on Prometheus server, uniquely identifying external labels must be configured")
}
// New gossip cluster.
- mint, maxt := metadata.Timestamps()
+ mint, maxt := m.Timestamps()
if err = peer.Join(cluster.PeerTypeSource, cluster.PeerMetadata{
- Labels: metadata.LabelsPB(),
+ Labels: m.LabelsPB(),
MinTime: mint,
MaxTime: maxt,
}); err != nil {
@@ -165,12 +165,12 @@ func runSidecar(
iterCtx, iterCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer iterCancel()
- if err := metadata.UpdateLabels(iterCtx, logger); err != nil {
+ if err := m.UpdateLabels(iterCtx, logger); err != nil {
level.Warn(logger).Log("msg", "heartbeat failed", "err", err)
promUp.Set(0)
} else {
// Update gossip.
- peer.SetLabels(metadata.LabelsPB())
+ peer.SetLabels(m.LabelsPB())
promUp.Set(1)
lastHeartbeat.Set(float64(time.Now().UnixNano()) / 1e9)
@@ -204,7 +204,7 @@ func runSidecar(
var client http.Client
promStore, err := store.NewPrometheusStore(
- logger, &client, promURL, metadata.Labels, metadata.Timestamps)
+ logger, &client, promURL, m.Labels, m.Timestamps)
if err != nil {
return errors.Wrap(err, "create Prometheus store")
}
@@ -252,7 +252,7 @@ func runSidecar(
}
}()
- s := shipper.New(logger, nil, dataDir, bkt, metadata.Labels, block.SidecarSource)
+ s := shipper.New(logger, nil, dataDir, bkt, m.Labels, metadata.SidecarSource)
ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
@@ -265,9 +265,9 @@ func runSidecar(
if err != nil {
level.Warn(logger).Log("msg", "reading timestamps failed", "err", err)
} else {
- metadata.UpdateTimestamps(minTime, math.MaxInt64)
+ m.UpdateTimestamps(minTime, math.MaxInt64)
- mint, maxt := metadata.Timestamps()
+ mint, maxt := m.Timestamps()
peer.SetTimestamps(mint, maxt)
}
return nil
@@ -281,7 +281,7 @@ func runSidecar(
return nil
}
-type metadata struct {
+type promMetadata struct {
promURL *url.URL
mtx sync.Mutex
@@ -290,7 +290,7 @@ type metadata struct {
labels labels.Labels
}
-func (s *metadata) UpdateLabels(ctx context.Context, logger log.Logger) error {
+func (s *promMetadata) UpdateLabels(ctx context.Context, logger log.Logger) error {
elset, err := queryExternalLabels(ctx, logger, s.promURL)
if err != nil {
return err
@@ -303,7 +303,7 @@ func (s *metadata) UpdateLabels(ctx context.Context, logger log.Logger) error {
return nil
}
-func (s *metadata) UpdateTimestamps(mint int64, maxt int64) {
+func (s *promMetadata) UpdateTimestamps(mint int64, maxt int64) {
s.mtx.Lock()
defer s.mtx.Unlock()
@@ -311,14 +311,14 @@ func (s *metadata) UpdateTimestamps(mint int64, maxt int64) {
s.maxt = maxt
}
-func (s *metadata) Labels() labels.Labels {
+func (s *promMetadata) Labels() labels.Labels {
s.mtx.Lock()
defer s.mtx.Unlock()
return s.labels
}
-func (s *metadata) LabelsPB() []storepb.Label {
+func (s *promMetadata) LabelsPB() []storepb.Label {
s.mtx.Lock()
defer s.mtx.Unlock()
@@ -332,7 +332,7 @@ func (s *metadata) LabelsPB() []storepb.Label {
return lset
}
-func (s *metadata) Timestamps() (mint int64, maxt int64) {
+func (s *promMetadata) Timestamps() (mint int64, maxt int64) {
s.mtx.Lock()
defer s.mtx.Unlock()
diff --git a/doc.go b/doc.go
new file mode 100644
index 00000000000..60db929d882
--- /dev/null
+++ b/doc.go
@@ -0,0 +1,6 @@
+// Package thanos is a set of components that
+// can provide highly available Prometheus
+// setup with long term storage capabilities.
+//
+// See https://github.com/improbable-eng/thanos/blob/master/docs/getting_started.md for first steps.
+package thanos // import "github.com/improbable-eng/thanos"
diff --git a/docs/getting_started.md b/docs/getting_started.md
index 1c28f2c3953..eb17387802a 100644
--- a/docs/getting_started.md
+++ b/docs/getting_started.md
@@ -30,7 +30,7 @@ The `thanos` binary should now be in your `$PATH` and is the only thing required
Thanos bases on vanilla Prometheus (v2.2.1+).
-For exact Prometheus version list Thanos was tested against you can find [here](../Makefile#L25)
+For exact Prometheus version list Thanos was tested against you can find [here](../Makefile#L36)
## [Sidecar](components/sidecar.md)
diff --git a/docs/img/arch.jpg b/docs/img/arch.jpg
new file mode 100644
index 00000000000..04d05547afb
Binary files /dev/null and b/docs/img/arch.jpg differ
diff --git a/docs/storage.md b/docs/storage.md
index da195560049..58165cd1836 100644
--- a/docs/storage.md
+++ b/docs/storage.md
@@ -50,6 +50,7 @@ config:
put_user_metadata: {}
http_config:
idle_conn_timeout: 0s
+ insecure_skip_verify: false
```
AWS region to endpoint mapping can be found in this [link](https://docs.aws.amazon.com/general/latest/gr/rande.html#s3_region)
@@ -57,7 +58,10 @@ AWS region to endpoint mapping can be found in this [link](https://docs.aws.amaz
Make sure you use a correct signature version.
Currently AWS require signature v4, so it needs `signature-version2: false`, otherwise, you will get Access Denied error, but several other S3 compatible use `signature-version2: true`
-For debug purposes you can set `insecure: true` to switch to plain insecure HTTP instead of HTTPS
+For debug and testing purposes you can set
+
+* `insecure: true` to switch to plain insecure HTTP instead of HTTPS
+* `http_config.insecure_skip_verify: true` to disable TLS certificate verification (if your S3 based storage is using a self-signed certificate, for example)
### Credentials
By default Thanos will try to retrieve credentials from the following sources:
diff --git a/pkg/block/block.go b/pkg/block/block.go
index 118b7ea96c8..cdc52a3e081 100644
--- a/pkg/block/block.go
+++ b/pkg/block/block.go
@@ -5,11 +5,12 @@ package block
import (
"context"
"encoding/json"
- "io/ioutil"
"os"
"path"
"path/filepath"
+ "github.com/improbable-eng/thanos/pkg/block/metadata"
+
"fmt"
"github.com/go-kit/kit/log"
@@ -17,8 +18,6 @@ import (
"github.com/improbable-eng/thanos/pkg/runutil"
"github.com/oklog/ulid"
"github.com/pkg/errors"
- "github.com/prometheus/tsdb"
- "github.com/prometheus/tsdb/fileutil"
)
const (
@@ -33,103 +32,6 @@ const (
DebugMetas = "debug/metas"
)
-type SourceType string
-
-const (
- UnknownSource SourceType = ""
- SidecarSource SourceType = "sidecar"
- CompactorSource SourceType = "compactor"
- CompactorRepairSource SourceType = "compactor.repair"
- RulerSource SourceType = "ruler"
- BucketRepairSource SourceType = "bucket.repair"
- TestSource SourceType = "test"
-)
-
-// Meta describes the a block's meta. It wraps the known TSDB meta structure and
-// extends it by Thanos-specific fields.
-type Meta struct {
- Version int `json:"version"`
-
- tsdb.BlockMeta
-
- Thanos ThanosMeta `json:"thanos"`
-}
-
-// ThanosMeta holds block meta information specific to Thanos.
-type ThanosMeta struct {
- Labels map[string]string `json:"labels"`
- Downsample ThanosDownsampleMeta `json:"downsample"`
-
- // Source is a real upload source of the block.
- Source SourceType `json:"source"`
-}
-
-type ThanosDownsampleMeta struct {
- Resolution int64 `json:"resolution"`
-}
-
-// WriteMetaFile writes the given meta into
/meta.json.
-func WriteMetaFile(logger log.Logger, dir string, meta *Meta) error {
- // Make any changes to the file appear atomic.
- path := filepath.Join(dir, MetaFilename)
- tmp := path + ".tmp"
-
- f, err := os.Create(tmp)
- if err != nil {
- return err
- }
-
- enc := json.NewEncoder(f)
- enc.SetIndent("", "\t")
-
- if err := enc.Encode(meta); err != nil {
- runutil.CloseWithLogOnErr(logger, f, "close meta")
- return err
- }
- if err := f.Close(); err != nil {
- return err
- }
- return renameFile(logger, tmp, path)
-}
-
-// ReadMetaFile reads the given meta from /meta.json.
-func ReadMetaFile(dir string) (*Meta, error) {
- b, err := ioutil.ReadFile(filepath.Join(dir, MetaFilename))
- if err != nil {
- return nil, err
- }
- var m Meta
-
- if err := json.Unmarshal(b, &m); err != nil {
- return nil, err
- }
- if m.Version != 1 {
- return nil, errors.Errorf("unexpected meta file version %d", m.Version)
- }
- return &m, nil
-}
-
-func renameFile(logger log.Logger, from, to string) error {
- if err := os.RemoveAll(to); err != nil {
- return err
- }
- if err := os.Rename(from, to); err != nil {
- return err
- }
-
- // Directory was renamed; sync parent dir to persist rename.
- pdir, err := fileutil.OpenDir(filepath.Dir(to))
- if err != nil {
- return err
- }
-
- if err = fileutil.Fsync(pdir); err != nil {
- runutil.CloseWithLogOnErr(logger, pdir, "close dir")
- return err
- }
- return pdir.Close()
-}
-
// Download downloads directory that is mean to be block directory.
func Download(ctx context.Context, logger log.Logger, bucket objstore.Bucket, id ulid.ULID, dst string) error {
if err := objstore.DownloadDir(ctx, logger, bucket, id.String(), dst); err != nil {
@@ -169,7 +71,7 @@ func Upload(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir st
return errors.Wrap(err, "not a block dir")
}
- meta, err := ReadMetaFile(bdir)
+ meta, err := metadata.Read(bdir)
if err != nil {
// No meta or broken meta file.
return errors.Wrap(err, "read meta")
@@ -216,16 +118,16 @@ func Delete(ctx context.Context, bucket objstore.Bucket, id ulid.ULID) error {
}
// DownloadMeta downloads only meta file from bucket by block ID.
-func DownloadMeta(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID) (Meta, error) {
+func DownloadMeta(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID) (metadata.Meta, error) {
rc, err := bkt.Get(ctx, path.Join(id.String(), MetaFilename))
if err != nil {
- return Meta{}, errors.Wrapf(err, "meta.json bkt get for %s", id.String())
+ return metadata.Meta{}, errors.Wrapf(err, "meta.json bkt get for %s", id.String())
}
defer runutil.CloseWithLogOnErr(logger, rc, "download meta bucket client")
- var m Meta
+ var m metadata.Meta
if err := json.NewDecoder(rc).Decode(&m); err != nil {
- return Meta{}, errors.Wrapf(err, "decode meta.json for block %s", id.String())
+ return metadata.Meta{}, errors.Wrapf(err, "decode meta.json for block %s", id.String())
}
return m, nil
}
@@ -234,24 +136,3 @@ func IsBlockDir(path string) (id ulid.ULID, ok bool) {
id, err := ulid.Parse(filepath.Base(path))
return id, err == nil
}
-
-// InjectThanosMeta sets Thanos meta to the block meta JSON and saves it to the disk.
-// NOTE: It should be used after writing any block by any Thanos component, otherwise we will miss crucial metadata.
-func InjectThanosMeta(logger log.Logger, bdir string, meta ThanosMeta, downsampledMeta *tsdb.BlockMeta) (*Meta, error) {
- newMeta, err := ReadMetaFile(bdir)
- if err != nil {
- return nil, errors.Wrap(err, "read new meta")
- }
- newMeta.Thanos = meta
-
- // While downsampling we need to copy original compaction.
- if downsampledMeta != nil {
- newMeta.Compaction = downsampledMeta.Compaction
- }
-
- if err := WriteMetaFile(logger, bdir, newMeta); err != nil {
- return nil, errors.Wrap(err, "write new meta")
- }
-
- return newMeta, nil
-}
diff --git a/pkg/block/index.go b/pkg/block/index.go
index 2249863b2d2..7669df223e6 100644
--- a/pkg/block/index.go
+++ b/pkg/block/index.go
@@ -11,6 +11,10 @@ import (
"strings"
"time"
+ "github.com/improbable-eng/thanos/pkg/block/metadata"
+
+ "github.com/prometheus/tsdb/fileutil"
+
"github.com/go-kit/kit/log"
"github.com/improbable-eng/thanos/pkg/runutil"
"github.com/oklog/ulid"
@@ -36,23 +40,84 @@ type indexCache struct {
Postings []postingsRange
}
+type realByteSlice []byte
+
+func (b realByteSlice) Len() int {
+ return len(b)
+}
+
+func (b realByteSlice) Range(start, end int) []byte {
+ return b[start:end]
+}
+
+func (b realByteSlice) Sub(start, end int) index.ByteSlice {
+ return b[start:end]
+}
+
+func getSymbolTable(b index.ByteSlice) (map[uint32]string, error) {
+ version := int(b.Range(4, 5)[0])
+
+ if version != 1 && version != 2 {
+ return nil, errors.Errorf("unknown index file version %d", version)
+ }
+
+ toc, err := index.NewTOCFromByteSlice(b)
+ if err != nil {
+ return nil, errors.Wrap(err, "read TOC")
+ }
+
+ symbolsV2, symbolsV1, err := index.ReadSymbols(b, version, int(toc.Symbols))
+ if err != nil {
+ return nil, errors.Wrap(err, "read symbols")
+ }
+
+ symbolsTable := make(map[uint32]string, len(symbolsV1)+len(symbolsV2))
+ for o, s := range symbolsV1 {
+ symbolsTable[o] = s
+ }
+ for o, s := range symbolsV2 {
+ symbolsTable[uint32(o)] = s
+ }
+
+ return symbolsTable, nil
+}
+
// WriteIndexCache writes a cache file containing the first lookup stages
// for an index file.
-func WriteIndexCache(logger log.Logger, fn string, r *index.Reader) error {
+func WriteIndexCache(logger log.Logger, indexFn string, fn string) error {
+ indexFile, err := fileutil.OpenMmapFile(indexFn)
+ if err != nil {
+ return errors.Wrapf(err, "open mmap index file %s", indexFn)
+ }
+ defer runutil.CloseWithLogOnErr(logger, indexFile, "close index cache mmap file from %s", indexFn)
+
+ b := realByteSlice(indexFile.Bytes())
+ indexr, err := index.NewReader(b)
+ if err != nil {
+ return errors.Wrap(err, "open index reader")
+ }
+ defer runutil.CloseWithLogOnErr(logger, indexr, "load index cache reader")
+
+ // We assume reader verified index already.
+ symbols, err := getSymbolTable(b)
+ if err != nil {
+ return err
+ }
+
f, err := os.Create(fn)
if err != nil {
- return errors.Wrap(err, "create file")
+ return errors.Wrap(err, "create index cache file")
}
defer runutil.CloseWithLogOnErr(logger, f, "index cache writer")
v := indexCache{
- Version: r.Version(),
- Symbols: r.SymbolTable(),
+ Version: indexr.Version(),
+ Symbols: symbols,
LabelValues: map[string][]string{},
}
// Extract label value indices.
- lnames, err := r.LabelIndices()
+ lnames, err := indexr.LabelIndices()
if err != nil {
return errors.Wrap(err, "read label indices")
}
@@ -62,7 +127,7 @@ func WriteIndexCache(logger log.Logger, fn string, r *index.Reader) error {
}
ln := lns[0]
- tpls, err := r.LabelValues(ln)
+ tpls, err := indexr.LabelValues(ln)
if err != nil {
return errors.Wrap(err, "get label values")
}
@@ -82,7 +147,7 @@ func WriteIndexCache(logger log.Logger, fn string, r *index.Reader) error {
}
// Extract postings ranges.
- pranges, err := r.PostingsRanges()
+ pranges, err := indexr.PostingsRanges()
if err != nil {
return errors.Wrap(err, "read postings ranges")
}
@@ -164,7 +229,7 @@ func VerifyIndex(logger log.Logger, fn string, minTime int64, maxTime int64) err
}
type Stats struct {
- // TotalSeries represents total number of series in block.
+ // TotalSeries represnts total number of series in block.
TotalSeries int
// OutOfOrderSeries represents number of series that have out of order chunks.
OutOfOrderSeries int
@@ -346,7 +411,7 @@ type ignoreFnType func(mint, maxt int64, prev *chunks.Meta, curr *chunks.Meta) (
// - removes all near "complete" outside chunks introduced by https://github.com/prometheus/tsdb/issues/347.
// Fixable inconsistencies are resolved in the new block.
// TODO(bplotka): https://github.com/improbable-eng/thanos/issues/378
-func Repair(logger log.Logger, dir string, id ulid.ULID, source SourceType, ignoreChkFns ...ignoreFnType) (resid ulid.ULID, err error) {
+func Repair(logger log.Logger, dir string, id ulid.ULID, source metadata.SourceType, ignoreChkFns ...ignoreFnType) (resid ulid.ULID, err error) {
if len(ignoreChkFns) == 0 {
return resid, errors.New("no ignore chunk function specified")
}
@@ -355,7 +420,7 @@ func Repair(logger log.Logger, dir string, id ulid.ULID, source SourceType, igno
entropy := rand.New(rand.NewSource(time.Now().UnixNano()))
resid = ulid.MustNew(ulid.Now(), entropy)
- meta, err := ReadMetaFile(bdir)
+ meta, err := metadata.Read(bdir)
if err != nil {
return resid, errors.Wrap(err, "read meta file")
}
@@ -363,7 +428,7 @@ func Repair(logger log.Logger, dir string, id ulid.ULID, source SourceType, igno
return resid, errors.New("cannot repair downsampled block")
}
- b, err := tsdb.OpenBlock(bdir, nil)
+ b, err := tsdb.OpenBlock(logger, bdir, nil)
if err != nil {
return resid, errors.Wrap(err, "open block")
}
@@ -405,7 +470,7 @@ func Repair(logger log.Logger, dir string, id ulid.ULID, source SourceType, igno
if err := rewrite(indexr, chunkr, indexw, chunkw, &resmeta, ignoreChkFns); err != nil {
return resid, errors.Wrap(err, "rewrite block")
}
- if err := WriteMetaFile(logger, resdir, &resmeta); err != nil {
+ if err := metadata.Write(logger, resdir, &resmeta); err != nil {
return resid, err
}
return resid, nil
@@ -494,7 +559,7 @@ OUTER:
func rewrite(
indexr tsdb.IndexReader, chunkr tsdb.ChunkReader,
indexw tsdb.IndexWriter, chunkw tsdb.ChunkWriter,
- meta *Meta,
+ meta *metadata.Meta,
ignoreChkFns []ignoreFnType,
) error {
symbols, err := indexr.Symbols()
diff --git a/pkg/block/index_test.go b/pkg/block/index_test.go
new file mode 100644
index 00000000000..80c10e8e6ed
--- /dev/null
+++ b/pkg/block/index_test.go
@@ -0,0 +1,46 @@
+package block
+
+import (
+ "io/ioutil"
+ "os"
+ "path/filepath"
+ "testing"
+
+ "github.com/go-kit/kit/log"
+ "github.com/improbable-eng/thanos/pkg/testutil"
+ "github.com/prometheus/tsdb/labels"
+)
+
+func TestWriteReadIndexCache(t *testing.T) {
+ tmpDir, err := ioutil.TempDir("", "test-compact-prepare")
+ testutil.Ok(t, err)
+ defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }()
+
+ b, err := testutil.CreateBlock(tmpDir, []labels.Labels{
+ {{Name: "a", Value: "1"}},
+ {{Name: "a", Value: "2"}},
+ {{Name: "a", Value: "3"}},
+ {{Name: "a", Value: "4"}},
+ {{Name: "b", Value: "1"}},
+ }, 100, 0, 1000, nil, 124)
+ testutil.Ok(t, err)
+
+ fn := filepath.Join(tmpDir, "index.cache.json")
+ testutil.Ok(t, WriteIndexCache(log.NewNopLogger(), filepath.Join(tmpDir, b.String(), "index"), fn))
+
+ version, symbols, lvals, postings, err := ReadIndexCache(log.NewNopLogger(), fn)
+ testutil.Ok(t, err)
+
+ testutil.Equals(t, 2, version)
+ testutil.Equals(t, 6, len(symbols))
+ testutil.Equals(t, 2, len(lvals))
+
+ vals, ok := lvals["a"]
+ testutil.Assert(t, ok, "")
+ testutil.Equals(t, []string{"1", "2", "3", "4"}, vals)
+
+ vals, ok = lvals["b"]
+ testutil.Assert(t, ok, "")
+ testutil.Equals(t, []string{"1"}, vals)
+ testutil.Equals(t, 6, len(postings))
+}
diff --git a/pkg/block/metadata/meta.go b/pkg/block/metadata/meta.go
new file mode 100644
index 00000000000..e4e87122c2d
--- /dev/null
+++ b/pkg/block/metadata/meta.go
@@ -0,0 +1,142 @@
+package metadata
+
+// metadata package is implements writing and reading wrapped meta.json where Thanos puts its metadata.
+// Those metadata contains external labels, downsampling resolution and source type.
+// This package is minimal and separated because it usited by testutils which limits test helpers we can use in
+// this package.
+
+import (
+ "encoding/json"
+ "io/ioutil"
+ "os"
+ "path/filepath"
+
+ "github.com/go-kit/kit/log"
+ "github.com/improbable-eng/thanos/pkg/runutil"
+ "github.com/pkg/errors"
+ "github.com/prometheus/tsdb"
+ "github.com/prometheus/tsdb/fileutil"
+)
+
+type SourceType string
+
+const (
+ UnknownSource SourceType = ""
+ SidecarSource SourceType = "sidecar"
+ CompactorSource SourceType = "compactor"
+ CompactorRepairSource SourceType = "compactor.repair"
+ RulerSource SourceType = "ruler"
+ BucketRepairSource SourceType = "bucket.repair"
+ TestSource SourceType = "test"
+)
+
+const (
+ // MetaFilename is the known JSON filename for meta information.
+ MetaFilename = "meta.json"
+)
+
+// Meta describes the a block's meta. It wraps the known TSDB meta structure and
+// extends it by Thanos-specific fields.
+type Meta struct {
+ Version int `json:"version"`
+
+ tsdb.BlockMeta
+
+ Thanos Thanos `json:"thanos"`
+}
+
+// Thanos holds block meta information specific to Thanos.
+type Thanos struct {
+ Labels map[string]string `json:"labels"`
+ Downsample ThanosDownsample `json:"downsample"`
+
+ // Source is a real upload source of the block.
+ Source SourceType `json:"source"`
+}
+
+type ThanosDownsample struct {
+ Resolution int64 `json:"resolution"`
+}
+
+// InjectThanos sets Thanos meta to the block meta JSON and saves it to the disk.
+// NOTE: It should be used after writing any block by any Thanos component, otherwise we will miss crucial metadata.
+func InjectThanos(logger log.Logger, bdir string, meta Thanos, downsampledMeta *tsdb.BlockMeta) (*Meta, error) {
+ newMeta, err := Read(bdir)
+ if err != nil {
+ return nil, errors.Wrap(err, "read new meta")
+ }
+ newMeta.Thanos = meta
+
+ // While downsampling we need to copy original compaction.
+ if downsampledMeta != nil {
+ newMeta.Compaction = downsampledMeta.Compaction
+ }
+
+ if err := Write(logger, bdir, newMeta); err != nil {
+ return nil, errors.Wrap(err, "write new meta")
+ }
+
+ return newMeta, nil
+}
+
+// Write writes the given meta into /meta.json.
+func Write(logger log.Logger, dir string, meta *Meta) error {
+ // Make any changes to the file appear atomic.
+ path := filepath.Join(dir, MetaFilename)
+ tmp := path + ".tmp"
+
+ f, err := os.Create(tmp)
+ if err != nil {
+ return err
+ }
+
+ enc := json.NewEncoder(f)
+ enc.SetIndent("", "\t")
+
+ if err := enc.Encode(meta); err != nil {
+ runutil.CloseWithLogOnErr(logger, f, "close meta")
+ return err
+ }
+ if err := f.Close(); err != nil {
+ return err
+ }
+ return renameFile(logger, tmp, path)
+}
+
+func renameFile(logger log.Logger, from, to string) error {
+ if err := os.RemoveAll(to); err != nil {
+ return err
+ }
+ if err := os.Rename(from, to); err != nil {
+ return err
+ }
+
+ // Directory was renamed; sync parent dir to persist rename.
+ pdir, err := fileutil.OpenDir(filepath.Dir(to))
+ if err != nil {
+ return err
+ }
+
+ if err = fileutil.Fsync(pdir); err != nil {
+ runutil.CloseWithLogOnErr(logger, pdir, "close dir")
+ return err
+ }
+ return pdir.Close()
+}
+
+// Read reads the given meta from /meta.json.
+func Read(dir string) (*Meta, error) {
+ b, err := ioutil.ReadFile(filepath.Join(dir, MetaFilename))
+ if err != nil {
+ return nil, err
+ }
+ var m Meta
+
+ if err := json.Unmarshal(b, &m); err != nil {
+ return nil, err
+ }
+ if m.Version != 1 {
+ return nil, errors.Errorf("unexpected meta file version %d", m.Version)
+ }
+ return &m, nil
+}
diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go
index 544de920eac..29302f2a1c5 100644
--- a/pkg/compact/compact.go
+++ b/pkg/compact/compact.go
@@ -9,6 +9,8 @@ import (
"sync"
"time"
+ "github.com/improbable-eng/thanos/pkg/block/metadata"
+
"io/ioutil"
"github.com/go-kit/kit/log"
@@ -39,7 +41,7 @@ type Syncer struct {
bkt objstore.Bucket
syncDelay time.Duration
mtx sync.Mutex
- blocks map[ulid.ULID]*block.Meta
+ blocks map[ulid.ULID]*metadata.Meta
metrics *syncerMetrics
}
@@ -130,7 +132,7 @@ func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket
logger: logger,
reg: reg,
syncDelay: syncDelay,
- blocks: map[ulid.ULID]*block.Meta{},
+ blocks: map[ulid.ULID]*metadata.Meta{},
bkt: bkt,
metrics: newSyncerMetrics(reg),
}, nil
@@ -185,9 +187,9 @@ func (c *Syncer) syncMetas(ctx context.Context) error {
// NOTE: It is not safe to miss "old" block (even that it is newly created) in sync step. Compactor needs to aware of ALL old blocks.
// TODO(bplotka): https://github.com/improbable-eng/thanos/issues/377
if ulid.Now()-id.Time() < uint64(c.syncDelay/time.Millisecond) &&
- meta.Thanos.Source != block.BucketRepairSource &&
- meta.Thanos.Source != block.CompactorSource &&
- meta.Thanos.Source != block.CompactorRepairSource {
+ meta.Thanos.Source != metadata.BucketRepairSource &&
+ meta.Thanos.Source != metadata.CompactorSource &&
+ meta.Thanos.Source != metadata.CompactorRepairSource {
level.Debug(c.logger).Log("msg", "block is too fresh for now", "block", id)
return nil
@@ -214,7 +216,7 @@ func (c *Syncer) syncMetas(ctx context.Context) error {
// GroupKey returns a unique identifier for the group the block belongs to. It considers
// the downsampling resolution and the block's labels.
-func GroupKey(meta block.Meta) string {
+func GroupKey(meta metadata.Meta) string {
return groupKey(meta.Thanos.Downsample.Resolution, labels.FromMap(meta.Thanos.Labels))
}
@@ -381,7 +383,7 @@ type Group struct {
labels labels.Labels
resolution int64
mtx sync.Mutex
- blocks map[ulid.ULID]*block.Meta
+ blocks map[ulid.ULID]*metadata.Meta
compactions prometheus.Counter
compactionFailures prometheus.Counter
groupGarbageCollectedBlocks prometheus.Counter
@@ -405,7 +407,7 @@ func newGroup(
bkt: bkt,
labels: lset,
resolution: resolution,
- blocks: map[ulid.ULID]*block.Meta{},
+ blocks: map[ulid.ULID]*metadata.Meta{},
compactions: compactions,
compactionFailures: compactionFailures,
groupGarbageCollectedBlocks: groupGarbageCollectedBlocks,
@@ -419,7 +421,7 @@ func (cg *Group) Key() string {
}
// Add the block with the given meta to the group.
-func (cg *Group) Add(meta *block.Meta) error {
+func (cg *Group) Add(meta *metadata.Meta) error {
cg.mtx.Lock()
defer cg.mtx.Unlock()
@@ -541,7 +543,7 @@ func IsRetryError(err error) bool {
return ok
}
-func (cg *Group) areBlocksOverlapping(include *block.Meta, excludeDirs ...string) error {
+func (cg *Group) areBlocksOverlapping(include *metadata.Meta, excludeDirs ...string) error {
var (
metas []tsdb.BlockMeta
exclude = map[ulid.ULID]struct{}{}
@@ -566,6 +568,9 @@ func (cg *Group) areBlocksOverlapping(include *block.Meta, excludeDirs ...string
metas = append(metas, include.BlockMeta)
}
+ sort.Slice(metas, func(i, j int) bool {
+ return metas[i].MinTime < metas[j].MinTime
+ })
if overlaps := tsdb.OverlappingBlocks(metas); len(overlaps) > 0 {
return errors.Errorf("overlaps found while gathering blocks. %s", overlaps)
}
@@ -597,12 +602,12 @@ func RepairIssue347(ctx context.Context, logger log.Logger, bkt objstore.Bucket,
return retry(errors.Wrapf(err, "download block %s", ie.id))
}
- meta, err := block.ReadMetaFile(bdir)
+ meta, err := metadata.Read(bdir)
if err != nil {
return errors.Wrapf(err, "read meta from %s", bdir)
}
- resid, err := block.Repair(logger, tmpdir, ie.id, block.CompactorRepairSource, block.IgnoreIssue347OutsideChunk)
+ resid, err := block.Repair(logger, tmpdir, ie.id, metadata.CompactorRepairSource, block.IgnoreIssue347OutsideChunk)
if err != nil {
return errors.Wrapf(err, "repair failed for block %s", ie.id)
}
@@ -647,7 +652,7 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) (
if err := os.MkdirAll(bdir, 0777); err != nil {
return compID, errors.Wrap(err, "create planning block dir")
}
- if err := block.WriteMetaFile(cg.logger, bdir, meta); err != nil {
+ if err := metadata.Write(cg.logger, bdir, meta); err != nil {
return compID, errors.Wrap(err, "write planning meta file")
}
}
@@ -670,7 +675,7 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) (
begin := time.Now()
for _, pdir := range plan {
- meta, err := block.ReadMetaFile(pdir)
+ meta, err := metadata.Read(pdir)
if err != nil {
return compID, errors.Wrapf(err, "read meta from %s", pdir)
}
@@ -718,7 +723,7 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) (
begin = time.Now()
- compID, err = comp.Compact(dir, plan...)
+ compID, err = comp.Compact(dir, plan, nil)
if err != nil {
return compID, halt(errors.Wrapf(err, "compact blocks %v", plan))
}
@@ -727,10 +732,10 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) (
bdir := filepath.Join(dir, compID.String())
- newMeta, err := block.InjectThanosMeta(cg.logger, bdir, block.ThanosMeta{
+ newMeta, err := metadata.InjectThanos(cg.logger, bdir, metadata.Thanos{
Labels: cg.labels.Map(),
- Downsample: block.ThanosDownsampleMeta{Resolution: cg.resolution},
- Source: block.CompactorSource,
+ Downsample: metadata.ThanosDownsample{Resolution: cg.resolution},
+ Source: metadata.CompactorSource,
}, nil)
if err != nil {
return compID, errors.Wrapf(err, "failed to finalize the block %s", bdir)
diff --git a/pkg/compact/compact_e2e_test.go b/pkg/compact/compact_e2e_test.go
index 74df73cc6f5..90d9866c908 100644
--- a/pkg/compact/compact_e2e_test.go
+++ b/pkg/compact/compact_e2e_test.go
@@ -13,6 +13,8 @@ import (
"testing"
"time"
+ "github.com/improbable-eng/thanos/pkg/block/metadata"
+
"github.com/go-kit/kit/log"
"github.com/improbable-eng/thanos/pkg/block"
"github.com/improbable-eng/thanos/pkg/objstore"
@@ -37,13 +39,13 @@ func TestSyncer_SyncMetas_e2e(t *testing.T) {
// After the first synchronization the first 5 should be dropped and the
// last 5 be loaded from the bucket.
var ids []ulid.ULID
- var metas []*block.Meta
+ var metas []*metadata.Meta
for i := 0; i < 15; i++ {
id, err := ulid.New(uint64(i), nil)
testutil.Ok(t, err)
- var meta block.Meta
+ var meta metadata.Meta
meta.Version = 1
meta.ULID = id
@@ -56,7 +58,7 @@ func TestSyncer_SyncMetas_e2e(t *testing.T) {
for _, m := range metas[5:] {
var buf bytes.Buffer
testutil.Ok(t, json.NewEncoder(&buf).Encode(&m))
- testutil.Ok(t, bkt.Upload(ctx, path.Join(m.ULID.String(), block.MetaFilename), &buf))
+ testutil.Ok(t, bkt.Upload(ctx, path.Join(m.ULID.String(), metadata.MetaFilename), &buf))
}
groups, err := sy.Groups()
@@ -79,11 +81,11 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) {
// Generate 10 source block metas and construct higher level blocks
// that are higher compactions of them.
- var metas []*block.Meta
+ var metas []*metadata.Meta
var ids []ulid.ULID
for i := 0; i < 10; i++ {
- var m block.Meta
+ var m metadata.Meta
m.Version = 1
m.ULID = ulid.MustNew(uint64(i), nil)
@@ -94,28 +96,28 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) {
metas = append(metas, &m)
}
- var m1 block.Meta
+ var m1 metadata.Meta
m1.Version = 1
m1.ULID = ulid.MustNew(100, nil)
m1.Compaction.Level = 2
m1.Compaction.Sources = ids[:4]
m1.Thanos.Downsample.Resolution = 0
- var m2 block.Meta
+ var m2 metadata.Meta
m2.Version = 1
m2.ULID = ulid.MustNew(200, nil)
m2.Compaction.Level = 2
m2.Compaction.Sources = ids[4:8] // last two source IDs is not part of a level 2 block.
m2.Thanos.Downsample.Resolution = 0
- var m3 block.Meta
+ var m3 metadata.Meta
m3.Version = 1
m3.ULID = ulid.MustNew(300, nil)
m3.Compaction.Level = 3
m3.Compaction.Sources = ids[:9] // last source ID is not part of level 3 block.
m3.Thanos.Downsample.Resolution = 0
- var m4 block.Meta
+ var m4 metadata.Meta
m4.Version = 14
m4.ULID = ulid.MustNew(400, nil)
m4.Compaction.Level = 2
@@ -127,7 +129,7 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) {
fmt.Println("create", m.ULID)
var buf bytes.Buffer
testutil.Ok(t, json.NewEncoder(&buf).Encode(&m))
- testutil.Ok(t, bkt.Upload(ctx, path.Join(m.ULID.String(), block.MetaFilename), &buf))
+ testutil.Ok(t, bkt.Upload(ctx, path.Join(m.ULID.String(), metadata.MetaFilename), &buf))
}
// Do one initial synchronization with the bucket.
@@ -173,7 +175,7 @@ func TestGroup_Compact_e2e(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
defer cancel()
- var metas []*block.Meta
+ var metas []*metadata.Meta
extLset := labels.Labels{{Name: "e1", Value: "1"}}
b1, err := testutil.CreateBlock(prepareDir, []labels.Labels{
{{Name: "a", Value: "1"}},
@@ -183,7 +185,7 @@ func TestGroup_Compact_e2e(t *testing.T) {
}, 100, 0, 1000, extLset, 124)
testutil.Ok(t, err)
- meta, err := block.ReadMetaFile(filepath.Join(prepareDir, b1.String()))
+ meta, err := metadata.Read(filepath.Join(prepareDir, b1.String()))
testutil.Ok(t, err)
metas = append(metas, meta)
@@ -196,15 +198,16 @@ func TestGroup_Compact_e2e(t *testing.T) {
testutil.Ok(t, err)
// Mix order to make sure compact is able to deduct min time / max time.
- meta, err = block.ReadMetaFile(filepath.Join(prepareDir, b3.String()))
+ meta, err = metadata.Read(filepath.Join(prepareDir, b3.String()))
testutil.Ok(t, err)
metas = append(metas, meta)
// Empty block. This can happen when TSDB does not have any samples for min-block-size time.
+ // NOTE: In new TSDB those are not produced anymore..
b2, err := testutil.CreateBlock(prepareDir, []labels.Labels{}, 100, 1001, 2000, extLset, 124)
testutil.Ok(t, err)
- meta, err = block.ReadMetaFile(filepath.Join(prepareDir, b2.String()))
+ meta, err = metadata.Read(filepath.Join(prepareDir, b2.String()))
testutil.Ok(t, err)
metas = append(metas, meta)
@@ -217,7 +220,7 @@ func TestGroup_Compact_e2e(t *testing.T) {
}, 100, 3001, 4000, extLset, 124)
testutil.Ok(t, err)
- meta, err = block.ReadMetaFile(filepath.Join(prepareDir, freshB.String()))
+ meta, err = metadata.Read(filepath.Join(prepareDir, freshB.String()))
testutil.Ok(t, err)
metas = append(metas, meta)
@@ -263,7 +266,7 @@ func TestGroup_Compact_e2e(t *testing.T) {
resDir := filepath.Join(dir, id.String())
testutil.Ok(t, block.Download(ctx, log.NewNopLogger(), bkt, id, resDir))
- meta, err = block.ReadMetaFile(resDir)
+ meta, err = metadata.Read(resDir)
testutil.Ok(t, err)
testutil.Equals(t, int64(0), meta.MinTime)
diff --git a/pkg/compact/downsample/downsample.go b/pkg/compact/downsample/downsample.go
index 305f72021c0..f5afecdcd07 100644
--- a/pkg/compact/downsample/downsample.go
+++ b/pkg/compact/downsample/downsample.go
@@ -5,7 +5,8 @@ import (
"path/filepath"
"sort"
- "github.com/improbable-eng/thanos/pkg/block"
+ "github.com/improbable-eng/thanos/pkg/block/metadata"
+
"github.com/prometheus/prometheus/pkg/value"
"github.com/prometheus/tsdb/chunkenc"
@@ -31,7 +32,7 @@ const (
// Downsample downsamples the given block. It writes a new block into dir and returns its ID.
func Downsample(
logger log.Logger,
- origMeta *block.Meta,
+ origMeta *metadata.Meta,
b tsdb.BlockReader,
dir string,
resolution int64,
@@ -113,6 +114,7 @@ func Downsample(
origMeta.Thanos.Downsample.Resolution,
resolution,
)
+
if err != nil {
return id, errors.Wrap(err, "downsample aggregate block")
}
@@ -125,18 +127,18 @@ func Downsample(
if err != nil {
return id, errors.Wrap(err, "create compactor")
}
- id, err = comp.Write(dir, newb, origMeta.MinTime, origMeta.MaxTime)
+ id, err = comp.Write(dir, newb, origMeta.MinTime, origMeta.MaxTime, &origMeta.BlockMeta)
if err != nil {
return id, errors.Wrap(err, "compact head")
}
bdir := filepath.Join(dir, id.String())
- var tmeta block.ThanosMeta
+ var tmeta metadata.Thanos
tmeta = origMeta.Thanos
- tmeta.Source = block.CompactorSource
+ tmeta.Source = metadata.CompactorSource
tmeta.Downsample.Resolution = resolution
- _, err = block.InjectThanosMeta(logger, bdir, tmeta, &origMeta.BlockMeta)
+ _, err = metadata.InjectThanos(logger, bdir, tmeta, &origMeta.BlockMeta)
if err != nil {
return id, errors.Wrapf(err, "failed to finalize the block %s", bdir)
}
@@ -228,13 +230,20 @@ func (b *memBlock) Chunks() (tsdb.ChunkReader, error) {
}
func (b *memBlock) Tombstones() (tsdb.TombstoneReader, error) {
- return tsdb.EmptyTombstoneReader(), nil
+ return emptyTombstoneReader{}, nil
}
func (b *memBlock) Close() error {
return nil
}
+type emptyTombstoneReader struct{}
+
+func (emptyTombstoneReader) Get(ref uint64) (tsdb.Intervals, error) { return nil, nil }
+func (emptyTombstoneReader) Iter(func(uint64, tsdb.Intervals) error) error { return nil }
+func (emptyTombstoneReader) Total() uint64 { return 0 }
+func (emptyTombstoneReader) Close() error { return nil }
+
// currentWindow returns the end timestamp of the window that t falls into.
func currentWindow(t, r int64) int64 {
// The next timestamp is the next number after s.t that's aligned with window.
@@ -412,6 +421,7 @@ func downsampleRaw(data []sample, resolution int64) []chunks.Meta {
chks = append(chks, ab.encode())
}
+
return chks
}
diff --git a/pkg/compact/downsample/downsample_test.go b/pkg/compact/downsample/downsample_test.go
index d3844784162..bb2c38b17a9 100644
--- a/pkg/compact/downsample/downsample_test.go
+++ b/pkg/compact/downsample/downsample_test.go
@@ -1,12 +1,15 @@
package downsample
import (
+ "github.com/prometheus/tsdb"
"io/ioutil"
"math"
"os"
"path/filepath"
"testing"
+ "github.com/improbable-eng/thanos/pkg/block/metadata"
+
"github.com/prometheus/prometheus/pkg/value"
"github.com/prometheus/tsdb/chunks"
@@ -59,7 +62,7 @@ func TestDownsampleRaw(t *testing.T) {
},
},
}
- testDownsample(t, input, &block.Meta{}, 100)
+ testDownsample(t, input, &metadata.Meta{BlockMeta: tsdb.BlockMeta{MinTime: 0, MaxTime: 250}}, 100)
}
func TestDownsampleAggr(t *testing.T) {
@@ -96,8 +99,9 @@ func TestDownsampleAggr(t *testing.T) {
},
},
}
- var meta block.Meta
+ var meta metadata.Meta
meta.Thanos.Downsample.Resolution = 10
+ meta.BlockMeta = tsdb.BlockMeta{MinTime: 99, MaxTime: 1300}
testDownsample(t, input, &meta, 500)
}
@@ -123,7 +127,7 @@ type downsampleTestSet struct {
// testDownsample inserts the input into a block and invokes the downsampler with the given resolution.
// The chunk ranges within the input block are aligned at 500 time units.
-func testDownsample(t *testing.T, data []*downsampleTestSet, meta *block.Meta, resolution int64) {
+func testDownsample(t *testing.T, data []*downsampleTestSet, meta *metadata.Meta, resolution int64) {
t.Helper()
dir, err := ioutil.TempDir("", "downsample-raw")
diff --git a/pkg/compact/downsample/pool.go b/pkg/compact/downsample/pool.go
index 9b199e7ab9d..17094cd0c6f 100644
--- a/pkg/compact/downsample/pool.go
+++ b/pkg/compact/downsample/pool.go
@@ -6,14 +6,14 @@ import (
"github.com/prometheus/tsdb/chunkenc"
)
-// Pool is a memory pool of chunk objects, supporting Thanos aggr chunk encoding.
+// Pool is a memory pool of chunk objects, supporting Thanos aggregated chunk encoding.
// It maintains separate pools for xor and aggr chunks.
type pool struct {
wrapped chunkenc.Pool
aggr sync.Pool
}
-// TODO(bplotka): Add reasonable limits to our sync pools them to detect OOMs early.
+// TODO(bwplotka): Add reasonable limits to our sync pooling them to detect OOMs early.
func NewPool() chunkenc.Pool {
return &pool{
wrapped: chunkenc.NewPool(),
@@ -51,6 +51,7 @@ func (p *pool) Put(c chunkenc.Chunk) error {
// Clear []byte.
*ac = AggrChunk(nil)
p.aggr.Put(ac)
+ return nil
}
return p.wrapped.Put(c)
diff --git a/pkg/compact/retention_test.go b/pkg/compact/retention_test.go
index 0aef91c697b..1a26b1e5db2 100644
--- a/pkg/compact/retention_test.go
+++ b/pkg/compact/retention_test.go
@@ -9,7 +9,7 @@ import (
"time"
"github.com/go-kit/kit/log"
- "github.com/improbable-eng/thanos/pkg/block"
+ "github.com/improbable-eng/thanos/pkg/block/metadata"
"github.com/improbable-eng/thanos/pkg/compact"
"github.com/improbable-eng/thanos/pkg/objstore"
"github.com/improbable-eng/thanos/pkg/objstore/inmem"
@@ -253,15 +253,15 @@ func TestApplyRetentionPolicyByResolution(t *testing.T) {
func uploadMockBlock(t *testing.T, bkt objstore.Bucket, id string, minTime, maxTime time.Time, resolutionLevel int64) {
t.Helper()
- meta1 := block.Meta{
+ meta1 := metadata.Meta{
Version: 1,
BlockMeta: tsdb.BlockMeta{
ULID: ulid.MustParse(id),
MinTime: minTime.Unix() * 1000,
MaxTime: maxTime.Unix() * 1000,
},
- Thanos: block.ThanosMeta{
- Downsample: block.ThanosDownsampleMeta{
+ Thanos: metadata.Thanos{
+ Downsample: metadata.ThanosDownsample{
Resolution: resolutionLevel,
},
},
diff --git a/pkg/objstore/s3/s3.go b/pkg/objstore/s3/s3.go
index 51b70742d6b..2bfc1041ee8 100644
--- a/pkg/objstore/s3/s3.go
+++ b/pkg/objstore/s3/s3.go
@@ -3,6 +3,7 @@ package s3
import (
"context"
+ "crypto/tls"
"fmt"
"io"
"math/rand"
@@ -47,7 +48,8 @@ type Config struct {
// HTTPConfig stores the http.Transport configuration for the s3 minio client.
type HTTPConfig struct {
- IdleConnTimeout model.Duration `yaml:"idle_conn_timeout"`
+ IdleConnTimeout model.Duration `yaml:"idle_conn_timeout"`
+ InsecureSkipVerify bool `yaml:"insecure_skip_verify"`
}
// Bucket implements the store.Bucket interface against s3-compatible APIs.
@@ -142,6 +144,7 @@ func NewBucketWithConfig(logger log.Logger, config Config, component string) (*B
// Refer:
// https://golang.org/src/net/http/transport.go?h=roundTrip#L1843
DisableCompression: true,
+ TLSClientConfig: &tls.Config{InsecureSkipVerify: config.HTTPConfig.InsecureSkipVerify},
})
var sse encrypt.ServerSide
@@ -320,6 +323,7 @@ func configFromEnv() Config {
}
c.Insecure, _ = strconv.ParseBool(os.Getenv("S3_INSECURE"))
+ c.HTTPConfig.InsecureSkipVerify, _ = strconv.ParseBool(os.Getenv("S3_INSECURE_SKIP_VERIFY"))
c.SignatureV2, _ = strconv.ParseBool(os.Getenv("S3_SIGNATURE_VERSION2"))
return c
}
diff --git a/pkg/objstore/s3/s3_test.go b/pkg/objstore/s3/s3_test.go
index 98f357ce149..c8d8204a469 100644
--- a/pkg/objstore/s3/s3_test.go
+++ b/pkg/objstore/s3/s3_test.go
@@ -11,6 +11,7 @@ func TestParseConfig_DefaultHTTPOpts(t *testing.T) {
input := []byte(`bucket: abcd
insecure: false
http_config:
+ insecure_skip_verify: true
idle_conn_timeout: 50s`)
cfg, err := parseConfig(input)
testutil.Ok(t, err)
@@ -23,10 +24,12 @@ http_config:
if cfg.Bucket != "abcd" {
t.Errorf("parsing of bucket failed: got %v, expected %v", cfg.Bucket, "abcd")
}
- if cfg.Insecure != false {
+ if cfg.Insecure {
t.Errorf("parsing of insecure failed: got %v, expected %v", cfg.Insecure, false)
}
-
+ if !cfg.HTTPConfig.InsecureSkipVerify {
+ t.Errorf("parsing of insecure_skip_verify failed: got %v, expected %v", cfg.HTTPConfig.InsecureSkipVerify, false)
+ }
}
func TestValidate_OK(t *testing.T) {
@@ -38,7 +41,8 @@ signature_version2: false
encrypt_sse: false
secret_key: "secret_key"
http_config:
- idle_conn_timeout: 0s`)
+ insecure_skip_verify: false
+ idle_conn_timeout: 50s`)
cfg, err := parseConfig(input)
testutil.Ok(t, err)
testutil.Ok(t, validate(cfg))
@@ -51,7 +55,7 @@ insecure: false
signature_version2: false
encrypt_sse: false
secret_key: "secret_key"
-put_user_metadata:
+put_user_metadata:
"X-Amz-Acl": "bucket-owner-full-control"
http_config:
idle_conn_timeout: 0s`)
diff --git a/pkg/query/api/v1.go b/pkg/query/api/v1.go
index 23e5b0a801b..cf693dd2fbe 100644
--- a/pkg/query/api/v1.go
+++ b/pkg/query/api/v1.go
@@ -527,15 +527,16 @@ func (api *API) series(r *http.Request) (interface{}, []error, *apiError) {
var sets []storage.SeriesSet
for _, mset := range matcherSets {
- s, err := q.Select(&storage.SelectParams{}, mset...)
+ s, _, err := q.Select(&storage.SelectParams{}, mset...)
if err != nil {
return nil, nil, &apiError{errorExec, err}
}
sets = append(sets, s)
}
- set := storage.NewMergeSeriesSet(sets)
- metrics := []labels.Labels{}
+ set := storage.NewMergeSeriesSet(sets, nil)
+
+ var metrics []labels.Labels
for set.Next() {
metrics = append(metrics, set.At().Labels())
}
diff --git a/pkg/query/api/v1_test.go b/pkg/query/api/v1_test.go
index 4afef1d0548..bcf17907b06 100644
--- a/pkg/query/api/v1_test.go
+++ b/pkg/query/api/v1_test.go
@@ -333,7 +333,7 @@ func TestEndpoints(t *testing.T) {
"start": []string{"-2"},
"end": []string{"-1"},
},
- response: []labels.Labels{},
+ response: []labels.Labels(nil),
},
// Start and end after series ends.
{
@@ -343,7 +343,7 @@ func TestEndpoints(t *testing.T) {
"start": []string{"100000"},
"end": []string{"100001"},
},
- response: []labels.Labels{},
+ response: []labels.Labels(nil),
},
// Start before series starts, end after series ends.
{
@@ -409,33 +409,38 @@ func TestEndpoints(t *testing.T) {
}
for _, test := range tests {
- // Build a context with the correct request params.
- ctx := context.Background()
- for p, v := range test.params {
- ctx = route.WithParam(ctx, p, v)
- }
- t.Logf("run query %q", test.query.Encode())
+ if ok := t.Run(test.query.Encode(), func(t *testing.T) {
+ // Build a context with the correct request params.
+ ctx := context.Background()
+ for p, v := range test.params {
+ ctx = route.WithParam(ctx, p, v)
+ }
- req, err := http.NewRequest("ANY", fmt.Sprintf("http://example.com?%s", test.query.Encode()), nil)
- if err != nil {
- t.Fatal(err)
- }
- resp, _, apiErr := test.endpoint(req.WithContext(ctx))
- if apiErr != nil {
- if test.errType == errorNone {
- t.Fatalf("Unexpected error: %s", apiErr)
+ req, err := http.NewRequest("ANY", fmt.Sprintf("http://example.com?%s", test.query.Encode()), nil)
+ if err != nil {
+ t.Fatal(err)
}
- if test.errType != apiErr.typ {
- t.Fatalf("Expected error of type %q but got type %q", test.errType, apiErr.typ)
+ resp, _, apiErr := test.endpoint(req.WithContext(ctx))
+ if apiErr != nil {
+ if test.errType == errorNone {
+ t.Fatalf("Unexpected error: %s", apiErr)
+ }
+ if test.errType != apiErr.typ {
+ t.Fatalf("Expected error of type %q but got type %q", test.errType, apiErr.typ)
+ }
+ return
}
- continue
- }
- if apiErr == nil && test.errType != errorNone {
- t.Fatalf("Expected error of type %q but got none", test.errType)
- }
- if !reflect.DeepEqual(resp, test.response) {
- t.Fatalf("Response does not match, expected:\n%+v\ngot:\n%+v", test.response, resp)
+ if apiErr == nil && test.errType != errorNone {
+ t.Fatalf("Expected error of type %q but got none", test.errType)
+ }
+
+ if !reflect.DeepEqual(resp, test.response) {
+ t.Fatalf("Response does not match, expected:\n%+v\ngot:\n%+v", test.response, resp)
+ }
+ }); !ok {
+ return
}
+
}
}
diff --git a/pkg/query/querier.go b/pkg/query/querier.go
index 6e962f64727..819ff3ac2a1 100644
--- a/pkg/query/querier.go
+++ b/pkg/query/querier.go
@@ -169,13 +169,13 @@ func aggrsFromFunc(f string) ([]storepb.Aggr, resAggr) {
return []storepb.Aggr{storepb.Aggr_COUNT, storepb.Aggr_SUM}, resAggrAvg
}
-func (q *querier) Select(params *storage.SelectParams, ms ...*labels.Matcher) (storage.SeriesSet, error) {
+func (q *querier) Select(params *storage.SelectParams, ms ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
span, ctx := tracing.StartSpan(q.ctx, "querier_select")
defer span.Finish()
sms, err := translateMatchers(ms...)
if err != nil {
- return nil, errors.Wrap(err, "convert matchers")
+ return nil, nil, errors.Wrap(err, "convert matchers")
}
queryAggrs, resAggr := aggrsFromFunc(params.Func)
@@ -189,10 +189,12 @@ func (q *querier) Select(params *storage.SelectParams, ms ...*labels.Matcher) (s
Aggregates: queryAggrs,
PartialResponseDisabled: !q.partialResponse,
}, resp); err != nil {
- return nil, errors.Wrap(err, "proxy Series()")
+ return nil, nil, errors.Wrap(err, "proxy Series()")
}
for _, w := range resp.warnings {
+ // NOTE(bwplotka): We could use warnings return arguments here, however need reporter anyway for LabelValues and LabelNames method,
+ // so we choose to be consistent and keep reporter.
q.warningReporter(errors.New(w))
}
@@ -203,7 +205,7 @@ func (q *querier) Select(params *storage.SelectParams, ms ...*labels.Matcher) (s
maxt: q.maxt,
set: newStoreSeriesSet(resp.seriesSet),
aggr: resAggr,
- }, nil
+ }, nil, nil
}
// TODO(fabxc): this could potentially pushed further down into the store API
@@ -220,7 +222,7 @@ func (q *querier) Select(params *storage.SelectParams, ms ...*labels.Matcher) (s
// The merged series set assembles all potentially-overlapping time ranges
// of the same series into a single one. The series are ordered so that equal series
// from different replicas are sequential. We can now deduplicate those.
- return newDedupSeriesSet(set, q.replicaLabel), nil
+ return newDedupSeriesSet(set, q.replicaLabel), nil, nil
}
// sortDedupLabels resorts the set so that the same series with different replica
@@ -245,6 +247,7 @@ func sortDedupLabels(set []storepb.Series, replicaLabel string) {
})
}
+// LabelValues returns all potential values for a label name.
func (q *querier) LabelValues(name string) ([]string, error) {
span, ctx := tracing.StartSpan(q.ctx, "querier_label_values")
defer span.Finish()
@@ -261,6 +264,12 @@ func (q *querier) LabelValues(name string) ([]string, error) {
return resp.Values, nil
}
+// LabelNames returns all the unique label names present in the block in sorted order.
+// TODO(bwplotka): Consider adding labelNames to thanos Query API https://github.com/improbable-eng/thanos/issues/702.
+func (q *querier) LabelNames() ([]string, error) {
+ return nil, errors.New("not implemented")
+}
+
func (q *querier) Close() error {
q.cancel()
return nil
diff --git a/pkg/query/querier_test.go b/pkg/query/querier_test.go
index 46b51a7f3ef..980d837213e 100644
--- a/pkg/query/querier_test.go
+++ b/pkg/query/querier_test.go
@@ -36,7 +36,7 @@ func TestQuerier_Series(t *testing.T) {
q := newQuerier(context.Background(), nil, 1, 300, "", testProxy, false, 0, true, nil)
defer func() { testutil.Ok(t, q.Close()) }()
- res, err := q.Select(&storage.SelectParams{})
+ res, _, err := q.Select(&storage.SelectParams{})
testutil.Ok(t, err)
expected := []struct {
diff --git a/pkg/query/test_print.go b/pkg/query/test_print.go
new file mode 100644
index 00000000000..70bc292439b
--- /dev/null
+++ b/pkg/query/test_print.go
@@ -0,0 +1,34 @@
+package query
+
+import (
+ "fmt"
+
+ "github.com/prometheus/prometheus/storage"
+)
+
+type printSeriesSet struct {
+ set storage.SeriesSet
+}
+
+func newPrintSeriesSet(set storage.SeriesSet) storage.SeriesSet {
+ return &printSeriesSet{set: set}
+}
+
+func (s *printSeriesSet) Next() bool {
+ return s.set.Next()
+}
+
+func (s *printSeriesSet) At() storage.Series {
+ at := s.set.At()
+ fmt.Println("Series", at.Labels())
+
+ i := at.Iterator()
+ for i.Next() {
+ fmt.Println(i.At())
+ }
+ return at
+}
+
+func (s *printSeriesSet) Err() error {
+ return s.set.Err()
+}
diff --git a/pkg/reloader/example_test.go b/pkg/reloader/example_test.go
new file mode 100644
index 00000000000..7b046734d93
--- /dev/null
+++ b/pkg/reloader/example_test.go
@@ -0,0 +1,54 @@
+package reloader_test
+
+import (
+ "context"
+ "fmt"
+ "io"
+ "log"
+ "net/http"
+ "net/url"
+
+ "github.com/improbable-eng/thanos/pkg/reloader"
+)
+
+func ExampleReloader() {
+ u, err := url.Parse("http://localhost:9090")
+ if err != nil {
+ log.Fatal(err)
+ }
+ rl := reloader.New(
+ nil,
+ reloader.ReloadURLFromBase(u),
+ "/path/to/cfg",
+ "/path/to/cfg.out",
+ []string{"/path/to/dirs"},
+ )
+
+ ctx, cancel := context.WithCancel(context.Background())
+ go func() {
+ if err := rl.Watch(ctx); err != nil {
+ log.Fatal(err)
+ }
+ }()
+
+ reloadHandler := func(w http.ResponseWriter, req *http.Request) {
+ if _, err := io.WriteString(w, "Reloaded\n"); err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ }
+ return
+ }
+
+ http.HandleFunc("/-/reload", reloadHandler)
+ log.Fatal(http.ListenAndServe(":9090", nil))
+
+ cancel()
+}
+
+func ExampleReloadURLFromBase() {
+ u, err := url.Parse("http://localhost:9090")
+ if err != nil {
+ log.Fatal(err)
+ }
+ fmt.Println(reloader.ReloadURLFromBase(u))
+ // Output: http://localhost:9090/-/reload
+}
diff --git a/pkg/reloader/reloader.go b/pkg/reloader/reloader.go
index c637ceb0ad0..bc71f44b8e2 100644
--- a/pkg/reloader/reloader.go
+++ b/pkg/reloader/reloader.go
@@ -1,5 +1,51 @@
// Package reloader contains helpers to trigger reloads of Prometheus instances
// on configuration changes and to substitute environment variables in config files.
+//
+// Reloader type is useful when you want to:
+//
+// * Watch on changes against certain file e.g (`cfgFile`) .
+// * Optionally, specify different different output file for watched `cfgFile` (`cfgOutputFile`).
+// This will also try decompress the `cfgFile` if needed and substitute ALL the envvars using Kubernetes substitution format: (`$(var)`)
+// * Watch on changes against certain directories (`ruleDires`).
+//
+// Once any of those two changes Prometheus on given `reloadURL` will be notified, causing Prometheus to reload configuration and rules.
+//
+// This and below for reloader:
+//
+// u, _ := url.Parse("http://localhost:9090")
+// rl := reloader.New(
+// nil,
+// reloader.ReloadURLFromBase(u),
+// "/path/to/cfg",
+// "/path/to/cfg.out",
+// []string{"/path/to/dirs"},
+// )
+//
+// The url of reloads can be generated with function ReloadURLFromBase().
+// It will append the default path of reload into the given url:
+//
+// u, _ := url.Parse("http://localhost:9090")
+// reloader.ReloadURLFromBase(u) // It will return "http://localhost:9090/-/reload"
+//
+// Start watching changes and stopped until the context gets canceled:
+//
+// ctx, cancel := context.WithCancel(context.Background())
+// go func() {
+// if err := rl.Watch(ctx); err != nil {
+// log.Fatal(err)
+// }
+// }()
+// // ...
+// cancel()
+//
+// By default, reloader will make a schedule to check the given config files and dirs of sum of hash with the last result,
+// even if it is no changes.
+//
+// A basic example of configuration template with environment variables:
+//
+// global:
+// external_labels:
+// replica: '$(HOSTNAME)'
package reloader
import (
diff --git a/pkg/runutil/example_test.go b/pkg/runutil/example_test.go
new file mode 100644
index 00000000000..b6edc72728b
--- /dev/null
+++ b/pkg/runutil/example_test.go
@@ -0,0 +1,41 @@
+package runutil_test
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "log"
+ "time"
+
+ "github.com/improbable-eng/thanos/pkg/runutil"
+)
+
+func ExampleRepeat() {
+ // It will stop Repeat 10 seconds later.
+ ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+ defer cancel()
+
+ // It will print out "Repeat" every 5 seconds.
+ err := runutil.Repeat(5*time.Second, ctx.Done(), func() error {
+ fmt.Println("Repeat")
+ return nil
+ })
+ if err != nil {
+ log.Fatal(err)
+ }
+}
+
+func ExampleRetry() {
+ // It will stop Retry 10 seconds later.
+ ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+ defer cancel()
+
+ // It will print out "Retry" every 5 seconds.
+ err := runutil.Retry(5*time.Second, ctx.Done(), func() error {
+ fmt.Println("Retry")
+ return errors.New("Try to retry")
+ })
+ if err != nil {
+ log.Fatal(err)
+ }
+}
diff --git a/pkg/runutil/runutil.go b/pkg/runutil/runutil.go
index e10732cf3ff..d6dbfe7e6a6 100644
--- a/pkg/runutil/runutil.go
+++ b/pkg/runutil/runutil.go
@@ -1,13 +1,51 @@
+// Package runutil provides helpers to advanced function scheduling control like repeat or retry.
+//
+// It's very often the case when you need to excutes some code every fixed intervals or have it retried automatically.
+// To make it reliably with proper timeout, you need to carefully arrange some boilerplate for this.
+// Below function does it for you.
+//
+// For repeat executes, use Repeat:
+//
+// err := runutil.Repeat(10*time.Second, stopc, func() error {
+// // ...
+// })
+//
+// Retry starts executing closure function f until no error is returned from f:
+//
+// err := runutil.Retry(10*time.Second, stopc, func() error {
+// // ...
+// })
+//
+// For logging an error on each f error, use RetryWithLog:
+//
+// err := runutil.RetryWithLog(logger, 10*time.Second, stopc, func() error {
+// // ...
+// })
+//
+// Another use case for runutil package is when you want to close a `Closer` interface. As we all know, we should close all implements of `Closer`, such as *os.File. Commonly we will use:
+//
+// defer closer.Close()
+//
+// The problem is that Close() usually can return important error e.g for os.File the actual file flush might happen (and fail) on `Close` method. It's important to *always* check error. Thanos provides utility functions to log every error like those, allowing to put them in convenient `defer`:
+//
+// defer runutil.CloseWithLogOnErr(logger, closer, "log format message")
+//
+// For capturing error, use CloseWithErrCapture:
+//
+// var err error
+// defer runutil.CloseWithErrCapture(logger, &err, closer, "log format message")
+//
+// // ...
+//
+// If Close() returns error, err will capture it and return by argument.
package runutil
import (
+ "fmt"
+ "io"
"os"
"time"
- "io"
-
- "fmt"
-
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
diff --git a/pkg/shipper/shipper.go b/pkg/shipper/shipper.go
index 5c1df9a9b7e..2163ec8b7a9 100644
--- a/pkg/shipper/shipper.go
+++ b/pkg/shipper/shipper.go
@@ -11,6 +11,8 @@ import (
"path"
"path/filepath"
+ "github.com/improbable-eng/thanos/pkg/block/metadata"
+
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/improbable-eng/thanos/pkg/block"
@@ -69,7 +71,7 @@ type Shipper struct {
metrics *metrics
bucket objstore.Bucket
labels func() labels.Labels
- source block.SourceType
+ source metadata.SourceType
}
// New creates a new shipper that detects new TSDB blocks in dir and uploads them
@@ -80,7 +82,7 @@ func New(
dir string,
bucket objstore.Bucket,
lbls func() labels.Labels,
- source block.SourceType,
+ source metadata.SourceType,
) *Shipper {
if logger == nil {
logger = log.NewNopLogger()
@@ -114,7 +116,7 @@ func (s *Shipper) Timestamps() (minTime, maxSyncTime int64, err error) {
minTime = math.MaxInt64
maxSyncTime = math.MinInt64
- if err := s.iterBlockMetas(func(m *block.Meta) error {
+ if err := s.iterBlockMetas(func(m *metadata.Meta) error {
if m.MinTime < minTime {
minTime = m.MinTime
}
@@ -158,7 +160,7 @@ func (s *Shipper) Sync(ctx context.Context) {
// TODO(bplotka): If there are no blocks in the system check for WAL dir to ensure we have actually
// access to real TSDB dir (!).
- if err = s.iterBlockMetas(func(m *block.Meta) error {
+ if err = s.iterBlockMetas(func(m *metadata.Meta) error {
// Do not sync a block if we already uploaded it. If it is no longer found in the bucket,
// it was generally removed by the compaction process.
if _, ok := hasUploaded[m.ULID]; !ok {
@@ -180,7 +182,7 @@ func (s *Shipper) Sync(ctx context.Context) {
}
}
-func (s *Shipper) sync(ctx context.Context, meta *block.Meta) (err error) {
+func (s *Shipper) sync(ctx context.Context, meta *metadata.Meta) (err error) {
dir := filepath.Join(s.dir, meta.ULID.String())
// We only ship of the first compacted block level.
@@ -225,7 +227,7 @@ func (s *Shipper) sync(ctx context.Context, meta *block.Meta) (err error) {
meta.Thanos.Labels = lset.Map()
}
meta.Thanos.Source = s.source
- if err := block.WriteMetaFile(s.logger, updir, meta); err != nil {
+ if err := metadata.Write(s.logger, updir, meta); err != nil {
return errors.Wrap(err, "write meta file")
}
return block.Upload(ctx, s.logger, s.bucket, updir)
@@ -234,7 +236,7 @@ func (s *Shipper) sync(ctx context.Context, meta *block.Meta) (err error) {
// iterBlockMetas calls f with the block meta for each block found in dir. It logs
// an error and continues if it cannot access a meta.json file.
// If f returns an error, the function returns with the same error.
-func (s *Shipper) iterBlockMetas(f func(m *block.Meta) error) error {
+func (s *Shipper) iterBlockMetas(f func(m *metadata.Meta) error) error {
names, err := fileutil.ReadDir(s.dir)
if err != nil {
return errors.Wrap(err, "read dir")
@@ -253,7 +255,7 @@ func (s *Shipper) iterBlockMetas(f func(m *block.Meta) error) error {
if !fi.IsDir() {
continue
}
- m, err := block.ReadMetaFile(dir)
+ m, err := metadata.Read(dir)
if err != nil {
level.Warn(s.logger).Log("msg", "reading meta file failed", "err", err)
continue
diff --git a/pkg/shipper/shipper_e2e_test.go b/pkg/shipper/shipper_e2e_test.go
index 8f0e70ecd65..496d53322f3 100644
--- a/pkg/shipper/shipper_e2e_test.go
+++ b/pkg/shipper/shipper_e2e_test.go
@@ -14,6 +14,7 @@ import (
"github.com/go-kit/kit/log"
"github.com/improbable-eng/thanos/pkg/block"
+ "github.com/improbable-eng/thanos/pkg/block/metadata"
"github.com/improbable-eng/thanos/pkg/objstore"
"github.com/improbable-eng/thanos/pkg/objstore/objtesting"
"github.com/improbable-eng/thanos/pkg/testutil"
@@ -32,7 +33,7 @@ func TestShipper_UploadBlocks_e2e(t *testing.T) {
}()
extLset := labels.FromStrings("prometheus", "prom-1")
- shipper := New(log.NewLogfmtLogger(os.Stderr), nil, dir, bkt, func() labels.Labels { return extLset }, block.TestSource)
+ shipper := New(log.NewLogfmtLogger(os.Stderr), nil, dir, bkt, func() labels.Labels { return extLset }, metadata.TestSource)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@@ -54,7 +55,7 @@ func TestShipper_UploadBlocks_e2e(t *testing.T) {
testutil.Ok(t, os.Mkdir(tmp, 0777))
- meta := block.Meta{
+ meta := metadata.Meta{
BlockMeta: tsdb.BlockMeta{
MinTime: timestamp.FromTime(now.Add(time.Duration(i) * time.Hour)),
MaxTime: timestamp.FromTime(now.Add((time.Duration(i) * time.Hour) + 1)),
@@ -62,7 +63,7 @@ func TestShipper_UploadBlocks_e2e(t *testing.T) {
}
meta.Version = 1
meta.ULID = id
- meta.Thanos.Source = block.TestSource
+ meta.Thanos.Source = metadata.TestSource
metab, err := json.Marshal(&meta)
testutil.Ok(t, err)
diff --git a/pkg/shipper/shipper_test.go b/pkg/shipper/shipper_test.go
index 368c5e92b99..150b5264db1 100644
--- a/pkg/shipper/shipper_test.go
+++ b/pkg/shipper/shipper_test.go
@@ -2,15 +2,13 @@ package shipper
import (
"io/ioutil"
- "os"
- "testing"
-
"math"
-
+ "os"
"path"
+ "testing"
"github.com/go-kit/kit/log"
- "github.com/improbable-eng/thanos/pkg/block"
+ "github.com/improbable-eng/thanos/pkg/block/metadata"
"github.com/improbable-eng/thanos/pkg/testutil"
"github.com/oklog/ulid"
"github.com/prometheus/tsdb"
@@ -23,7 +21,7 @@ func TestShipperTimestamps(t *testing.T) {
testutil.Ok(t, os.RemoveAll(dir))
}()
- s := New(nil, nil, dir, nil, nil, block.TestSource)
+ s := New(nil, nil, dir, nil, nil, metadata.TestSource)
// Missing thanos meta file.
_, _, err = s.Timestamps()
@@ -41,7 +39,7 @@ func TestShipperTimestamps(t *testing.T) {
id1 := ulid.MustNew(1, nil)
testutil.Ok(t, os.Mkdir(path.Join(dir, id1.String()), os.ModePerm))
- testutil.Ok(t, block.WriteMetaFile(log.NewNopLogger(), path.Join(dir, id1.String()), &block.Meta{
+ testutil.Ok(t, metadata.Write(log.NewNopLogger(), path.Join(dir, id1.String()), &metadata.Meta{
Version: 1,
BlockMeta: tsdb.BlockMeta{
ULID: id1,
@@ -56,7 +54,7 @@ func TestShipperTimestamps(t *testing.T) {
id2 := ulid.MustNew(2, nil)
testutil.Ok(t, os.Mkdir(path.Join(dir, id2.String()), os.ModePerm))
- testutil.Ok(t, block.WriteMetaFile(log.NewNopLogger(), path.Join(dir, id2.String()), &block.Meta{
+ testutil.Ok(t, metadata.Write(log.NewNopLogger(), path.Join(dir, id2.String()), &metadata.Meta{
Version: 1,
BlockMeta: tsdb.BlockMeta{
ULID: id2,
diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go
index e121e9d4081..2614bfbdca1 100644
--- a/pkg/store/bucket.go
+++ b/pkg/store/bucket.go
@@ -16,6 +16,8 @@ import (
"sync"
"time"
+ "github.com/improbable-eng/thanos/pkg/block/metadata"
+
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/improbable-eng/thanos/pkg/block"
@@ -459,6 +461,7 @@ func (s *bucketSeriesSet) Err() error {
return s.err
}
+// blockSeries return requested series from given index and chunk readers.
func (s *BucketStore) blockSeries(
ctx context.Context,
ulid ulid.ULID,
@@ -488,7 +491,6 @@ func (s *BucketStore) blockSeries(
}
// Get result postings list by resolving the postings tree.
- // TODO(bwplotka): Users are seeing panics here, because of lazyPosting being not loaded by preloadPostings.
ps, err := index.ExpandPostings(lazyPostings)
if err != nil {
return nil, stats, errors.Wrap(err, "expand postings")
@@ -504,7 +506,8 @@ func (s *BucketStore) blockSeries(
}
}
- // Preload all series index data
+ // Preload all series index data.
+ // TODO(bwplotka): Consider not keeping all series in memory all the time.
if err := indexr.preloadSeries(ps); err != nil {
return nil, stats, errors.Wrap(err, "preload series")
}
@@ -1001,7 +1004,7 @@ func (s *bucketBlockSet) labelMatchers(matchers ...labels.Matcher) ([]labels.Mat
type bucketBlock struct {
logger log.Logger
bucket objstore.BucketReader
- meta *block.Meta
+ meta *metadata.Meta
dir string
indexCache *indexCache
chunkPool *pool.BytesPool
@@ -1065,7 +1068,7 @@ func (b *bucketBlock) loadMeta(ctx context.Context, id ulid.ULID) error {
} else if err != nil {
return err
}
- meta, err := block.ReadMetaFile(b.dir)
+ meta, err := metadata.Read(b.dir)
if err != nil {
return errors.Wrap(err, "read meta.json")
}
@@ -1095,19 +1098,15 @@ func (b *bucketBlock) loadIndexCache(ctx context.Context) (err error) {
}
}()
- indexr, err := index.NewFileReader(fn)
- if err != nil {
- return errors.Wrap(err, "open index reader")
- }
- defer runutil.CloseWithLogOnErr(b.logger, indexr, "load index cache reader")
+ // Create index cache adhoc.
- if err := block.WriteIndexCache(b.logger, cachefn, indexr); err != nil {
+ if err := block.WriteIndexCache(b.logger, fn, cachefn); err != nil {
return errors.Wrap(err, "write index cache")
}
b.indexVersion, b.symbols, b.lvals, b.postings, err = block.ReadIndexCache(b.logger, cachefn)
if err != nil {
- return errors.Wrap(err, "read index cache")
+ return errors.Wrap(err, "read fresh index cache")
}
return nil
}
@@ -1179,15 +1178,22 @@ func newBucketIndexReader(ctx context.Context, logger log.Logger, block *bucketB
logger: logger,
ctx: ctx,
block: block,
- dec: &index.Decoder{},
stats: &queryStats{},
cache: cache,
loadedSeries: map[uint64][]byte{},
}
- r.dec.SetSymbolTable(r.block.symbols)
+ r.dec = &index.Decoder{LookupSymbol: r.lookupSymbol}
return r
}
+func (r *bucketIndexReader) lookupSymbol(o uint32) (string, error) {
+ s, ok := r.block.symbols[o]
+ if !ok {
+ return "", errors.Errorf("bucketIndexReader: unknown symbol offset %d", o)
+ }
+ return s, nil
+}
+
func (r *bucketIndexReader) preloadPostings() error {
const maxGapSize = 512 * 1024
@@ -1270,23 +1276,24 @@ func (r *bucketIndexReader) loadPostings(ctx context.Context, postings []*lazyPo
return nil
}
-func (r *bucketIndexReader) preloadSeries(ids []uint64) error {
+func (r *bucketIndexReader) preloadSeries(refs []uint64) error {
const maxSeriesSize = 64 * 1024
const maxGapSize = 512 * 1024
- var newIDs []uint64
+ var newRefs []uint64
- for _, id := range ids {
- if b, ok := r.cache.series(r.block.meta.ULID, id); ok {
- r.loadedSeries[id] = b
+ for _, ref := range refs {
+ if b, ok := r.cache.series(r.block.meta.ULID, ref); ok {
+ r.loadedSeries[ref] = b
continue
}
- newIDs = append(newIDs, id)
+ newRefs = append(newRefs, ref)
}
- ids = newIDs
+ refs = newRefs
- parts := partitionRanges(len(ids), func(i int) (start, end uint64) {
- return ids[i], ids[i] + maxSeriesSize
+ // Combine multiple close byte ranges to not be rate-limited from object storage.
+ parts := partitionRanges(len(refs), func(i int) (start, end uint64) {
+ return refs[i], refs[i] + maxSeriesSize
}, maxGapSize)
var g run.Group
@@ -1295,7 +1302,7 @@ func (r *bucketIndexReader) preloadSeries(ids []uint64) error {
i, j := p[0], p[1]
g.Add(func() error {
- return r.loadSeries(ctx, ids[i:j], ids[i], ids[j-1]+maxSeriesSize)
+ return r.loadSeries(ctx, refs[i:j], refs[i], refs[j-1]+maxSeriesSize)
}, func(err error) {
if err != nil {
cancel()
@@ -1305,7 +1312,7 @@ func (r *bucketIndexReader) preloadSeries(ids []uint64) error {
return g.Run()
}
-func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []uint64, start, end uint64) error {
+func (r *bucketIndexReader) loadSeries(ctx context.Context, refs []uint64, start, end uint64) error {
begin := time.Now()
b, err := r.block.readIndexRange(ctx, int64(start), int64(end-start))
@@ -1317,12 +1324,12 @@ func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []uint64, start,
defer r.mtx.Unlock()
r.stats.seriesFetchCount++
- r.stats.seriesFetched += len(ids)
+ r.stats.seriesFetched += len(refs)
r.stats.seriesFetchDurationSum += time.Since(begin)
r.stats.seriesFetchedSizeSum += int(end - start)
- for _, id := range ids {
- c := b[id-start:]
+ for _, ref := range refs {
+ c := b[ref-start:]
l, n := binary.Uvarint(c)
if n < 1 {
@@ -1332,8 +1339,8 @@ func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []uint64, start,
return errors.Errorf("invalid remaining size %d, expected %d", len(c), n+int(l))
}
c = c[n : n+int(l)]
- r.loadedSeries[id] = c
- r.cache.setSeries(r.block.meta.ULID, id, c)
+ r.loadedSeries[ref] = c
+ r.cache.setSeries(r.block.meta.ULID, ref, c)
}
return nil
}
@@ -1421,6 +1428,7 @@ func (r *bucketIndexReader) SortedPostings(p index.Postings) index.Postings {
// Series populates the given labels and chunk metas for the series identified
// by the reference.
// Returns ErrNotFound if the ref does not resolve to a known series.
+// prealoadSeries needs to be invoked first to have this method return loaded results.
func (r *bucketIndexReader) Series(ref uint64, lset *labels.Labels, chks *[]chunks.Meta) error {
b, ok := r.loadedSeries[ref]
if !ok {
@@ -1438,6 +1446,11 @@ func (r *bucketIndexReader) LabelIndices() ([][]string, error) {
return nil, errors.New("not implemented")
}
+// LabelNames returns all the unique label names present in the index in sorted order.
+func (r *bucketIndexReader) LabelNames() ([]string, error) {
+ return nil, errors.New("not implemented")
+}
+
// Close released the underlying resources of the reader.
func (r *bucketIndexReader) Close() error {
r.block.pendingReaders.Done()
diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go
index b5bbd6692a3..9b0d536f78e 100644
--- a/pkg/store/bucket_e2e_test.go
+++ b/pkg/store/bucket_e2e_test.go
@@ -10,6 +10,7 @@ import (
"github.com/go-kit/kit/log"
"github.com/improbable-eng/thanos/pkg/block"
+ "github.com/improbable-eng/thanos/pkg/block/metadata"
"github.com/improbable-eng/thanos/pkg/objstore"
"github.com/improbable-eng/thanos/pkg/objstore/objtesting"
"github.com/improbable-eng/thanos/pkg/runutil"
@@ -66,10 +67,10 @@ func TestBucketStore_e2e(t *testing.T) {
dir1, dir2 := filepath.Join(dir, id1.String()), filepath.Join(dir, id2.String())
// Add labels to the meta of the second block.
- meta, err := block.ReadMetaFile(dir2)
+ meta, err := metadata.Read(dir2)
testutil.Ok(t, err)
meta.Thanos.Labels = map[string]string{"ext2": "value2"}
- testutil.Ok(t, block.WriteMetaFile(log.NewNopLogger(), dir2, meta))
+ testutil.Ok(t, metadata.Write(log.NewNopLogger(), dir2, meta))
testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, dir1))
testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, dir2))
diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go
index fa23dfc6f8e..ef531b81a5e 100644
--- a/pkg/store/bucket_test.go
+++ b/pkg/store/bucket_test.go
@@ -4,13 +4,11 @@ import (
"testing"
"time"
- "github.com/oklog/ulid"
-
- "github.com/improbable-eng/thanos/pkg/compact/downsample"
-
"github.com/fortytw2/leaktest"
- "github.com/improbable-eng/thanos/pkg/block"
+ "github.com/improbable-eng/thanos/pkg/block/metadata"
+ "github.com/improbable-eng/thanos/pkg/compact/downsample"
"github.com/improbable-eng/thanos/pkg/testutil"
+ "github.com/oklog/ulid"
"github.com/prometheus/tsdb/labels"
)
@@ -41,7 +39,7 @@ func TestBucketBlockSet_addGet(t *testing.T) {
}
for _, in := range input {
- var m block.Meta
+ var m metadata.Meta
m.Thanos.Downsample.Resolution = in.window
m.MinTime = in.mint
m.MaxTime = in.maxt
@@ -102,7 +100,7 @@ func TestBucketBlockSet_addGet(t *testing.T) {
var exp []*bucketBlock
for _, b := range c.res {
- var m block.Meta
+ var m metadata.Meta
m.Thanos.Downsample.Resolution = b.window
m.MinTime = b.mint
m.MaxTime = b.maxt
@@ -129,7 +127,7 @@ func TestBucketBlockSet_remove(t *testing.T) {
}
for _, in := range input {
- var m block.Meta
+ var m metadata.Meta
m.ULID = in.id
m.MinTime = in.mint
m.MaxTime = in.maxt
diff --git a/pkg/testutil/prometheus.go b/pkg/testutil/prometheus.go
index 807b61f8a05..4634c97263e 100644
--- a/pkg/testutil/prometheus.go
+++ b/pkg/testutil/prometheus.go
@@ -12,8 +12,9 @@ import (
"syscall"
"time"
+ "github.com/improbable-eng/thanos/pkg/block/metadata"
+
"github.com/go-kit/kit/log"
- "github.com/improbable-eng/thanos/pkg/block"
"github.com/improbable-eng/thanos/pkg/runutil"
"github.com/oklog/ulid"
"github.com/pkg/errors"
@@ -157,7 +158,7 @@ func (p *Prometheus) SetConfig(s string) (err error) {
// Stop terminates Prometheus and clean up its data directory.
func (p *Prometheus) Stop() error {
if err := p.cmd.Process.Signal(syscall.SIGTERM); err != nil {
- return errors.Wrapf(err, "failed to Prometheus. Kill it manually and cleanr %s dir", p.db.Dir())
+ return errors.Wrapf(err, "failed to Prometheus. Kill it manually and clean %s dir", p.db.Dir())
}
time.Sleep(time.Second / 2)
@@ -188,7 +189,7 @@ func CreateBlock(
extLset labels.Labels,
resolution int64,
) (id ulid.ULID, err error) {
- h, err := tsdb.NewHead(nil, nil, tsdb.NopWAL(), 10000000000)
+ h, err := tsdb.NewHead(nil, nil, nil, 10000000000)
if err != nil {
return id, errors.Wrap(err, "create head block")
}
@@ -238,15 +239,15 @@ func CreateBlock(
return id, errors.Wrap(err, "create compactor")
}
- id, err = c.Write(dir, h, mint, maxt)
+ id, err = c.Write(dir, h, mint, maxt, nil)
if err != nil {
return id, errors.Wrap(err, "write block")
}
- if _, err = block.InjectThanosMeta(log.NewNopLogger(), filepath.Join(dir, id.String()), block.ThanosMeta{
+ if _, err = metadata.InjectThanos(log.NewNopLogger(), filepath.Join(dir, id.String()), metadata.Thanos{
Labels: extLset.Map(),
- Downsample: block.ThanosDownsampleMeta{Resolution: resolution},
- Source: block.TestSource,
+ Downsample: metadata.ThanosDownsample{Resolution: resolution},
+ Source: metadata.TestSource,
}, nil); err != nil {
return id, errors.Wrap(err, "finalize block")
}
diff --git a/pkg/verifier/index_issue.go b/pkg/verifier/index_issue.go
index 54a20703d45..72100b15bc6 100644
--- a/pkg/verifier/index_issue.go
+++ b/pkg/verifier/index_issue.go
@@ -8,6 +8,8 @@ import (
"path"
"path/filepath"
+ "github.com/improbable-eng/thanos/pkg/block/metadata"
+
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/improbable-eng/thanos/pkg/block"
@@ -94,7 +96,7 @@ func IndexIssue(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bac
logger,
tmpdir,
id,
- block.BucketRepairSource,
+ metadata.BucketRepairSource,
block.IgnoreCompleteOutsideChunk,
block.IgnoreDuplicateOutsideChunk,
block.IgnoreIssue347OutsideChunk,