Guava services tutorial with examples

Introduction

This article aims at introducing commonly used implementation of  Guava  Service  interface in version 13.0.1 with concrete examples from CDAP. This article contains material adapted or quoted from https://github.com/google/guava/wiki/ServiceExplained and Java docs in Guava source code, but citation is omitted for readability and convenience.

 

Overview

Guava  Service  interface represents an object with an operational state, with  start() and stop() methods that return a ListenableFuture that represents the result of an asynchronous transition to a desired state. Synchronous startAndWait() and stopAndWait() methods waits for the transition to a desired state to complete. For example, webservers, RPC servers, and timers,  can implement the  Service  interface.Managing the state of services like these, which require proper startup and shutdown management, can be nontrivial, especially if multiple threads or scheduling is involved. Guava provides some skeletons to manage the state logic and synchronization details for you.

Principles of Using a Service

 1. A Service implementation must implement only the valid state transitions:

The normal lifecycle of a Service  is

  •  Service.State.NEW  to

  •  Service.State.STARTING  to
  •  Service.State.RUNNING  to
  •  Service.State.STOPPING  to
  •  Service.State.TERMINATED 

Any exceptions thrown at states Service.State.NEW,  Service.State.STARTING, Service.State.RUNNING or Service.State.STOPPING will lead to Service.State.FAILED. Service.State.FAILED will also be reached if there are failures or if Service#stop() is called before the Service reaches the Service.State.RUNNING state. Service#stop() 

Service#addListener(Listener listener, Executor executor) method can add an Service.Listener that listens to every state transition of the Service and execute certain actions with the Executor. The set of legal transitions form a DAG (http://en.wikipedia.org/wiki/Directed_acyclic_graph), therefore every method of the listener will be called at most once. An implementation of the Service interface must implement all the state transitions as defined here. 

 2. All state transitions must complete within finite amount of time and not be blocked:

 A service may make use of other services. However, services should perform state transitions independently, without waiting for certain conditions of other services. Otherwise, state transitions may be blocked indefinitely when services waiting each other forms a situation like dead lock. 

 3. A Service implementation must respond to start() or startAndWait() by transitioning to Service.State.RUNNING 

When start() or startAndWait() is called for the first time in a Service, the service will respond by by transitioning form Service.State.NEW to Service.State.RUNNING. Any exceptions happening during the state transition will cause the service to transition into Service.State.FAILED. start() returns a ListenableFuture representing an asynchronous computation of this state transition. On the other hand, startAndWait() waits for the state transition to complete. Therefore, after startAndWait() returns, the service should be guaranteed to be in Service.State.RUNNING if no exception occurs.

 4. A Service implementation must respond to stop() or stopAndWait() by transitioning to a terminal state, and close all the threads and objects initialized by it:

Unless already at a terminal or Service.State.NEW state, no matter what is the current state of a service, it must respond to stop() asynchronously and stopAndWait() synchronously by transitioning to one of the terminal states: Service.State.TERMINATED, or Service.State.FAILED if any exception occurs or the service hasn't reached Service.State.RUNNING. Since, a service cannot ever leave a terminal state, when a service reaches the Service.State.FAILED or Service.State.TERMINATED state, all the threads and objects initialized when the service starts must be properly closed. 

To correctly follow all the requirements listed above can be challenging. Therefore, it's strongly recommended to use the following abstract classes in Guava which implement this interface and make the threading and state management easier.

Implementations

AbstractService

When you need to do your own manual thread management, override  AbstractService  directly. Typically, you should be well served by one of the below implementations, but implementing  AbstractService  is recommended when, for example, you are modeling something that provides its own threading semantics as a  Service , you have your own specific threading requirements.

To implement  AbstractService  you must implement 2 methods.

  •  doStart()  doStart()  is called directly by the first call to  start() . The service is in Service.State.STARTING state when doStart()  method is called. doStart() should perform all initialization and eventually MUST call  notifyStarted() to transition the service into Service.State.RUNNING state if start up succeeded. startAndWait() will only return after notifyStarted() returns successfully. Any Throwable thrown by  doStart() will incur notifyFailed() to transition the service into Service.State.FAILED state. 
  •  doStop()  doStop()  is called directly by the first call to  stop() only if the service in Service.State.RUNNING or  Service.State.STARTING state, which means doStart() must have completed successfully. Your doStop() method should shut down your service and then eventually MUST call notifyStopped() if shutdown succeeded. startAndWait() will only return after notifyStarted() returns successfully. Any Throwable thrown by  doStop() will incur notifyFailed() to transition the service into Service.State.FAILED state. 

start()  and  stop() methods of AbstractService run in the same thread in which they are called.  Your  doStart()  and  doStop() methods should be fast, because the service object is locked when doStart()  or  doStop() is running, and no other state transition can happen when the service object is locked. If you need to do expensive initialization, such as reading files, opening network connections, or any operation that might block, you should consider moving that work to another thread.

An example implementation of AbstractService in CDAP is RetryOnStartFailureService. It wraps around another Service such that, if the wrapped service failed to start, it will get restarted based on the RetryStrategy.

Notice that RetryOnStartFailureService#doStart() creates a new thread startupThread and immediately calls notifyStarted() after the asynchronous start of the startupThread, regardless of whether the wrapped service currentDelegate has successfully started. Therefore, calling RetryOnStartFailureService#startAndWait() can only guarantee that this RetryOnStartFailureService is in  Service.State.RUNNING but makes no guarantee that the wrapped service in RetryOnStartFailureService has finished startup. 

RetryOnStartFailureService
public class RetryOnStartFailureService extends AbstractService {
...
  private volatile Service currentDelegate;
...
 
  /**
  * Creates a new instance.
  *
  * @param delegate a {@link Supplier} that gives new instance of the delegating Service.
  * @param retryStrategy strategy to use for retrying
  */
  public RetryOnStartFailureService(Supplier<Service> delegate, RetryStrategy retryStrategy) {
    this.delegate = delegate;
    this.currentDelegate = delegate.get();
    this.delegateServiceName = currentDelegate.getClass().getSimpleName();
    this.retryStrategy = retryStrategy;
  }

  @Override
  protected void doStart() {
    startupThread = new Thread("Endure-Service-" + delegateServiceName) {
      @Override
      public void run() {
        int failures = 0;
        long startTime = System.currentTimeMillis();
        long delay = 0L;

        while (delay >= 0 && !stopped) {
          try {
            currentDelegate.start().get();
            // Only assigned the delegate if and only if the delegate service started successfully
            startedService = currentDelegate;
            break;
          } catch (InterruptedException e) {
            // This thread will be interrupted from the doStop() method. Don't reset the interrupt flag.
          } catch (Throwable t) {
...
          }
        }
      }
    };
    startupThread.start();
    notifyStarted();
  }

  @Override
  protected void doStop() {
    // doStop() won't be called until doStart() returns, hence the startupThread would never be null
    stopped = true;
    startupThread.interrupt();
    Uninterruptibles.joinUninterruptibly(startupThread);

    // Stop the started service if it exists and propagate the stop state
    // There could be a small race between the delegate service started successfully and
    // the setting of the startedService field. When that happens, the stop failure state is not propagated.
    // Nevertheless, there won't be any service left behind without stopping.
    if (startedService != null) {
      Futures.addCallback(startedService.stop(), new FutureCallback<State>() {
        @Override
        public void onSuccess(State result) {
          notifyStopped();
        }

        @Override
        public void onFailure(Throwable t) {
          notifyFailed(t);
        }
      }, Threads.SAME_THREAD_EXECUTOR);
      return;
    }

    // If there is no started service, stop the current delete, but no need to propagate the stop state
    // because if the underlying service is not yet started due to failure, it shouldn't affect the stop state
    // of this retrying service.
    if (currentDelegate != null) {
      currentDelegate.stop().addListener(new Runnable() {
        @Override
        public void run() {
          notifyStopped();
        }
      }, Threads.SAME_THREAD_EXECUTOR);
      return;
    }

    // Otherwise, if nothing has been started yet, just notify this service is stopped
    notifyStopped();
  }
...
}

An example implementation of AbstractService in CDAP is RetryOnStartFailureService. It wraps around another Service such that, if the wrapped service failed to start, it will get restarted based on the RetryStrategy.

AbstractIdleService

The  AbstractIdleService skeleton implements a Service  which does not need to perform any action while in the "running" state -- and therefore does not need a thread while running -- but has startup and shutdown actions to perform. Implementing such a service is as easy as extending  AbstractIdleService  and implementing the  startUp()  and  shutDown()  methods.

When AbstractIdleService#start() or AbstractIdleService#stop() is called, a new thread will be created to call startUp() or shutDown() respectively.  Call AbstractIdleService#startAndWait() or AbstractIdleService#stopAndWait()  to make sure startUp() or shutDown() completes. shutDown() will only be called if startUp() completed successfully.

ATTENTION: Even though AbstractIdleService runs startUp() or shutDown() in a new thread,  notifyStarted() or notifyStopped() is called after these methods respectively. AbstractIdleService#startAndWait() or AbstractIdleService#stopAndWait() will not return until notifyStarted() or notifyStopped()is called.

 

An example implementation of AbstractIdleService in CDAP is AbstractNotificationService. Notice that there is no flag to indicate whether startUp() has completed, but all other methods in AbstractNotificationService can only run after startUp() completed. This is guaranteed by  AbstractNotificationService#startAndWait() being called before other methods in AbstractNotificationService are called. 
AbstractNotificationService
public abstract class AbstractNotificationService extends AbstractIdleService implements NotificationService {
...
  @Override
  protected void startUp() throws Exception {
    transactionSystemClient.startAndWait();
  }

  @Override
  protected void shutDown() throws Exception {
    transactionSystemClient.stopAndWait();
  }
...


AbstractExecutionThreadService

An  AbstractExecutionThreadService  performs startup, running, and shutdown actions in a single thread. You must override the  run()  method, and it must respond to stop requests. For example, you might perform actions in a work loop:

public void run() {
  while (isRunning()) {
    // perform a unit of work
  }
}

 

Alternately, you may override  triggerShutdown()  in any way that causes  run()  to return.

Overriding  startUp()  and  shutDown()  is optional, but the service state will be managed for you.  startUp()  is guaranteed to be called before  run() and  shutDown()  is guaranteed to be called after  run() returns or throws any exception. 

Note that  start()  calls your  startUp()  method, creates a thread for you, and invokes  run()  in that thread.  stop()  calls  triggerShutdown() , so that shutDown() will be executed in that thread  after run() returns. Then eventually the thread dies.

An example implementation of AbstractExecutionThreadService in CDAP is AggregatedMetricsCollectionService. triggerShutdown() calls runThread.interrupt() in order to cause run() to return. run() responds to stop() by catching InterruptedException raised by the interrupt in triggerShutdown(), and exiting the while loop when isRunning() returns false. isRunning() will return false, when stop() is called, since stop() transitions the service to Service.State.STOPPING. shutDown() performs a final action that flushes the metrics since there will be no more metrics to be flushed after run() returns,  

 

AggregatedMetricsCollectionService
public abstract class AggregatedMetricsCollectionService extends AbstractExecutionThreadService
                                                         implements MetricsCollectionService {
...

  @Override
  protected void startUp() throws Exception {
    runThread = Thread.currentThread();
  }

  @Override
  protected final void run() {
    long sleepMillis = getInitialDelayMillis();
    while (isRunning()) {
      try {
        TimeUnit.MILLISECONDS.sleep(sleepMillis);
 ...
      } catch (InterruptedException e) {
        // Expected when stop is called.
        break;
      }
    }
  }

...

  @Override
  protected void shutDown() throws Exception {
    // Flush the metrics when shutting down.
    publishMetrics(System.currentTimeMillis());
  }

  @Override
  protected void triggerShutdown() {
    if (runThread != null) {
      runThread.interrupt();
    }
  }
...
}

 

AbstractScheduledService

An  AbstractScheduledService  performs some periodic task while running. Subclasses implement  runOneIteration()  to specify one iteration of the task, as well as the familiar  startUp()  and  shutDown()  methods.

executor() returns the Executor that will be used to run this service. The default implementation returns a new Executor that sets the name of its threads to the string returned by getServiceName() method. Subclass may override this method to use a custom Executor.with a specific name, thread group or priority. startUp()  and  shutDown()  methods will be run in the Executor returned by executor()

 Unlike in AbstractExecutionThreadService, it's not guaranteed that startUp() has been called when shutDown() is called. 

To describe the execution schedule, you must implement the  scheduler()  method. Typically, you will use one of the provided schedules from  AbstractScheduledService.Scheduler , either  newFixedRateSchedule(initialDelay, delay, TimeUnit)  or  newFixedDelaySchedule(initialDelay, delay, TimeUnit) , corresponding to the familiar methods in  ScheduledExecutorService . Custom schedules can be implemented using  CustomScheduler ; see the Javadoc for details.

An example implementation of AbstractExecutionThreadService in CDAP is AbstractResourceReporter. It writes out resource metrics every reportInterval seconds which is defined in scheduler()

AbstractResourceReporter
public abstract class AbstractResourceReporter extends AbstractScheduledService implements ProgramResourceReporter {
...

  protected void runOneIteration() throws Exception {
    reportResources();
  }

  @Override
  protected void shutDown() throws Exception {
    if (executor != null) {
      executor.shutdownNow();
    }
  }

  protected Scheduler scheduler() {
    return Scheduler.newFixedRateSchedule(0, reportInterval, TimeUnit.SECONDS);
  }

  @Override
  protected final ScheduledExecutorService executor() {
    executor = Executors.newSingleThreadScheduledExecutor(Threads.createDaemonThreadFactory("reporter-scheduler"));
    return executor;
  }
}