Improving microservice resilience in Liberty
We can use the MicroProfile Fault Tolerance feature to make service invocation more resilient. This feature is an implementation of Eclipse Microprofile Fault Tolerance Specification 1.0, 1.1, and 2.0. It provides a programming model to support resilient microservices through patterns that include retries, circuit breakers, bulkheads, timeouts, and fallbacks.
The latest information about MicroProfile Fault Tolerance is available on the Open Liberty website.
For more information about the open source MicroProfile specification that is implemented by the MicroProfile Fault Tolerance feature, see Eclipse Microprofile Fault Tolerance Specification 2.0.
- Add the mpFaultTolerance-1.0, mpFaultTolerance-1.1, or
mpFaultTolerance-2.0 feature to a featureManager element in your
server.xml file.
<featureManager> <feature>mpFaultTolerance-2.0</feature> </featureManager>
- Use a code snippet to improve microservice resilience.
A fault tolerance circuit breaker provides a way for systems to fail-fast. It temporarily
disables the running of a service to prevent the service from overloading a system.
- Circuit breaker code snippet 1: Create a CircuitBreakerBean with configured CircuitBreaker and Timeout
- CircuitBreaker code snippet 2: Use the CircuitBreakerBean
A fault tolerance RetryPolicy provides a way to configure when to retry services. If the main service fails, a fault tolerance fallback can specify a service to use.
- Fallback and Retry code snippet 1: FTServiceBean with configured FallbackHandler and Retry policy
- Fallback and Retry code snippet 2: A FallbackHandler, the code that is driven if the main service fails
- Fallback and Retry code snippet 3: Use the FTServiceBean
Fault tolerance provides the ability to run methods asynchronously by annotating them with @Asynchronous and making the method return Future or (if using mpFaultTolerance-2.0) CompletionStage. This functionality can make other fault tolerance annotations more powerful, for example allowing a Bulkhead to queue waiting method calls, or allowing a @Timeout to return to a result as soon as the time limit is reached, even if the method does not respond to being interrupted.
- Asynchronous code snippet 1: Create an AsynchronousBean with methods which return
- Asynchronous code snippet 2: Use the AsynchronousBean
A fault tolerance bulkhead limits the number of concurrent calls to a service. The bulkhead limits the amount of system resource that service invocations can use.
- Bulkhead code snippet 1: Create a BulkheadBean with configured Bulkhead
- Bulkhead code snippet 2: Use the BulkheadBean
- Configure annotation parameters
- We can override the parameters of any Fault Tolerance annotation at runtime using MicroProfile config. For more information on this feature, see Spec Chapter 12
- (When using mpFaultTolerance-1.1 or later) Metrics are automatically exported for each method annotated with a fault tolerance annotation so that it can be monitored. For more information on this feature, see Spec Chapter 11 and this OpenLiberty Blog Post
- (if using mpFaultTolerance-2.0) Fault Tolerance is implemented using a CDI interceptor. If we are using mpFaultTolerance-2.0 and the application also uses other CDI interceptors, we can adjust the priority of the Fault Tolerance interceptor to configure how it interacts with other interceptors. For information on how to do this, see Spec Chapter 3
- For a summary of the changes between different versions of the Fault Tolerance, please see the Spec release notes
Example
Circuit breaker code snippet 1: Create a CircuitBreakerBean with configured CircuitBreaker and Timeout.
@RequestScoped public class CircuitBreakerBean { private int executionCounterA = 0; // The combined effect of the specified requestVolumeThreshold and failureRatio is that 3 // failures will trigger the circuit to open. // After a 1 second delay the Circuit will allow fresh attempts to invoke the service. @CircuitBreaker(delay = 1, delayUnit = ChronoUnit.SECONDS, requestVolumeThreshold = 3, failureRatio = 1.0) // A service is considered to have timed out after 3 seconds @Timeout(value = 3, unit = ChronoUnit.SECONDS) public String serviceA() { executionCounterA++; if (executionCounterA <= 3) { //Sleep for 10 secs to force a timeout try { Thread.sleep(10000); } catch (InterruptedException e) { System.out.println("serviceA interrupted"); } }
CircuitBreaker code snippet 2: Use the CircuitBreakerBean.
@Inject CircuitBreakerBean bean; // FaultTolerance bean with circuit breaker, should fail 3 times for (int i = 0; i < 3; i++) { try { bean.serviceA(); throw new AssertionError("TimeoutException not caught"); } catch (TimeoutException e) { //expected } } // The CircuitBreaker should be open, so calling serviceA should generate a // CircuitBreakerOpenException. try { bean.serviceA(); throw new AssertionError("CircuitBreakerOpenException not caught"); } catch (CircuitBreakerOpenException e) { //expected } //allow time for the circuit to re-close Thread.sleep(3000); // The CircuitBreaker should be closed and serviceA should now succeed. String res = bean.serviceA(); if (!"serviceA: 4".equals(res)) { throw new AssertionError("Bad Result: " + res); }
Fallback and Retry code snippet 1: FTServiceBean with configured FallbackHandler and Retry policy.
@RequestScoped public class FTServiceBean { // Annotate serviceA with a named FallbackHandler and a Retry policy specifying the // number of retries. @Retry(maxRetries = 2) @Fallback(StringFallbackHandler.class) public String serviceA() { throw new RuntimeException("Connection failed"); return null; } }
Fallback and Retry code snippet 2: A FallbackHandler, the code that is driven if the main service fails.
@Dependent public class StringFallbackHandler implements FallbackHandler<String> { @Override public String handle(ExecutionContext context) { return "fallback for " + context.getMethod().getName(); } }
Fallback and Retry code snippet 3: Use the FTServiceBean.
private @Inject FTServiceBean ftServiceBean; try { // Call serviceA, which will be retried twice in the event of failure, after which // the FallbackHandler will be driven. String result = ftServiceBean.serviceA(); if(!result.contains("serviceA")) throw new AssertionError("The message should be \"fallback for serviceA\""); catch(RuntimeException ex) { throw new AssertionError("serviceA should not throw a RuntimeException"); }
Asynchronous code snippet 1: Create an AsynchronousBean with methods which return Future or CompletionStage.
@RequestScoped public class AsynchronousBean { @Asynchronous public Future<String> serviceA() { try { // Sleep to simulate work Thread.sleep(3000); } catch (InterruptedException e) { throw new RuntimeException("serviceA interrupted", e); } // Return the result in a completed CompletableFuture return CompletableFuture.completedFuture("serviceA OK"); } //Note: returning a CompletionStage requires Fault Tolerance 2.0 @Asynchronous public CompletionStage<String> serviceB() { try { // Sleep to simulate work Thread.sleep(3000); } catch (InterruptedException e) { throw new RuntimeException("serviceB interrupted", e); } // Return the result in a completed CompletableFuture (which implements CompletionStage) return CompletableFuture.completedFuture("serviceB OK"); } }
Asynchronous code snippet 2: Use the AsynchronousBean.
@Inject AsynchronousBean asyncBean; // serviceA and serviceB methods will run in parallel because they are annotated with @Asynchronous Future<String> resultA = asyncBean.serviceA(); CompletionStage<String> resultB = asyncBean.serviceB(); // The CompletionStage returned from serviceB allows us to add actions which take place when the serviceB method finishes resultB.thenAccept((r) -> System.out.println("ServiceB result: " + r)) .exceptionally((ex) -> { System.out.println("ServiceB failed"); ex.printStackTrace(); return null; }); // For the Future returned from serviceA, we need to wait for it to finish, then we can handle the result try { System.out.println("serviceA result: " + resultA.get()); } catch (ExecutionException ex) { System.out.println("ServiceA failed"); ex.printStackTrace(); } catch (InterruptedException ex) { System.out.println("Interrupted waiting for serviceA"); }
Bulkhead code snippet 1: Create a BulkheadBean with configured Bulkhead.
@RequestScoped @Asynchronous public class BulkheadBean { private final AtomicInteger connectATokens = new AtomicInteger(0); // Configure a Bulkhead that supports at most 2 concurrent threads. @Bulkhead(maxThreads = 2) public Future<Boolean> connectA(String data) throws InterruptedException { System.out.println("connectA starting " + data); int token = connectATokens.incrementAndGet(); try { if (token > 2) { throw new RuntimeException("Too many threads in connectA[" + data + "]: " + token); } Thread.sleep(5000); return CompletableFuture.completedFuture(Boolean.TRUE); } finally { connectATokens.decrementAndGet(); System.out.println("connectA complete " + data); } } }
Bulkhead code snippet 2: Use the BulkheadBean.
@Inject BulkheadBean bean; // connectA has a poolSize of 2 // The first two calls to connectA should be run straight away, in parallel, each around // 5 seconds Future<Boolean> future1 = bean.connectA("One"); Thread.sleep(100); Future<Boolean> future2 = bean.connectA("Two"); Thread.sleep(100); // The next two calls to connectA should wait until the first 2 have finished Future<Boolean> future3 = bean.connectA("Three"); Thread.sleep(100); Future<Boolean> future4 = bean.connectA("Four"); Thread.sleep(100); //total time should be just over 10s Thread.sleep(11000); if (!future1.get(1000, TimeUnit.MILLISECONDS)) { throw new AssertionError("Future1 did not complete properly"); } if (!future2.get(1000, TimeUnit.MILLISECONDS)) { throw new AssertionError("Future2 did not complete properly"); } if (!future3.get(1000, TimeUnit.MILLISECONDS)) { throw new AssertionError("Future3 did not complete properly"); } if (!future4.get(1000, TimeUnit.MILLISECONDS)) { throw new AssertionError("Future4 did not complete properly"); }