-
Notifications
You must be signed in to change notification settings - Fork 22
Add Go (supersedes other PR) #38
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
d789f8c
45aca64
ca97341
b78919c
80a2641
1ac2786
ee30251
5426918
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,180 @@ | ||
| # Go SDK Advanced Features | ||
|
|
||
| ## Schedules | ||
|
|
||
| Create recurring workflow executions using the Schedule API. | ||
|
|
||
| ```go | ||
| scheduleHandle, err := c.ScheduleClient().Create(ctx, client.ScheduleOptions{ | ||
| ID: "daily-report", | ||
| Spec: client.ScheduleSpec{ | ||
| CronExpressions: []string{"0 9 * * *"}, | ||
| }, | ||
| Action: &client.ScheduleWorkflowAction{ | ||
| ID: "daily-report-workflow", | ||
| Workflow: DailyReportWorkflow, | ||
| TaskQueue: "reports", | ||
| }, | ||
| }) | ||
| ``` | ||
|
|
||
| Using intervals instead of cron: | ||
|
|
||
| ```go | ||
| scheduleHandle, err := c.ScheduleClient().Create(ctx, client.ScheduleOptions{ | ||
| ID: "hourly-sync", | ||
| Spec: client.ScheduleSpec{ | ||
| Intervals: []client.ScheduleIntervalSpec{ | ||
| {Every: time.Hour}, | ||
| }, | ||
| }, | ||
| Action: &client.ScheduleWorkflowAction{ | ||
| ID: "hourly-sync-workflow", | ||
| Workflow: SyncWorkflow, | ||
| TaskQueue: "sync", | ||
| }, | ||
| }) | ||
| ``` | ||
|
|
||
| Manage schedules: | ||
|
|
||
| ```go | ||
| handle := c.ScheduleClient().GetHandle(ctx, "daily-report") | ||
|
|
||
| // Pause / unpause | ||
| handle.Pause(ctx, client.SchedulePauseOptions{Note: "Maintenance window"}) | ||
| handle.Unpause(ctx, client.ScheduleUnpauseOptions{Note: "Maintenance complete"}) | ||
|
|
||
| // Trigger immediately | ||
| handle.Trigger(ctx, client.ScheduleTriggerOptions{}) | ||
|
|
||
| // Describe | ||
| desc, err := handle.Describe(ctx) | ||
|
|
||
| // Delete | ||
| handle.Delete(ctx) | ||
| ``` | ||
|
|
||
| ## Async Activity Completion | ||
|
|
||
| For activities that complete asynchronously (e.g., human tasks, external callbacks). | ||
| If you configure a heartbeat_timeout on this activity, the external completer is responsible for sending heartbeats via the async handle. | ||
| If you do NOT set a heartbeat_timeout, no heartbeats are required. | ||
|
|
||
| **Note:** If the external system that completes the asynchronous action can reliably be trusted to do the task and Signal back with the result, and it doesn't need to Heartbeat or receive Cancellation, then consider using **signals** instead. | ||
|
|
||
| **Step 1: Return `activity.ErrResultPending` from the activity.** | ||
|
|
||
| ```go | ||
| func RequestApproval(ctx context.Context, requestID string) (string, error) { | ||
| activityInfo := activity.GetInfo(ctx) | ||
| taskToken := activityInfo.TaskToken | ||
|
|
||
| // Store taskToken externally (e.g., database) for later completion | ||
| err := storeTaskToken(requestID, taskToken) | ||
| if err != nil { | ||
| return "", err | ||
| } | ||
|
|
||
| // Signal that this activity will be completed externally | ||
| return "", activity.ErrResultPending | ||
| } | ||
| ``` | ||
|
|
||
| **Step 2: Complete from another process using the task token.** | ||
|
|
||
| ```go | ||
| temporalClient, err := client.Dial(client.Options{}) | ||
|
|
||
| // Complete the activity | ||
| err = temporalClient.CompleteActivity(ctx, taskToken, "approved", nil) | ||
|
|
||
| // Or fail it | ||
| err = temporalClient.CompleteActivity(ctx, taskToken, nil, errors.New("rejected")) | ||
| ``` | ||
|
|
||
| Or complete by ID (no task token needed): | ||
|
|
||
| ```go | ||
| err = temporalClient.CompleteActivityByID(ctx, namespace, workflowID, runID, activityID, "approved", nil) | ||
| ``` | ||
|
|
||
| ## Worker Tuning | ||
|
|
||
| Configure `worker.Options` for production workloads: | ||
|
|
||
| ```go | ||
| w := worker.New(c, "my-task-queue", worker.Options{ | ||
| // Max concurrent activity executions (default: 1000) | ||
| MaxConcurrentActivityExecutionSize: 500, | ||
|
|
||
| // Max concurrent workflow task executions (default: 1000) | ||
| MaxConcurrentWorkflowTaskExecutionSize: 500, | ||
|
|
||
| // Max concurrent activity task pollers (default: 2) | ||
| MaxConcurrentActivityTaskPollers: 4, | ||
|
|
||
| // Max concurrent workflow task pollers (default: 2) | ||
| MaxConcurrentWorkflowTaskPollers: 4, | ||
|
|
||
| // Graceful shutdown timeout (default: 0) | ||
| WorkerStopTimeout: 30 * time.Second, | ||
| }) | ||
| ``` | ||
|
|
||
| Scale pollers based on task queue throughput. If you observe high schedule-to-start latency, increase the number of pollers or add more workers. | ||
|
|
||
| ## Sessions | ||
|
donald-pinckney marked this conversation as resolved.
|
||
|
|
||
| Go-specific feature for routing multiple activities to the same worker. All activities using the session context execute on the same worker host. | ||
|
|
||
| **Enable on the worker:** | ||
|
|
||
| ```go | ||
| w := worker.New(c, "fileprocessing", worker.Options{ | ||
| EnableSessionWorker: true, | ||
| MaxConcurrentSessionExecutionSize: 100, // default: 1000 | ||
| }) | ||
| ``` | ||
|
|
||
| **Use in a workflow:** | ||
|
|
||
| ```go | ||
| func FileProcessingWorkflow(ctx workflow.Context, file FileParam) error { | ||
| ao := workflow.ActivityOptions{ | ||
| StartToCloseTimeout: time.Minute, | ||
| } | ||
| ctx = workflow.WithActivityOptions(ctx, ao) | ||
|
|
||
| sessionCtx, err := workflow.CreateSession(ctx, &workflow.SessionOptions{ | ||
| CreationTimeout: time.Minute, | ||
| ExecutionTimeout: 10 * time.Minute, | ||
| }) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| defer workflow.CompleteSession(sessionCtx) | ||
|
|
||
| // All three activities run on the same worker | ||
| var downloadResult string | ||
| err = workflow.ExecuteActivity(sessionCtx, DownloadFile, file.URL).Get(sessionCtx, &downloadResult) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| var processResult string | ||
| err = workflow.ExecuteActivity(sessionCtx, ProcessFile, downloadResult).Get(sessionCtx, &processResult) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| err = workflow.ExecuteActivity(sessionCtx, UploadFile, processResult).Get(sessionCtx, nil) | ||
| return err | ||
| } | ||
| ``` | ||
|
|
||
| Key points: | ||
| - `workflow.ErrSessionFailed` is returned if the worker hosting the session dies | ||
| - `CompleteSession` releases resources -- always call it (use `defer`) | ||
| - Use case: file processing (download, process, upload on same host), GPU workloads, or any pipeline needing local state | ||
| - `MaxConcurrentSessionExecutionSize` on `worker.Options` limits how many sessions a single worker can handle | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,183 @@ | ||
| # Go SDK Data Handling | ||
|
|
||
| ## Overview | ||
|
|
||
| The Go SDK uses the `converter.DataConverter` interface to serialize/deserialize workflow inputs, outputs, and activity parameters. The default converter converts values to JSON. | ||
|
|
||
| ## Default Data Converter | ||
|
|
||
| The default `CompositeDataConverter` applies converters in order until one returns a non-nil Payload: | ||
|
|
||
| 1. `converter.NewNilPayloadConverter()` -- nil values | ||
| 2. `converter.NewByteSlicePayloadConverter()` -- `[]byte` | ||
| 3. `converter.NewProtoJSONPayloadConverter()` -- Protobuf messages as JSON | ||
| 4. `converter.NewProtoPayloadConverter()` -- Protobuf messages as binary | ||
| 5. `converter.NewJSONPayloadConverter()` -- anything JSON-serializable | ||
|
|
||
| Structs must have exported fields to be serialized. | ||
|
|
||
| ## Custom Data Converter | ||
|
|
||
| Implement the `converter.DataConverter` interface (`ToPayload`, `FromPayload`, `ToPayloads`, `FromPayloads`, `ToString`, `ToStrings`) or compose a new converter from existing ones. | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. More of a meta question to skills in general (as someone not super up-to-speed), this line reminds me that some of these docs are basically the same as the code itself, and now introduce a new place we have to keep in sync with the code itself, like if we happen to add a new method to this interface. I think someone could argue it's equally as fast to look at the code itself to see what methods the interface has. Is this skill basically organizing the codebase in a AI digestible way so it doesn't need to dig through the code itself? Is adding information like this proven to be beneficial to the skill? Would the gain from this be comparable to if we ensured our docs were thorough and up to date?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Lots of questions here, I'll do what i can to break it down :). Q1: What value does a skill provide vs. docs vs. the SDK code itself? For reference, you can currently use these 2 MCP servers:
So the question is, what value does a skill provide compared to those two MCP servers? Primarily, its about providing a more tightly curated "garden path" to show to the agent, compared to the much broader in scope docs and SDK code. The aim is to make it both more digestable (= faster for the agent to index into the content that matters), and be able to force items into its context. As an example, consider the data converters. Just by loading the top level Q2: How do we know this is beneficial to the agent? The only way we will know for sure is by doing extensive evaluations. I'm currently working on this, but if you want to help, the most helpful thing you can do is note down Temporal coding tasks that a coding agent performed poorly on. Given difficult tasks, we can then observe that the agent completes them successfully given the skill. Let me know if you know of any tricky tasks for agents! Q3: How do we keep skill in sync? This is discussed a bit in the PRD, but essentially I will track what SDK versions the skill was currently written for, and then in the future can diff changelogs, and identify and fix stale areas of the skill. I'd also like to investigate snipsync sorts of stuff for managing code snippets better.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That being said, I don't like this
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Has been changed to a code sample. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for the answers! I'll take a closer look at the PRD as well
sounds good, will keep an eye out and remember to give you feedback when I come across this 😃 |
||
|
|
||
| Pass the custom converter via client options: | ||
|
|
||
| ```go | ||
| c, err := client.Dial(client.Options{ | ||
| DataConverter: myCustomDataConverter, | ||
| }) | ||
| ``` | ||
|
|
||
| ## Composition of Payload Converters | ||
|
|
||
| Use `converter.NewCompositeDataConverter` to chain type-specific converters. The first converter that can handle the type wins. | ||
|
|
||
| ```go | ||
| dataConverter := converter.NewCompositeDataConverter( | ||
| converter.NewNilPayloadConverter(), | ||
| converter.NewByteSlicePayloadConverter(), | ||
| converter.NewProtoJSONPayloadConverter(), | ||
| converter.NewProtoPayloadConverter(), | ||
| YourCustomPayloadConverter(), | ||
| converter.NewJSONPayloadConverter(), | ||
| ) | ||
| ``` | ||
|
|
||
| ## Protobuf Support | ||
|
|
||
| Binary protobuf: | ||
| ```go | ||
| converter.NewProtoPayloadConverter() | ||
| ``` | ||
|
|
||
| JSON protobuf: | ||
| ```go | ||
| converter.NewProtoJSONPayloadConverter() | ||
| ``` | ||
|
|
||
| Both are included in the default data converter. SDK v1.26.0+ migrated from gogo/protobuf to google/protobuf. If you need backward compatibility with older payloads encoded with gogo, use the `LegacyTemporalProtoCompat` option. | ||
|
donald-pinckney marked this conversation as resolved.
Outdated
|
||
|
|
||
| ## Payload Encryption | ||
|
|
||
| Implement the `converter.PayloadCodec` interface (`Encode` and `Decode`) and wrap the default data converter: | ||
|
|
||
| ```go | ||
| // Codec implements converter.PayloadCodec for encryption. | ||
| type Codec struct{} | ||
|
|
||
| func (Codec) Encode(payloads []*commonpb.Payload) ([]*commonpb.Payload, error) { | ||
| result := make([]*commonpb.Payload, len(payloads)) | ||
| for i, p := range payloads { | ||
| origBytes, err := p.Marshal() | ||
| if err != nil { | ||
| return payloads, err | ||
| } | ||
| encrypted := encrypt(origBytes) // your encryption logic | ||
| result[i] = &commonpb.Payload{ | ||
| Metadata: map[string][]byte{converter.MetadataEncoding: []byte("binary/encrypted")}, | ||
| Data: encrypted, | ||
| } | ||
| } | ||
| return result, nil | ||
| } | ||
|
|
||
| func (Codec) Decode(payloads []*commonpb.Payload) ([]*commonpb.Payload, error) { | ||
| result := make([]*commonpb.Payload, len(payloads)) | ||
| for i, p := range payloads { | ||
| if string(p.Metadata[converter.MetadataEncoding]) != "binary/encrypted" { | ||
| result[i] = p | ||
| continue | ||
| } | ||
| decrypted := decrypt(p.Data) // your decryption logic | ||
| result[i] = &commonpb.Payload{} | ||
| err := result[i].Unmarshal(decrypted) | ||
| if err != nil { | ||
| return payloads, err | ||
| } | ||
| } | ||
| return result, nil | ||
| } | ||
| ``` | ||
|
|
||
| Wrap with `CodecDataConverter` and pass to client: | ||
|
|
||
| ```go | ||
| var DataConverter = converter.NewCodecDataConverter( | ||
| converter.GetDefaultDataConverter(), | ||
| &Codec{}, | ||
| ) | ||
|
|
||
| c, err := client.Dial(client.Options{ | ||
| DataConverter: DataConverter, | ||
| }) | ||
| ``` | ||
|
|
||
| ## Search Attributes | ||
|
|
||
| Set at workflow start: | ||
|
|
||
| ```go | ||
| handle, err := c.ExecuteWorkflow(ctx, client.StartWorkflowOptions{ | ||
| ID: "order-123", | ||
| TaskQueue: "orders", | ||
| SearchAttributes: map[string]interface{}{ | ||
| "OrderStatus": "pending", | ||
| "CustomerId": "cust-456", | ||
| }, | ||
| }, OrderWorkflow, input) | ||
| ``` | ||
|
|
||
| Upsert from within a workflow: | ||
|
|
||
| ```go | ||
| err := workflow.UpsertSearchAttributes(ctx, map[string]interface{}{ | ||
| "OrderStatus": "completed", | ||
| }) | ||
| ``` | ||
|
|
||
| Typed search attributes (v1.26.0+, preferred): | ||
|
|
||
| ```go | ||
| var OrderStatusKey = temporal.NewSearchAttributeKeyKeyword("OrderStatus") | ||
|
|
||
| err := workflow.UpsertTypedSearchAttributes(ctx, OrderStatusKey.ValueSet("completed")) | ||
| ``` | ||
|
|
||
| Query workflows by search attributes: | ||
|
|
||
| ```go | ||
| resp, err := c.ListWorkflow(ctx, &workflowservice.ListWorkflowExecutionsRequest{ | ||
| Query: `OrderStatus = "pending" AND CustomerId = "cust-456"`, | ||
| }) | ||
| ``` | ||
|
|
||
| ## Workflow Memo | ||
|
|
||
| Set in start options: | ||
|
|
||
| ```go | ||
| handle, err := c.ExecuteWorkflow(ctx, client.StartWorkflowOptions{ | ||
| ID: "order-123", | ||
| TaskQueue: "orders", | ||
| Memo: map[string]interface{}{ | ||
| "customerName": "Alice", | ||
| "notes": "Priority customer", | ||
| }, | ||
| }, OrderWorkflow, input) | ||
| ``` | ||
|
|
||
| Read memo from workflow info. Upsert memo (Go SDK only): | ||
|
|
||
| ```go | ||
| err := workflow.UpsertMemo(ctx, map[string]interface{}{ | ||
| "notes": "Updated notes", | ||
| }) | ||
| ``` | ||
|
|
||
| ## Best Practices | ||
|
|
||
| 1. Use structs with exported fields for inputs and outputs | ||
| 2. Prefer JSON for readability during development, protobuf for performance in production | ||
| 3. Keep payloads small -- see `references/core/gotchas.md` for limits | ||
| 4. Use `PayloadCodec` for encryption; never store sensitive data unencrypted | ||
| 5. Configure the same data converter on both client and worker | ||
Uh oh!
There was an error while loading. Please reload this page.