taskRunner is the function module the platform invokes for every scheduled task in your
connector. It is the receiving end of scheduleOrUpdateTask and scheduleChildTask. Unlike the
other task operations, taskRunner is not a method you call on the SDK; it is a Forge function
your connector exposes and the platform calls into.
You declare it once in your manifest.yml under graph:connector.orchestration.taskRunner. The
platform then invokes it on every cadence tick of a root task and on every child task scheduled
from inside a root invocation.
1 2modules: graph:connector: - key: my-connector # ... datasource, capabilities, objectTypes, onConnectionChange ... orchestration: taskRunner: function: taskRunnerFn function: - key: taskRunnerFn handler: index.taskRunner
The function key under orchestration.taskRunner references a function entry by key. The
handler is the dotted path to the exported function in your code.
The default per-invocation execution budget is in the tens of seconds. To opt your taskRunner
into a 15-minute budget, add timeoutSeconds: 900 to the function entry:
1 2function: - key: taskRunnerFn handler: index.taskRunner timeoutSeconds: 900
timeoutSeconds is only valid on a function that is actively referenced by a consumer module
(such as graph:connector.orchestration.taskRunner). Adding it to an unreferenced function fails
manifest validation. See Long-running tasks.
1 2export async function taskRunner( request: TaskRunnerPayload, ): Promise<TaskRunnerResult>;
TaskRunnerPayloadThe payload the platform passes on every invocation. Define it in your code (the SDK does not currently export it as a named type):
1 2{ connectionId: string; // The connection this task belongs to appId: string; // The Forge app id extensionKey: string; // The graph:connector module key environmentId: string; // The Forge environment (development, staging, production) scanId: string; // The scheduled run this invocation is part of taskExecutionId: string; // Unique to this invocation attempt taskId: string; // The task being executed (root or child) connectorFormFields: Record<string, any>; // The form values configured for the connection taskDescription?: string; // Optional description string taskMetadata?: any; // Optional metadata payload (see "Metadata propagation" below) }
TaskRunnerResultThe shape your handler returns. Define it in your code:
1 2{ success: boolean; message?: string; }
The platform reads this to log the outcome and surface it in the connector UI. The return value
is independent of the updateTaskStatus call: returning { success: true } does not by itself
report success to the orchestrator. You must also call updateTaskStatus before returning.
1 2import { graph, types } from '@forge/teamwork-graph'; import { kvs } from '@forge/kvs'; interface TaskInfo { taskType: types.ForgeTaskType; connectionId: string; metadata?: Record<string, any>; completed?: boolean; } export async function taskRunner(request: TaskRunnerPayload) { const taskInfo = (await kvs.get(`taskInfo:${request.taskId}`)) as TaskInfo | undefined; if (!taskInfo) { await graph.updateTaskStatus({ scanId: request.scanId, taskExecutionId: request.taskExecutionId, connectionId: request.connectionId, status: 'failure', failureReason: 'ENTITY_NOT_FOUND', task: { taskId: request.taskId }, }); return { success: false, message: 'Task not found' }; } if (taskInfo.completed) { // Duplicate delivery of an already-finished task. Report success without // re-running side effects. See "At-least-once delivery" in concepts. await graph.updateTaskStatus({ scanId: request.scanId, taskExecutionId: request.taskExecutionId, connectionId: request.connectionId, status: 'success', task: { taskId: request.taskId }, }); return { success: true, message: 'Already completed (duplicate delivery)' }; } try { switch (taskInfo.taskType) { case types.FORGE_TASK_TYPES.ENTITY_INGESTION_INCREMENTAL: await runEntityIngestion(request, taskInfo); break; case types.FORGE_TASK_TYPES.USER_INGESTION_INCREMENTAL: await runUserSync(request); break; default: throw new Error(`Unsupported task type: ${taskInfo.taskType}`); } await kvs.set(`taskInfo:${request.taskId}`, { ...taskInfo, completed: true }); await graph.updateTaskStatus({ scanId: request.scanId, taskExecutionId: request.taskExecutionId, connectionId: request.connectionId, status: 'success', task: { taskId: request.taskId }, }); return { success: true }; } catch (error) { await graph.updateTaskStatus({ scanId: request.scanId, taskExecutionId: request.taskExecutionId, connectionId: request.connectionId, status: 'failure', failureReason: 'RETRYABLE_ERROR', task: { taskId: request.taskId }, }); return { success: false, message: String(error) }; } }
runEntityIngestion handles both the root tick and per-folder children for the same
ENTITY_INGESTION_INCREMENTAL taskType; it inspects taskInfo.metadata.folderId to know which
folder this invocation should walk.
The platform invokes the same taskRunner function for every scheduled task on the connection,
regardless of whether it is a root tick or a child fan-out. Children typically share their
parent's taskType, so the same taskRunner switch case handles both. Use TaskInfo.metadata
(read from KVS in the example above) to carry the per-invocation context that distinguishes the
root from a child, for example which folder this invocation should walk.
TaskRunnerPayload exposes an optional taskMetadata field, but in practice you should not
depend on it to carry the per-invocation context your handler needs. The temporalTaskMetadata
and persistentTaskMetadata fields you can pass on scheduleChildTask are platform-facing
inputs (the orchestrator may consume specific keys for its own scheduling logic); their
propagation back to the runner is not a stable contract today.
The reliable channel is to persist the context you need to KVS keyed by the child taskId
before scheduling, and read it back inside taskRunner. See
TaskInfo persistence.
connectorFormFields is always populatedBoth root and child invocations carry the connection's form configuration in
connectorFormFields. Use it for connection-scoped configuration (API keys, root folder IDs,
toggles). It is the same shape you receive in validateConnection and onConnectionChange.
The platform may invoke your taskRunner more than once for the same (taskId, scanId) with a
fresh taskExecutionId each time, and does not deduplicate child schedules by taskId
server-side. Your handler must be idempotent, and child taskIds must be deterministic so
duplicates land on the same KVS row your runner consults. See
At-least-once delivery and idempotency.
Always call updateTaskStatus before returning from taskRunner. The handler return value is
independent of the platform's view of task health; only updateTaskStatus updates that.
taskRunnerThe platform stops invoking your taskRunner for a given connection or task in any of these
situations:
orchestration.taskRunner and you redeploy.failureReason you reported via
updateTaskStatus.See Lifecycle.
Rate this page: