Listen for tasks

Subscribing to tasks using the Lattice SDK

A taskable agent is an asset entity, or group of entities that can perform tasks.

An agent calls the following API to listen for tasks assigned to it in Lattice:

  • StreamAsAgent — For monitoring tasks routed to the agent using REST.
  • ListenAsAgent — For monitoring tasks routed to the agent using gRPC.

Before you begin

  • To publish taskable entities, and subscribe to tasks, set up your Lattice environment.
  • Familiarize yourself with entities and different entity types.
gRPC authentication

If you are using gRPC with client credentials, set up the token refresh module before running the examples on this page.

Integrate an agent

An asset is an entity under your control, or under the control of another operator or system. Assets may accept tasks such as search or tracking. An agent is an asset, or a group of assets, that can complete a specific set of defined tasks.

To publish an agent, do the following:

1

Define a TaskCatalog

The entity model’s TaskCatalog component defines the tasks that an asset can execute. An operator, using the Lattice UI or a programmatic SDK integration, can use an entity override to modify the asset’s TaskCatalog.

For example, publish an asset with the following catalog to listen to, and execute VisualId and Investigate tasks assigned to it:

taskCatalog
1 "taskCatalog": {
2 // Define the tasks the asset can perform.
3 "taskDefinitions": [
4 {
5 // Set the task specification URL.
6 "taskSpecificationUrl": "type.googleapis.com/anduril.tasks.v2.VisualId"
7 },
8 {
9 // Set the task specification URL.
10 "taskSpecificationUrl": "type.googleapis.com/anduril.tasks.v2.Investigate"
11 }
12 ]
13 }
2

Publish the agent

Use the PublishEntity method to publish a taskable agent:

1package main
2
3import (
4 "context"
5 "fmt"
6 "net/http"
7 "os"
8 "time"
9
10 Lattice "github.com/anduril/lattice-sdk-go/v4"
11 "github.com/anduril/lattice-sdk-go/v4/client"
12 "github.com/anduril/lattice-sdk-go/v4/option"
13 "github.com/google/uuid"
14)
15
16func main() {
17 // Get environment variables
18 latticeEndpoint := os.Getenv("LATTICE_ENDPOINT")
19 clientSecret := os.Getenv("LATTICE_CLIENT_SECRET")
20 clientId := os.Getenv("LATTICE_CLIENT_ID")
21
22 // Remove sandboxesToken from the following statements if you are not developing on Sandboxes
23 sandboxesToken := os.Getenv("SANDBOXES_TOKEN")
24
25 // Check required environment variables
26 if latticeEndpoint == "" || clientId == "" || clientSecret == "" || sandboxesToken == "" {
27 fmt.Println("Missing required environment variables")
28 os.Exit(1)
29 }
30
31 // Initialize headers for sandbox authorization
32 headers := http.Header{}
33 headers.Add("Anduril-Sandbox-Authorization", fmt.Sprintf("Bearer %s", sandboxesToken))
34
35 // Create the client
36 LatticeClient := client.NewClient(
37 option.WithClientCredentials(clientId, clientSecret),
38 option.WithBaseURL(fmt.Sprintf("https://%s", latticeEndpoint)),
39 option.WithHTTPHeader(headers),
40 )
41
42 // Generate a unique ID for the entity
43 entityId := uuid.New().String()
44
45 // Get creation time
46 creationTime := time.Now().UTC()
47
48 // Continuously publish the entity
49 for {
50 latestTimestamp := time.Now().UTC()
51 ctx := context.Background()
52
53 // Create entity to publish
54 entity := Lattice.Entity{
55 EntityID: &entityId,
56 Description: Lattice.String("Friendly drone asset"),
57 Aliases: &Lattice.Aliases{
58 Name: Lattice.String("Drone 1"),
59 },
60 IsLive: Lattice.Bool(true),
61 CreatedTime: Lattice.Time(creationTime),
62 ExpiryTime: Lattice.Time(latestTimestamp.Add(1 * time.Minute)),
63 Ontology: &Lattice.Ontology{
64 Template: Lattice.OntologyTemplateTemplateAsset.Ptr(),
65 PlatformType: Lattice.String("UAV"),
66 },
67 MilView: &Lattice.MilView{
68 Disposition: Lattice.MilViewDispositionDispositionFriendly.Ptr(),
69 Environment: Lattice.MilViewEnvironmentEnvironmentAir.Ptr(),
70 },
71 Location: &Lattice.Location{
72 Position: &Lattice.Position{
73 LatitudeDegrees: Lattice.Float64(50.91402185768586),
74 LongitudeDegrees: Lattice.Float64(0.79203612077257),
75 AltitudeAsfMeters: Lattice.Float64(1000),
76 },
77 },
78 Provenance: &Lattice.Provenance{
79 IntegrationName: Lattice.String("your_integration_name"),
80 DataType: Lattice.String("your_data_type"),
81 SourceUpdateTime: Lattice.Time(latestTimestamp),
82 },
83 Health: &Lattice.Health{
84 ConnectionStatus: Lattice.HealthConnectionStatusConnectionStatusOnline.Ptr(),
85 HealthStatus: Lattice.HealthHealthStatusHealthStatusHealthy.Ptr(),
86 UpdateTime: Lattice.Time(latestTimestamp),
87 },
88 TaskCatalog: &Lattice.TaskCatalog{
89 TaskDefinitions: []*Lattice.TaskDefinition{
90 {TaskSpecificationURL: Lattice.String("type.googleapis.com/anduril.tasks.v2.VisualId")},
91 {TaskSpecificationURL: Lattice.String("type.googleapis.com/anduril.tasks.v2.Investigate")},
92 },
93 },
94 }
95
96 // Publish the entity
97 _, err := LatticeClient.Entities.PublishEntity(ctx, &entity)
98
99 // Handle errors
100 if err != nil {
101 fmt.Printf("Error publishing entity: %v\n", err)
102 } else {
103 fmt.Println("Published asset: " + entityId)
104 }
105
106 // Wait before next request
107 time.Sleep(5 * time.Second)
108 }
109}

If successful, you see the entity ID in the console. Copy the ID:

$2025-07-16T02:50:04.694Z [INFO]: Published asset: <entity-id>
3

Stream tasks as an agent

Use StreamAsAgent, or ListenAsAgent for gRPC, and replace AGENT_ID with the ID of the agent you just published. This lets the agent subscribe to, and listen for, tasks routed to it by Lattice:

1package main
2
3import (
4 "context"
5 "errors"
6 "fmt"
7 "io"
8 "log"
9 "net/http"
10 "os"
11
12 Lattice "github.com/anduril/lattice-sdk-go/v4"
13 "github.com/anduril/lattice-sdk-go/v4/client"
14 "github.com/anduril/lattice-sdk-go/v4/option"
15)
16
17func main() {
18 // Get environment variables
19 latticeEndpoint := os.Getenv("LATTICE_ENDPOINT")
20 clientSecret := os.Getenv("LATTICE_CLIENT_SECRET")
21 clientId := os.Getenv("LATTICE_CLIENT_ID")
22
23 // Remove sandboxesToken from the following statements if you are not developing on Sandboxes
24 sandboxesToken := os.Getenv("SANDBOXES_TOKEN")
25
26 // Check required environment variables
27 if latticeEndpoint == "" || clientId == "" || clientSecret == "" || sandboxesToken == "" {
28 fmt.Println("Missing required environment variables")
29 os.Exit(1)
30 }
31
32 // Initialize headers for sandbox authorization
33 headers := http.Header{}
34 headers.Add("Anduril-Sandbox-Authorization", fmt.Sprintf("Bearer %s", sandboxesToken))
35
36 // Create the client
37 LatticeClient := client.NewClient(
38 option.WithClientCredentials(clientId, clientSecret),
39 option.WithBaseURL(fmt.Sprintf("https://%s", latticeEndpoint)),
40 option.WithHTTPHeader(headers),
41 )
42
43 // Entity ID to listen for tasks
44 entityId := "<AGENT_ID>"
45 fmt.Printf("Streaming tasks for entity %s...\n", entityId)
46
47 // Create context for the request
48 ctx := context.Background()
49
50 // Create agent selector
51 agentStreamRequest := Lattice.AgentStreamRequest{
52 AgentSelector: &Lattice.EntityIDsSelector{
53 EntityIDs: []string{entityId},
54 },
55 }
56
57 // Stream tasks
58 stream, err := LatticeClient.Tasks.StreamAsAgent(ctx, &agentStreamRequest)
59 if err != nil {
60 fmt.Printf("Error streaming tasks: %v\n", err)
61 os.Exit(1)
62 }
63
64 // Process stream events
65 for {
66 select {
67 case <-ctx.Done():
68 log.Printf("Context canceled: %v", ctx.Err())
69 return
70 default:
71 // Continue processing
72 }
73 event, err := stream.Recv()
74
75 if errors.Is(err, io.EOF) {
76 log.Println("Stream completed successfully.")
77 return
78 }
79
80 if err != nil {
81 log.Printf("Error receiving message: %v", err)
82 continue
83 }
84
85 if event.Event == "heartbeat" {
86 timestamp := *event.Heartbeat.Timestamp
87 log.Printf("Heartbeat: %s", timestamp)
88 } else {
89 request := event.GetAgentRequest()
90 if executeRequest := request.GetExecuteRequest(); executeRequest != nil {
91 task := executeRequest.GetTask()
92 if task != nil {
93 log.Printf("Received task: %s", *task.GetVersion().GetTaskID())
94 log.Printf("Task description: %s", *task.GetDescription())
95 }
96 } else if completeRequest := request.GetCompleteRequest(); completeRequest != nil {
97 taskToComplete := completeRequest.GetTaskID()
98 if taskToComplete != nil {
99 log.Printf("Completing task: %s", *taskToComplete)
100 }
101 } else if cancelRequest := request.GetCancelRequest(); cancelRequest != nil {
102 taskToCancel := cancelRequest.GetTaskID()
103 if taskToCancel != nil {
104 log.Printf("Canceling task: %s", *taskToCancel)
105 }
106 }
107 }
108 }
109}

If successful, you see the following output:

$Listening for tasks for entity with ID <entity-id>...
4

Assign a task using the Lattice UI

To assign a task, do the following:

  1. Open your environment’s Lattice UI, and select the entity you published in the Entity Explorer.
  2. From the entity’s detail page, click Assign Task:
    Shows an entity's detail page with the Assign Task button in the Lattice Developer Console.
  3. Select a task from the Available Tasks list, fill in the fields under Task Configuration, then click Submit Task. The form is generated from the task’s schema, and submitting it sends a real CreateTask request:
    Shows the task configuration form in the Lattice Developer Console.
5

Verify the response

If successful, you see the following output back in your local development console:

$Received the following task: "Maintain sensor awareness on specified group or object."

Endpoints for operators

The operator should use the following endpoints from Lattice UI or a third-party UI:

  • CreateTask: Creates a new task. Lattice calls the CreateTask endpoint after a task’s details have been populated in the UI, and an operator presses “Execute Task” for the first time.
  • GetTask: Fetches the state of an existing task.
  • QueryTasks: Finds tasks that match the specified criteria.

What’s next