Operators use the Tasks API to monitor and manage tasks across their operational domain.
The StreamTasks (REST | gRPC)
API provides centralized visibility into task creation, updates, and status changes for all tasks in your environment.
The CancelTask (REST | gRPC)
API allows you to request cancellation of tasks that are no longer needed.
If you are using gRPC with client credentials, set up the token refresh module before running the examples on this page.
The StreamTasks (REST | gRPC)
API establishes a server-sent events (SSE) stream that notifies your application about task updates:
In the following example, we specify a prefix for the task stream:
1 package main 2 3 import ( 4 "context" 5 "fmt" 6 "net/http" 7 "os" 8 9 Lattice "github.com/anduril/lattice-sdk-go/v4" 10 "github.com/anduril/lattice-sdk-go/v4/client" 11 "github.com/anduril/lattice-sdk-go/v4/option" 12 ) 13 14 func main() { 15 // Get environment variables 16 latticeEndpoint := os.Getenv("LATTICE_ENDPOINT") 17 clientSecret := os.Getenv("LATTICE_CLIENT_SECRET") 18 clientId := os.Getenv("LATTICE_CLIENT_ID") 19 sandboxesToken := os.Getenv("SANDBOXES_TOKEN") 20 21 // Check required environment variables 22 if latticeEndpoint == "" || clientId == "" || clientSecret == "" || sandboxesToken == "" { 23 fmt.Println("Missing required environment variables") 24 os.Exit(1) 25 } 26 27 // Initialize headers for sandbox authorization 28 headers := http.Header{} 29 headers.Add("Anduril-Sandbox-Authorization", fmt.Sprintf("Bearer %s", sandboxesToken)) 30 // Create the client 31 LatticeClient := client.NewClient( 32 option.WithClientCredentials(clientId, clientSecret), 33 option.WithBaseURL(fmt.Sprintf("https://%s", latticeEndpoint)), 34 option.WithHTTPHeader(headers), 35 ) 36 37 fmt.Println("Starting task stream...") 38 39 ctx := context.Background() 40 41 // Create task stream request 42 request := &Lattice.TaskStreamRequest{ 43 HeartbeatIntervalMs: Lattice.Int(10000), 44 ExcludePreexistingTasks: Lattice.Bool(false), 45 TaskType: &Lattice.TaskStreamRequestTaskType{ 46 TaskStreamRequestTaskTypeTaskTypePrefix: &Lattice.TaskStreamRequestTaskTypeTaskTypePrefix{ 47 // Define a prefix filter for tasks that belong to your organization 48 TaskTypePrefix: "type.googleapis.com/<your-organization>.tasks", 49 }, 50 }, 51 } 52 53 // Start the task stream 54 stream, err := LatticeClient.Tasks.StreamTasks(ctx, request) 55 if err != nil { 56 fmt.Printf("Failed to start task stream: %v\n", err) 57 return 58 } 59 defer stream.Close() 60 61 // Process the stream events as they arrive 62 for { 63 event, err := stream.Recv() 64 if err != nil { 65 fmt.Printf("Error receiving task stream event: %v\n", err) 66 return 67 } 68 69 if event.Event == "heartbeat" { 70 // Process heartbeat events 71 timestamp := *event.GetHeartbeat().GetTimestamp() 72 fmt.Printf("Heartbeat: %v\n", timestamp) 73 } else { 74 task := event.GetTaskEvent().GetTaskEvent().GetTask() 75 taskId := task.GetVersion().GetTaskID() 76 status := task.GetStatus().GetStatus() 77 78 fmt.Printf(" TaskID: %v\n", *taskId) 79 fmt.Printf(" Status: %v\n", *status) 80 } 81 } 82 }
The StreamTasks method accepts the following parameters:
The interval in milliseconds at which the server sends heartbeat events (default: 30000 ms). Use heartbeats to verify the connection is still active.
When set to true, the stream will only include tasks created after the stream starts.
When false (default), existing tasks will also be included in the stream.
Specifies which task types to include in the stream. Can filter by exact match or by prefix.
Only include tasks whose type starts with the specified prefix.
For example, type.googleapis.com/<your-organization.tasks>.
Only include tasks specified in the list. You must list the full URL of the task definition for each of the tasks you want to filter from the stream.
Process the different types of events that arrive through the stream:
1 package main 2 3 import ( 4 "context" 5 "fmt" 6 "net/http" 7 "os" 8 9 Lattice "github.com/anduril/lattice-sdk-go/v4" 10 "github.com/anduril/lattice-sdk-go/v4/client" 11 "github.com/anduril/lattice-sdk-go/v4/option" 12 ) 13 14 func main() { 15 // Get environment variables 16 latticeEndpoint := os.Getenv("LATTICE_ENDPOINT") 17 clientSecret := os.Getenv("LATTICE_CLIENT_SECRET") 18 clientId := os.Getenv("LATTICE_CLIENT_ID") 19 sandboxesToken := os.Getenv("SANDBOXES_TOKEN") 20 21 // Check required environment variables 22 if latticeEndpoint == "" || clientId == "" || clientSecret == "" || sandboxesToken == "" { 23 fmt.Println("Missing required environment variables") 24 os.Exit(1) 25 } 26 27 // Initialize headers for sandbox authorization 28 headers := http.Header{} 29 headers.Add("Anduril-Sandbox-Authorization", fmt.Sprintf("Bearer %s", sandboxesToken)) 30 // Create the client 31 LatticeClient := client.NewClient( 32 option.WithClientCredentials(clientId, clientSecret), 33 option.WithBaseURL(fmt.Sprintf("https://%s", latticeEndpoint)), 34 option.WithHTTPHeader(headers), 35 ) 36 37 fmt.Println("Starting task stream...") 38 39 ctx := context.Background() 40 41 // Create task stream request 42 request := &Lattice.TaskStreamRequest{ 43 HeartbeatIntervalMs: Lattice.Int(10000), 44 ExcludePreexistingTasks: Lattice.Bool(false), 45 TaskType: &Lattice.TaskStreamRequestTaskType{ 46 TaskStreamRequestTaskTypeTaskTypePrefix: &Lattice.TaskStreamRequestTaskTypeTaskTypePrefix{ 47 // Define a prefix filter for tasks that belong to your organization 48 TaskTypePrefix: "type.googleapis.com/<your-organization>.tasks", 49 }, 50 }, 51 } 52 53 // Start the task stream 54 stream, err := LatticeClient.Tasks.StreamTasks(ctx, request) 55 if err != nil { 56 fmt.Printf("Failed to start task stream: %v\n", err) 57 return 58 } 59 defer stream.Close() 60 61 // Process the stream events as they arrive 62 for { 63 event, err := stream.Recv() 64 if err != nil { 65 fmt.Printf("Error receiving task stream event: %v\n", err) 66 return 67 } 68 69 if event.Event == "heartbeat" { 70 // Process heartbeat events 71 timestamp := *event.GetHeartbeat().GetTimestamp() 72 fmt.Printf("Heartbeat: %v\n", timestamp) 73 } else { 74 task := event.GetTaskEvent().GetTaskEvent().GetTask() 75 taskId := task.GetVersion().GetTaskID() 76 status := task.GetStatus().GetStatus() 77 78 fmt.Printf(" TaskID: %v\n", *taskId) 79 fmt.Printf(" Status: %v\n", *status) 80 } 81 } 82 }
The stream produces two main event types:
heartbeat: Regular server heartbeats that confirm the connection is active.task_even: Notifications about task creation, updates, or status changes.When successfully running the StreamTasks code, you’ll see output similar to the following:
$ Starting task stream $ Heartbeat: 2025-01-29T12:34:56.789Z $ Task Event: TASK_CREATED $ TaskID: task-123456 $ Status: STATUS_SENT $ Task Event: TASK_UPDATED $ TaskID: task-123456 $ Status: STATUS_MACHINE_RECEIPT $ Heartbeat: 2025-01-29T12:35:06.789Z $ Task Event: TASK_UPDATED $ TaskID: task-123456 $ Status: STATUS_EXECUTING
The CancelTask API cancels a task by marking it for cancellation in the system.
The behavior depends on the task’s current state:
STATUS_DONE_NOT_OK
with ERROR_CODE_CANCELLED).Create an asset entity that can receive and process tasks. The entity must include a
taskCatalog
with taskDefinitions
that specify which task types the asset supports:
1 package main 2 3 import ( 4 "context" 5 "fmt" 6 "net/http" 7 "os" 8 "time" 9 10 Lattice "github.com/anduril/lattice-sdk-go/v4" 11 "github.com/anduril/lattice-sdk-go/v4/client" 12 "github.com/anduril/lattice-sdk-go/v4/option" 13 "github.com/google/uuid" 14 ) 15 16 func main() { 17 // Get environment variables 18 latticeEndpoint := os.Getenv("LATTICE_ENDPOINT") 19 clientSecret := os.Getenv("LATTICE_CLIENT_SECRET") 20 clientId := os.Getenv("LATTICE_CLIENT_ID") 21 22 // Remove sandboxesToken from the following statements if you are not developing on Sandboxes 23 sandboxesToken := os.Getenv("SANDBOXES_TOKEN") 24 25 // Check required environment variables 26 if latticeEndpoint == "" || clientId == "" || clientSecret == "" || sandboxesToken == "" { 27 fmt.Println("Missing required environment variables") 28 os.Exit(1) 29 } 30 31 // Initialize headers for sandbox authorization 32 headers := http.Header{} 33 headers.Add("Anduril-Sandbox-Authorization", fmt.Sprintf("Bearer %s", sandboxesToken)) 34 35 // Create the client 36 LatticeClient := client.NewClient( 37 option.WithClientCredentials(clientId, clientSecret), 38 option.WithBaseURL(fmt.Sprintf("https://%s", latticeEndpoint)), 39 option.WithHTTPHeader(headers), 40 ) 41 42 // Generate a unique ID for the entity 43 entityId := uuid.New().String() 44 45 // Get creation time 46 creationTime := time.Now().UTC() 47 48 // Continuously publish the entity 49 for { 50 latestTimestamp := time.Now().UTC() 51 ctx := context.Background() 52 53 // Create entity to publish 54 entity := Lattice.Entity{ 55 EntityID: &entityId, 56 Description: Lattice.String("Friendly drone asset"), 57 Aliases: &Lattice.Aliases{ 58 Name: Lattice.String("Drone 1"), 59 }, 60 IsLive: Lattice.Bool(true), 61 CreatedTime: Lattice.Time(creationTime), 62 ExpiryTime: Lattice.Time(latestTimestamp.Add(1 * time.Minute)), 63 Ontology: &Lattice.Ontology{ 64 Template: Lattice.OntologyTemplateTemplateAsset.Ptr(), 65 PlatformType: Lattice.String("UAV"), 66 }, 67 MilView: &Lattice.MilView{ 68 Disposition: Lattice.MilViewDispositionDispositionFriendly.Ptr(), 69 Environment: Lattice.MilViewEnvironmentEnvironmentAir.Ptr(), 70 }, 71 Location: &Lattice.Location{ 72 Position: &Lattice.Position{ 73 LatitudeDegrees: Lattice.Float64(50.91402185768586), 74 LongitudeDegrees: Lattice.Float64(0.79203612077257), 75 AltitudeAsfMeters: Lattice.Float64(1000), 76 }, 77 }, 78 Provenance: &Lattice.Provenance{ 79 IntegrationName: Lattice.String("your_integration_name"), 80 DataType: Lattice.String("your_data_type"), 81 SourceUpdateTime: Lattice.Time(latestTimestamp), 82 }, 83 Health: &Lattice.Health{ 84 ConnectionStatus: Lattice.HealthConnectionStatusConnectionStatusOnline.Ptr(), 85 HealthStatus: Lattice.HealthHealthStatusHealthStatusHealthy.Ptr(), 86 UpdateTime: Lattice.Time(latestTimestamp), 87 }, 88 TaskCatalog: &Lattice.TaskCatalog{ 89 TaskDefinitions: []*Lattice.TaskDefinition{ 90 {TaskSpecificationURL: Lattice.String("type.googleapis.com/anduril.tasks.v2.VisualId")}, 91 {TaskSpecificationURL: Lattice.String("type.googleapis.com/anduril.tasks.v2.Investigate")}, 92 }, 93 }, 94 } 95 96 // Publish the entity 97 _, err := LatticeClient.Entities.PublishEntity(ctx, &entity) 98 99 // Handle errors 100 if err != nil { 101 fmt.Printf("Error publishing entity: %v\n", err) 102 } else { 103 fmt.Println("Published asset: " + entityId) 104 } 105 106 // Wait before next request 107 time.Sleep(5 * time.Second) 108 } 109 }
The taskCatalog defines the task types this asset can execute.
Each TaskDefinition
specifies a taskSpecificationUrl
that identifies the task type (for example, type.googleapis.com/anduril.tasks.v2.VisualId).
Create a task processor that listens for tasks assigned to your agent and handles cancellation requests.
For demonstration purposes, this example uses a TASK_ACTIVE environment variable to control whether the agent accepts or rejects cancellations:
1 package main 2 3 import ( 4 "context" 5 "errors" 6 "fmt" 7 "io" 8 "log" 9 "net/http" 10 "os" 11 "strings" 12 "time" 13 14 Lattice "github.com/anduril/lattice-sdk-go/v4" 15 "github.com/anduril/lattice-sdk-go/v4/client" 16 "github.com/anduril/lattice-sdk-go/v4/option" 17 ) 18 19 // Sets whether the task is currently being processed. If so, it cannot be cancelled. 20 var taskActive bool 21 22 func main() { 23 // Get environment variables 24 latticeEndpoint := os.Getenv("LATTICE_ENDPOINT") 25 clientSecret := os.Getenv("LATTICE_CLIENT_SECRET") 26 clientId := os.Getenv("LATTICE_CLIENT_ID") 27 sandboxesToken := os.Getenv("SANDBOXES_TOKEN") 28 taskActiveStr := os.Getenv("TASK_ACTIVE") 29 30 // Check required environment variables 31 if latticeEndpoint == "" || clientId == "" || clientSecret == "" || sandboxesToken == "" { 32 fmt.Println("Missing required environment variables") 33 os.Exit(1) 34 } 35 36 // Parse TASK_ACTIVE environment variable 37 taskActive = strings.ToLower(taskActiveStr) == "true" 38 39 // Initialize headers for sandbox authorization 40 headers := http.Header{} 41 headers.Add("Anduril-Sandbox-Authorization", fmt.Sprintf("Bearer %s", sandboxesToken)) 42 43 // Create the client 44 LatticeClient := client.NewClient( 45 option.WithClientCredentials(clientId, clientSecret), 46 option.WithBaseURL(fmt.Sprintf("https://%s", latticeEndpoint)), 47 option.WithHTTPHeader(headers), 48 ) 49 50 // Set the entity ID to listen for tasks 51 entityId := "<AGENT_ID>" 52 fmt.Printf("Streaming tasks for entity: %s...\n", entityId) 53 54 // Create context for the request 55 ctx := context.Background() 56 57 // Create agent stream request 58 agentStreamRequest := Lattice.AgentStreamRequest{ 59 AgentSelector: &Lattice.EntityIDsSelector{ 60 EntityIDs: []string{entityId}, 61 }, 62 } 63 64 // Stream tasks 65 stream, err := LatticeClient.Tasks.StreamAsAgent(ctx, &agentStreamRequest) 66 if err != nil { 67 fmt.Printf("Error streaming tasks: %v\n", err) 68 os.Exit(1) 69 } 70 71 // Process stream events 72 for { 73 select { 74 case <-ctx.Done(): 75 log.Printf("Context canceled: %v", ctx.Err()) 76 return 77 default: 78 // Continue processing 79 } 80 81 event, err := stream.Recv() 82 83 if errors.Is(err, io.EOF) { 84 log.Println("Stream completed successfully.") 85 return 86 } 87 88 if err != nil { 89 log.Printf("Error receiving message: %v", err) 90 continue 91 } 92 93 if event.Event == "heartbeat" { 94 timestamp := *event.Heartbeat.Timestamp 95 log.Printf("Heartbeat: %s", timestamp) 96 } else { 97 request := event.GetAgentRequest() 98 if executeRequest := request.GetExecuteRequest(); executeRequest != nil { 99 task := executeRequest.GetTask() 100 if task != nil { 101 taskId := *task.GetVersion().GetTaskID() 102 taskStatusVersion := *task.GetVersion().GetStatusVersion() 103 description := *task.GetDescription() 104 105 log.Printf("Starting task %s, version %d: %s", taskId, taskStatusVersion, description) 106 107 // Update task status to STATUS_EXECUTING 108 result, err := executeTask(ctx, LatticeClient, taskId, int(taskStatusVersion), entityId) 109 if err != nil { 110 log.Printf("Error starting task: %v", err) 111 continue 112 } 113 114 log.Printf("Started task with status version: %d", *result.StatusVersion) 115 } 116 } else if completeRequest := request.GetCompleteRequest(); completeRequest != nil { 117 taskToComplete := completeRequest.GetTaskID() 118 if taskToComplete != nil { 119 log.Printf("Completing task: %s", *taskToComplete) 120 err := completeTask(ctx, LatticeClient, *taskToComplete, entityId) 121 if err != nil { 122 log.Printf("Error completing task: %v", err) 123 } 124 } 125 } else if cancelRequest := request.GetCancelRequest(); cancelRequest != nil { 126 taskToCancel := cancelRequest.GetTaskID() 127 if taskToCancel != nil { 128 log.Printf("Cancelling task: %s", *taskToCancel) 129 err := cancelTask(ctx, LatticeClient, *taskToCancel, entityId) 130 if err != nil { 131 log.Printf("Error cancelling task: %v", err) 132 } 133 } 134 } 135 } 136 137 // Sleep briefly to prevent tight looping 138 time.Sleep(100 * time.Millisecond) 139 } 140 } 141 142 // executeTask updates the task status to STATUS_EXECUTING 143 func executeTask(ctx context.Context, client *client.Client, taskId string, taskStatusVersion int, agentEntityId string) (*Lattice.TaskVersion, error) { 144 // Increment status version for the update 145 taskStatusVersion++ 146 147 // Create system principal with the agent entity ID 148 principal := Lattice.Principal{ 149 System: &Lattice.System{ 150 EntityID: &agentEntityId, 151 }, 152 } 153 154 taskStatus := Lattice.TaskStatus{ 155 Status: Lattice.TaskStatusStatusStatusExecuting.Ptr(), 156 } 157 158 // Create task status update request 159 taskStatusUpdate := Lattice.TaskStatusUpdate{ 160 TaskID: taskId, 161 StatusVersion: &taskStatusVersion, 162 NewStatus: &taskStatus, 163 Author: &principal, 164 } 165 166 // Call the UpdateTaskStatus API 167 task, err := client.Tasks.UpdateTaskStatus(ctx, &taskStatusUpdate) 168 if err != nil { 169 return nil, fmt.Errorf("error updating task status: %w", err) 170 } 171 172 return task.Version, nil 173 } 174 175 // cancelTask handles cancellation requests from Lattice 176 func cancelTask(ctx context.Context, client *client.Client, taskId string, entityId string) error { 177 // Get current task to retrieve status_version 178 getTaskRequest := Lattice.GetTaskRequest{ 179 TaskID: taskId, 180 } 181 task, err := client.Tasks.GetTask(ctx, &getTaskRequest) 182 if err != nil { 183 return fmt.Errorf("error getting task: %w", err) 184 } 185 if task.Status == nil || task.Version == nil || task.Version.StatusVersion == nil { 186 return fmt.Errorf("task status or version is missing") 187 } 188 currentTaskStatus := task.Status.Status 189 taskStatusVersion := *task.Version.StatusVersion 190 taskStatusVersion++ 191 192 // Create system principal with the agent entity ID 193 principal := Lattice.Principal{ 194 System: &Lattice.System{ 195 EntityID: &entityId, 196 }, 197 } 198 199 if taskActive { 200 // Reject cancellation: task is active and cannot be cancelled 201 rejectedMessage := "Task is already active, and cannot be cancelled." 202 taskStatus := Lattice.TaskStatus{ 203 // Because the cancellation is being rejected, we do not 204 // change the task status. 205 Status: currentTaskStatus, 206 TaskError: &Lattice.TaskError{ 207 Code: Lattice.TaskErrorCodeErrorCodeRejected.Ptr(), 208 Message: &rejectedMessage, 209 }, 210 } 211 212 taskStatusUpdate := Lattice.TaskStatusUpdate{ 213 TaskID: taskId, 214 StatusVersion: &taskStatusVersion, 215 NewStatus: &taskStatus, 216 Author: &principal, 217 } 218 219 _, err := client.Tasks.UpdateTaskStatus(ctx, &taskStatusUpdate) 220 if err != nil { 221 return fmt.Errorf("error updating task status: %w", err) 222 } 223 224 log.Println("Task could not be cancelled.") 225 } else { 226 // Accept cancellation 227 cancelledMessage := "Task cancelled by agent." 228 taskStatus := Lattice.TaskStatus{ 229 Status: Lattice.TaskStatusStatusStatusDoneNotOk.Ptr(), 230 TaskError: &Lattice.TaskError{ 231 Code: Lattice.TaskErrorCodeErrorCodeCancelled.Ptr(), 232 Message: &cancelledMessage, 233 }, 234 } 235 236 taskStatusUpdate := Lattice.TaskStatusUpdate{ 237 TaskID: taskId, 238 StatusVersion: &taskStatusVersion, 239 NewStatus: &taskStatus, 240 Author: &principal, 241 } 242 243 _, err := client.Tasks.UpdateTaskStatus(ctx, &taskStatusUpdate) 244 if err != nil { 245 return fmt.Errorf("error updating task status: %w", err) 246 } 247 248 log.Println("Task has been cancelled.") 249 } 250 251 return nil 252 } 253 254 // completeTask updates the task status to STATUS_DONE_OK 255 func completeTask(ctx context.Context, client *client.Client, taskId string, entityId string) error { 256 // Get current task to retrieve status_version 257 getTaskRequest := Lattice.GetTaskRequest{ 258 TaskID: taskId, 259 } 260 task, err := client.Tasks.GetTask(ctx, &getTaskRequest) 261 if err != nil { 262 return fmt.Errorf("error getting task: %w", err) 263 } 264 265 if task.Version == nil || task.Version.StatusVersion == nil { 266 return fmt.Errorf("task version is missing") 267 } 268 269 taskStatusVersion := *task.Version.StatusVersion 270 // Increment version and update to terminal state 271 taskStatusVersion++ 272 273 // Create system principal with the agent entity ID 274 principal := Lattice.Principal{ 275 System: &Lattice.System{ 276 EntityID: &entityId, 277 }, 278 } 279 280 taskStatus := Lattice.TaskStatus{ 281 Status: Lattice.TaskStatusStatusStatusDoneOk.Ptr(), 282 } 283 284 taskStatusUpdate := Lattice.TaskStatusUpdate{ 285 TaskID: taskId, 286 StatusVersion: &taskStatusVersion, 287 NewStatus: &taskStatus, 288 Author: &principal, 289 } 290 291 _, err = client.Tasks.UpdateTaskStatus(ctx, &taskStatusUpdate) 292 if err != nil { 293 return fmt.Errorf("error updating task status: %w", err) 294 } 295 296 return nil 297 }
The StreamAsAgent API establishes a server-sent events (SSE) stream that delivers three types of requests:
executeRequest which notifies the agent to start executing a task, completeRequest which requests the agent to complete a task, and
cancelRequest which equests the agent to cancel a task.
When a cancelRequest arrives, the agent retrieves the current task state and decides whether to accept or reject the cancellation.
Rejecting cancellation:
If the task is active and cannot be cancelled, the agent rejects the cancellation by
keeping the current status and attaching a TaskError
with code ERROR_CODE_REJECTED:
1 package main 2 3 import ( 4 "context" 5 "errors" 6 "fmt" 7 "io" 8 "log" 9 "net/http" 10 "os" 11 "strings" 12 "time" 13 14 Lattice "github.com/anduril/lattice-sdk-go/v4" 15 "github.com/anduril/lattice-sdk-go/v4/client" 16 "github.com/anduril/lattice-sdk-go/v4/option" 17 ) 18 19 // Sets whether the task is currently being processed. If so, it cannot be cancelled. 20 var taskActive bool 21 22 func main() { 23 // Get environment variables 24 latticeEndpoint := os.Getenv("LATTICE_ENDPOINT") 25 clientSecret := os.Getenv("LATTICE_CLIENT_SECRET") 26 clientId := os.Getenv("LATTICE_CLIENT_ID") 27 sandboxesToken := os.Getenv("SANDBOXES_TOKEN") 28 taskActiveStr := os.Getenv("TASK_ACTIVE") 29 30 // Check required environment variables 31 if latticeEndpoint == "" || clientId == "" || clientSecret == "" || sandboxesToken == "" { 32 fmt.Println("Missing required environment variables") 33 os.Exit(1) 34 } 35 36 // Parse TASK_ACTIVE environment variable 37 taskActive = strings.ToLower(taskActiveStr) == "true" 38 39 // Initialize headers for sandbox authorization 40 headers := http.Header{} 41 headers.Add("Anduril-Sandbox-Authorization", fmt.Sprintf("Bearer %s", sandboxesToken)) 42 43 // Create the client 44 LatticeClient := client.NewClient( 45 option.WithClientCredentials(clientId, clientSecret), 46 option.WithBaseURL(fmt.Sprintf("https://%s", latticeEndpoint)), 47 option.WithHTTPHeader(headers), 48 ) 49 50 // Set the entity ID to listen for tasks 51 entityId := "<AGENT_ID>" 52 fmt.Printf("Streaming tasks for entity: %s...\n", entityId) 53 54 // Create context for the request 55 ctx := context.Background() 56 57 // Create agent stream request 58 agentStreamRequest := Lattice.AgentStreamRequest{ 59 AgentSelector: &Lattice.EntityIDsSelector{ 60 EntityIDs: []string{entityId}, 61 }, 62 } 63 64 // Stream tasks 65 stream, err := LatticeClient.Tasks.StreamAsAgent(ctx, &agentStreamRequest) 66 if err != nil { 67 fmt.Printf("Error streaming tasks: %v\n", err) 68 os.Exit(1) 69 } 70 71 // Process stream events 72 for { 73 select { 74 case <-ctx.Done(): 75 log.Printf("Context canceled: %v", ctx.Err()) 76 return 77 default: 78 // Continue processing 79 } 80 81 event, err := stream.Recv() 82 83 if errors.Is(err, io.EOF) { 84 log.Println("Stream completed successfully.") 85 return 86 } 87 88 if err != nil { 89 log.Printf("Error receiving message: %v", err) 90 continue 91 } 92 93 if event.Event == "heartbeat" { 94 timestamp := *event.Heartbeat.Timestamp 95 log.Printf("Heartbeat: %s", timestamp) 96 } else { 97 request := event.GetAgentRequest() 98 if executeRequest := request.GetExecuteRequest(); executeRequest != nil { 99 task := executeRequest.GetTask() 100 if task != nil { 101 taskId := *task.GetVersion().GetTaskID() 102 taskStatusVersion := *task.GetVersion().GetStatusVersion() 103 description := *task.GetDescription() 104 105 log.Printf("Starting task %s, version %d: %s", taskId, taskStatusVersion, description) 106 107 // Update task status to STATUS_EXECUTING 108 result, err := executeTask(ctx, LatticeClient, taskId, int(taskStatusVersion), entityId) 109 if err != nil { 110 log.Printf("Error starting task: %v", err) 111 continue 112 } 113 114 log.Printf("Started task with status version: %d", *result.StatusVersion) 115 } 116 } else if completeRequest := request.GetCompleteRequest(); completeRequest != nil { 117 taskToComplete := completeRequest.GetTaskID() 118 if taskToComplete != nil { 119 log.Printf("Completing task: %s", *taskToComplete) 120 err := completeTask(ctx, LatticeClient, *taskToComplete, entityId) 121 if err != nil { 122 log.Printf("Error completing task: %v", err) 123 } 124 } 125 } else if cancelRequest := request.GetCancelRequest(); cancelRequest != nil { 126 taskToCancel := cancelRequest.GetTaskID() 127 if taskToCancel != nil { 128 log.Printf("Cancelling task: %s", *taskToCancel) 129 err := cancelTask(ctx, LatticeClient, *taskToCancel, entityId) 130 if err != nil { 131 log.Printf("Error cancelling task: %v", err) 132 } 133 } 134 } 135 } 136 137 // Sleep briefly to prevent tight looping 138 time.Sleep(100 * time.Millisecond) 139 } 140 } 141 142 // executeTask updates the task status to STATUS_EXECUTING 143 func executeTask(ctx context.Context, client *client.Client, taskId string, taskStatusVersion int, agentEntityId string) (*Lattice.TaskVersion, error) { 144 // Increment status version for the update 145 taskStatusVersion++ 146 147 // Create system principal with the agent entity ID 148 principal := Lattice.Principal{ 149 System: &Lattice.System{ 150 EntityID: &agentEntityId, 151 }, 152 } 153 154 taskStatus := Lattice.TaskStatus{ 155 Status: Lattice.TaskStatusStatusStatusExecuting.Ptr(), 156 } 157 158 // Create task status update request 159 taskStatusUpdate := Lattice.TaskStatusUpdate{ 160 TaskID: taskId, 161 StatusVersion: &taskStatusVersion, 162 NewStatus: &taskStatus, 163 Author: &principal, 164 } 165 166 // Call the UpdateTaskStatus API 167 task, err := client.Tasks.UpdateTaskStatus(ctx, &taskStatusUpdate) 168 if err != nil { 169 return nil, fmt.Errorf("error updating task status: %w", err) 170 } 171 172 return task.Version, nil 173 } 174 175 // cancelTask handles cancellation requests from Lattice 176 func cancelTask(ctx context.Context, client *client.Client, taskId string, entityId string) error { 177 // Get current task to retrieve status_version 178 getTaskRequest := Lattice.GetTaskRequest{ 179 TaskID: taskId, 180 } 181 task, err := client.Tasks.GetTask(ctx, &getTaskRequest) 182 if err != nil { 183 return fmt.Errorf("error getting task: %w", err) 184 } 185 if task.Status == nil || task.Version == nil || task.Version.StatusVersion == nil { 186 return fmt.Errorf("task status or version is missing") 187 } 188 currentTaskStatus := task.Status.Status 189 taskStatusVersion := *task.Version.StatusVersion 190 taskStatusVersion++ 191 192 // Create system principal with the agent entity ID 193 principal := Lattice.Principal{ 194 System: &Lattice.System{ 195 EntityID: &entityId, 196 }, 197 } 198 199 if taskActive { 200 // Reject cancellation: task is active and cannot be cancelled 201 rejectedMessage := "Task is already active, and cannot be cancelled." 202 taskStatus := Lattice.TaskStatus{ 203 // Because the cancellation is being rejected, we do not 204 // change the task status. 205 Status: currentTaskStatus, 206 TaskError: &Lattice.TaskError{ 207 Code: Lattice.TaskErrorCodeErrorCodeRejected.Ptr(), 208 Message: &rejectedMessage, 209 }, 210 } 211 212 taskStatusUpdate := Lattice.TaskStatusUpdate{ 213 TaskID: taskId, 214 StatusVersion: &taskStatusVersion, 215 NewStatus: &taskStatus, 216 Author: &principal, 217 } 218 219 _, err := client.Tasks.UpdateTaskStatus(ctx, &taskStatusUpdate) 220 if err != nil { 221 return fmt.Errorf("error updating task status: %w", err) 222 } 223 224 log.Println("Task could not be cancelled.") 225 } else { 226 // Accept cancellation 227 cancelledMessage := "Task cancelled by agent." 228 taskStatus := Lattice.TaskStatus{ 229 Status: Lattice.TaskStatusStatusStatusDoneNotOk.Ptr(), 230 TaskError: &Lattice.TaskError{ 231 Code: Lattice.TaskErrorCodeErrorCodeCancelled.Ptr(), 232 Message: &cancelledMessage, 233 }, 234 } 235 236 taskStatusUpdate := Lattice.TaskStatusUpdate{ 237 TaskID: taskId, 238 StatusVersion: &taskStatusVersion, 239 NewStatus: &taskStatus, 240 Author: &principal, 241 } 242 243 _, err := client.Tasks.UpdateTaskStatus(ctx, &taskStatusUpdate) 244 if err != nil { 245 return fmt.Errorf("error updating task status: %w", err) 246 } 247 248 log.Println("Task has been cancelled.") 249 } 250 251 return nil 252 } 253 254 // completeTask updates the task status to STATUS_DONE_OK 255 func completeTask(ctx context.Context, client *client.Client, taskId string, entityId string) error { 256 // Get current task to retrieve status_version 257 getTaskRequest := Lattice.GetTaskRequest{ 258 TaskID: taskId, 259 } 260 task, err := client.Tasks.GetTask(ctx, &getTaskRequest) 261 if err != nil { 262 return fmt.Errorf("error getting task: %w", err) 263 } 264 265 if task.Version == nil || task.Version.StatusVersion == nil { 266 return fmt.Errorf("task version is missing") 267 } 268 269 taskStatusVersion := *task.Version.StatusVersion 270 // Increment version and update to terminal state 271 taskStatusVersion++ 272 273 // Create system principal with the agent entity ID 274 principal := Lattice.Principal{ 275 System: &Lattice.System{ 276 EntityID: &entityId, 277 }, 278 } 279 280 taskStatus := Lattice.TaskStatus{ 281 Status: Lattice.TaskStatusStatusStatusDoneOk.Ptr(), 282 } 283 284 taskStatusUpdate := Lattice.TaskStatusUpdate{ 285 TaskID: taskId, 286 StatusVersion: &taskStatusVersion, 287 NewStatus: &taskStatus, 288 Author: &principal, 289 } 290 291 _, err = client.Tasks.UpdateTaskStatus(ctx, &taskStatusUpdate) 292 if err != nil { 293 return fmt.Errorf("error updating task status: %w", err) 294 } 295 296 return nil 297 }
Accepting cancellation:
If the task can be cancelled, the agent accepts by setting the status to STATUS_DONE_NOT_OK
with a TaskError indicating ERROR_CODE_CANCELLED:
1 package main 2 3 import ( 4 "context" 5 "errors" 6 "fmt" 7 "io" 8 "log" 9 "net/http" 10 "os" 11 "strings" 12 "time" 13 14 Lattice "github.com/anduril/lattice-sdk-go/v4" 15 "github.com/anduril/lattice-sdk-go/v4/client" 16 "github.com/anduril/lattice-sdk-go/v4/option" 17 ) 18 19 // Sets whether the task is currently being processed. If so, it cannot be cancelled. 20 var taskActive bool 21 22 func main() { 23 // Get environment variables 24 latticeEndpoint := os.Getenv("LATTICE_ENDPOINT") 25 clientSecret := os.Getenv("LATTICE_CLIENT_SECRET") 26 clientId := os.Getenv("LATTICE_CLIENT_ID") 27 sandboxesToken := os.Getenv("SANDBOXES_TOKEN") 28 taskActiveStr := os.Getenv("TASK_ACTIVE") 29 30 // Check required environment variables 31 if latticeEndpoint == "" || clientId == "" || clientSecret == "" || sandboxesToken == "" { 32 fmt.Println("Missing required environment variables") 33 os.Exit(1) 34 } 35 36 // Parse TASK_ACTIVE environment variable 37 taskActive = strings.ToLower(taskActiveStr) == "true" 38 39 // Initialize headers for sandbox authorization 40 headers := http.Header{} 41 headers.Add("Anduril-Sandbox-Authorization", fmt.Sprintf("Bearer %s", sandboxesToken)) 42 43 // Create the client 44 LatticeClient := client.NewClient( 45 option.WithClientCredentials(clientId, clientSecret), 46 option.WithBaseURL(fmt.Sprintf("https://%s", latticeEndpoint)), 47 option.WithHTTPHeader(headers), 48 ) 49 50 // Set the entity ID to listen for tasks 51 entityId := "<AGENT_ID>" 52 fmt.Printf("Streaming tasks for entity: %s...\n", entityId) 53 54 // Create context for the request 55 ctx := context.Background() 56 57 // Create agent stream request 58 agentStreamRequest := Lattice.AgentStreamRequest{ 59 AgentSelector: &Lattice.EntityIDsSelector{ 60 EntityIDs: []string{entityId}, 61 }, 62 } 63 64 // Stream tasks 65 stream, err := LatticeClient.Tasks.StreamAsAgent(ctx, &agentStreamRequest) 66 if err != nil { 67 fmt.Printf("Error streaming tasks: %v\n", err) 68 os.Exit(1) 69 } 70 71 // Process stream events 72 for { 73 select { 74 case <-ctx.Done(): 75 log.Printf("Context canceled: %v", ctx.Err()) 76 return 77 default: 78 // Continue processing 79 } 80 81 event, err := stream.Recv() 82 83 if errors.Is(err, io.EOF) { 84 log.Println("Stream completed successfully.") 85 return 86 } 87 88 if err != nil { 89 log.Printf("Error receiving message: %v", err) 90 continue 91 } 92 93 if event.Event == "heartbeat" { 94 timestamp := *event.Heartbeat.Timestamp 95 log.Printf("Heartbeat: %s", timestamp) 96 } else { 97 request := event.GetAgentRequest() 98 if executeRequest := request.GetExecuteRequest(); executeRequest != nil { 99 task := executeRequest.GetTask() 100 if task != nil { 101 taskId := *task.GetVersion().GetTaskID() 102 taskStatusVersion := *task.GetVersion().GetStatusVersion() 103 description := *task.GetDescription() 104 105 log.Printf("Starting task %s, version %d: %s", taskId, taskStatusVersion, description) 106 107 // Update task status to STATUS_EXECUTING 108 result, err := executeTask(ctx, LatticeClient, taskId, int(taskStatusVersion), entityId) 109 if err != nil { 110 log.Printf("Error starting task: %v", err) 111 continue 112 } 113 114 log.Printf("Started task with status version: %d", *result.StatusVersion) 115 } 116 } else if completeRequest := request.GetCompleteRequest(); completeRequest != nil { 117 taskToComplete := completeRequest.GetTaskID() 118 if taskToComplete != nil { 119 log.Printf("Completing task: %s", *taskToComplete) 120 err := completeTask(ctx, LatticeClient, *taskToComplete, entityId) 121 if err != nil { 122 log.Printf("Error completing task: %v", err) 123 } 124 } 125 } else if cancelRequest := request.GetCancelRequest(); cancelRequest != nil { 126 taskToCancel := cancelRequest.GetTaskID() 127 if taskToCancel != nil { 128 log.Printf("Cancelling task: %s", *taskToCancel) 129 err := cancelTask(ctx, LatticeClient, *taskToCancel, entityId) 130 if err != nil { 131 log.Printf("Error cancelling task: %v", err) 132 } 133 } 134 } 135 } 136 137 // Sleep briefly to prevent tight looping 138 time.Sleep(100 * time.Millisecond) 139 } 140 } 141 142 // executeTask updates the task status to STATUS_EXECUTING 143 func executeTask(ctx context.Context, client *client.Client, taskId string, taskStatusVersion int, agentEntityId string) (*Lattice.TaskVersion, error) { 144 // Increment status version for the update 145 taskStatusVersion++ 146 147 // Create system principal with the agent entity ID 148 principal := Lattice.Principal{ 149 System: &Lattice.System{ 150 EntityID: &agentEntityId, 151 }, 152 } 153 154 taskStatus := Lattice.TaskStatus{ 155 Status: Lattice.TaskStatusStatusStatusExecuting.Ptr(), 156 } 157 158 // Create task status update request 159 taskStatusUpdate := Lattice.TaskStatusUpdate{ 160 TaskID: taskId, 161 StatusVersion: &taskStatusVersion, 162 NewStatus: &taskStatus, 163 Author: &principal, 164 } 165 166 // Call the UpdateTaskStatus API 167 task, err := client.Tasks.UpdateTaskStatus(ctx, &taskStatusUpdate) 168 if err != nil { 169 return nil, fmt.Errorf("error updating task status: %w", err) 170 } 171 172 return task.Version, nil 173 } 174 175 // cancelTask handles cancellation requests from Lattice 176 func cancelTask(ctx context.Context, client *client.Client, taskId string, entityId string) error { 177 // Get current task to retrieve status_version 178 getTaskRequest := Lattice.GetTaskRequest{ 179 TaskID: taskId, 180 } 181 task, err := client.Tasks.GetTask(ctx, &getTaskRequest) 182 if err != nil { 183 return fmt.Errorf("error getting task: %w", err) 184 } 185 if task.Status == nil || task.Version == nil || task.Version.StatusVersion == nil { 186 return fmt.Errorf("task status or version is missing") 187 } 188 currentTaskStatus := task.Status.Status 189 taskStatusVersion := *task.Version.StatusVersion 190 taskStatusVersion++ 191 192 // Create system principal with the agent entity ID 193 principal := Lattice.Principal{ 194 System: &Lattice.System{ 195 EntityID: &entityId, 196 }, 197 } 198 199 if taskActive { 200 // Reject cancellation: task is active and cannot be cancelled 201 rejectedMessage := "Task is already active, and cannot be cancelled." 202 taskStatus := Lattice.TaskStatus{ 203 // Because the cancellation is being rejected, we do not 204 // change the task status. 205 Status: currentTaskStatus, 206 TaskError: &Lattice.TaskError{ 207 Code: Lattice.TaskErrorCodeErrorCodeRejected.Ptr(), 208 Message: &rejectedMessage, 209 }, 210 } 211 212 taskStatusUpdate := Lattice.TaskStatusUpdate{ 213 TaskID: taskId, 214 StatusVersion: &taskStatusVersion, 215 NewStatus: &taskStatus, 216 Author: &principal, 217 } 218 219 _, err := client.Tasks.UpdateTaskStatus(ctx, &taskStatusUpdate) 220 if err != nil { 221 return fmt.Errorf("error updating task status: %w", err) 222 } 223 224 log.Println("Task could not be cancelled.") 225 } else { 226 // Accept cancellation 227 cancelledMessage := "Task cancelled by agent." 228 taskStatus := Lattice.TaskStatus{ 229 Status: Lattice.TaskStatusStatusStatusDoneNotOk.Ptr(), 230 TaskError: &Lattice.TaskError{ 231 Code: Lattice.TaskErrorCodeErrorCodeCancelled.Ptr(), 232 Message: &cancelledMessage, 233 }, 234 } 235 236 taskStatusUpdate := Lattice.TaskStatusUpdate{ 237 TaskID: taskId, 238 StatusVersion: &taskStatusVersion, 239 NewStatus: &taskStatus, 240 Author: &principal, 241 } 242 243 _, err := client.Tasks.UpdateTaskStatus(ctx, &taskStatusUpdate) 244 if err != nil { 245 return fmt.Errorf("error updating task status: %w", err) 246 } 247 248 log.Println("Task has been cancelled.") 249 } 250 251 return nil 252 } 253 254 // completeTask updates the task status to STATUS_DONE_OK 255 func completeTask(ctx context.Context, client *client.Client, taskId string, entityId string) error { 256 // Get current task to retrieve status_version 257 getTaskRequest := Lattice.GetTaskRequest{ 258 TaskID: taskId, 259 } 260 task, err := client.Tasks.GetTask(ctx, &getTaskRequest) 261 if err != nil { 262 return fmt.Errorf("error getting task: %w", err) 263 } 264 265 if task.Version == nil || task.Version.StatusVersion == nil { 266 return fmt.Errorf("task version is missing") 267 } 268 269 taskStatusVersion := *task.Version.StatusVersion 270 // Increment version and update to terminal state 271 taskStatusVersion++ 272 273 // Create system principal with the agent entity ID 274 principal := Lattice.Principal{ 275 System: &Lattice.System{ 276 EntityID: &entityId, 277 }, 278 } 279 280 taskStatus := Lattice.TaskStatus{ 281 Status: Lattice.TaskStatusStatusStatusDoneOk.Ptr(), 282 } 283 284 taskStatusUpdate := Lattice.TaskStatusUpdate{ 285 TaskID: taskId, 286 StatusVersion: &taskStatusVersion, 287 NewStatus: &taskStatus, 288 Author: &principal, 289 } 290 291 _, err = client.Tasks.UpdateTaskStatus(ctx, &taskStatusUpdate) 292 if err != nil { 293 return fmt.Errorf("error updating task status: %w", err) 294 } 295 296 return nil 297 }
The agent must first retrieve the current task using GetTask to
obtain the current statusVersion,
then increment it before calling UpdateTaskStatus.
Send a task cancellation request to Lattice. Since, in this example, the task has already been sent to an agent, Lattice routes the request to the agent:
1 package main 2 3 import ( 4 "context" 5 "fmt" 6 "net/http" 7 "os" 8 9 Lattice "github.com/anduril/lattice-sdk-go/v4" 10 "github.com/anduril/lattice-sdk-go/v4/client" 11 "github.com/anduril/lattice-sdk-go/v4/option" 12 ) 13 14 func main() { 15 // Get environment variables 16 latticeEndpoint := os.Getenv("LATTICE_ENDPOINT") 17 clientSecret := os.Getenv("LATTICE_CLIENT_SECRET") 18 clientId := os.Getenv("LATTICE_CLIENT_ID") 19 sandboxesToken := os.Getenv("SANDBOXES_TOKEN") 20 21 // Check required environment variables 22 if latticeEndpoint == "" || clientId == "" || clientSecret == "" || sandboxesToken == "" { 23 fmt.Println("Missing required environment variables.") 24 os.Exit(1) 25 } 26 27 // Check for required command-line arguments 28 if len(os.Args) < 2 { 29 fmt.Println("Usage: go run cancel_task.go <task_id> [entity_id]") 30 os.Exit(1) 31 } 32 33 taskId := os.Args[1] 34 var entityId *string 35 if len(os.Args) > 2 { 36 entityId = &os.Args[2] 37 } 38 39 // Initialize headers for sandbox authorization 40 headers := http.Header{} 41 headers.Add("Anduril-Sandbox-Authorization", fmt.Sprintf("Bearer %s", sandboxesToken)) 42 43 // Create the client 44 LatticeClient := client.NewClient( 45 option.WithClientCredentials(clientId, clientSecret), 46 option.WithBaseURL(fmt.Sprintf("https://%s", latticeEndpoint)), 47 option.WithHTTPHeader(headers), 48 ) 49 50 // Create context for the request 51 ctx := context.Background() 52 53 // Build author based on whether entity_id is provided 54 var author *Lattice.Principal 55 if entityId != nil { 56 author = &Lattice.Principal{ 57 System: &Lattice.System{ 58 EntityID: entityId, 59 }, 60 } 61 fmt.Printf("Cancelling task: %s\n", taskId) 62 fmt.Printf("Author entity: %s\n", *entityId) 63 } else { 64 userId := "operator_1" 65 author = &Lattice.Principal{ 66 User: &Lattice.User{ 67 UserID: &userId, 68 }, 69 } 70 fmt.Printf("Cancelling task: %s\n", taskId) 71 } 72 73 // Create cancel task request 74 cancelRequest := Lattice.TaskCancellation{ 75 TaskID: taskId, 76 Author: author, 77 } 78 79 // Call the CancelTask API 80 _, err := LatticeClient.Tasks.CancelTask(ctx, &cancelRequest) 81 if err != nil { 82 fmt.Printf("Error cancelling task: %v\n", err) 83 os.Exit(1) 84 } 85 86 fmt.Printf("Cancel task response: {}\n") 87 }
The CancelTask API accepts the following parameters:
The unique identifier of the task to cancel.
Run the cancellation request with the task ID from Step 2:
$ python cancel_task.py <task_id>
After requesting cancellation, the task will transition to one of two outcomes:
Successful cancellation:
When the agent accepts the cancellation, the task transitions to
STATUS_DONE_NOT_OK with ERROR_CODE_CANCELLED:
$ Cancelling task: task-123456 $ Task has been cancelled. $ Task Status: STATUS_DONE_NOT_OK $ Error Code: ERROR_CODE_CANCELLED $ Error Message: Task cancelled by agent.
Rejected cancellation:
When the agent rejects the cancellation, the task retains its current status but includes a
TaskError with ERROR_CODE_REJECTED:
$ Cancelling task: task-123456 $ Task could not be cancelled. $ Task Status: STATUS_EXECUTING $ Error Code: ERROR_CODE_REJECTED $ Error Message: Task is already active, and cannot be cancelled.
For more information, see CancelTask in the Lattice API Reference.