Resequence
The Resequencer from the EIP patterns allows you to reorganise messages based on some comparator.
By default in Camel we use an Expression to create the comparator; so that you can compare by a message header or the body or a piece of a message etc.
Options
The Resequence EIP supports 1 options which are listed below:
Name | Description | Default | Type |
---|---|---|---|
resequencerConfig |
To configure the resequencer in using either batch or stream configuration. Will by default use batch configuration. |
ResequencerConfig |
Camel supports two resequencing algorithms:
-
Batch resequencing collects messages into a batch, sorts the messages and sends them to their output.
-
Stream resequencing re-orders (continuous) message streams based on the detection of gaps between messages.
By default the Resequencer does not support duplicate messages and will only keep the last message, in case a message arrives with the same message expression. However in the batch mode you can enable it to allow duplicates.
Batch Resequencing
The following example shows how to use the batch-processing resequencer so that messages are sorted in order of the body() expression. That is messages are collected into a batch (either by a maximum number of messages per batch or using a timeout) then they are sorted in order and then sent out to their output.
from("direct:start")
.resequence().body()
.to("mock:result");
This is equivalent to
from("direct:start")
.resequence(body()).batch()
.to("mock:result");
The batch-processing resequencer can be further configured via the size()
and timeout()
methods.
from("direct:start")
.resequence(body()).batch().size(300).timeout(4000L)
.to("mock:result")
This sets the batch size to 300 and the batch timeout to 4000 ms (by default, the batch size is 100 and the timeout is 1000 ms). Alternatively, you can provide a configuration object.
from("direct:start")
.resequence(body()).batch(new BatchResequencerConfig(300, 4000L))
.to("mock:result")
So the above example will reorder messages from endpoint direct:a in order of their bodies, to the endpoint mock:result.
Typically you’d use a header rather than the body to order things; or maybe a part of the body. So you could replace this expression with
resequence(header("mySeqNo"))
for example to reorder messages using a custom sequence number in the header mySeqNo
.
You can of course use many different Expression languages such as XPath, XQuery, SQL or various Scripting Languages.
And an example in XML
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<route>
<from uri="direct:start" />
<resequence>
<simple>body</simple>
<to uri="mock:result" />
<!--
batch-config can be ommitted for default (batch) resequencer settings
-->
<batch-config batchSize="300" batchTimeout="4000" />
</resequence>
</route>
</camelContext>
Allow Duplicates
In the batch
mode, you can now allow duplicates. In Java DSL there is a allowDuplicates()
method and in Spring XML there is an allowDuplicates=true
attribute on the <batch-config/>
you can use to enable it.
Reverse
In the batch
mode, you can now reverse the expression ordering. By default the order is based on 0..9,A..Z, which would let messages with low numbers be ordered first, and thus also also outgoing first. In some cases you want to reverse order, which is now possible.
In Java DSL there is a reverse()
method and in Spring XML there is an reverse=true
attribute on the <batch-config/>
you can use to enable it.
Resequence JMS messages based on JMSPriority
It’s now much easier to use the Resequencer to resequence messages from JMS queues based on JMSPriority. For that to work you need to use the two new options allowDuplicates
and reverse
.
from("jms:queue:foo")
// sort by JMSPriority by allowing duplicates (message can have same JMSPriority)
// and use reverse ordering so 9 is first output (most important), and 0 is last
// use batch mode and fire every 3th second
.resequence(header("JMSPriority")).batch().timeout(3000).allowDuplicates().reverse()
.to("mock:result");
Notice this is only possible in the batch
mode of the Resequencer.
Ignore invalid exchanges
The Resequencer EIP throws a CamelExchangeException
if the incoming Exchange is not valid for the resequencer - ie. the expression cannot be evaluated, such as a missing header.
You can use the option ignoreInvalidExchanges
to ignore these exceptions which means the Resequencer will then skip the invalid Exchange.
from("direct:start")
.resequence(header("seqno")).batch().timeout(1000)
// ignore invalid exchanges (they are discarded)
.ignoreInvalidExchanges()
.to("mock:result");
This option is available for both batch and stream resequencer.
Reject Old Exchanges
This option can be used to prevent out of order messages from being sent regardless of the event that delivered messages downstream (capacity, timeout, etc). If enabled using rejectOld()
, the Resequencer will throw a MessageRejectedException
when an incoming Exchange is "older" (based on the Comparator) than the last delivered message. This provides an extra level of control with regards to delayed message ordering.
from("direct:start")
.onException(MessageRejectedException.class).handled(true).to("mock:error").end()
.resequence(header("seqno")).stream().timeout(1000).rejectOld()
.to("mock:result");
This option is available for the stream resequencer only.
Stream Resequencing
The next example shows how to use the stream-processing resequencer. Messages are re-ordered based on their sequence numbers given by a seqnum header using gap detection and timeouts on the level of individual messages.
from("direct:start").resequence(header("seqnum")).stream().to("mock:result");
The stream-processing resequencer can be further configured via the capacity()
and timeout()
methods.
from("direct:start")
.resequence(header("seqnum")).stream().capacity(5000).timeout(4000L)
.to("mock:result")
This sets the resequencer’s capacity to 5000 and the timeout to 4000 ms (by default, the capacity is 1000 and the timeout is 1000 ms). Alternatively, you can provide a configuration object.
from("direct:start")
.resequence(header("seqnum")).stream(new StreamResequencerConfig(5000, 4000L))
.to("mock:result")
The stream-processing resequencer algorithm is based on the detection of gaps in a message stream rather than on a fixed batch size. Gap detection in combination with timeouts removes the constraint of having to know the number of messages of a sequence (i.e. the batch size) in advance. Messages must contain a unique sequence number for which a predecessor and a successor is known. For example a message with the sequence number 3 has a predecessor message with the sequence number 2 and a successor message with the sequence number 4. The message sequence 2,3,5 has a gap because the successor of 3 is missing. The resequencer therefore has to retain message 5 until message 4 arrives (or a timeout occurs).
If the maximum time difference between messages (with successor/predecessor relationship with respect to the sequence number) in a message stream is known, then the resequencer’s timeout parameter should be set to this value. In this case it is guaranteed that all messages of a stream are delivered in correct order to the next processor. The lower the timeout value is compared to the out-of-sequence time difference the higher is the probability for out-of-sequence messages delivered by this resequencer. Large timeout values should be supported by sufficiently high capacity values. The capacity parameter is used to prevent the resequencer from running out of memory.
By default, the stream resequencer expects long sequence numbers but other sequence numbers types can be supported as well by providing a custom expression.
public class MyFileNameExpression implements Expression {
public String getFileName(Exchange exchange) {
return exchange.getIn().getBody(String.class);
}
public Object evaluate(Exchange exchange) {
// parser the file name with YYYYMMDD-DNNN pattern
String fileName = getFileName(exchange);
String[] files = fileName.split("-D");
Long answer = Long.parseLong(files[0]) * 1000 + Long.parseLong(files[1]);
return answer;
}
public <T> T evaluate(Exchange exchange, Class<T> type) {
Object result = evaluate(exchange);
return exchange.getContext().getTypeConverter().convertTo(type, result);
}
}
from("direct:start")
.resequence(new MyFileNameExpression()).stream().timeout(100).to("mock:result");
or custom comparator via the comparator()
method
ExpressionResultComparator<Exchange> comparator = new MyComparator();
from("direct:start")
.resequence(header("seqnum")).stream().comparator(comparator)
.to("mock:result");
or via a StreamResequencerConfig
object.
ExpressionResultComparator<Exchange> comparator = new MyComparator();
StreamResequencerConfig config = new StreamResequencerConfig(100, 1000L, comparator);
from("direct:start")
.resequence(header("seqnum")).stream(config)
.to("mock:result");
And an example in XML
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<route>
<from uri="direct:start"/>
<resequence>
<simple>in.header.seqnum</simple>
<to uri="mock:result" />
<stream-config capacity="5000" timeout="4000"/>
</resequence>
</route>
</camelContext>