Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 89 additions & 0 deletions cli/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ const (
dataFlagDatabasePassword = "password"
dataFlagFilterTags = "filter-tags"
dataFlagTimeout = "timeout"
dataFlagCollectionType = "collection-type"
dataFlagPipelineName = "pipeline-name"
dataFlagIndexName = "index-name"
dataFlagIndexSpecFile = "index-path"

datapipelineFlagSchedule = "schedule"
datapipelineFlagMQL = "mql"
Expand Down Expand Up @@ -1381,6 +1385,91 @@ var app = &cli.App{
},
},
},
{
Name: "index",
Usage: "manage indexes for hot data and pipeline sink collections",
UsageText: createUsageText("data index", nil, false, true),
HideHelpCommand: true,
Subcommands: []*cli.Command{
{
Name: "create",
Usage: "create an index for a data collection",
UsageText: createUsageText(
"data index create", []string{generalFlagOrgID, dataFlagCollectionType, dataFlagIndexSpecFile}, true, false,
),
Flags: []cli.Flag{
&cli.StringFlag{
Name: generalFlagOrgID,
Required: true,
Usage: "org ID of the data collection",
},
&cli.StringFlag{
Name: dataFlagCollectionType,
Required: true,
Usage: formatAcceptedValues("collection type", "hot-storage", "pipeline-sink"),
},
&cli.StringFlag{
Name: dataFlagPipelineName,
Required: false,
Usage: "name of the pipeline associated with the index when collection type is 'pipeline-sink'",
},
&cli.PathFlag{
Name: dataFlagIndexSpecFile,
Required: true,
Usage: "path to index specification JSON file",
TakesFile: true,
},
},
Action: createCommandWithT[createCustomIndexArgs](CreateCustomIndexAction),
},
{
Name: "delete",
Usage: "delete an index from a data collection",
UsageText: createUsageText("data index delete", []string{generalFlagOrgID, dataFlagCollectionType, dataFlagIndexName}, true, false),
Flags: []cli.Flag{
&cli.StringFlag{
Name: generalFlagOrgID,
Required: true,
Usage: "org ID of the data collection",
},
&cli.StringFlag{
Name: dataFlagCollectionType,
Required: true,
Usage: formatAcceptedValues("collection type", "hot-storage", "pipeline-sink"),
},
&cli.StringFlag{
Name: dataFlagPipelineName,
Required: false,
Usage: "name of the pipeline associated with the index when collection type is 'pipeline-sink'",
},
&cli.StringFlag{
Name: dataFlagIndexName,
Required: true,
Usage: "name of the index to delete",
},
},
Action: createCommandWithT[deleteCustomIndexArgs](DeleteCustomIndexAction),
},
{
Name: "list",
Usage: "list indexes for a data collection",
UsageText: createUsageText("data index list", []string{generalFlagOrgID, dataFlagCollectionType}, false, false),
Flags: []cli.Flag{
&cli.StringFlag{
Name: generalFlagOrgID,
Required: true,
Usage: "org ID of the data collection",
},
&cli.StringFlag{
Name: dataFlagCollectionType,
Required: true,
Usage: formatAcceptedValues("collection type", "hot-storage", "pipeline-sink"),
},
},
Action: createCommandWithT[listCustomIndexesArgs](ListCustomIndexesAction),
},
},
},
},
},
{
Expand Down
197 changes: 197 additions & 0 deletions cli/customindex.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
package cli

import (
"context"
"encoding/json"
"errors"
"fmt"
"os"

"github.com/urfave/cli/v2"
pb "go.viam.com/api/app/data/v1"
)

const (
hotStoreCollectionType = pb.IndexableCollection_INDEXABLE_COLLECTION_HOT_STORE
pipelineSinkCollectionType = pb.IndexableCollection_INDEXABLE_COLLECTION_PIPELINE_SINK
unspecifiedCollectionType = pb.IndexableCollection_INDEXABLE_COLLECTION_UNSPECIFIED

hotStoreCollectionTypeStr = "hot-storage"
pipelineSinkCollectionTypeStr = "pipeline-sink"
)

var (
errInvalidCollectionType = errors.New("invalid collection type, must be one of: hot-storage, pipeline-sink")
errPipelineNameRequired = errors.New("--pipeline-name is required when --collection-type is 'pipeline-sink'")
errPipelineNameNotAllowed = errors.New("--pipeline-name can only be used when --collection-type is 'pipeline-sink'")
)

type createCustomIndexArgs struct {
OrgID string
CollectionType string
PipelineName string
IndexSpecPath string
}

// CreateCustomIndexAction creates a custom index for a specified organization and collection type
// using the provided index specification file in the arguments.
func CreateCustomIndexAction(c *cli.Context, args createCustomIndexArgs) error {
client, err := newViamClient(c)
if err != nil {
return err
}

collectionType, err := validateCollectionTypeArgs(c, args.CollectionType)
if err != nil {
return err
}

indexSpec, err := readJSONToByteSlices(args.IndexSpecPath)
if err != nil {
return fmt.Errorf("failed to read index spec from file: %w", err)
}

_, err = client.dataClient.CreateIndex(context.Background(), &pb.CreateIndexRequest{
OrganizationId: args.OrgID,
CollectionType: collectionType,
PipelineName: &args.PipelineName,
IndexSpec: indexSpec,
})
if err != nil {
return fmt.Errorf("failed to create index: %w", err)
}

printf(c.App.Writer, "Create index request sent successfully")

return nil
}

type deleteCustomIndexArgs struct {
OrgID string
CollectionType string
PipelineName string
IndexName string
}

// DeleteCustomIndexAction deletes a custom index for a specified organization and collection type using the provided index name.
func DeleteCustomIndexAction(c *cli.Context, args deleteCustomIndexArgs) error {
client, err := newViamClient(c)
if err != nil {
return err
}

collectionType, err := validateCollectionTypeArgs(c, args.CollectionType)
if err != nil {
return err
}

_, err = client.dataClient.DeleteIndex(context.Background(), &pb.DeleteIndexRequest{
OrganizationId: args.OrgID,
CollectionType: collectionType,
PipelineName: &args.PipelineName,
IndexName: args.IndexName,
})
if err != nil {
return fmt.Errorf("failed to delete index: %w", err)
}

printf(c.App.Writer, "Index (name: %s) deleted successfully", args.IndexName)

return nil
}

type listCustomIndexesArgs struct {
OrgID string
CollectionType string
PipelineName string
}

// ListCustomIndexesAction lists all custom indexes for a specified organization and collection type.
func ListCustomIndexesAction(c *cli.Context, args listCustomIndexesArgs) error {
client, err := newViamClient(c)
if err != nil {
return err
}

collectionType, err := validateCollectionTypeArgs(c, args.CollectionType)
if err != nil {
return err
}

resp, err := client.dataClient.ListIndexes(context.Background(), &pb.ListIndexesRequest{
OrganizationId: args.OrgID,
CollectionType: collectionType,
PipelineName: &args.PipelineName,
})
if err != nil {
return fmt.Errorf("failed to list indexes: %w", err)
}

if len(resp.Indexes) == 0 {
printf(c.App.Writer, "No indexes found")
return nil
}

printf(c.App.Writer, "Indexes:\n")
for _, index := range resp.Indexes {
printf(c.App.Writer, "- Name: %s\n", index.IndexName)
printf(c.App.Writer, " Spec: %s\n", index.IndexSpec)
Comment on lines +137 to +138
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if we wanna print more info here like what the collection type / pipeline name for each index is

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, name and spec should be fine for now. The collection type / pipeline name will be the same for each index, and the customer will have just entered those in the request, so those fields would make the response longer without adding new info.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True

}

return nil
}

func validateCollectionTypeArgs(c *cli.Context, collectionType string) (pb.IndexableCollection, error) {
var collectionTypeProto pb.IndexableCollection
switch collectionType {
case hotStoreCollectionTypeStr:
collectionTypeProto = hotStoreCollectionType
case pipelineSinkCollectionTypeStr:
collectionTypeProto = pipelineSinkCollectionType
default:
return unspecifiedCollectionType, errInvalidCollectionType
}

collectionTypeFlag := c.String(dataFlagCollectionType)
pipelineName := c.String(dataFlagPipelineName)

if collectionTypeFlag == pipelineSinkCollectionTypeStr && pipelineName == "" {
return unspecifiedCollectionType, errPipelineNameRequired
}

if collectionTypeFlag != pipelineSinkCollectionTypeStr && pipelineName != "" {
return unspecifiedCollectionType, errPipelineNameNotAllowed
}

return collectionTypeProto, nil
}

func readJSONToByteSlices(filePath string) ([][]byte, error) {
//nolint:gosec // filePath is a user-provided path for a JSON file containing an index spec
data, err := os.ReadFile(filePath)
if err != nil {
return nil, err
}

var indexSpec struct {
Key json.RawMessage `json:"key"`
Options json.RawMessage `json:"options,omitempty"`
}

if err = json.Unmarshal(data, &indexSpec); err != nil {
return nil, fmt.Errorf("failed to unmarshal JSON: %w", err)
}

if len(indexSpec.Key) == 0 {
return nil, fmt.Errorf("missing required 'key' field in index spec")
}

result := make([][]byte, 0, 2)
result = append(result, []byte(indexSpec.Key))

if len(indexSpec.Options) > 0 {
result = append(result, []byte(indexSpec.Options))
}

return result, nil
}
Loading
Loading