Introduction
This article aims at introducing commonly used implementation of Guava Service
interface in version 13.0.1 with concrete examples from CDAP. This article contains material adapted or quoted from https://github.com/google/guava/wiki/ServiceExplained and Java docs in Guava source code, but citation is omitted for readability and convenience.
Overview
Guava Service
interface represents an object with an operational state, with start()
and stop()
methods that return a ListenableFuture
that represents the result of an asynchronous transition to a desired state. Synchronous startAndWait()
and stopAndWait()
methods waits for the transition to a desired state to complete. For example, webservers, RPC servers, and timers, can implement the Service
interface.Managing the state of services like these, which require proper startup and shutdown management, can be nontrivial, especially if multiple threads or scheduling is involved. Guava provides some skeletons to manage the state logic and synchronization details for you.
Principles of Using a Service
1. A Service
implementation must implement only the valid state transitions:
The normal lifecycle of a
Service
is
Service.State.NEW
toService.State.STARTING
toService.State.RUNNING
toService.State.STOPPING
toService.State.TERMINATED
Any exceptions thrown at states Service.State.NEW
, Service.State.STARTING
, Service.State.RUNNING
or Service.State.STOPPING
will lead to Service.State.FAILED
.
Service.State.FAILED
will also be reached if there are failures or if Service#stop()
is called before the Service reaches the Service.State.RUNNING
state. Service#stop()
Service#addListener(Listener listener, Executor executor)
method can add an Service.Listener
that listens to every state transition of the Service
and execute certain actions with the Executor
. The set of legal transitions form a DAG (http://en.wikipedia.org/wiki/Directed_acyclic_graph), therefore every method of the listener will be called at most once. An implementation of the Service
interface must implement all the state transitions as defined here.
2. All state transitions must complete within finite amount of time and not be blocked:
A service may make use of other services. However, services should perform state transitions independently, without waiting for certain conditions of other services. Otherwise, state transitions may be blocked indefinitely when services waiting each other forms a situation like dead lock.
3. A Service
implementation must respond to start()
or startAndWait()
by transitioning to Service.State.RUNNING
When start()
or startAndWait()
is called for the first time in a Service, the service will respond by by transitioning form Service.State.NEW
to Service.State.RUNNING. Any exceptions happening during the state transition will cause the service to transition into Service.State.FAILED.
start()
returns a ListenableFuture representing an asynchronous computation of this state transition. On the other hand, startAndWait()
waits for the state transition to complete. Therefore, after startAndWait()
returns, the service should be guaranteed to be in Service.State.RUNNING
if no exception occurs.
4. A Service
implementation must respond to stop()
or stopAndWait()
by transitioning to a terminal state, and close all the threads and objects initialized by it:
Unless already at a terminal or Service.State.NEW
state, no matter what is the current state of a service, it must respond to stop()
asynchronously and stopAndWait() synchronously by transitioning to one of the terminal states: Service.State.TERMINATED,
or
if any exception occurs or the service hasn't reached Service.State.FAILED
. Since, a service cannot ever leave a terminal state, when a service reaches the Service.State.RUNNING
Service.State.FAILED
or Service.State.TERMINATED
state, all the threads and objects initialized when the service starts must be properly closed.
To correctly follow all the requirements listed above can be challenging. Therefore, it's strongly recommended to use the following abstract classes in Guava which implement this interface and make the threading and state management easier.
Implementations
AbstractService
When you need to do your own manual thread management, override AbstractService
directly. Typically, you should be well served by one of the below implementations, but implementing AbstractService
is recommended when, for example, you are modeling something that provides its own threading semantics as a Service
, you have your own specific threading requirements.
To implement AbstractService
you must implement 2 methods.
doStart()
:doStart()
is called directly by the first call tostart()
. The service is inService.State.STARTING
state whendoStart()
method is called.doStart()
should perform all initialization and then eventually callnotifyStarted()
to transition the service intoService.State.RUNNING
state if start up succeeded ornotifyFailed()
to transition the service intoService.State.FAILED
state if start up failed.doStop()
:doStop()
is called directly by the first call tostop()
only if the service inService.State.RUNNING
orService.State.STARTING
state, which meansdoStart()
must have completed successfully. YourdoStop()
method should shut down your service and then eventually callnotifyStopped()
if shutdown succeeded ornotifyFailed()
if shutdown failed.
start()
and stop()
methods of AbstractService run in the same thread in which they are called.
Your doStart()
and doStop()
methods should be fast, because the service object is locked when doStart()
or doStop()
is running, and no other state transition can happen when the service object is locked. If you need to do expensive initialization, such as reading files, opening network connections, or any operation that might block, you should consider moving that work to another thread.
An example implementation of AbstractService in CDAP is RetryOnStartFailureService. It wraps around another Service
such that, if the wrapped service failed to start, it will get restarted based on the RetryStrategy
.
Notice that RetryOnStartFailureService#doStart()
creates a new thread startupThread
and immediately calls notifyStarted()
after the asynchronous start of the startupThread, regardless of whether the wrapped service currentDelegate has successfully started. Therefore, calling RetryOnStartFailureService#startAndWait()
can only guarantee that this RetryOnStartFailureService
is in Service.State.RUNNING
but makes no guarantee that the wrapped service in RetryOnStartFailureService
has finished startup.
public class RetryOnStartFailureService extends AbstractService { ... private volatile Service currentDelegate; ... /** * Creates a new instance. * * @param delegate a {@link Supplier} that gives new instance of the delegating Service. * @param retryStrategy strategy to use for retrying */ public RetryOnStartFailureService(Supplier<Service> delegate, RetryStrategy retryStrategy) { this.delegate = delegate; this.currentDelegate = delegate.get(); this.delegateServiceName = currentDelegate.getClass().getSimpleName(); this.retryStrategy = retryStrategy; } @Override protected void doStart() { startupThread = new Thread("Endure-Service-" + delegateServiceName) { @Override public void run() { int failures = 0; long startTime = System.currentTimeMillis(); long delay = 0L; while (delay >= 0 && !stopped) { try { currentDelegate.start().get(); // Only assigned the delegate if and only if the delegate service started successfully startedService = currentDelegate; break; } catch (InterruptedException e) { // This thread will be interrupted from the doStop() method. Don't reset the interrupt flag. } catch (Throwable t) { ... } } } }; startupThread.start(); notifyStarted(); } @Override protected void doStop() { // doStop() won't be called until doStart() returns, hence the startupThread would never be null stopped = true; startupThread.interrupt(); Uninterruptibles.joinUninterruptibly(startupThread); // Stop the started service if it exists and propagate the stop state // There could be a small race between the delegate service started successfully and // the setting of the startedService field. When that happens, the stop failure state is not propagated. // Nevertheless, there won't be any service left behind without stopping. if (startedService != null) { Futures.addCallback(startedService.stop(), new FutureCallback<State>() { @Override public void onSuccess(State result) { notifyStopped(); } @Override public void onFailure(Throwable t) { notifyFailed(t); } }, Threads.SAME_THREAD_EXECUTOR); return; } // If there is no started service, stop the current delete, but no need to propagate the stop state // because if the underlying service is not yet started due to failure, it shouldn't affect the stop state // of this retrying service. if (currentDelegate != null) { currentDelegate.stop().addListener(new Runnable() { @Override public void run() { notifyStopped(); } }, Threads.SAME_THREAD_EXECUTOR); return; } // Otherwise, if nothing has been started yet, just notify this service is stopped notifyStopped(); } ... }
An example implementation of AbstractService in CDAP is RetryOnStartFailureService. It wraps around another Service
such that, if the wrapped service failed to start, it will get restarted based on the RetryStrategy
.
AbstractIdleService
The AbstractIdleService
skeleton implements a Service
which does not need to perform any action while in the "running" state -- and therefore does not need a thread while running -- but has startup and shutdown actions to perform. Implementing such a service is as easy as extending AbstractIdleService
and implementing the startUp()
and shutDown()
methods.
When AbstractIdleService#start()
is called, a new thread will be created to call startUp().
When
AbstractIdleService#stop()
is called, another new thread will be created to call
shutDown()
. Call AbstractIdleService#start().get()
to make sure the actions in startUp()
all complete. shutDown()
will only be called if startUp()
completed successfully.
ATTENTION:
the default implementation of AbstractIdleService#executor(final State state)
returns an executor with a new thread to run startUp()
. Therefore, when AbstractIdleService#startAndWait()
returns, there is no guarantee that startUp()
has finished, unless AbstractIdleService#executor(final State state)
is overridden with an executor runs the runnable in the same thread. When AbstractIdleService#stopAndWait()
returns, there is no guarantee that shutDown()
has finished either for the same reason.
If AbstractIdleService
is in Service.State.RUNNING
state, it will never transition into Service.State.TERMINATED
until stop()
or stopAndWait()
is called.
startUp()
has completed, but all other methods in AbstractNotificationService can only run after startUp()
completed. This is guaranteed by AbstractNotificationService#startAndWait()
being called before other methods in AbstractNotificationService are called. public abstract class AbstractNotificationService extends AbstractIdleService implements NotificationService { ... @Override protected void startUp() throws Exception { transactionSystemClient.startAndWait(); } @Override protected void shutDown() throws Exception { transactionSystemClient.stopAndWait(); } ...
AbstractExecutionThreadService
An AbstractExecutionThreadService
performs startup, running, and shutdown actions in a single thread. You must override the run()
method, and it must respond to stop requests. For example, you might perform actions in a work loop:
public void run() { while (isRunning()) { // perform a unit of work } }
Alternately, you may override triggerShutdown()
in any way that causes run()
to return.
Overriding startUp()
and shutDown()
is optional, but the service state will be managed for you. startUp()
is guaranteed to be called before run()
and shutDown()
is guaranteed to be called after run()
returns or throws any exception.
Note that start()
calls your startUp()
method, creates a thread for you, and invokes run()
in that thread. stop()
calls triggerShutdown()
, so that shutDown()
will be executed in that thread after run()
returns. Then eventually the thread dies.
An example implementation of AbstractExecutionThreadService in CDAP is AggregatedMetricsCollectionService. triggerShutdown()
calls runThread.interrupt()
in order to cause run()
to return. run()
responds to stop()
by catching InterruptedException
raised by the interrupt in triggerShutdown()
, and exiting the while loop when isRunning()
returns false. isRunning()
will return false, when stop()
is called, since stop()
transitions the service to Service.State.STOPPING. shutDown()
performs a final action that flushes the metrics since there will be no more metrics to be flushed after run()
returns,
public abstract class AggregatedMetricsCollectionService extends AbstractExecutionThreadService implements MetricsCollectionService { ... @Override protected void startUp() throws Exception { runThread = Thread.currentThread(); } @Override protected final void run() { long sleepMillis = getInitialDelayMillis(); while (isRunning()) { try { TimeUnit.MILLISECONDS.sleep(sleepMillis); ... } catch (InterruptedException e) { // Expected when stop is called. break; } } } ... @Override protected void shutDown() throws Exception { // Flush the metrics when shutting down. publishMetrics(System.currentTimeMillis()); } @Override protected void triggerShutdown() { if (runThread != null) { runThread.interrupt(); } } ... }
AbstractScheduledService
An AbstractScheduledService
performs some periodic task while running. Subclasses implement runOneIteration()
to specify one iteration of the task, as well as the familiar startUp()
and shutDown()
methods.
executor()
returns the Executor
that will be used to run this service. The default implementation returns a new Executor
that sets the name of its threads to the string returned by getServiceName()
method. Subclass may override this method to use a custom Executor
.with a specific name, thread group or priority. startUp()
and shutDown()
methods will be run in the Executor returned by executor()
Unlike in AbstractExecutionThreadService
, it's not guaranteed that
startUp()
has been called when shutDown()
is called.
To describe the execution schedule, you must implement the scheduler()
method. Typically, you will use one of the provided schedules from AbstractScheduledService.Scheduler
, either newFixedRateSchedule(initialDelay, delay, TimeUnit)
or newFixedDelaySchedule(initialDelay, delay, TimeUnit)
, corresponding to the familiar methods in ScheduledExecutorService
. Custom schedules can be implemented using CustomScheduler
; see the Javadoc for details.
An example implementation of AbstractExecutionThreadService in CDAP is AbstractResourceReporter. It writes out resource metrics every reportInterval
seconds which is defined in scheduler()
.
public abstract class AbstractResourceReporter extends AbstractScheduledService implements ProgramResourceReporter { ... protected void runOneIteration() throws Exception { reportResources(); } @Override protected void shutDown() throws Exception { if (executor != null) { executor.shutdownNow(); } } protected Scheduler scheduler() { return Scheduler.newFixedRateSchedule(0, reportInterval, TimeUnit.SECONDS); } @Override protected final ScheduledExecutorService executor() { executor = Executors.newSingleThreadScheduledExecutor(Threads.createDaemonThreadFactory("reporter-scheduler")); return executor; } }