Available: | Confluence 5.6 and later |
Confluence Journal Service provides access to journals that are durable FIFO queues with the following behavior:
Journals are accessed through com.atlassian.confluence.api.service.journal.JournalService
.
Each journal has its own journal identifier. Journals do not need to be explicitly created or removed by client code.
1 2private static final JournalIdentifier JOURNAL_ID = new JournalIdentifier("com.mycompany.myplugin_myjournal");
Journal entries have five fields:
id
: Monotonically increasing unique identifier for the entry. It is assigned automatically when an entry is queued. An entry with a larger id is considered to have happened after entries with a smaller id.journalId
: Identifies the journal that the entry belongs to. Maximum length is 255 characters. May only contain lower-case letters (English alphabet), digits, underscores and dot characters.creationDate
: Point in time when the entry was created. It is assigned automatically when an entry is queued.type
: Type of the entry. This is a journal specific free-form field. Maximum length is 255 characters.message
: Message of the entry. This is a journal specific free-form field. Maximum length is 2047 characters.1 2JournalEntry entry = new JournalEntry(JOURNAL_ID, "my_entry_type", "my_message"); journalService.enqueue(entry);
Entries should be added to the journal as the last thing in a transaction. If the transaction is not committed shortly after adding an entry, some nodes might miss the entry.
Entries are processed by calling JournalService#processEntries
method. It has the following signature:
1 2<V> V processEntries(@Nonnull JournalIdentifier journalId, int maxEntries, @Nonnull Function<Iterable<JournalEntry>, EntryProcessorResult<V>> entryProcessor) throws ServiceException;
processEntries
will call the given entryProcessor
function with at most maxEntries
entries which have the given journalId
. entryProcessor
will in turn create an instance of EntryProcessorResult
that contains a result
value that is returned to the caller of processEntries
. Depending on entryProcessor
return value, four things can happen on next call to processEntries
:
EntryProcessorResult.success(result)
causes none of the processed entries to be processed again.EntryProcessorResult.partial(result, lastSuccessfulId)
causes entries with an id larger than lastSuccessfulId
to be processed again.EntryProcessorResult.failure(result, failedEntryId)
causes entries with an id equal to or larger than failedEntryId to be processed again.RuntimeException
causes all the entries to be processed again.Do as much processing as possible in entryProcessor
. There is no way to process entries again if something goes wrong after returning from entryProcessor
.
Entries should be processed in small batches (i.e. small maxEntries
value) in order to reduce memory consumption during the processing. The following example shows how to process all the entries in batches. It also demonstrates how to ensure that already processed entries will not be processed again if a RuntimeException
is thrown when processing an entry.
1 2int successCount = 0; do { successCount = journalService.processEntries(JOURNAL_ID, BATCH_SIZE, new Function<Iterable<JournalEntry>, EntryProcessorResult<Integer>>() { @Override public EntryProcessorResult<Integer> apply(Iterable<JournalEntry> entries) { int count = 0; for (JournalEntry entry : entries) { try { // Do something with the entry } catch (RuntimeException e) { log.warn("Failed to process edge index task for entry '" + entry + "'", e); return EntryProcessorResult.failure(count, entry.getId()); } count++; } return EntryProcessorResult.success(count); } }); } while (successCount == BATCH_SIZE);
JournalService
also offers methods for retrieving current entries in the journal without removing them from journal (peek
), removing entries from journal without processing them (reset
) and counting the number of entries in the journal (countEntries)
.
Each cluster node keeps track of the entries that have been processed on the same node. This means each entry is processed on all cluster nodes.
A compatibility library is available for plugins that want to use Journal Service but need to stay compatible with older Confluence versions. In older versions of Confluence (5.5 and earlier) entries are stored in main memory. This means that unprocessed entries will be forgotten during restart and that entries are only processed on the local node in a cluster.
Maven dependency
1 2<dependency> <groupId>com.atlassian.confluence.journal</groupId> <artifactId>confluence-journal-compat</artifactId> <version>1.0</version> </dependency>
In addition to maven dependency, the compatibility library depends on Journal Service on Confluence 5.6 and later, thus related packaged need to be dynamically imported in OSGi instructions section in the plugin pom.xml:
Maven OSGi instructions
1 2<DynamicImport-Package> com.atlassian.confluence.api.model.journal;version="5.6", com.atlassian.confluence.api.service.journal;version="5.6" </DynamicImport-Package>
com.atlassian.confluence.journal.compat.JournalServiceFactory
is the entry point to the compatibility library, and it can be used to get a JournalService instance:
1 2JournalService journalService = JournalServiceFactory.getInstance();
Normally tests would invoke JournalService#waitForRecentEntriesToBecomeVisible()
after running code that added entries and before running code that needs to process those entries. If there are a lot of tests like this, the wait time can become significant. The following example code shows how to use the com.atlassian.confluence.test.JournalManagerBackdoor
service for temporarily disabling this wait. It should only be used in test code.
1 2long originalIgnoreSinceMillis = journalManagerBackdoor.getIgnoreWithinMillis(); try { // This is safe as long as other threads are not adding journal // entries concurrently journalManagerBackdoor.setIgnoreWithinMillis(0); // Call code that processes entries } finally { journalManagerBackdoor.setIgnoreWithinMillis(originalIgnoreSinceMillis); }
Rate this page: