Developer
News and Updates
Get Support
Sign in
Get Support
Sign in
DOCUMENTATION
Cloud
Data Center
Resources
Sign in
Sign in
DOCUMENTATION
Cloud
Data Center
Resources
Sign in
Last updated Jun 1, 2026

Build an orchestrated connector

This page explains the Orchestration capability of the @forge/teamwork-graph SDK: what it is, why you would use it, and how to wire it into a Forge connector app. It uses a Google Drive connector as a worked example, but the patterns apply to any Forge data source.

For the underlying concepts (the three task IDs, idempotency, long-running execution, lifecycle rules, and common gotchas), see Orchestration concepts. For the SDK method reference, see Task operations.

For a runnable reference implementation, see Connect Google Drive to Teamwork Graph.

What is Orchestration?

Orchestration is the Teamwork Graph scheduling and execution layer that runs background ingestion work for a Forge connector. Instead of your app trying to ingest an entire data source in a single web request, you describe the work as a graph of tasks and hand them to Teamwork Graph.

The platform then:

  • Invokes your taskRunner function when each task is due to run.
  • Runs tasks on a recurring schedule, such as every 30 minutes for incremental sync.
  • Lets a task fan out by spawning child tasks, such as one task per sub-folder, page, or shard.
  • Retries or backs off based on the failure reason you report.

In short: you write small task handlers, and Teamwork Graph handles when, how often, and in what order they run.

Why use Orchestration?

Forge web triggers and event functions have short execution budgets and no built-in concept of resumable, recurring work. Building an ingestion pipeline directly on top of them forces you to solve several hard problems yourself.

Without OrchestrationWith Orchestration
You manage your own cron or external schedulerThe platform schedules tasks on a recurring interval
Large datasets must be processed in one invocationWork is split into a tree of root and child tasks
You build retry and backoff logic from scratchThe platform retries based on failureReason
State between runs has to be cobbled togetherpersistentTaskMetadata is delivered back on the next run

You should reach for orchestration whenever your connector needs to:

  • Periodically pull data from an external system on a schedule.
  • Walk a large or hierarchical source such as folders, projects, repositories, or pages.
  • Survive transient failures and resume on the next scheduled run.

Key Concepts

  • Task — a unit of work identified by a taskId and tagged with a taskType, usually one of the SDK-provided ForgeTaskType values.
  • Root task — the top-level task for a connection. Scheduled with graph.scheduleOrUpdateTask and given a recurring scheduleInterval. Typically created in your onConnectionChange handler when a connection is created or updated.
  • Child task — a sub-task spawned from inside a running task via graph.scheduleChildTask. It inherits the connection of its parent and is used to fan out work.
  • scanId / taskExecutionId — opaque identifiers passed to your taskRunner on every invocation. You must echo them back when scheduling child tasks or reporting status.
  • persistentTaskMetadata — arbitrary JSON delivered to your taskRunner on each execution of a child task. Use this to record what slice of the data source the task is responsible for.
  • Task status — every execution of a task must end with a call to graph.updateTaskStatus or with error handling that results in a status update. Status is success or failure; failures can additionally carry a failureReason.

How to Use Orchestration

1. Declare the task runner in manifest.yml

Under your graph:connector module, add an orchestration block pointing at the function that will execute tasks.

1
2
modules:
  graph:connector:
    - key: my-connector
      # ... datasource, objectTypes, etc ...
      orchestration:
        taskRunner:
          function: taskRunnerFn
  function:
    - key: taskRunnerFn
      handler: index.taskRunner

You can see this in the example app's manifest.yml:

1
2
orchestration:
        taskRunner:
          function: taskRunnerFn
  webtrigger:
    - key: google-ingestion-webtrigger
      function: google-ingestion
  function:
    - key: google-ingestion
      handler: index.googleIngestion
    - key: onConnectionChangeFn
      handler: index.onConnectionChange
    - key: validateConnectionFn
      handler: index.validateConnection
    - key: taskRunnerFn
      handler: index.taskRunner

2. Schedule a root task

Root tasks are typically scheduled from onConnectionChange when the connection is CREATED or UPDATED. Use graph.scheduleOrUpdateTask - it will create a new schedule on first call and update the existing one on subsequent calls if you reuse the same taskId.

1
2
export async function scheduleInitialTask(connectorConfig: ConnectorConfig, taskType: types.ForgeTaskType): Promise<void> {
    try {
        if (!connectorConfig.connectionId) {
            throw new Error('connectionId is required for scheduling initial data ingestion task');
        }
		const rootTaskId: string | undefined = await getRootTaskId(taskType);

        const taskId = rootTaskId ?? uuidv4().toString();
		console.log(`Scheduling initial ${taskType} task: ${taskId}`);
        const taskScheduleRequest: types.TaskScheduleRequest = {
            connectionId: connectorConfig.connectionId,
            scheduleInterval: {
                value: 30,
                timeUnit: 'minutes' as const
            },
			task: {
                taskType: taskType,
                taskId: taskId
            },
        };

        console.log(`Scheduling initial ${taskType} task: ${JSON.stringify(taskScheduleRequest)}`);
        
        const result: types.TaskScheduleResponse = await graph.scheduleOrUpdateTask(taskScheduleRequest);
        
        if (result.status === 'ACCEPTED') {
            console.log(`Initial ${taskType} task scheduled successfully: ${result.taskId}`);
            
            const taskInfo: TaskInfo = {
                taskId: taskId,
                taskType: taskType,
                connectionId: connectorConfig.connectionId,
                connectorName: connectorConfig.connectorName,
                status: 'ACCEPTED',
                createdAt: new Date().toISOString(),
                isRootTask: true,
				metadata: {
					folderId: connectorConfig.folderId
				}
            };
            await storeTaskInfo(taskId, taskInfo);
			await storeRootTaskId(taskType, taskId);
        } else {
            throw new Error(`Root task ${rootTaskId}: Failed to schedule initial ${taskType} task: ${result.message}`);
        }

Key points:

  • scheduleInterval defines the recurring cadence using minutes, hours, or days.
  • Always reuse a stored taskId if one already exists for the connection and task type. This prevents duplicate schedules.
  • Pick a taskType from types.FORGE_TASK_TYPES. Do not invent your own strings.

3. Export a taskRunner from your Forge app

The platform calls this function with a TaskRunnerPayload containing the taskId, scanId, taskExecutionId, connectionId, the connector form fields, and any task metadata you previously attached.

The handler should:

  1. Look up its own task record so it knows what it is doing.
  2. Dispatch on taskType.
  3. Do the work, possibly scheduling child tasks.
  4. Call graph.updateTaskStatus with success or failure.
1
2
export async function taskRunner(request: TaskRunnerPayload): Promise<Object> {
	console.log(`TS: TaskRunner request with taskId: ${request.taskId}, scanId: ${request.scanId}, 
		connectorFormFields: ${JSON.stringify(sanitizeForLogging(request.connectorFormFields))}, 
		taskMetadata: ${JSON.stringify(sanitizeForLogging(request.taskMetadata))}`);

	try {
		const taskInfo = await getTaskInfo(request.taskId);
		if (!taskInfo) {
			throw new Error(`Task ${request.taskId}: Task not found`);
		}
		let result: Object;
		
		switch (taskInfo.taskType) {
			case types.FORGE_TASK_TYPES.ENTITY_INGESTION_INCREMENTAL:
				result = await executeAndScheduleChildTasks(request, taskInfo);
				break;
			case types.FORGE_TASK_TYPES.USER_INGESTION_INCREMENTAL:
				result = await executeUserSync(request);
				break;
			default:
				throw new Error(`Task ${request.taskId}: Unsupported task type: ${taskInfo.taskType}. Only ${types.FORGE_TASK_TYPES.ENTITY_INGESTION_INCREMENTAL} and ${types.FORGE_TASK_TYPES.USER_INGESTION_INCREMENTAL} are supported.`);
		}
		await updateTaskStatus(request, ((result as { success: boolean })?.success) ? 'success' : 'failure');
		return result;
	} catch (error) {
		console.error(`Task ${request.taskId}: Error in taskRunner: ${error}`);
		await updateTaskStatus(request, 'failure', 'NON_RETRYABLE_ERROR');
		return {
			success: false,
			message: error instanceof Error ? error.message : 'Unknown error occurred'
		};
	}
}

4. Schedule child tasks from inside the task runner

When a running task discovers more work, fan out by calling graph.scheduleChildTask. Echo back the scanId and taskExecutionId from the current invocation, and attach persistentTaskMetadata describing the slice of work this child should do.

1
2
async function scheduleSubTasks(folders: GoogleDriveFile[], request: TaskRunnerPayload, childTaskResults: { folderId: string; status: string; error?: string; }[]) {
	for (const folder of folders) {
		try {
			const childTaskId = uuidv4().toString();
			const childTaskRequest: types.ChildTaskScheduleRequest = {
				scanId: request.scanId,
				taskExecutionId: request.taskExecutionId,
				connectionId: request.connectionId,
				task: {
					parentTaskId: request.taskId,
					taskType: types.FORGE_TASK_TYPES.ENTITY_INGESTION_INCREMENTAL,
					taskId: childTaskId,
					persistentTaskMetadata: {
						folderId: folder.id,
						folderName: folder.name
					}
				}
			};

			console.log(`Task ${request.taskId}: Scheduling child task for folder: ${folder.name} (${folder.id})`);
			const childTaskResponse: types.ChildTaskScheduleResponse = await graph.scheduleChildTask(childTaskRequest);

Key points:

  • You do not provide scheduleInterval for child tasks. The recurring cadence is configured on the root task, while child tasks are queued from a running execution to fan out additional work.
  • persistentTaskMetadata is the right place for defining what slice of data a task is responsible for.
  • Use a unique taskId per child and store enough local info to dispatch the child correctly when it executes.

5. Report task status

Every execution must end with a status update so the platform can decide whether to retry. Use graph.updateTaskStatus with the same scanId and taskExecutionId you received.

1
2
async function updateTaskStatus(
	request: TaskRunnerPayload,
	status: 'success' | 'failure',
	failureReason?: types.TaskStatusUpdateRequest['failureReason']
): Promise<void> {
	try {
		const statusUpdateRequest: types.TaskStatusUpdateRequest = {
			scanId: request.scanId,
			taskExecutionId: request.taskExecutionId,
			status: status,
			task: {
				taskId: request.taskId
			},
			connectionId: request.connectionId
		};

		if (status === 'failure' && failureReason) {
			statusUpdateRequest.failureReason = failureReason;
		}

For failures, classify the error and pass an appropriate failureReason. The example maps common error signatures such as HTTP 401, 429, 404, and token revoked or expired conditions to standard reasons:

1
2
let failureReason: types.TaskStatusUpdateRequest['failureReason'] = 'NON_RETRYABLE_ERROR';

if (error instanceof Error) {
	const errorMessage = error.message.toLowerCase();
	if (errorMessage.includes('unauthorized') || errorMessage.includes('401')) {
		failureReason = 'UNAUTHORIZED';
	} else if (errorMessage.includes('token') && (errorMessage.includes('revoked') || errorMessage.includes('expired'))) {
		failureReason = errorMessage.includes('revoked') ? 'AUTH_TOKEN_REVOKED' : 'AUTH_TOKEN_EXPIRED';
	} else if (errorMessage.includes('rate limit') || errorMessage.includes('429')) {
		failureReason = 'RATE_LIMITED';
	} else if (errorMessage.includes('not found') || errorMessage.includes('404')) {
		failureReason = 'ENTITY_NOT_FOUND';
	}
}

6. Persist task metadata between executions

Two complementary stores work well in practice:

  • persistentTaskMetadata on the task itself - best for what this task is responsible for. It rides along with the task and is delivered back to your runner on every execution.
  • App KVS via @forge/kvs - best for cross-task bookkeeping, such as tracking which taskId is the current root for a given task type on a connection.

The example wraps root task helpers like this:

1
2
export async function getRootTaskId(taskType: types.ForgeTaskType): Promise<string | undefined> {
	const taskKey = `root_task_${taskType}`;
	const result = await kvs.get(taskKey);
	return result as string | undefined;
}

export async function storeRootTaskId(taskType: types.ForgeTaskType, taskId: string): Promise<void> {
	const taskKey = `root_task_${taskType}`;
	await kvs.set(taskKey, taskId);
}

export async function deleteRootTaskId(taskType: types.ForgeTaskType): Promise<void> {
	const taskKey = `root_task_${taskType}`;
	await kvs.delete(taskKey);
}

When the connection is deleted, clean up stored root task IDs so a fresh install starts cleanly:

1
2
case 'DELETED':
	console.log('Connection deleted:', request.name);
	await kvs.deleteSecret(request.name);
	await deleteRootTaskId(types.FORGE_TASK_TYPES.ENTITY_INGESTION_INCREMENTAL);
	await deleteRootTaskId(types.FORGE_TASK_TYPES.USER_INGESTION_INCREMENTAL);
	break;

Task Types

Task types come from the SDK as types.FORGE_TASK_TYPES. They communicate to the platform what kind of ingestion the task represents and whether it is a full or incremental run.

ConstantValue
FORGE_TASK_TYPES.USER_INGESTION_FULL"user_ingestion_full"
FORGE_TASK_TYPES.USER_INGESTION_INCREMENTAL"user_ingestion_incremental"
FORGE_TASK_TYPES.GROUP_INGESTION_FULL"group_ingestion_full"
FORGE_TASK_TYPES.GROUP_INGESTION_INCREMENTAL"group_ingestion_incremental"
FORGE_TASK_TYPES.ENTITY_INGESTION_FULL"entity_ingestion_full"
FORGE_TASK_TYPES.ENTITY_INGESTION_INCREMENTAL"entity_ingestion_incremental"
FORGE_TASK_TYPES.RELATIONSHIP_INGESTION_FULL"relationship_ingestion_full"
FORGE_TASK_TYPES.RELATIONSHIP_INGESTION_INCREMENTAL"relationship_ingestion_incremental"

Always reference task types via types.FORGE_TASK_TYPES.<NAME>. Do not hard-code the string literals or define a parallel enum in your app.

Failure Reasons

failureReason values understood by the platform are:

ReasonUse when
UNAUTHORIZEDAuthentication failed, such as an HTTP 401. This can surface re-auth prompts to the admin.
AUTH_TOKEN_REVOKEDThe credential was revoked by the upstream system.
AUTH_TOKEN_EXPIREDThe credential expired and refresh is required.
RATE_LIMITEDThe upstream rate limit was hit, such as HTTP 429. The platform can back off.
ENTITY_NOT_FOUNDThe referenced resource no longer exists upstream, such as HTTP 404.
RETRYABLE_ERRORA transient error occurred and the platform may retry on the schedule.
RETRY_LIMIT_EXCEEDEDYou have decided to stop retrying after repeated failures.
NON_RETRYABLE_ERRORDefault for unclassified errors; the task fails this run.

End-to-End Flow

This is the lifecycle for a single connection in the example app:

  1. Connection created. The user installs the connector and provides config. Forge calls onConnectionChange with action: 'CREATED'.

  2. Root tasks scheduled. onConnectionChange calls scheduleInitialTask twice - once for ENTITY_INGESTION_INCREMENTAL and once for USER_INGESTION_INCREMENTAL. Each call invokes graph.scheduleOrUpdateTask with a 30-minute interval and stores the new taskId in KVS.

  3. Task fires on schedule. Teamwork Graph invokes taskRunner with the root task's taskId, scanId, taskExecutionId, connector form fields, and for child tasks, taskMetadata.

  4. Dispatch by type.

    • ENTITY_INGESTION_INCREMENTAL calls logic that queries the Drive folder, transforms files into Teamwork Graph documents, uploads them via graph.setObjects in batches, and schedules a child task per sub-folder.
    • USER_INGESTION_INCREMENTAL calls logic that uploads users via graph.setUsers.
  5. Status reported. The runner finishes with graph.updateTaskStatus using success or failure plus optional failureReason.

  6. Child tasks run with task metadata. Each child task receives its persistentTaskMetadata and may fan out further.

  7. Connection deleted. onConnectionChange with action: 'DELETED' cleans up KVS entries, including stored root task IDs.

Best Practices

  • Make scheduling idempotent. Always look up an existing root taskId for the connection and task type before calling scheduleOrUpdateTask.
  • Keep tasks small. A task should do a bounded amount of work and fan out the rest as children.
  • Use persistentTaskMetadata for task identity. Store what slice of data the task owns there, not mutable execution state.
  • Always report status. Wrap the runner body in try/catch and ensure graph.updateTaskStatus is called from both success and failure paths.
  • Classify failures precisely. Recognizing conditions such as UNAUTHORIZED, RATE_LIMITED, and AUTH_TOKEN_EXPIRED lets the platform and admin UI respond intelligently.
  • Do not throw from your status updater. A failure to update status should be logged, not propagated, so the original task result is not hidden.
  • Sanitize logs. Connector form fields can contain secrets such as API keys or OAuth tokens.
  • Use the SDK's FORGE_TASK_TYPES directly. Avoid maintaining a parallel enum.

SDK Reference

All three orchestration methods live on the graph export from @forge/teamwork-graph:

1
2
import { graph, types } from '@forge/teamwork-graph';

// Schedule (or update) a recurring root task for a connection.
graph.scheduleOrUpdateTask(request: types.TaskScheduleRequest)
  : Promise<types.TaskScheduleResponse>;

// Spawn a child task from inside a running task.
graph.scheduleChildTask(request: types.ChildTaskScheduleRequest)
  : Promise<types.ChildTaskScheduleResponse>;

// Report success/failure for the current task execution.
graph.updateTaskStatus(request: types.TaskStatusUpdateRequest)
  : Promise<types.TaskStatusUpdateResponse>;

Key request shapes from the SDK type definitions:

1
2
type TaskScheduleRequest = {
  connectionId: string;
  scheduleInterval: { value: number; timeUnit: 'minutes' | 'hours' | 'days' };
  task: { taskType: ForgeTaskType; taskId: string };
};

type ChildTaskScheduleRequest = {
  scanId: string;
  taskExecutionId: string;
  connectionId: string;
  task: {
    parentTaskId: string;
    taskType: ForgeTaskType;
    taskId: string;
    persistentTaskMetadata?: any;
    temporalTaskMetadata?: any;
  };
};

type TaskStatusUpdateRequest = {
  scanId: string;
  taskExecutionId: string;
  status: 'success' | 'failure';
  task: { taskId: string };
  failureReason?:
    | 'ENTITY_NOT_FOUND'
    | 'RETRY_LIMIT_EXCEEDED'
    | 'NON_RETRYABLE_ERROR'
    | 'UNAUTHORIZED'
    | 'AUTH_TOKEN_REVOKED'
    | 'AUTH_TOKEN_EXPIRED'
    | 'RATE_LIMITED'
    | 'RETRYABLE_ERROR';
  connectionId: string;
};

The payload delivered to your taskRunner looks like this:

1
2
interface TaskRunnerPayload {
  connectionId: string;
  appId: string;
  extensionKey: string;
  environmentId: string;
  scanId: string;
  taskExecutionId: string;
  taskId: string;
  connectorFormFields: Record<string, any>;
  taskDescription?: string;
  taskMetadata?: any;
}

For a runnable end-to-end example, see the Google Drive connector example app, including its src/task-management.ts, src/connection-management.ts, and manifest.yml.

Rate this page: