diff --git a/cli/app.go b/cli/app.go index 3773b45b936..274ce47c2d8 100644 --- a/cli/app.go +++ b/cli/app.go @@ -124,6 +124,10 @@ const ( dataFlagDatabasePassword = "password" dataFlagFilterTags = "filter-tags" dataFlagTimeout = "timeout" + dataFlagCollectionType = "collection-type" + dataFlagPipelineName = "pipeline-name" + dataFlagIndexName = "index-name" + dataFlagIndexSpecFile = "index-path" datapipelineFlagSchedule = "schedule" datapipelineFlagMQL = "mql" @@ -1381,6 +1385,91 @@ var app = &cli.App{ }, }, }, + { + Name: "index", + Usage: "manage indexes for hot data and pipeline sink collections", + UsageText: createUsageText("data index", nil, false, true), + HideHelpCommand: true, + Subcommands: []*cli.Command{ + { + Name: "create", + Usage: "create an index for a data collection", + UsageText: createUsageText( + "data index create", []string{generalFlagOrgID, dataFlagCollectionType, dataFlagIndexSpecFile}, true, false, + ), + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: generalFlagOrgID, + Required: true, + Usage: "org ID of the data collection", + }, + &cli.StringFlag{ + Name: dataFlagCollectionType, + Required: true, + Usage: formatAcceptedValues("collection type", "hot-storage", "pipeline-sink"), + }, + &cli.StringFlag{ + Name: dataFlagPipelineName, + Required: false, + Usage: "name of the pipeline associated with the index when collection type is 'pipeline-sink'", + }, + &cli.PathFlag{ + Name: dataFlagIndexSpecFile, + Required: true, + Usage: "path to index specification JSON file", + TakesFile: true, + }, + }, + Action: createCommandWithT[createCustomIndexArgs](CreateCustomIndexAction), + }, + { + Name: "delete", + Usage: "delete an index from a data collection", + UsageText: createUsageText("data index delete", []string{generalFlagOrgID, dataFlagCollectionType, dataFlagIndexName}, true, false), + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: generalFlagOrgID, + Required: true, + Usage: "org ID of the data collection", + }, + &cli.StringFlag{ + Name: dataFlagCollectionType, + Required: true, + Usage: formatAcceptedValues("collection type", "hot-storage", "pipeline-sink"), + }, + &cli.StringFlag{ + Name: dataFlagPipelineName, + Required: false, + Usage: "name of the pipeline associated with the index when collection type is 'pipeline-sink'", + }, + &cli.StringFlag{ + Name: dataFlagIndexName, + Required: true, + Usage: "name of the index to delete", + }, + }, + Action: createCommandWithT[deleteCustomIndexArgs](DeleteCustomIndexAction), + }, + { + Name: "list", + Usage: "list indexes for a data collection", + UsageText: createUsageText("data index list", []string{generalFlagOrgID, dataFlagCollectionType}, false, false), + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: generalFlagOrgID, + Required: true, + Usage: "org ID of the data collection", + }, + &cli.StringFlag{ + Name: dataFlagCollectionType, + Required: true, + Usage: formatAcceptedValues("collection type", "hot-storage", "pipeline-sink"), + }, + }, + Action: createCommandWithT[listCustomIndexesArgs](ListCustomIndexesAction), + }, + }, + }, }, }, { diff --git a/cli/customindex.go b/cli/customindex.go new file mode 100644 index 00000000000..12611db7c42 --- /dev/null +++ b/cli/customindex.go @@ -0,0 +1,197 @@ +package cli + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "os" + + "github.com/urfave/cli/v2" + pb "go.viam.com/api/app/data/v1" +) + +const ( + hotStoreCollectionType = pb.IndexableCollection_INDEXABLE_COLLECTION_HOT_STORE + pipelineSinkCollectionType = pb.IndexableCollection_INDEXABLE_COLLECTION_PIPELINE_SINK + unspecifiedCollectionType = pb.IndexableCollection_INDEXABLE_COLLECTION_UNSPECIFIED + + hotStoreCollectionTypeStr = "hot-storage" + pipelineSinkCollectionTypeStr = "pipeline-sink" +) + +var ( + errInvalidCollectionType = errors.New("invalid collection type, must be one of: hot-storage, pipeline-sink") + errPipelineNameRequired = errors.New("--pipeline-name is required when --collection-type is 'pipeline-sink'") + errPipelineNameNotAllowed = errors.New("--pipeline-name can only be used when --collection-type is 'pipeline-sink'") +) + +type createCustomIndexArgs struct { + OrgID string + CollectionType string + PipelineName string + IndexSpecPath string +} + +// CreateCustomIndexAction creates a custom index for a specified organization and collection type +// using the provided index specification file in the arguments. +func CreateCustomIndexAction(c *cli.Context, args createCustomIndexArgs) error { + client, err := newViamClient(c) + if err != nil { + return err + } + + collectionType, err := validateCollectionTypeArgs(c, args.CollectionType) + if err != nil { + return err + } + + indexSpec, err := readJSONToByteSlices(args.IndexSpecPath) + if err != nil { + return fmt.Errorf("failed to read index spec from file: %w", err) + } + + _, err = client.dataClient.CreateIndex(context.Background(), &pb.CreateIndexRequest{ + OrganizationId: args.OrgID, + CollectionType: collectionType, + PipelineName: &args.PipelineName, + IndexSpec: indexSpec, + }) + if err != nil { + return fmt.Errorf("failed to create index: %w", err) + } + + printf(c.App.Writer, "Create index request sent successfully") + + return nil +} + +type deleteCustomIndexArgs struct { + OrgID string + CollectionType string + PipelineName string + IndexName string +} + +// DeleteCustomIndexAction deletes a custom index for a specified organization and collection type using the provided index name. +func DeleteCustomIndexAction(c *cli.Context, args deleteCustomIndexArgs) error { + client, err := newViamClient(c) + if err != nil { + return err + } + + collectionType, err := validateCollectionTypeArgs(c, args.CollectionType) + if err != nil { + return err + } + + _, err = client.dataClient.DeleteIndex(context.Background(), &pb.DeleteIndexRequest{ + OrganizationId: args.OrgID, + CollectionType: collectionType, + PipelineName: &args.PipelineName, + IndexName: args.IndexName, + }) + if err != nil { + return fmt.Errorf("failed to delete index: %w", err) + } + + printf(c.App.Writer, "Index (name: %s) deleted successfully", args.IndexName) + + return nil +} + +type listCustomIndexesArgs struct { + OrgID string + CollectionType string + PipelineName string +} + +// ListCustomIndexesAction lists all custom indexes for a specified organization and collection type. +func ListCustomIndexesAction(c *cli.Context, args listCustomIndexesArgs) error { + client, err := newViamClient(c) + if err != nil { + return err + } + + collectionType, err := validateCollectionTypeArgs(c, args.CollectionType) + if err != nil { + return err + } + + resp, err := client.dataClient.ListIndexes(context.Background(), &pb.ListIndexesRequest{ + OrganizationId: args.OrgID, + CollectionType: collectionType, + PipelineName: &args.PipelineName, + }) + if err != nil { + return fmt.Errorf("failed to list indexes: %w", err) + } + + if len(resp.Indexes) == 0 { + printf(c.App.Writer, "No indexes found") + return nil + } + + printf(c.App.Writer, "Indexes:\n") + for _, index := range resp.Indexes { + printf(c.App.Writer, "- Name: %s\n", index.IndexName) + printf(c.App.Writer, " Spec: %s\n", index.IndexSpec) + } + + return nil +} + +func validateCollectionTypeArgs(c *cli.Context, collectionType string) (pb.IndexableCollection, error) { + var collectionTypeProto pb.IndexableCollection + switch collectionType { + case hotStoreCollectionTypeStr: + collectionTypeProto = hotStoreCollectionType + case pipelineSinkCollectionTypeStr: + collectionTypeProto = pipelineSinkCollectionType + default: + return unspecifiedCollectionType, errInvalidCollectionType + } + + collectionTypeFlag := c.String(dataFlagCollectionType) + pipelineName := c.String(dataFlagPipelineName) + + if collectionTypeFlag == pipelineSinkCollectionTypeStr && pipelineName == "" { + return unspecifiedCollectionType, errPipelineNameRequired + } + + if collectionTypeFlag != pipelineSinkCollectionTypeStr && pipelineName != "" { + return unspecifiedCollectionType, errPipelineNameNotAllowed + } + + return collectionTypeProto, nil +} + +func readJSONToByteSlices(filePath string) ([][]byte, error) { + //nolint:gosec // filePath is a user-provided path for a JSON file containing an index spec + data, err := os.ReadFile(filePath) + if err != nil { + return nil, err + } + + var indexSpec struct { + Key json.RawMessage `json:"key"` + Options json.RawMessage `json:"options,omitempty"` + } + + if err = json.Unmarshal(data, &indexSpec); err != nil { + return nil, fmt.Errorf("failed to unmarshal JSON: %w", err) + } + + if len(indexSpec.Key) == 0 { + return nil, fmt.Errorf("missing required 'key' field in index spec") + } + + result := make([][]byte, 0, 2) + result = append(result, []byte(indexSpec.Key)) + + if len(indexSpec.Options) > 0 { + result = append(result, []byte(indexSpec.Options)) + } + + return result, nil +} diff --git a/cli/customindex_test.go b/cli/customindex_test.go new file mode 100644 index 00000000000..0174a26af8f --- /dev/null +++ b/cli/customindex_test.go @@ -0,0 +1,189 @@ +package cli + +import ( + "encoding/json" + "errors" + "flag" + "os" + "testing" + + "github.com/urfave/cli/v2" + pb "go.viam.com/api/app/data/v1" + "go.viam.com/test" +) + +func TestValidateCollectionTypeArgs(t *testing.T) { + testCases := map[string]struct { + collectionType string + pipelineName string + expectedType pb.IndexableCollection + expectedError error + }{ + "hot_store_without_pipeline": { + collectionType: "hot-storage", + pipelineName: "", + expectedType: hotStoreCollectionType, + expectedError: nil, + }, + "pipeline_sink_with_pipeline": { + collectionType: "pipeline-sink", + pipelineName: "my-pipeline", + expectedType: pipelineSinkCollectionType, + expectedError: nil, + }, + "pipeline_sink_without_pipeline": { + collectionType: "pipeline-sink", + pipelineName: "", + expectedType: unspecifiedCollectionType, + expectedError: errPipelineNameRequired, + }, + "hot_store_with_pipeline": { + collectionType: "hot-storage", + pipelineName: "my-pipeline", + expectedType: unspecifiedCollectionType, + expectedError: errPipelineNameNotAllowed, + }, + "unknown_collection_type": { + collectionType: "unknown", + pipelineName: "", + expectedType: unspecifiedCollectionType, + expectedError: errInvalidCollectionType, + }, + "empty_collection_type": { + collectionType: "", + pipelineName: "", + expectedType: unspecifiedCollectionType, + expectedError: errInvalidCollectionType, + }, + "invalid_case_sensitivity": { + collectionType: "HOT-STORAGE", + pipelineName: "", + expectedType: unspecifiedCollectionType, + expectedError: errInvalidCollectionType, + }, + "pipeline_sink_invalid_case_with_pipeline": { + collectionType: "PIPELINE-SINK", + pipelineName: "my-pipeline", + expectedType: unspecifiedCollectionType, + expectedError: errInvalidCollectionType, + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + app := &cli.App{} + set := flag.NewFlagSet("test", 0) + set.String(dataFlagCollectionType, tc.collectionType, "") + set.String(dataFlagPipelineName, tc.pipelineName, "") + set.Parse(nil) + ctx := cli.NewContext(app, set, nil) + + collectionType, err := validateCollectionTypeArgs(ctx, tc.collectionType) + + if tc.expectedError != nil { + test.That(t, err, test.ShouldNotBeNil) + test.That(t, err, test.ShouldEqual, tc.expectedError) + } else { + test.That(t, err, test.ShouldBeNil) + } + test.That(t, collectionType, test.ShouldEqual, tc.expectedType) + }) + } +} + +func TestReadJSONToByteSlices(t *testing.T) { + testCases := map[string]struct { + fileContent string + expectedResult [][]byte + expectedError bool + }{ + "valid_with_key_and_options": { + fileContent: `{ + "key": {"resource_name": 1, "method_name": 1}, + "options": {"sparse": true} + }`, + expectedResult: [][]byte{ + []byte(`{"resource_name": 1, "method_name": 1}`), + []byte(`{"sparse": true}`), + }, + expectedError: false, + }, + "valid_with_key_only": { + fileContent: `{ + "key": {"name": 1, "email": -1} + }`, + expectedResult: [][]byte{ + []byte(`{"name": 1, "email": -1}`), + }, + expectedError: false, + }, + "valid_with_empty_options": { + fileContent: `{ + "key": {"timestamp": -1}, + "options": {} + }`, + expectedResult: [][]byte{ + []byte(`{"timestamp": -1}`), + []byte(`{}`), + }, + expectedError: false, + }, + "missing_key_field": { + fileContent: `{ + "options": {"unique": true} + }`, + expectedError: true, + }, + "invalid_json_structure": { + fileContent: `[ + {"key": {"name": 1}} + ]`, + expectedError: true, + }, + "malformed_json": { + fileContent: `{"key": {"name": 1}`, + expectedError: true, + }, + "empty_object": { + fileContent: `{}`, + expectedError: true, + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + tmpFile, err := os.CreateTemp("", "test-*.json") + test.That(t, err, test.ShouldBeNil) + defer os.Remove(tmpFile.Name()) + + _, err = tmpFile.WriteString(tc.fileContent) + test.That(t, err, test.ShouldBeNil) + tmpFile.Close() + + result, err := readJSONToByteSlices(tmpFile.Name()) + if tc.expectedError { + test.That(t, err, test.ShouldNotBeNil) + return + } + + test.That(t, err, test.ShouldBeNil) + test.That(t, len(result), test.ShouldEqual, len(tc.expectedResult)) + + // Compare each byte slice (ignoring whitespace differences in JSON) + for i := range result { + var resultJSON, expectedJSON interface{} + err := json.Unmarshal(result[i], &resultJSON) + test.That(t, err, test.ShouldBeNil) + err = json.Unmarshal(tc.expectedResult[i], &expectedJSON) + test.That(t, err, test.ShouldBeNil) + test.That(t, resultJSON, test.ShouldResemble, expectedJSON) + } + }) + } + + t.Run("file_not_found", func(t *testing.T) { + _, err := readJSONToByteSlices("nonexistent-file.json") + test.That(t, err, test.ShouldNotBeNil) + test.That(t, errors.Is(err, os.ErrNotExist), test.ShouldBeTrue) + }) +}