Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
mix: update comments and clean up commented out codes, todos
  • Loading branch information
bom-d-van committed Dec 19, 2019
commit 8a3bdde501987cf69ac33f70d5c1f25a81363da0
13 changes: 10 additions & 3 deletions compress.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,12 +361,18 @@ func (whisper *Whisper) fetchCompressed(start, end int64, archive *archiveInfo)
}
}

// Start live aggregation. This probably has a read peformance hit.
if base := whisper.archives[0]; base != archive {
var dps []dataPoint

// Mix aggregation is triggered when block in base archive is rotated and also
// depends on the sufficiency of data points. This could results to a over
// long gap when fetching data from higer archives, depending on different
// retention policy. Therefore cwhisper needs to do live aggregation.
if whisper.aggregationMethod == Mix {
var overflow = len(dst) > 0 && dst[len(dst)-1].interval < int(end)
var baseLookupNeeded = len(dst) > 0 && dst[len(dst)-1].interval < int(end)
var inBase bool
if overflow {
if baseLookupNeeded {
bstart, bend := base.getOverallRange()
inBase = int64(bstart) <= end || end <= int64(bend)
}
Expand All @@ -385,6 +391,7 @@ func (whisper *Whisper) fetchCompressed(start, end int64, archive *archiveInfo)
}
}

// This would benefits both mix and no-mix aggregations.
if base.hasBuffer() {
for _, p := range unpackDataPoints(base.buffer) {
if p.interval != 0 && int(start) <= p.interval && p.interval <= int(end) {
Expand All @@ -393,7 +400,6 @@ func (whisper *Whisper) fetchCompressed(start, end int64, archive *archiveInfo)
}
}

// live aggregate
var pinterval int
var vals []float64
for i, dp := range dps {
Expand Down Expand Up @@ -514,6 +520,7 @@ func (whisper *Whisper) archiveUpdateManyCompressed(archive *archiveInfo, points
aggregateValue := aggregate(whisper.aggregationMethod, knownValues)
point := &TimeSeriesPoint{lowerIntervalStart, aggregateValue}

// TODO: consider migrating to a non-recursive propagation implementation like mix policy
if err := whisper.archiveUpdateManyCompressed(lower, []*TimeSeriesPoint{point}); err != nil {
return err
}
Expand Down
7 changes: 0 additions & 7 deletions compress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -912,7 +912,6 @@ func TestEstimatePointSize(t *testing.T) {
return
}

// TODO: srcMix is empty, investigate why
func TestFillCompressedMix(t *testing.T) {
srcPath := "fill-mix.src.cwsp"
dstPath := "fill-mix.dst.cwsp"
Expand Down Expand Up @@ -954,11 +953,9 @@ func TestFillCompressedMix(t *testing.T) {
nowNext := func() time.Time { now++; return Now() }
defer func() { Now = func() time.Time { return time.Now() } }()

// twoMonthsAgo := Now().Add(time.Hour * 24 * -60)
limit = 300 + rand.Intn(100)
for i, end := 0, 60*60*24*80; i < end; i++ {
points = append(points, &TimeSeriesPoint{
// Time: int(twoMonthsAgo.Add(time.Second * time.Duration(i)).Unix()),
Time: int(nowNext().Unix()),
Value: rand.NormFloat64(),
})
Expand Down Expand Up @@ -1078,8 +1075,6 @@ func TestFillCompressedMix(t *testing.T) {
compare(dstMix, oldDstMix, int(Now().Add(time.Hour*24*-2+time.Hour).Unix()), int(Now().Unix()))
}

// TODO: check if there are duplicated timestamps by directly reading
// data from blocks
func TestFetchCompressedMix(t *testing.T) {
srcPath := "fetch-mix.cwsp"
os.Remove(srcPath)
Expand Down Expand Up @@ -1119,7 +1114,6 @@ func TestFetchCompressedMix(t *testing.T) {

for i := 0; i < 4*60*60; i++ {
points = append(points, &TimeSeriesPoint{
// Time: int(start.Add(time.Second * time.Duration(i)).Unix()),
Time: int(Now().Unix()),
Value: float64(i),
})
Expand All @@ -1143,7 +1137,6 @@ func TestFetchCompressedMix(t *testing.T) {
t.Error(err)
}

// data, err := srcMix.Fetch(int(start.Unix()), int(Now().Unix()))
t.Run("Check1stArchive", func(t *testing.T) {
data, err := srcMix.FetchByAggregation(now-10, now, &MixAggregationSpec{Method: Min})
if err != nil {
Expand Down
5 changes: 0 additions & 5 deletions debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,6 @@ func (archive *archiveInfo) dumpInfoCompressed() {
fmt.Printf("points_per_block: %d\n", archive.calculateSuitablePointsPerBlock(archive.whisper.pointsPerBlock))
fmt.Printf("compression_ratio: %f (%d/%d)\n", float64(archive.blockSize*archive.blockCount)/float64(archive.Size()), archive.blockSize*archive.blockCount, archive.Size())
if archive.aggregationSpec != nil {
// if archive.aggregationSpec.Method == Percentile {
// fmt.Printf("aggregation: p%.2f\n", archive.aggregationSpec.Percentile)
// } else {
// fmt.Printf("aggregation: %s\n", archive.aggregationSpec.Method)
// }
fmt.Printf("aggregation: %s\n", archive.aggregationSpec)
}

Expand Down
6 changes: 1 addition & 5 deletions whisper.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ type archiveInfo struct {
// is necessary to have it, so it's possible to optimize away buffers in lower
// archives.
buffer []byte
bufferSize int
bufferSize int // dynamically calculated in Whisper.initMetaInfo

blockRanges []blockRange // TODO: remove: sorted by start
blockSize int
Expand Down Expand Up @@ -643,10 +643,6 @@ func (whisper *Whisper) initMetaInfo() {
}

if i == 0 {
if whisper.aggregationMethod == Mix {
// TODO
// arc.bufferSize =
}
continue
}

Expand Down