Confluence Journal Service

Available:

Confluence 5.6 and later

Introduction

Confluence Journal Service provides access to journals that are durable FIFO queues with the following behavior:

  • Journal entries are processed on all nodes when running in a cluster.
  • Entries are durable - receiving nodes do not need to be running when the event is queued.
  • New cluster nodes can process entries that were queued before the node was created.
  • Entries are guaranteed to be delivered in the same order that they were added.
  • Journals are bounded in size by the age of the entries, i.e. entries that are older than a set limit are cleared regularly.

Usage

Journals are accessed through com.atlassian.confluence.api.service.journal.JournalService.

Each journal has its own journal identifier. Journals do not need to be explicitly created or removed by client code. 

private static final JournalIdentifier JOURNAL_ID = new JournalIdentifier("com.mycompany.myplugin_myjournal");

Creating journal entries

Journal entries have five fields:

  1. id: Monotonically increasing unique identifier for the entry. It is assigned automatically when an entry is queued. An entry with a larger id is considered to have happened after entries with a smaller id.
  2. journalId: Identifies the journal that the entry belongs to. Maximum length is 255 characters. May only contain lower-case letters (English alphabet), digits, underscores and dot characters.
  3. creationDate: Point in time when the entry was created. It is assigned automatically when an entry is queued.
  4. type: Type of the entry. This is a journal specific free-form field. Maximum length is 255 characters.
  5. message: Message of the entry. This is a journal specific free-form field. Maximum length is 2047 characters.
JournalEntry entry = new JournalEntry(JOURNAL_ID, "my_entry_type", "my_message");
journalService.enqueue(entry);

Entries should be added to the journal as the last thing in a transaction. If the transaction is not committed shortly after adding an entry, some nodes might miss the entry.

Processing journal entries

Entries are processed by calling JournalService#processEntries method. It has the following signature:

<V> V processEntries(@Nonnull JournalIdentifier journalId,
                     int maxEntries,
                     @Nonnull Function<Iterable<JournalEntry>, EntryProcessorResult<V>> entryProcessor)
            throws ServiceException;

processEntries will call the given entryProcessor function with at most maxEntries entries which have the given journalId.  entryProcessor will in turn create an instance of EntryProcessorResult that contains a result value that is returned to the caller of processEntries. Depending on entryProcessor return value, four things can happen on next call to processEntries:

  1. Returning EntryProcessorResult.success(result) causes none of the processed entries to be processed again.
  2. Returning EntryProcessorResult.partial(result, lastSuccessfulId) causes entries with an id larger than lastSuccessfulId to be processed again.
  3. Returning EntryProcessorResult.failure(result, failedEntryId) causes entries with an id equal to or larger than failedEntryId to be processed again.
  4. Throwing a RuntimeException causes all the entries to be processed again.

Do as much processing as possible in entryProcessor. There is no way to process entries again if something goes wrong after returning from entryProcessor.

Entries should be processed in small batches (i.e. small maxEntries value) in order to reduce memory consumption during the processing. The following example shows how to process all the entries in batches. It also demonstrates how to ensure that already processed entries will not be processed again if a RuntimeException is thrown when processing an entry.

int successCount = 0;
do
{
    successCount = journalService.processEntries(JOURNAL_ID, BATCH_SIZE, new Function<Iterable<JournalEntry>, EntryProcessorResult<Integer>>()
    {
        @Override
        public EntryProcessorResult<Integer> apply(Iterable<JournalEntry> entries)
        {
            int count = 0;
            for (JournalEntry entry : entries)
            {
                try
                {
                    // Do something with the entry
                }
                catch (RuntimeException e)
                {
                    log.warn("Failed to process edge index task for entry '" + entry + "'", e);
                    return EntryProcessorResult.failure(count, entry.getId());
                }
                count++;
            }
            return EntryProcessorResult.success(count);
        }
    });
}
while (successCount == BATCH_SIZE);

Other operations

JournalService also offers methods for retrieving current entries in the journal without removing them from journal (peek), removing entries from journal without processing them (reset) and counting the number of entries in the journal (countEntries). 

Usage in cluster

Each cluster node keeps track of the entries that have been processed on the same node. This means each entry is processed on all cluster nodes.

Backwards compatibility

A compatibility library is available for plugins that want to use Journal Service but need to stay compatible with older Confluence versions. In older versions of Confluence (5.5 and earlier) entries are stored in main memory. This means that unprocessed entries will be forgotten during restart and that entries are only processed on the local node in a cluster.

Maven dependency
<dependency>
    <groupId>com.atlassian.confluence.journal</groupId>
    <artifactId>confluence-journal-compat</artifactId>
    <version>1.0</version>
</dependency>

In addition to maven dependency, the compatibility library depends on Journal Service on Confluence 5.6 and later, thus related packaged need to be dynamically imported in OSGi instructions section in the plugin pom.xml:

Maven OSGi instructions
<DynamicImport-Package>
    com.atlassian.confluence.api.model.journal;version="5.6",
    com.atlassian.confluence.api.service.journal;version="5.6"
</DynamicImport-Package>

com.atlassian.confluence.journal.compat.JournalServiceFactory is the entry point to the compatibility library, and it can be used to get a JournalService instance:

JournalService journalService = JournalServiceFactory.getInstance();

Testing

Normally tests would invoke JournalService#waitForRecentEntriesToBecomeVisible() after running code that added entries and before running code that needs to process those entries. If there are a lot of tests like this, the wait time can become significant. The following example code shows how to use the com.atlassian.confluence.test.JournalManagerBackdoor service for temporarily disabling this wait. It should only be used in test code.

long originalIgnoreSinceMillis = journalManagerBackdoor.getIgnoreWithinMillis();
try
{
    // This is safe as long as other threads are not adding journal
    // entries concurrently
    journalManagerBackdoor.setIgnoreWithinMillis(0);
    
    // Call code that processes entries
}
finally
{
    journalManagerBackdoor.setIgnoreWithinMillis(originalIgnoreSinceMillis);
}
Was this page helpful?

Have a question about this article?

See questions about this article

Powered by Confluence and Scroll Viewport