Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
remove goprocess from api
  • Loading branch information
gammazero committed Feb 20, 2025
commit fd12b84daeb175e9ce62e70e813a74d13fffc4e2
37 changes: 9 additions & 28 deletions query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,12 +150,6 @@ type Results interface {
NextSync() (Result, bool) // blocks and waits to return the next result, second parameter returns false when results are exhausted
Rest() ([]Entry, error) // waits till processing finishes, returns all entries at once.
Close() error // client may call Close to signal early exit

// Process returns a goprocess.Process associated with these results.
// most users will not need this function (Close is all they want),
// but it's here in case you want to connect the results to other
// goprocess-friendly things.
Process() goprocess.Process
}

// results implements Results
Expand Down Expand Up @@ -186,10 +180,6 @@ func (r *results) Rest() ([]Entry, error) {
return es, nil
}

func (r *results) Process() goprocess.Process {
return r.proc
}

func (r *results) Close() error {
return r.proc.Close()
}
Expand All @@ -210,15 +200,15 @@ func (r *results) Query() Query {
// an early close signal from the client.
type ResultBuilder struct {
Query Query
Process goprocess.Process
process goprocess.Process
Output chan Result
}

// Results returns a Results to to this builder.
func (rb *ResultBuilder) Results() Results {
return &results{
query: rb.Query,
proc: rb.Process,
proc: rb.process,
res: rb.Output,
}
}
Expand All @@ -235,7 +225,7 @@ func NewResultBuilder(q Query) *ResultBuilder {
Query: q,
Output: make(chan Result, bufSize),
}
b.Process = goprocess.WithTeardown(func() error {
b.process = goprocess.WithTeardown(func() error {
close(b.Output)
return nil
})
Expand All @@ -248,7 +238,7 @@ func NewResultBuilder(q Query) *ResultBuilder {
// DEPRECATED: This iterator is impossible to cancel correctly. Canceling it
// will leave anything trying to write to the result channel hanging.
func ResultsWithChan(q Query, res <-chan Result) Results {
return ResultsWithProcess(q, func(worker goprocess.Process, out chan<- Result) {
proc := func(worker goprocess.Process, out chan<- Result) {
for {
select {
case <-worker.Closing(): // client told us to close early
Expand All @@ -265,20 +255,16 @@ func ResultsWithChan(q Query, res <-chan Result) Results {
}
}
}
})
}
}

// ResultsWithProcess returns a Results object with the results generated by the
// passed subprocess.
func ResultsWithProcess(q Query, proc func(goprocess.Process, chan<- Result)) Results {
b := NewResultBuilder(q)

// go consume all the entries and add them to the results.
b.Process.Go(func(worker goprocess.Process) {
b.process.Go(func(worker goprocess.Process) {
proc(worker, b.Output)
})

go b.Process.CloseAfterChildren() //nolint
go b.process.CloseAfterChildren() //nolint
return b.Results()
}

Expand Down Expand Up @@ -378,11 +364,6 @@ func (r *resultsIter) Rest() ([]Entry, error) {
return es, nil
}

func (r *resultsIter) Process() goprocess.Process {
r.useLegacyResults()
return r.legacyResults.Process()
}

func (r *resultsIter) Close() error {
if r.legacyResults != nil {
return r.legacyResults.Close()
Expand All @@ -403,7 +384,7 @@ func (r *resultsIter) useLegacyResults() {
b := NewResultBuilder(r.query)

// go consume all the entries and add them to the results.
b.Process.Go(func(worker goprocess.Process) {
b.process.Go(func(worker goprocess.Process) {
defer r.close()
for {
e, ok := r.next()
Expand All @@ -418,7 +399,7 @@ func (r *resultsIter) useLegacyResults() {
}
})

go b.Process.CloseAfterChildren() //nolint
go b.process.CloseAfterChildren() //nolint

r.legacyResults = b.Results().(*results)
}