Runtimes
Web triggers
Async functions
Product REST APIs
Fetch APIs

Forge Long Running Compute is now accessible through Forge's Early Access Program (EAP). For details on how to sign up for the EAP, see the changelog announcement.

EAPs are offered to selected users for testing and feedback purposes. APIs and features under EAP are unsupported and subject to change without notice. APIs and features under EAP are not recommended for use in production environments.

For more details, see Forge EAP, Preview, and GA.

Async events API

This API enables Forge apps to push events and execute them asynchronously in the background.

One use case for async events is an app with a function that requires more than the Forge function maximum of 25 seconds to run, such as an AI client or import function. The app can use the async events API to send events to a consumer function with a longer maximum runtime that allows more processing before timeout.

Get started

To import the Forge events package and instantiate the queue in your app, run:

1
2
import { Queue } from '@forge/events';

const queue = new Queue({ key: 'queue-name' });

Push API

Pushes events to the queue to be processed later. The events processing can be delayed up to 15 minutes using the delayInSeconds setting. A maximum of 50 events can be pushed per request, up to a maximum combined payload of 200kb.

Method signature

1
2
type Payload = string | number | boolean | { [key: string]: Payload };
type PushSettings = { delayInSeconds: number }

/**
 * @returns Id of the job created
 * */
await queue.push(Payload | Payload[], PushSettings)

Example

To push event(s) to the queue:

1
2
// Push a single event with string payload
await queue.push('hello world');

// Push a single event with JSON payload
await queue.push({ "hello": "world" });

// Push multiple events to the queue
await queue.push(["hello", "world"]);

// Delay the processing of the event by 5 seconds
await queue.push("hello world", {
	delayInSeconds: 5
})

Event consumer

To create an event consumer module in the app manifest, use:

1
2
modules:
  consumer:
    - key: queue-consumer
      # Name of the queue for which this consumer will be invoked
      queue: queue-name
      resolver:
        function: consumer-function
        # resolver function to be called with payload
        method: event-listener
  function:
    - key: consumer-function
      handler: consumer.handler

If you are in the Forge Long Running Compute EAP, you have the option of including an optional timeoutSeconds parameter that specifies a maximum runtime for that async event consumer function so that it can run longer than the default of 55 seconds. You can specify up to 900 seconds (15 minutes) as follows:

1
2
modules:
  consumer:
    - key: queue-consumer
      # Name of the queue for which this consumer will be invoked
      queue: queue-name
      resolver:
        function: consumer-function
        # resolver function to be called with payload
        method: event-listener
  function:
    - key: consumer-function
      handler: consumer.handler
      timeoutSeconds: 600

To create the resolver for the consumer module, use:

1
2
import Resolver from "@forge/resolver";
const resolver = new Resolver();

resolver.define("event-listener", async ({ payload, context }) => {
	// process the event
});

export const handler = resolver.getDefinitions();

Tracking progress of events

When you push events to the queue, a new job is created. This job’s id is returned from the push API. To get the job’s stats using the id, use:

1
2
// Get the job ID
const jobId = await queue.push(['event1', 'event2']);

// Get the JobProgress object
const jobProgress = queue.getJob(jobId);

// Get stats of a particular job
const response = await jobProgress.getStats();
const {success, inProgress, failed} = await response.json();

Job ID is also accessible via the context parameter of your resolver function:

1
2
resolver.define("event-listener", async ({ payload, context }) => {
  // Get the JobProgress object
  const jobProgress = queue.getJob(context.jobId);
});

Cancel a job in progress

You can cancel a job that’s in progress using its JobProgress instance. When a job is canceled, events of that job are no longer processed.

1
2
resolver.define("event-listener", async ({ payload, context }) => {
  // Get the JobProgress object
  const jobProgress = queue.getJob(context.jobId);

  try {
    // process the event
  } catch (error) {
    // You can cancel the job when an error happens
    await jobProgress.cancel()
  }
});

Retry events

1. Retry for app level errors

You can request a retry for a product event trigger by returning an InvocationError object. This is defined in the @forge/events package

You can only retry an event for a maximum of four times.

Additional options can be included in the InvocationError via a RetryOptions object, allowing you to provide more information about what went wrong and configure the retry.

Event payload schema

1
2
RetryOptions {
  retryAfter: number // retry trigger after in seconds
  retryReason: InvocationErrorCode // reason why the error occured
  retryData: any // additional data to assist retry logic
}

enum InvocationErrorCode {
  // There was a rate limit upstream that caused the Application to fail.
  FUNCTION_UPSTREAM_RATE_LIMITED = "FUNCTION_UPSTREAM_RATE_LIMITED",
  // Some application level error occurred and a retry is warranted
  FUNCTION_RETRY_REQUEST = "FUNCTION_RETRY_REQUEST"
} 

retryAfter limitation

The maximum retryAfter value is 900 seconds (15 minutes). Any retryAfter values exceeding this limit are lowered to 900 seconds.

retryReason values

ValueDescription
FUNCTION_UPSTREAM_RATE_LIMITEDRate limit upstream that caused the app to fail
FUNCTION_RETRY_REQUESTUnclassified error occurred during the app that the developer would like to retry

Example for requesting a retry

In the following sample code, the app calls an external API and is rate limited. A retry is requested with a timeout that is equal to the backoff time provided by the external API and the retry reason is FUNCTION_UPSTREAM_RATE_LIMITED as there was an upstream dependency that was rate limited.

1
2
import Resolver from '@forge/resolver';
import {InvocationError, InvocationErrorCode} from "@forge/events";

resolver.define('consumeMyQueue', async ({payload, context}) => {
  const userName = 'john';
  const response = await callExternalApi(username);

  if(response.headers.has('Retry-After')){
    return new InvocationError({

      // The App can request the retry to happen after a certain time period elapses

      retryAfter: parseInt(response.headers.get('Retry-After')),

      // The App should provide a reason as to why they are retrying.

      // This reason will be feedback to the event payload on the retry

      // and is to help the developer discern the initial failure reason.

      retryReason: InvocationErrorCode.FUNCTION_UPSTREAM_RATE_LIMITED,
      retryData: {
        userName: userName
      }
    });
  }
});

Example for handling a retry

In the following sample code the app checks whether or not the retryContext field exists in the event payload. If it exists, it means that the app is currently handling a retry and the app can handle the retry accordingly. The app can request another retry if another retryable error occurs, but note that the event can only be retried up to 4 times.

Properties

There will be one extra object retryContext in the event payload for a retry. This object contains three properties:

  • retryCount: number of times (max 4) the app requested for retry
  • retryReason: reason why the error occurred
  • retryData: additional data to assist retry logic

retryReason and retryData will be populated with the same values as were given when requesting the retry with InvocationError.

1
2
import { RetryOptions, InvocationError, InvocationErrorCode } from "@forge/events"

resolver.define('consumeMyQueue', async ({payload, context}) => {
  try {
    // retryContext will be populated if this is a retry
    if (payload.retryContext) {
      const { retryCount, retryReason, retryData } = payload.retryContext;
      handleRetry(retryCount, retryReason, retryData, payload);
    }
    else {
      handleEvent(payload);
    }
  } catch(error) {
    // If the event is retryable, the App can request for another retry
    // although, note that the maximum number of retries is 4 
    if (e instanceof RetryableException) {
      const retryOptions: RetryOptions = {
        retryAfter: calculateBackOffTime(payload.retryContext),
        retryReason: InvocationErrorCode.FUNCTION_RETRY_REQUEST,
        retryData: getRetryData(e, payload)
      }
      return new InvocationError(retryOptions);
    }
  }
});

2. Retry for platform level errors

Platform level errors cannot be captured by the app. Examples include timeouts and Out of memory(OOM) errors. If a platform error occurs, the Forge platform will automatically retry the event on behalf of you.

retryReason values

ValueDescription
FUNCTION_OUT_OF_MEMORYThe function ran out of memory (allocated by Forge Platform/Lambda) in the previous attempt
FUNCTION_TIME_OUTThe function timed out during the previous attempt
FUNCTION_PLATFORM_RATE_LIMITEDAn infrastructure Quota/Rate Limit occurred during the previous attempt at running the function
FUNCTION_PLATFORM_UNKNOWN_ERRORAn undefined error occurred during the previous attempt at running the function

Rate this page: