This page explains the Orchestration capability of the @forge/teamwork-graph SDK: what it is, why you would use it, and how to wire it into a Forge connector app. It uses a Google Drive connector as a worked example, but the patterns apply to any Forge data source.
For the underlying concepts (the three task IDs, idempotency, long-running execution, lifecycle rules, and common gotchas), see Orchestration concepts. For the SDK method reference, see Task operations.
For a runnable reference implementation, see Connect Google Drive to Teamwork Graph.
Orchestration is the Teamwork Graph scheduling and execution layer that runs background ingestion work for a Forge connector. Instead of your app trying to ingest an entire data source in a single web request, you describe the work as a graph of tasks and hand them to Teamwork Graph.
The platform then:
taskRunner function when each task is due to run.In short: you write small task handlers, and Teamwork Graph handles when, how often, and in what order they run.
Forge web triggers and event functions have short execution budgets and no built-in concept of resumable, recurring work. Building an ingestion pipeline directly on top of them forces you to solve several hard problems yourself.
| Without Orchestration | With Orchestration |
|---|---|
| You manage your own cron or external scheduler | The platform schedules tasks on a recurring interval |
| Large datasets must be processed in one invocation | Work is split into a tree of root and child tasks |
| You build retry and backoff logic from scratch | The platform retries based on failureReason |
| State between runs has to be cobbled together | persistentTaskMetadata is delivered back on the next run |
You should reach for orchestration whenever your connector needs to:
taskId and tagged with a taskType, usually one of the SDK-provided ForgeTaskType values.graph.scheduleOrUpdateTask and given a recurring scheduleInterval. Typically created in your onConnectionChange handler when a connection is created or updated.graph.scheduleChildTask. It inherits the connection of its parent and is used to fan out work.scanId / taskExecutionId — opaque identifiers passed to your taskRunner on every invocation. You must echo them back when scheduling child tasks or reporting status.persistentTaskMetadata — arbitrary JSON delivered to your taskRunner on each execution of a child task. Use this to record what slice of the data source the task is responsible for.graph.updateTaskStatus or with error handling that results in a status update. Status is success or failure; failures can additionally carry a failureReason.manifest.ymlUnder your graph:connector module, add an orchestration block pointing at the function that will execute tasks.
1 2modules: graph:connector: - key: my-connector # ... datasource, objectTypes, etc ... orchestration: taskRunner: function: taskRunnerFn function: - key: taskRunnerFn handler: index.taskRunner
You can see this in the example app's manifest.yml:
1 2orchestration: taskRunner: function: taskRunnerFn webtrigger: - key: google-ingestion-webtrigger function: google-ingestion function: - key: google-ingestion handler: index.googleIngestion - key: onConnectionChangeFn handler: index.onConnectionChange - key: validateConnectionFn handler: index.validateConnection - key: taskRunnerFn handler: index.taskRunner
Root tasks are typically scheduled from onConnectionChange when the connection is CREATED or UPDATED. Use graph.scheduleOrUpdateTask - it will create a new schedule on first call and update the existing one on subsequent calls if you reuse the same taskId.
1 2export async function scheduleInitialTask(connectorConfig: ConnectorConfig, taskType: types.ForgeTaskType): Promise<void> { try { if (!connectorConfig.connectionId) { throw new Error('connectionId is required for scheduling initial data ingestion task'); } const rootTaskId: string | undefined = await getRootTaskId(taskType); const taskId = rootTaskId ?? uuidv4().toString(); console.log(`Scheduling initial ${taskType} task: ${taskId}`); const taskScheduleRequest: types.TaskScheduleRequest = { connectionId: connectorConfig.connectionId, scheduleInterval: { value: 30, timeUnit: 'minutes' as const }, task: { taskType: taskType, taskId: taskId }, }; console.log(`Scheduling initial ${taskType} task: ${JSON.stringify(taskScheduleRequest)}`); const result: types.TaskScheduleResponse = await graph.scheduleOrUpdateTask(taskScheduleRequest); if (result.status === 'ACCEPTED') { console.log(`Initial ${taskType} task scheduled successfully: ${result.taskId}`); const taskInfo: TaskInfo = { taskId: taskId, taskType: taskType, connectionId: connectorConfig.connectionId, connectorName: connectorConfig.connectorName, status: 'ACCEPTED', createdAt: new Date().toISOString(), isRootTask: true, metadata: { folderId: connectorConfig.folderId } }; await storeTaskInfo(taskId, taskInfo); await storeRootTaskId(taskType, taskId); } else { throw new Error(`Root task ${rootTaskId}: Failed to schedule initial ${taskType} task: ${result.message}`); }
Key points:
scheduleInterval defines the recurring cadence using minutes, hours, or days.taskId if one already exists for the connection and task type. This prevents duplicate schedules.taskType from types.FORGE_TASK_TYPES. Do not invent your own strings.taskRunner from your Forge appThe platform calls this function with a TaskRunnerPayload containing the taskId, scanId, taskExecutionId, connectionId, the connector form fields, and any task metadata you previously attached.
The handler should:
taskType.graph.updateTaskStatus with success or failure.1 2export async function taskRunner(request: TaskRunnerPayload): Promise<Object> { console.log(`TS: TaskRunner request with taskId: ${request.taskId}, scanId: ${request.scanId}, connectorFormFields: ${JSON.stringify(sanitizeForLogging(request.connectorFormFields))}, taskMetadata: ${JSON.stringify(sanitizeForLogging(request.taskMetadata))}`); try { const taskInfo = await getTaskInfo(request.taskId); if (!taskInfo) { throw new Error(`Task ${request.taskId}: Task not found`); } let result: Object; switch (taskInfo.taskType) { case types.FORGE_TASK_TYPES.ENTITY_INGESTION_INCREMENTAL: result = await executeAndScheduleChildTasks(request, taskInfo); break; case types.FORGE_TASK_TYPES.USER_INGESTION_INCREMENTAL: result = await executeUserSync(request); break; default: throw new Error(`Task ${request.taskId}: Unsupported task type: ${taskInfo.taskType}. Only ${types.FORGE_TASK_TYPES.ENTITY_INGESTION_INCREMENTAL} and ${types.FORGE_TASK_TYPES.USER_INGESTION_INCREMENTAL} are supported.`); } await updateTaskStatus(request, ((result as { success: boolean })?.success) ? 'success' : 'failure'); return result; } catch (error) { console.error(`Task ${request.taskId}: Error in taskRunner: ${error}`); await updateTaskStatus(request, 'failure', 'NON_RETRYABLE_ERROR'); return { success: false, message: error instanceof Error ? error.message : 'Unknown error occurred' }; } }
When a running task discovers more work, fan out by calling graph.scheduleChildTask. Echo back the scanId and taskExecutionId from the current invocation, and attach persistentTaskMetadata describing the slice of work this child should do.
1 2async function scheduleSubTasks(folders: GoogleDriveFile[], request: TaskRunnerPayload, childTaskResults: { folderId: string; status: string; error?: string; }[]) { for (const folder of folders) { try { const childTaskId = uuidv4().toString(); const childTaskRequest: types.ChildTaskScheduleRequest = { scanId: request.scanId, taskExecutionId: request.taskExecutionId, connectionId: request.connectionId, task: { parentTaskId: request.taskId, taskType: types.FORGE_TASK_TYPES.ENTITY_INGESTION_INCREMENTAL, taskId: childTaskId, persistentTaskMetadata: { folderId: folder.id, folderName: folder.name } } }; console.log(`Task ${request.taskId}: Scheduling child task for folder: ${folder.name} (${folder.id})`); const childTaskResponse: types.ChildTaskScheduleResponse = await graph.scheduleChildTask(childTaskRequest);
Key points:
scheduleInterval for child tasks. The recurring cadence is configured on the root task, while child tasks are queued from a running execution to fan out additional work.persistentTaskMetadata is the right place for defining what slice of data a task is responsible for.taskId per child and store enough local info to dispatch the child correctly when it executes.Every execution must end with a status update so the platform can decide whether to retry. Use graph.updateTaskStatus with the same scanId and taskExecutionId you received.
1 2async function updateTaskStatus( request: TaskRunnerPayload, status: 'success' | 'failure', failureReason?: types.TaskStatusUpdateRequest['failureReason'] ): Promise<void> { try { const statusUpdateRequest: types.TaskStatusUpdateRequest = { scanId: request.scanId, taskExecutionId: request.taskExecutionId, status: status, task: { taskId: request.taskId }, connectionId: request.connectionId }; if (status === 'failure' && failureReason) { statusUpdateRequest.failureReason = failureReason; }
For failures, classify the error and pass an appropriate failureReason. The example maps common error signatures such as HTTP 401, 429, 404, and token revoked or expired conditions to standard reasons:
1 2let failureReason: types.TaskStatusUpdateRequest['failureReason'] = 'NON_RETRYABLE_ERROR'; if (error instanceof Error) { const errorMessage = error.message.toLowerCase(); if (errorMessage.includes('unauthorized') || errorMessage.includes('401')) { failureReason = 'UNAUTHORIZED'; } else if (errorMessage.includes('token') && (errorMessage.includes('revoked') || errorMessage.includes('expired'))) { failureReason = errorMessage.includes('revoked') ? 'AUTH_TOKEN_REVOKED' : 'AUTH_TOKEN_EXPIRED'; } else if (errorMessage.includes('rate limit') || errorMessage.includes('429')) { failureReason = 'RATE_LIMITED'; } else if (errorMessage.includes('not found') || errorMessage.includes('404')) { failureReason = 'ENTITY_NOT_FOUND'; } }
Two complementary stores work well in practice:
persistentTaskMetadata on the task itself - best for what this task is responsible for. It rides along with the task and is delivered back to your runner on every execution.@forge/kvs - best for cross-task bookkeeping, such as tracking which taskId is the current root for a given task type on a connection.The example wraps root task helpers like this:
1 2export async function getRootTaskId(taskType: types.ForgeTaskType): Promise<string | undefined> { const taskKey = `root_task_${taskType}`; const result = await kvs.get(taskKey); return result as string | undefined; } export async function storeRootTaskId(taskType: types.ForgeTaskType, taskId: string): Promise<void> { const taskKey = `root_task_${taskType}`; await kvs.set(taskKey, taskId); } export async function deleteRootTaskId(taskType: types.ForgeTaskType): Promise<void> { const taskKey = `root_task_${taskType}`; await kvs.delete(taskKey); }
When the connection is deleted, clean up stored root task IDs so a fresh install starts cleanly:
1 2case 'DELETED': console.log('Connection deleted:', request.name); await kvs.deleteSecret(request.name); await deleteRootTaskId(types.FORGE_TASK_TYPES.ENTITY_INGESTION_INCREMENTAL); await deleteRootTaskId(types.FORGE_TASK_TYPES.USER_INGESTION_INCREMENTAL); break;
Task types come from the SDK as types.FORGE_TASK_TYPES. They communicate to the platform what kind of ingestion the task represents and whether it is a full or incremental run.
| Constant | Value |
|---|---|
FORGE_TASK_TYPES.USER_INGESTION_FULL | "user_ingestion_full" |
FORGE_TASK_TYPES.USER_INGESTION_INCREMENTAL | "user_ingestion_incremental" |
FORGE_TASK_TYPES.GROUP_INGESTION_FULL | "group_ingestion_full" |
FORGE_TASK_TYPES.GROUP_INGESTION_INCREMENTAL | "group_ingestion_incremental" |
FORGE_TASK_TYPES.ENTITY_INGESTION_FULL | "entity_ingestion_full" |
FORGE_TASK_TYPES.ENTITY_INGESTION_INCREMENTAL | "entity_ingestion_incremental" |
FORGE_TASK_TYPES.RELATIONSHIP_INGESTION_FULL | "relationship_ingestion_full" |
FORGE_TASK_TYPES.RELATIONSHIP_INGESTION_INCREMENTAL | "relationship_ingestion_incremental" |
Always reference task types via types.FORGE_TASK_TYPES.<NAME>. Do not hard-code the string literals or define a parallel enum in your app.
failureReason values understood by the platform are:
| Reason | Use when |
|---|---|
UNAUTHORIZED | Authentication failed, such as an HTTP 401. This can surface re-auth prompts to the admin. |
AUTH_TOKEN_REVOKED | The credential was revoked by the upstream system. |
AUTH_TOKEN_EXPIRED | The credential expired and refresh is required. |
RATE_LIMITED | The upstream rate limit was hit, such as HTTP 429. The platform can back off. |
ENTITY_NOT_FOUND | The referenced resource no longer exists upstream, such as HTTP 404. |
RETRYABLE_ERROR | A transient error occurred and the platform may retry on the schedule. |
RETRY_LIMIT_EXCEEDED | You have decided to stop retrying after repeated failures. |
NON_RETRYABLE_ERROR | Default for unclassified errors; the task fails this run. |
This is the lifecycle for a single connection in the example app:
Connection created. The user installs the connector and provides config. Forge calls onConnectionChange with action: 'CREATED'.
Root tasks scheduled. onConnectionChange calls scheduleInitialTask twice - once for ENTITY_INGESTION_INCREMENTAL and once for USER_INGESTION_INCREMENTAL. Each call invokes graph.scheduleOrUpdateTask with a 30-minute interval and stores the new taskId in KVS.
Task fires on schedule. Teamwork Graph invokes taskRunner with the root task's taskId, scanId, taskExecutionId, connector form fields, and for child tasks, taskMetadata.
Dispatch by type.
ENTITY_INGESTION_INCREMENTAL calls logic that queries the Drive folder, transforms files into Teamwork Graph documents, uploads them via graph.setObjects in batches, and schedules a child task per sub-folder.USER_INGESTION_INCREMENTAL calls logic that uploads users via graph.setUsers.Status reported. The runner finishes with graph.updateTaskStatus using success or failure plus optional failureReason.
Child tasks run with task metadata. Each child task receives its persistentTaskMetadata and may fan out further.
Connection deleted. onConnectionChange with action: 'DELETED' cleans up KVS entries, including stored root task IDs.
taskId for the connection and task type before calling scheduleOrUpdateTask.persistentTaskMetadata for task identity. Store what slice of data the task owns there, not mutable execution state.try/catch and ensure graph.updateTaskStatus is called from both success and failure paths.UNAUTHORIZED, RATE_LIMITED, and AUTH_TOKEN_EXPIRED lets the platform and admin UI respond intelligently.FORGE_TASK_TYPES directly. Avoid maintaining a parallel enum.All three orchestration methods live on the graph export from @forge/teamwork-graph:
1 2import { graph, types } from '@forge/teamwork-graph'; // Schedule (or update) a recurring root task for a connection. graph.scheduleOrUpdateTask(request: types.TaskScheduleRequest) : Promise<types.TaskScheduleResponse>; // Spawn a child task from inside a running task. graph.scheduleChildTask(request: types.ChildTaskScheduleRequest) : Promise<types.ChildTaskScheduleResponse>; // Report success/failure for the current task execution. graph.updateTaskStatus(request: types.TaskStatusUpdateRequest) : Promise<types.TaskStatusUpdateResponse>;
Key request shapes from the SDK type definitions:
1 2type TaskScheduleRequest = { connectionId: string; scheduleInterval: { value: number; timeUnit: 'minutes' | 'hours' | 'days' }; task: { taskType: ForgeTaskType; taskId: string }; }; type ChildTaskScheduleRequest = { scanId: string; taskExecutionId: string; connectionId: string; task: { parentTaskId: string; taskType: ForgeTaskType; taskId: string; persistentTaskMetadata?: any; temporalTaskMetadata?: any; }; }; type TaskStatusUpdateRequest = { scanId: string; taskExecutionId: string; status: 'success' | 'failure'; task: { taskId: string }; failureReason?: | 'ENTITY_NOT_FOUND' | 'RETRY_LIMIT_EXCEEDED' | 'NON_RETRYABLE_ERROR' | 'UNAUTHORIZED' | 'AUTH_TOKEN_REVOKED' | 'AUTH_TOKEN_EXPIRED' | 'RATE_LIMITED' | 'RETRYABLE_ERROR'; connectionId: string; };
The payload delivered to your taskRunner looks like this:
1 2interface TaskRunnerPayload { connectionId: string; appId: string; extensionKey: string; environmentId: string; scanId: string; taskExecutionId: string; taskId: string; connectorFormFields: Record<string, any>; taskDescription?: string; taskMetadata?: any; }
For a runnable end-to-end example, see the Google Drive connector example app, including its src/task-management.ts, src/connection-management.ts, and manifest.yml.
Rate this page: