Graceful Shutdown
Camel supports a pluggable shutdown strategy using
org.apache.camel.spi.ShutdownStrategy
. Its responsible for shutting
down routes in a graceful manner. The other resources will still be
handled by CamelContext to shutdown. This leaves
the problem at hand with properly shutting down all the routes in a
reliable manner to the ShutdownStrategy
.
Camel provides a default strategy in the
org.apache.camel.impl.engine.DefaultShutdownStrategy
which is capable of
doing that.
DefaultShutdownStrategy
The default strategy will gracefully shutdown routes:
-
in the reverse order they was started. The option
shutdownRoutesInReverseOrder
can be used to use the old behavior. -
let pending and current in flight exchanges run to completion before shutting down
-
using a timeout of 45 seconds which then forces a shutdown now
You can configure the timeout, and whether it should shutdown now remaining routes when the timeout occurred or ignore. See the setters on the class.
It will output to log the progress during graceful shutdown as shown in an example below
2009-12-20 10:56:53,055 [main ] INFO DefaultCamelContext - Apache Camel
(CamelContext:camel-1) is stopping 2009-12-20 10:56:53,056 [main ] INFO
DefaultShutdownStrategy - Starting to graceful shutdown routes (timeout
300 seconds) 2009-12-20 10:56:53,059 [1: ShutdownTask] INFO
DefaultShutdownStrategy - Waiting as there are still 5 inflight
exchanges to complete before we can shutdown 2009-12-20 10:56:54,060 [1:
ShutdownTask] INFO DefaultShutdownStrategy - Waiting as there are still
4 inflight exchanges to complete before we can shutdown 2009-12-20
10:56:55,061 [1: ShutdownTask] INFO DefaultShutdownStrategy - Waiting as
there are still 3 inflight exchanges to complete before we can shutdown
2009-12-20 10:56:56,065 [1: ShutdownTask] INFO DefaultShutdownStrategy -
Waiting as there are still 2 inflight exchanges to complete before we
can shutdown 2009-12-20 10:56:57,066 [1: ShutdownTask] INFO
DefaultShutdownStrategy - Waiting as there are still 1 inflight
exchanges to complete before we can shutdown 2009-12-20 10:56:58,069
[main ] INFO DefaultShutdownStrategy - Graceful shutdown of routes
complete in 5 seconds. 2009-12-20 10:56:58,072 [main ] INFO
DefaultInflightRepository - Shutting down with no inflight exchanges.
2009-12-20 10:56:58,077 [main ] INFO DefaultCamelContext - Apache Camel
(CamelContext:camel-1) stopped
Notice how it waits while there are inflight exchanges still being processed before it can shutdown.
Suppressing logging due to timeout not allowing all inflight messages to complete
If a graceful shutdown could not shutdown cleanly within the given timeout period, then Camel performs a more aggressive shutdown by forcing routes and thread pools etc to shutdown. And as well the routing engine will reject continue processing Exchanges. If this happens you may see WARN logs about Exchanges being rejected and other failures due the forced shutdown.
If you do not want to see these logs, you can suppress this by setting the option SuppressLoggingOnTimeout to true.
context.getShutdownStrategy().setSuppressLoggingOnTimeout(true);
Notice the suppress is a "best effort" though there may still be some logs coming from 3rd party libraries and whatnot, which Camel cannot control.
Logging inflight exchange information on timeout
If a graceful shutdown could not shutdown cleanly within the given timeout period, then Camel performs a more aggressive shutdown by forcing routes and thread pools etc to shutdown. When the timeout happens, then Camel logs information about the current inflight exchanges, which shows from which route the exchange origins, and where it currently is being routed. For example the logging below, shows that there is 1 inflight exchange, that origins from route1, and currently is still in route1 at the "delay1" node. The elapsed is time in millis how long at the current node (eg delay1) and duration is total time in mills.
If you enable DEBUG logging level
on org.apache.camel.impl.engine.DefaultShutdownStrategy
then it logs the same
inflight exchange information during graceful shutdown
2015-01-12 13:23:23,656 [ - ShutdownTask] INFO DefaultShutdownStrategy -
There are 1 inflight exchanges: InflightExchange:
[exchangeId=ID-davsclaus-air-62213-1421065401253-0-3,
fromRouteId=route1, routeId=route1, nodeId=delay1, elapsed=2007,
duration=2017]
If you do not want to see these logs, you can turn this off by setting the option logInflightExchangesOnTimeout to false.
context.getShutdownStrategy().setLogInflightExchangesOnTimeout(false);
Controlling ordering of routes
You can configure the order in which routes should be started, and thus also the same order they are being shutdown. See more at Configuring route startup ordering and autostartup.
Fine grained configuration
You can control two areas that influence graceful shutdown in the Camel routing:
-
ShutdownRoute
-
ShutdownRunningTask
These options can be configured on two scopes: context
and route
.
Where a route will fallback to the context
scoped option, if not
explicit configured. (same principle as Error
Handler, etc.).
ShutdownRoute
This option can control how a given route should act during graceful
shutdown. It has two values Default
and Defer
. The Default
is
obviously the default option which lets Camel shutdown the route as
early as possible. The Defer
is used to defer shutting down this route
to a later stage. This is useful when other routes are dependent upon
it. For example an internal route which other routes reuse.
For example in the route below we have two routes, where route 1 is
dependent upon route 2. At shutdown we want route 1 to complete all its
current messages and we also want the 2nd route to do this as well. So
we can mark both routes to Defer
but since route 1 is a
SEDA based route its Defer
by default (it uses
ShutdownAware
).
A Java DSL based example to defer shutting down the 2nd:
package org.apache.camel.processor;
import java.io.File;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.camel.Component;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.file.FileConsumer;
import org.apache.camel.component.file.FileEndpoint;
import org.apache.camel.component.file.GenericFileOperations;
import org.apache.camel.component.mock.MockEndpoint;
import org.junit.Before;
import org.junit.Test;
import static org.apache.camel.ShutdownRoute.Defer;
public class ShutdownDeferTest extends ContextTestSupport {
private static final AtomicBoolean CONSUMER_SUSPENDED = new AtomicBoolean();
@Override
@Before
public void setUp() throws Exception {
deleteDirectory("target/deferred");
super.setUp();
}
@Test
public void testShutdownDeferred() throws Exception {
MockEndpoint bar = getMockEndpoint("mock:bar");
bar.expectedMinimumMessageCount(1);
template.sendBody("seda:foo", "A");
template.sendBody("seda:foo", "B");
template.sendBody("seda:foo", "C");
template.sendBody("seda:foo", "D");
template.sendBody("seda:foo", "E");
assertMockEndpointsSatisfied();
Thread.sleep(50);
context.stop();
assertFalse("Should not have been suspended", CONSUMER_SUSPENDED.get());
}
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@Override
// START SNIPPET: e1
public void configure() throws Exception {
from("seda:foo")
.startupOrder(1)
.to("file://target/deferred");
// use file component to transfer files from route 1 -> route 2 as it
// will normally suspend, but by deferring this we can let route 1
// complete while shutting down
MyDeferFileEndpoint defer = new MyDeferFileEndpoint("file://target/deferred?initialDelay=0&delay=10", getContext().getComponent("file"));
defer.setFile(new File("target/deferred"));
from(defer)
// defer shutting down this route as the 1st route depends upon it
.startupOrder(2).shutdownRoute(Defer)
.to("mock:bar");
}
// END SNIPPET: e1
};
}
private static final class MyDeferFileEndpoint extends FileEndpoint {
private MyDeferFileEndpoint(String endpointUri, Component component) {
super(endpointUri, component);
}
@Override
protected FileConsumer newFileConsumer(Processor processor, GenericFileOperations<File> operations) {
return new FileConsumer(this, processor, operations, createGenericFileStrategy()) {
@Override
protected void doSuspend() throws Exception {
CONSUMER_SUSPENDED.set(true);
super.doSuspend();
}
};
}
}
}
Defer shutting down internal routes only
Its best to only defer shutting down internal routes only. As public routes should shutdown as quickly as possible otherwise it will just keep intake new messages which will delay the shutdown processor. Or even have it timeout if a lot of new messages keep coming in.
ShutdownRunningTask
This option control how a given route consumer acts during shutdown.
Most route consumer will only operate on a single task (message),
however the Batch Consumer can operate on many
messages (in a batch). This option is for those kind of consumers. By
default it uses the option CompleteCurrentTaskOnly
which mean that the
current in progress task (message) will be completed and then the
consumer will shutdown. The other option CompleteAllTasks
allows the
consumer to complete all the tasks (messages) before shutting down. For
example a File consumer will process all the pending
files it has picked up before shutting down.
package org.apache.camel.processor;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.ShutdownRunningTask;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.junit.Before;
import org.junit.Test;
public class ShutdownCompleteAllTasksTest extends ContextTestSupport {
private static String url = "file:target/pending?initialDelay=0&delay=10";
private static AtomicInteger counter = new AtomicInteger();
private static CountDownLatch latch = new CountDownLatch(2);
@Override
@Before
public void setUp() throws Exception {
deleteDirectory("target/pending");
super.setUp();
template.sendBodyAndHeader(url, "A", Exchange.FILE_NAME, "a.txt");
template.sendBodyAndHeader(url, "B", Exchange.FILE_NAME, "b.txt");
template.sendBodyAndHeader(url, "C", Exchange.FILE_NAME, "c.txt");
template.sendBodyAndHeader(url, "D", Exchange.FILE_NAME, "d.txt");
template.sendBodyAndHeader(url, "E", Exchange.FILE_NAME, "e.txt");
}
@Test
public void testShutdownCompleteAllTasks() throws Exception {
// give it 30 seconds to shutdown
context.getShutdownStrategy().setTimeout(30);
// start route
context.startRoute("foo");
MockEndpoint bar = getMockEndpoint("mock:bar");
bar.expectedMinimumMessageCount(1);
assertMockEndpointsSatisfied();
int batch = bar.getReceivedExchanges().get(0).getProperty(Exchange.BATCH_SIZE, int.class);
// wait for latch
latch.await(10, TimeUnit.SECONDS);
// shutdown during processing
context.stop();
// should route all
assertEquals("Should complete all messages", batch, counter.get());
}
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@Override
// START SNIPPET: e1
public void configure() throws Exception {
from(url).routeId("foo").noAutoStartup()
// let it complete all tasks during shutdown
.shutdownRunningTask(ShutdownRunningTask.CompleteAllTasks)
.process(new MyProcessor())
.to("mock:bar");
}
// END SNIPPET: e1
};
}
public static class MyProcessor implements Processor {
public void process(Exchange exchange) throws Exception {
counter.incrementAndGet();
latch.countDown();
}
}
}
JMX managed
The ShutdownStrategy
is JMX aware as well so you can manage it from a
JMX console. For example you can change the timeout value.
Shutting down individual routes
It is possible to gracefully shutdown an individual route using
shutdownRoute(routeId)
method on CamelContext
. Its also possible to
provide a specific timeout to use instead of the default timeout
settings using shutdownRoute(routeId, timeout, timeUnit)
.
Developer related
If you develop your own Camel component or want to implement your own shutdown strategy then read this section for details.
ShutdownStrategy
You can implement your own strategy to control the shutdown by
implementing the org.apache.camel.spi.ShutdownStrategy
and the set it
on the CamelContext
using the setShutdownStrategy
method.
When using Spring XML you then just define a spring bean which
implements the org.apache.camel.spi.ShutdownStrategy
and Camel will
look it up at startup and use it instead of its default. See more at
Advanced
configuration of CamelContext using Spring.
ShutdownAware
The interface org.apache.camel.spi.ShutdownAware
is an optional
interface consumers can implement to have fine grained control during
shutdown. The ShutdownStrategy
must be able to deal with consumers
which implement this interface. This interface was introduced to cater
for in memory consumers such as SEDA which potentially
have a number of pending messages on its internal in memory queues. What
this allows is to let it control the shutdown process to let it complete
its pending messages.
The method getPendingExchangesSize
should return the number of pending
messages which reside on the in memory queues.
The method deferShutdown
should return true
to defer the shutdown
to a later stage, when there are no more pending and inflight messages.
Batch Consumer should implement
ShutdownAware
so they properly support the ShutdownRunningTask
option. See GenericFileConsumer
for an example.
See Also
When Camel is starting up, then its Route Controller that handles this to ensure all the routes are started with support for managing troublesome routes with restaring.