Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 90 additions & 0 deletions cli/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ const (
dataFlagDatabasePassword = "password"
dataFlagFilterTags = "filter-tags"
dataFlagTimeout = "timeout"
dataFlagCollectionType = "collection-type"
dataFlagPipelineName = "pipeline-name"
dataFlagIndexName = "index-name"
dataFlagIndexSpecFile = "index-path"

datapipelineFlagSchedule = "schedule"
datapipelineFlagMQL = "mql"
Expand Down Expand Up @@ -1381,6 +1385,92 @@ var app = &cli.App{
},
},
},
{
Name: "index",
Usage: "manage indexes for hot data stores and pipeline sinks",
Copy link
Member Author

@danielbotros danielbotros Oct 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Happy to hear opinions on better usage messages

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this one let's do manage indexes for hot data and pipeline sink collections

UsageText: createUsageText("data index", nil, false, true),
HideHelpCommand: true,
Subcommands: []*cli.Command{
{
Name: "create",
Usage: "create an index for a data collection",
UsageText: createUsageText(
"data index create", []string{generalFlagOrgID, dataFlagCollectionType, dataFlagIndexSpecFile}, true, false,
),
Flags: []cli.Flag{
&cli.StringFlag{
Name: generalFlagOrgID,
Required: true,
Usage: "org ID of the data collection",
},
&cli.StringFlag{
Name: dataFlagCollectionType,
Required: true,
Usage: formatAcceptedValues("collection type", "hot_store", "pipeline_sink"),
},
&cli.StringFlag{
Name: dataFlagPipelineName,
Required: false,
Usage: "name of the pipeline associated with the index when collection type is 'pipeline_sink'",
},
&cli.PathFlag{
Name: dataFlagIndexSpecFile,
Required: true,
Usage: "path to index specification JSON file",
TakesFile: true,
},
},
Action: createCommandWithT[createCustomIndexArgs](CreateCustomIndexAction),
},
{
Name: "delete",
Usage: "delete an index from a data collection",
UsageText: createUsageText("data index delete", []string{generalFlagOrgID, dataFlagCollectionType, dataFlagIndexName}, true, false),
Flags: []cli.Flag{
&cli.StringFlag{
Name: generalFlagOrgID,
Required: true,
Usage: "org ID of the data collection",
},
&cli.StringFlag{
Name: dataFlagCollectionType,
Required: true,
Usage: formatAcceptedValues("collection type", "hot_store", "pipeline_sink"),
},
&cli.StringFlag{
Name: dataFlagPipelineName,
Required: false,
Usage: "name of the pipeline associated with the index when collection type is 'pipeline_sink'",
},
&cli.StringFlag{
Name: dataFlagIndexName,
Required: true,
Usage: "name of the index to delete",
},
},
Before: createCommandWithT[deleteCustomIndexArgs](DeleteCustomIndexConfirmation),
Action: createCommandWithT[deleteCustomIndexArgs](DeleteCustomIndexAction),
},
{
Name: "list",
Usage: "list indexes for a data collection",
UsageText: createUsageText("data index list", []string{generalFlagOrgID, dataFlagCollectionType}, false, false),
Flags: []cli.Flag{
&cli.StringFlag{
Name: generalFlagOrgID,
Required: true,
Usage: "org ID of the data collection",
},
&cli.StringFlag{
Name: dataFlagCollectionType,
Required: true,
Usage: formatAcceptedValues("collection type", "hot_store", "pipeline_sink"),
},
},
Action: createCommandWithT[listCustomIndexesArgs](ListCustomIndexesAction),
},
},
},
},
},
{
Expand Down
207 changes: 207 additions & 0 deletions cli/customindex.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
package cli

import (
"bufio"
"context"
"encoding/json"
"errors"
"fmt"
"os"
"strings"

"github.com/urfave/cli/v2"
pb "go.viam.com/api/app/data/v1"
)

const (
hotStoreCollectionType = pb.IndexableCollection_INDEXABLE_COLLECTION_HOT_STORE
pipelineSinkCollectionType = pb.IndexableCollection_INDEXABLE_COLLECTION_PIPELINE_SINK
unspecifiedCollectionType = pb.IndexableCollection_INDEXABLE_COLLECTION_UNSPECIFIED

hotStoreCollectionTypeStr = "hot_store"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

q: i dont remember what the cli convention is for strings we accept but it seems like the flags we have are normally hyphenated?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that seems right

pipelineSinkCollectionTypeStr = "pipeline_sink"
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"pipeline" might be better

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gloriacai01

How do we refer to pipelines in customer facing docs/features? Are they familiar with pipeline_sinks? Or just the general concept of pipelines and collections?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think pipeline sink makes sense, esp since we reference "pipeline sink" rather than "pipeline" as a data source type users can query from. also nit but i wonder if it should be "hot storage" instead of "hot store"? i know the api says hot store but for tabular data source type, we reference it as hot storage
@RobertXu we've talked about this, but at least for customer facing features these could be aligned? wdyt?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, pipeline sink and hot storage both sound good to me to keep things consistent for customers!

)

var (
errInvalidCollectionType = errors.New("invalid collection type, must be one of: hot_store, pipeline_sink")
errPipelineNameRequired = errors.New("--pipeline-name is required when --collection-type is 'pipeline_sink'")
errPipelineNameNotAllowed = errors.New("--pipeline-name can only be used when --collection-type is 'pipeline_sink'")
)

type createCustomIndexArgs struct {
OrgID string
CollectionType string
PipelineName string
IndexSpecPath string
}

// CreateCustomIndexAction creates a custom index for a specified organization and collection type
// using the provided index specification file in the arguments.
func CreateCustomIndexAction(c *cli.Context, args createCustomIndexArgs) error {
client, err := newViamClient(c)
if err != nil {
return err
}

collectionType, err := validateCollectionTypeArgs(c, args.CollectionType)
if err != nil {
return err
}

indexSpec, err := readJSONToByteSlices(args.IndexSpecPath)
if err != nil {
return fmt.Errorf("failed to read index spec from file: %w", err)
}

_, err = client.dataClient.CreateIndex(context.Background(), &pb.CreateIndexRequest{
OrganizationId: args.OrgID,
CollectionType: collectionType,
PipelineName: &args.PipelineName,
IndexSpec: indexSpec,
})
if err != nil {
return fmt.Errorf("failed to create index: %w", err)
}

printf(c.App.Writer, "Create index request sent successfully")

return nil
}

type deleteCustomIndexArgs struct {
OrgID string
CollectionType string
PipelineName string
IndexName string
}

// DeleteCustomIndexAction deletes a custom index for a specified organization and collection type using the provided index name.
func DeleteCustomIndexAction(c *cli.Context, args deleteCustomIndexArgs) error {
client, err := newViamClient(c)
if err != nil {
return err
}

collectionType, err := validateCollectionTypeArgs(c, args.CollectionType)
if err != nil {
return err
}

_, err = client.dataClient.DeleteIndex(context.Background(), &pb.DeleteIndexRequest{
OrganizationId: args.OrgID,
CollectionType: collectionType,
PipelineName: &args.PipelineName,
IndexName: args.IndexName,
})
if err != nil {
return fmt.Errorf("failed to delete index: %w", err)
}

printf(c.App.Writer, "Index (name: %s) deleted successfully", args.IndexName)

return nil
}

// DeleteCustomIndexConfirmation prompts the user for confirmation before deleting a custom index.
func DeleteCustomIndexConfirmation(c *cli.Context, args deleteCustomIndexArgs) error {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be okay to skip a confirmation step here

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh, I didn't know we had this pattern.

I think in this case, it's ok to skip the confirmation step. I don't think people will accidentally delete an index since the action requires quite specific params.

There are 2 risks with deleting an index

  1. worse query performance- we can easily add back the index if this proves to be a problem
  2. they delete a unique index which allows duplicate data to appear

Case 1 is pretty manageable, Case 2 is not great, however, we don't perform any validation when they create a unique index, so it makes sense to not do the same when deleting (Note: I don't love the lack of validation, but this is what came up during the doc review process).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah sounds good seems not to important to validate this

printf(c.App.Writer, "Are you sure you want to delete index (name: %s)? This action cannot be undone. (y/N): ", args.IndexName)
if err := c.Err(); err != nil {
return err
}

rawInput, err := bufio.NewReader(c.App.Reader).ReadString('\n')
if err != nil {
return err
}

if input := strings.ToUpper(strings.TrimSpace(rawInput)); input != "Y" {
return errors.New("aborted")
}
return nil
}

type listCustomIndexesArgs struct {
OrgID string
CollectionType string
PipelineName string
}

// ListCustomIndexesAction lists all custom indexes for a specified organization and collection type.
func ListCustomIndexesAction(c *cli.Context, args listCustomIndexesArgs) error {
client, err := newViamClient(c)
if err != nil {
return err
}

collectionType, err := validateCollectionTypeArgs(c, args.CollectionType)
if err != nil {
return err
}

resp, err := client.dataClient.ListIndexes(context.Background(), &pb.ListIndexesRequest{
OrganizationId: args.OrgID,
CollectionType: collectionType,
PipelineName: &args.PipelineName,
})
if err != nil {
return fmt.Errorf("failed to list indexes: %w", err)
}

if len(resp.Indexes) == 0 {
printf(c.App.Writer, "No indexes found")
return nil
}

printf(c.App.Writer, "Indexes:\n")
for _, index := range resp.Indexes {
printf(c.App.Writer, "- Name: %s\n", index.IndexName)
printf(c.App.Writer, " Spec: %s\n", index.IndexSpec)
Comment on lines +137 to +138
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if we wanna print more info here like what the collection type / pipeline name for each index is

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, name and spec should be fine for now. The collection type / pipeline name will be the same for each index, and the customer will have just entered those in the request, so those fields would make the response longer without adding new info.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True

}

return nil
}

func validateCollectionTypeArgs(c *cli.Context, collectionType string) (pb.IndexableCollection, error) {
var collectionTypeProto pb.IndexableCollection
switch collectionType {
case hotStoreCollectionTypeStr:
collectionTypeProto = hotStoreCollectionType
case pipelineSinkCollectionTypeStr:
collectionTypeProto = pipelineSinkCollectionType
default:
return unspecifiedCollectionType, errInvalidCollectionType
}

collectionTypeFlag := c.String(dataFlagCollectionType)
pipelineName := c.String(dataFlagPipelineName)

if collectionTypeFlag == pipelineSinkCollectionTypeStr && pipelineName == "" {
return unspecifiedCollectionType, errPipelineNameRequired
}

if collectionTypeFlag != pipelineSinkCollectionTypeStr && pipelineName != "" {
return unspecifiedCollectionType, errPipelineNameNotAllowed
}

return collectionTypeProto, nil
}

func readJSONToByteSlices(filePath string) ([][]byte, error) {
//nolint:gosec // filePath is a user-provided path for a JSON file containing an index spec
data, err := os.ReadFile(filePath)
if err != nil {
return nil, err
}

var rawMessages []json.RawMessage
if err = json.Unmarshal(data, &rawMessages); err != nil {
return nil, err
}

result := make([][]byte, len(rawMessages))
for i, raw := range rawMessages {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we'll need to specifically parse out the key and index properties from the JSON file.

Here's an example index spec JSON from the scope document:

{
  "key": {
    "resource_name": 1,
    "method_name": 1
  },
 "options": {
    "sparse": true
  }
}

We have to do this weird conversion b/c people used to writing MongoDB index specs use JSON objects, but MongoDB's index creation API requires BSON.D slices to preserve key ordering for compound indexes.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yeah you're right I did this and then looked at the SDK bug and now realizing I did the same thing lol

result[i] = []byte(raw)
}

return result, nil
}
Loading
Loading