Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
mix: initial supports for a new aggregation policy: mix (with percent…
…iles)
  • Loading branch information
bom-d-van committed Dec 8, 2019
commit 67dc1cb85324843c7268fbae094150e3f56c387c
154 changes: 145 additions & 9 deletions compress.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,14 @@ func (whisper *Whisper) WriteHeaderCompressed() (err error) {
i += packInt(b, archive.blockCount, i)
i += packFloat32(b, archive.avgCompressedPointSize, i)

var mixSpec int
if archive.aggregationSpec != nil {
b[i] = byte(archive.aggregationSpec.Method)
i += ByteSize
i += packFloat32(b, archive.aggregationSpec.Percentile, i)
mixSpec = 5
}

i += packInt(b, archive.cblock.index, i)
i += packInt(b, archive.cblock.p0.interval, i)
i += packFloat64(b, archive.cblock.p0.value, i)
Expand All @@ -105,7 +113,11 @@ func (whisper *Whisper) WriteHeaderCompressed() (err error) {
i += packInt(b, int(archive.stats.discard.oldInterval), i)
i += packInt(b, int(archive.stats.extended), i)

i += FreeCompressedArchiveInfoSize
i += FreeCompressedArchiveInfoSize - mixSpec

if FreeCompressedArchiveInfoSize < mixSpec {
panic("out of FreeCompressedArchiveInfoSize") // a panic that should never happens
}
}

// write block_range_info and buffer
Expand Down Expand Up @@ -196,6 +208,14 @@ func (whisper *Whisper) readHeaderCompressed() (err error) {
arc.avgCompressedPointSize = unpackFloat32(b[offset : offset+FloatSize])
offset += FloatSize

if whisper.aggregationMethod == Mix && i > 0 {
arc.aggregationSpec = &MixAggregationSpec{}
arc.aggregationSpec.Method = AggregationMethod(b[offset])
offset += ByteSize
arc.aggregationSpec.Percentile = unpackFloat32(b[offset : offset+FloatSize])
offset += FloatSize
}

arc.cblock.index = unpackInt(b[offset : offset+IntSize])
offset += IntSize
arc.cblock.p0.interval = unpackInt(b[offset : offset+IntSize])
Expand Down Expand Up @@ -348,7 +368,16 @@ func (whisper *Whisper) archiveUpdateManyCompressed(archive *archiveInfo, points
alignedPoints := alignPoints(archive, points)

if !archive.hasBuffer() {
return archive.appendToBlockAndRotate(alignedPoints)
rotated, err := archive.appendToBlockAndRotate(alignedPoints)
if err != nil {
return err
}

if !(whisper.aggregationMethod == Mix && rotated) {
return nil
}

return whisper.propagateToMixedArchivesCompressed()
}

baseIntervalsPerUnit, currentUnit, minInterval := archive.getBufferInfo()
Expand Down Expand Up @@ -391,7 +420,7 @@ func (whisper *Whisper) archiveUpdateManyCompressed(archive *archiveInfo, points
continue
}

if err := archive.appendToBlockAndRotate(dps); err != nil {
if _, err := archive.appendToBlockAndRotate(dps); err != nil {
// TODO: record and continue?
return err
}
Expand Down Expand Up @@ -448,7 +477,7 @@ func (archive *archiveInfo) getBufferByUnit(unit int) []byte {
return archive.buffer[lb:ub]
}

func (archive *archiveInfo) appendToBlockAndRotate(dps []dataPoint) error {
func (archive *archiveInfo) appendToBlockAndRotate(dps []dataPoint) (rotated bool, err error) {
whisper := archive.whisper // TODO: optimize away?

blockBuffer := make([]byte, len(dps)*(MaxCompressedPointSize)+endOfBlockSize)
Expand All @@ -463,7 +492,7 @@ func (archive *archiveInfo) appendToBlockAndRotate(dps []dataPoint) error {
size = len(blockBuffer)
}
if err := whisper.fileWriteAt(blockBuffer[:size], int64(offset)); err != nil {
return err
return rotated, err
}

if len(left) == 0 {
Expand All @@ -487,9 +516,11 @@ func (archive *archiveInfo) appendToBlockAndRotate(dps []dataPoint) error {
archive.cblock = nblock
archive.blockRanges[nblock.index].start = 0
archive.blockRanges[nblock.index].end = 0

rotated = true
}

return nil
return rotated, nil
}

func (whisper *Whisper) extendIfNeeded() error {
Expand Down Expand Up @@ -566,7 +597,7 @@ func (whisper *Whisper) extendIfNeeded() error {
if err != nil {
return fmt.Errorf("archives[%d].blocks[%d].read: %s", i, block.index, err)
}
if err := nwhisper.archives[i].appendToBlockAndRotate(dst); err != nil {
if _, err := nwhisper.archives[i].appendToBlockAndRotate(dst); err != nil {
return fmt.Errorf("archives[%d].blocks[%d].write: %s", i, block.index, err)
}
}
Expand Down Expand Up @@ -1274,7 +1305,7 @@ func (whisper *Whisper) CompressTo(dstPath string) error {
// TODO: consider support moving the last data points to buffer
for i := len(whisper.archives) - 1; i >= 0; i-- {
points := pointsByArchives[i]
if err := dst.archives[i].appendToBlockAndRotate(points); err != nil {
if _, err := dst.archives[i].appendToBlockAndRotate(points); err != nil {
return err
}
}
Expand Down Expand Up @@ -1483,7 +1514,7 @@ func (dstw *Whisper) FillCompressed(srcw *Whisper) error {

for i := len(dstw.archives) - 1; i >= 0; i-- {
points := pointsByArchives[i]
if err := newDst.archives[i].appendToBlockAndRotate(points); err != nil {
if _, err := newDst.archives[i].appendToBlockAndRotate(points); err != nil {
return err
}
copy(newDst.archives[i].buffer, dstw.archives[i].buffer)
Expand All @@ -1506,3 +1537,108 @@ func (dstw *Whisper) FillCompressed(srcw *Whisper) error {

return nil
}

func (whisper *Whisper) propagateToMixedArchivesCompressed() error {
var largestSPP int
// var latestInterval int
var lastArchive *archiveInfo
var spps []int
for _, arc := range whisper.archives[1:] {
if arc.secondsPerPoint > largestSPP {
largestSPP = arc.secondsPerPoint
// latestInterval = arc.cblock.pn1.interval
lastArchive = arc
}

var knownSPP bool
for _, spp := range spps {
knownSPP = knownSPP || (arc.secondsPerPoint == spp)
}
if !knownSPP {
spps = append(spps, arc.secondsPerPoint)
}
}
if largestSPP == 0 {
return nil
}

var propagatable bool
var baseArchive = whisper.archives[0]
var newInterval = baseArchive.cblock.pn1.interval - mod(baseArchive.cblock.pn1.interval, lastArchive.secondsPerPoint)
if lastArchive.cblock.pn1.interval == 0 {
propagatable = baseArchive.cblock.pn1.interval-baseArchive.getSortedBlockRanges()[0].start > largestSPP
} else {
propagatable = newInterval-lastArchive.cblock.pn1.interval > largestSPP // has at least 1 point to be propagated
}

if !propagatable {
return nil
}

from := lastArchive.cblock.pn1.interval + lastArchive.secondsPerPoint
until := baseArchive.cblock.pn1.interval - mod(baseArchive.cblock.pn1.interval, lastArchive.secondsPerPoint)
dps, err := whisper.fetchCompressed(int64(from), int64(until), baseArchive)
if err != nil {
return fmt.Errorf("mix: failed to baseArchive.fetchCompressed(%d, %d): %s", from, until, err)
}

type groupedDataPoint struct {
interval int
values []float64
}
var dpsBySPP = map[int][]*groupedDataPoint{}
for _, dp := range dps {
for _, spp := range spps {
interval := dp.interval - mod(dp.interval, spp)
if len(dpsBySPP[spp]) > 0 {
gdp := dpsBySPP[spp][len(dpsBySPP[spp])-1]
if gdp.interval == interval {
gdp.values = append(gdp.values, dp.value)
continue
}

// check we have enough data points to propagate a value
knownPercent := float32(len(gdp.values)) / float32(spp/baseArchive.secondsPerPoint)
if knownPercent < whisper.xFilesFactor {
gdp.interval = interval
gdp.values = []float64{}
continue
}

// sorted for percentiles
sort.Float64s(gdp.values)
}
dpsBySPP[spp] = append(dpsBySPP[spp], &groupedDataPoint{interval: interval, values: []float64{dp.value}})
}
}

for _, arc := range whisper.archives[1:] {
gdps := dpsBySPP[arc.secondsPerPoint]
dps := make([]dataPoint, len(gdps))
for i, gdp := range gdps {
dps[i].interval = gdp.interval
if arc.aggregationSpec.Method == Percentile {
dps[i].value = getPercentile(arc.aggregationSpec.Percentile, gdp.values)
} else {
dps[i].value = aggregate(arc.aggregationSpec.Method, gdp.values)
}
}

if _, err := arc.appendToBlockAndRotate(dps); err != nil {
return fmt.Errorf("mix: failed to propagate archive %s: %s", arc.Retention, err)
}
}

return nil
}

// getPercentiles assumes values are sorted and use the nearest-rank method
func getPercentile(p float32, vals []float64) float64 {
pos := int(math.Ceil(float64(p) * float64(len(vals)) / 100))
if pos == 0 {
pos = 1
} else if pos > len(vals) {
pos = len(vals)
}
return vals[pos-1]
}
87 changes: 87 additions & 0 deletions compress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,93 @@ func TestCompressedWhisperReadWrite3(t *testing.T) {
}
}

func TestMixAggregationCompressed(t *testing.T) {
fpath := "mix.cwsp"
os.Remove(fpath)

whisper, err := CreateWithOptions(
fpath,
[]*Retention{
{secondsPerPoint: 1, numberOfPoints: 172800}, // 1s:2d
{secondsPerPoint: 60, numberOfPoints: 40320}, // 1m:28d
{secondsPerPoint: 3600, numberOfPoints: 17520}, // 1h:2y
},
Mix,
0,
&Options{
Compressed: true, PointsPerBlock: 7200, InMemory: true,
MixAggregationSpecs: []MixAggregationSpec{
{Method: Average, Percentile: 0},
{Method: Sum, Percentile: 0},
{Method: Last, Percentile: 0},
{Method: Max, Percentile: 0},
{Method: Min, Percentile: 0},
{Method: Percentile, Percentile: 50},
{Method: Percentile, Percentile: 95},
{Method: Percentile, Percentile: 99},
},
},
)
if err != nil {
panic(err)
}
// whisper.Close()

// var now = time.Now()
// var total = 60*60*24*365*2 + 37
var start = time.Now().Add(time.Duration(time.Hour * -24))
// var start = time.Unix(1491778800, 0)
Now = func() time.Time { return start }
defer func() { Now = func() time.Time { return time.Now() } }()

var ps []*TimeSeriesPoint
for i := 0; i < 24*60*60; i++ {
ps = append(ps, &TimeSeriesPoint{
Time: int(start.Unix()),
// Value: float64(rand.Intn(100000)),
Value: float64(i),
})
start = start.Add(time.Second)
}
if err := whisper.UpdateMany(ps); err != nil {
t.Fatal(err)
}
if err := whisper.Close(); err != nil {
t.Fatal(err)
}
whisper.file.(*memFile).dumpOnDisk(fpath)

// TODO: make some real data validations
{
vals, err := whisper.FetchByAggregation(int(Now().Add(time.Hour*24*-3).Unix()), int(Now().Unix()), &MixAggregationSpec{Method: Last})
if err != nil {
t.Error(err)
}
log.Printf("vals = %+v\n", vals)
}
{
vals, err := whisper.FetchByAggregation(int(Now().Add(time.Hour*24*-3).Unix()), int(Now().Unix()), &MixAggregationSpec{Method: Percentile, Percentile: 50})
if err != nil {
t.Error(err)
}
log.Printf("vals = %+v\n", vals)
}
{
vals, err := whisper.FetchByAggregation(int(Now().Add(time.Hour*24*-30).Unix()), int(Now().Unix()), &MixAggregationSpec{Method: Last})
if err != nil {
t.Error(err)
}
log.Printf("vals = %+v\n", vals)
}
{
vals, err := whisper.FetchByAggregation(int(Now().Add(time.Hour*24*-30).Unix()), int(Now().Unix()), &MixAggregationSpec{Method: Percentile, Percentile: 50})
if err != nil {
t.Error(err)
}
log.Printf("vals = %+v\n", vals)
}
}

func TestCompressTo(t *testing.T) {
fpath := "compress_to.wsp"
os.Remove(fpath)
Expand Down
16 changes: 16 additions & 0 deletions debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,14 @@ func (archive *archiveInfo) dumpInfoCompressed() {
fmt.Printf("block_count: %d\n", archive.blockCount)
fmt.Printf("points_per_block: %d\n", archive.calculateSuitablePointsPerBlock(archive.whisper.pointsPerBlock))
fmt.Printf("compression_ratio: %f (%d/%d)\n", float64(archive.blockSize*archive.blockCount)/float64(archive.Size()), archive.blockSize*archive.blockCount, archive.Size())
if archive.aggregationSpec != nil {
if archive.aggregationSpec.Method == Percentile {
fmt.Printf("aggregation: p%.2f\n", archive.aggregationSpec.Percentile)
} else {
fmt.Printf("aggregation: %s\n", archive.aggregationSpec.Method)
}
}

fmt.Printf("cblock\n")
fmt.Printf(" index: %d\n", archive.cblock.index)
fmt.Printf(" p[0].interval: %d\n", archive.cblock.p0.interval)
Expand Down Expand Up @@ -151,6 +159,14 @@ func (arc *archiveInfo) dumpDataPointsCompressed() {
}
}

if arc.aggregationSpec != nil {
if arc.aggregationSpec.Method == Percentile {
fmt.Printf("aggregation: p%.2f\n", arc.aggregationSpec.Percentile)
} else {
fmt.Printf("aggregation: %s\n", arc.aggregationSpec.Method)
}
}

for _, block := range arc.blockRanges {
fmt.Printf("archive %s block %d @%d\n", arc.Retention, block.index, arc.blockOffset(block.index))
if block.start == 0 {
Expand Down
Loading