Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions workflow/controller/dag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,53 @@ func TestArtifactResolutionWhenSkippedDAG(t *testing.T) {
assert.Equal(t, wfv1.WorkflowSucceeded, woc.wf.Status.Phase)
}

func TestExpandTaskWithParam(t *testing.T) {
ctx := logging.TestContext(t.Context())
task := wfv1.DAGTask{
Name: "fanout-param",
Template: "tmpl",
Arguments: wfv1.Arguments{
Parameters: []wfv1.Parameter{{
Name: "msg",
Value: wfv1.AnyStringPtr("{{item}}"),
}},
},
WithParam: `[1234, "foo\tbar", true, []]`,
}

expanded, err := expandTask(ctx, task, map[string]string{})
require.NoError(t, err)
require.Len(t, expanded, 4)

expectedExpandedTasks := []struct {
Name string
Parameter string
}{
{
Name: "fanout-param(0:1234)",
Parameter: "1234",
},
{
Name: `fanout-param(1:foo\tbar)`,
Parameter: "foo\tbar",
},
{
Name: "fanout-param(2:true)",
Parameter: "true",
},
{
Name: "fanout-param(3:[])",
Parameter: "[]",
},
}

for i, expected := range expectedExpandedTasks {
assert.Equal(t, expected.Name, expanded[i].Name)
assert.Equal(t, "tmpl", expanded[i].Template)
assert.Equal(t, expected.Parameter, expanded[i].Arguments.Parameters[0].Value.String())
}
}

func TestEvaluateDependsLogic(t *testing.T) {
testTasks := []wfv1.DAGTask{
{
Expand Down
5 changes: 4 additions & 1 deletion workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3783,9 +3783,12 @@ func processItem(ctx context.Context, tmpl template.Template, name string, index
var newName string

switch item.GetType() {
case wfv1.String, wfv1.Number, wfv1.Bool:
case wfv1.Number, wfv1.Bool:
replaceMap["item"] = fmt.Sprintf("%v", item)
newName = generateNodeName(name, index, item)
case wfv1.String:
replaceMap["item"] = item.GetStrVal()
newName = generateNodeName(name, index, item)
case wfv1.Map:
// Handle the case when withItems is a list of maps.
// vals holds stringified versions of the map items which are incorporated as part of the step name.
Expand Down
85 changes: 74 additions & 11 deletions workflow/controller/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6948,19 +6948,82 @@ func TestFailSuspendedAndPendingNodesAfterShutdown(t *testing.T) {

func Test_processItem(t *testing.T) {
ctx := logging.TestContext(t.Context())
task := wfv1.DAGTask{
WithParam: `[{"number": 2, "string": "foo", "list": [0, "1"], "json": {"number": 2, "string": "foo", "list": [0, "1"]}}]`,

tests := []struct {
name string
withParam string
expectedName string
expectedParam string
}{
{
name: "Test string",
withParam: `["string"]`,
expectedName: `task-name(0:string)`,
expectedParam: `string`,
},
{
name: "Test multiline string",
withParam: `["alpha\nbeta"]`,
expectedName: `task-name(0:alpha\nbeta)`,
expectedParam: "alpha\nbeta",
},
{
name: "Test number",
withParam: `[42]`,
expectedName: `task-name(0:42)`,
expectedParam: `42`,
},
{
name: "Test boolean",
withParam: `[true]`,
expectedName: `task-name(0:true)`,
expectedParam: `true`,
},
{
name: "Test map",
withParam: `[{"number": 2, "string": "foo", "list": [0, "1"], "json": {"number": 2, "string": "foo", "list": [0, "1"]}}]`,
expectedName: `task-name(0:json:{"list":[0,"1"],"number":2,"string":"foo"},list:[0,"1"],number:2,string:foo)`,
expectedParam: `{"json":{"list":[0,"1"],"number":2,"string":"foo"},"list":[0,"1"],"number":2,"string":"foo"}`,
},
{
name: "Test list",
withParam: `[[1, "two", 3]]`,
expectedName: `task-name(0:[1 two 3])`,
expectedParam: `[1,"two",3]`,
},
}
taskBytes, err := json.Marshal(task)
require.NoError(t, err)
var items []wfv1.Item
wfv1.MustUnmarshal([]byte(task.WithParam), &items)

var newTask wfv1.DAGTask
tmpl, _ := template.NewTemplate(string(taskBytes))
newTaskName, err := processItem(ctx, tmpl, "task-name", 0, items[0], &newTask, "", map[string]string{})
require.NoError(t, err)
assert.Equal(t, `task-name(0:json:{"list":[0,"1"],"number":2,"string":"foo"},list:[0,"1"],number:2,string:foo)`, newTaskName)
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
task := wfv1.DAGTask{
WithParam: tt.withParam,
Arguments: wfv1.Arguments{
Parameters: []wfv1.Parameter{
{
Name: "item",
Value: wfv1.AnyStringPtr("{{item}}"),
},
},
},
}

taskBytes, err := json.Marshal(task)
require.NoError(t, err)

tmpl, err := template.NewTemplate(string(taskBytes))
require.NoError(t, err)

var items []wfv1.Item
wfv1.MustUnmarshal([]byte(tt.withParam), &items)

var newTask wfv1.DAGTask
newTaskName, err := processItem(ctx, tmpl, "task-name", 0, items[0], &newTask, "", map[string]string{})

require.NoError(t, err)
assert.Equal(t, tt.expectedName, newTaskName)
assert.Equal(t, tt.expectedParam, newTask.Arguments.Parameters[0].Value.String())
})
}
}

var stepTimeoutWf = `
Expand Down
89 changes: 89 additions & 0 deletions workflow/controller/steps_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,95 @@ func TestStepsWithParamAndGlobalParam(t *testing.T) {
assert.Equal(t, wfv1.WorkflowRunning, woc.wf.Status.Phase)
}

var stepsWithParam = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: steps-with-params-
spec:
entrypoint: main
templates:
- name: main
steps:
- - name: use-with-param
template: whalesay
arguments:
parameters:
- name: message
value: "{{item}}"
withParam: "[1234, \"foo\\tbar\", true, []]"
`

func TestExpandStepGroupWithParam(t *testing.T) {
ctx := logging.TestContext(t.Context())
wf := wfv1.MustUnmarshalWorkflow(stepsWithParam)
woc := newWoc(ctx, *wf)

expanded, err := woc.expandStepGroup(ctx, "[0]", wf.Spec.Templates[0].Steps[0].Steps, &stepsContext{scope: createScope(&wf.Spec.Templates[0])})
require.NoError(t, err)
require.Len(t, expanded, 4)

expectedExpandedTasks := []struct {
Name string
Parameter string
}{
{
Name: "use-with-param(0:1234)",
Parameter: "1234",
},
{
Name: `use-with-param(1:foo\tbar)`,
Parameter: "foo\tbar",
},
{
Name: "use-with-param(2:true)",
Parameter: "true",
},
{
Name: "use-with-param(3:[])",
Parameter: "[]",
},
}

for i, expected := range expectedExpandedTasks {
assert.Equal(t, expected.Name, expanded[i].Name)
require.Len(t, expanded[i].Arguments.Parameters, 1)
assert.Equal(t, expected.Parameter, expanded[i].Arguments.Parameters[0].Value.String())
}
}

var stepsWithItems = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: steps-with-items-
spec:
entrypoint: main
templates:
- name: main
steps:
- - name: use-with-items
template: whalesay
arguments:
parameters:
- name: message
value: "{{item}}"
withItems:
- Hello"Argo
`

func TestExpandStepGroupWithItems(t *testing.T) {
ctx := logging.TestContext(t.Context())
wf := wfv1.MustUnmarshalWorkflow(stepsWithItems)
woc := newWoc(ctx, *wf)

expanded, err := woc.expandStepGroup(ctx, "[0]", wf.Spec.Templates[0].Steps[0].Steps, &stepsContext{scope: createScope(&wf.Spec.Templates[0])})
require.NoError(t, err)
require.Len(t, expanded, 1)

assert.Equal(t, `Hello"Argo`, expanded[0].Arguments.Parameters[0].Value.String())
}

func TestResourceDurationMetric(t *testing.T) {
nodeStatus := `
boundaryID: many-items-z26lj
Expand Down
Loading