Java API Design and Implementation Details.

  1. Public interface to emit the preview data:

    public interface PreviewEmitter {
     
    	/**
     	 * Emit the property specified by name and value for the given key. 
    	 * values will be grouped in a list for values emitted with same key and propertyName.
         * @param propertyName the the name of the property
    	 * @param propertyValue the value associated with the property
     	 */
    	 void emit(String propertyName, Object propertyValue);
    } 

    2. Preview Context API

    public interface Debugger() {
    	/**
    	 * boolean flag to indicate if preview mode or not.
    	 */
    	boolean isPreviewEnabled();
     
    	/**
    	 * get PreviewEmitter, PreviewEmitter can be used to emit objects collected by key and field names.
    	 */
        PreviewEmitter getPreviewEmitter(String emitterName);
    }

    3. How the application will get access to the PreviewEmitter

    public static class ETLMapper extends Mapper implements ProgramLifecycle<MapReduceTaskContext<Object, Object>> {
      private TransformRunner<Object, Object> transformRunner;  
    
      @Override
      public void initialize(MapReduceTaskContext<Object, Object> context) throws Exception {
        // get source, transform, sink ids from program properties
        Map<String, String> properties = context.getSpecification().getProperties();
        if (Boolean.valueOf(properties.get(Constants.STAGE_LOGGING_ENABLED))) {
          LogStageInjector.start();
        }
        transformRunner = new TransformRunner<>(context, mapperMetrics);
    
      }
    
      @Override
      public void map(Object key, Object value, Mapper.Context context) throws IOException, InterruptedException {
          transformRunner.transform(key, value);
      }
    ...
    }
     
    TrackedTransform.java
     
    /**
     * A {@link Transformation} that delegates transform operations while emitting metrics
     * around how many records were input into the transform and output by it.
     *
     * @param <IN> Type of input object
     * @param <OUT> Type of output object
     */
    public class TrackedTransform<IN, OUT> implements Transformation<IN, OUT>, Destroyable {
    
    
    	private final PreviewContext previewContext;
    	private final String stageName;
     
    	public TrackedTransform(Transformation<IN, OUT> transform, StageMetrics metrics, PreviewContext previewContext, String stageName,
        	                    @Nullable String metricInName, @Nullable String metricOutName) {
    		...
      		this.previewContext = previewContext;
            this.stageName = stageName;
    		...
    	}
    
    	@Override
    	public void transform(IN input, Emitter<OUT> emitter) throws Exception {
      		if (metricInName != null) {
        		metrics.count(metricInName, 1);
      		}
      		if (previewContext.isPreviewEnabled()) {
    			// emitting input data to preview
    			previewContext.getEmitter().emit(stageName, "inputData", input); 
      		}
      		transform.transform(input, new TrackedEmitter<>(emitter, metrics, metricOutName, stageName, previewContext));
    	}
    }
    
    
     
    ... TrackedEmitter.java
    
    
    @Override
    public void emit(T value) {
      delegate.emit(value);
      stageMetrics.count(emitMetricName, 1);
      if (previewContext.isPreviewEnabled()) {
    	  //emitting output data for preview
    	  previewContext.getPreviewEmitter().emit(stageName, "outputData", value);
      }
    }
    
    @Override
    public void emitError(InvalidEntry<T> value) {
      delegate.emitError(value);
      stageMetrics.count("records.error", 1);
      if (previewContext.isPreviewEnabled()) {
    
    	  // emitting error data for preview
      	  previewContext.getPreviewEmitter().emit(stageName, "errorData", value);
      }
    }	

     

    PreviewContext implementation will use previewId to create a preview emitter which can be obtained using getPreviewEmitter by programs. Programs can use isPreviewEnabled to check if preview is enabled before emitting. 

    4 How will CDAP get data from the preview?

    /**
     * Represents the state of the preview.
     */
    public class PreviewStatus {
      public enum Status {
    	RUNNING,
    	COMPLETED,
    	DEPLOY_FAILED,
    	RUNTIME_FAILED	
      };
     
      Status previewStatus;
      @Nullable	
      String failureMessage;			
    }
     
    // This is internal interface which will be used by REST handlers
    // to retrieve the preview information.
    public interface PreviewManager {
     
        /**
    	 * Get the status of the preview represented by previewId.
         */
    	PreviewStatus getStatus(PreviewId previewId);
     
    	/**
    	 * Get the data associated with the preview represented by previewId.
    	 */
        Map<String, Map<String, List<Object>> getData(PreviewId previewId);
     
    	/**
    	 * Get all metrics associated with the preview represented by previewId.
    	 */
    	Collection<MetricTimeSeries> getMetrics(PreviewId previewId);
      
     	/**
    	 * Get all logs associated with the preview represented by previewId.
    	 */
    	List<LogEntry> getLogs(PreviewId previewId);
    }
     
    class PreviewId extends EntityId implements NamespaceId, ParentId<NamespaceId> {
    	NamespaceId namespace;
        String preview;
    }

SDK:

Preview Execution Isolation:

Requirement:

  1. We want the program runs we execute, datasets created during preview for preview purpose, logs and metrics emitted during preview to be isolated from the regular Standalone execution which is used to publish and run the pipeline.
  2. In Preview, pipeline could have lookup datasets in a transform which reads from the datasets in Standalone. so we want a way to share datasets in preview with datasets in standalone. 
  3. In Preview, we want to skip writing meta data and lineage information as they are unnecessary. 

Preview Injector vs Standalone Injector:

ServiceStandalone (Yes/No)Preview (Yes/No)Description
userInterfaceService
YesNoWe don't want to run UI separately.
trackerAppCreationService
YesNoTracker app is for exploring meta data, this should be on real data (standalone) and not preview data.
router
YesNowe don't want to run another router, existing router should be able to discover and router to preview service.
streamService
YesNo 
exploreExecutorService
YesNoNo requirement to explore data in preview
exploreClient
YesNoNo requirement to explore data in preview
metadataService
Yes

No

Metadata service just starts a service with Metadata and Lineage handler. which is used by user to add user-level meta data. CDAP System uses Metadata Store to emit system level metadata.

since we use remote dataset framework for system level dataset, we need to check if that will be enough to emit metadata in system dataset or do we need to share meta data store.

serviceStore (set/get service instances)
YesNoPreview service runs as a single instance and works on small input set, doesn't need many instances, so we wouldn't need a serviceStore to increase/decrease preview instances.
appFabricServer
YesNoAppFabric has many services which we wouldn't need, PreviewServer can include just the required services.
previewServer
NoYesNew addition
datasetService
YesYesWe have a new shared dataset framework, need dataset service to handle dataset requests.
metricsQueryService
YesNo Can user MetricStore to query directly, as our requirement for metrics is straightforward. we will return all metrics emitted by a preview-id
txService
YesNo  
externalAuthenticationServer (if security enabled)
YesNo 
logAppenderInitializer
YesYes 
kafkaClient(if audit enabled)
YesNo 
zkClient (if audit enabled)
YesNo 
authorizerInstantiator (started by default)
YesNo 

 

AppFabricServer vs PreviewServer :

This is a subset of services started in app-fabric server.

ServicesAppFabricServerPreviewServer
notificationService
YesNo
schedulerService
YesNo
applicationLifecycleService
YesYes
systemArtifactLoader
YesYes
programRuntimeService
YesYes
streamCoordinatorClient
YesYes
programLifecycleService
YesYes
pluginService
YesNo (PluginService is needed only during config and not during preview)
handlerHttpService
YesYes (but only with preview handler). CDAP Router should route calls for preview here.
metricsCollectionServiceYesYes
defaultNamespaceEnsurerYesNo

PreviewDatasetFramework

Requirements:

1) Pipeline want's to read from a dataset source (or) pipeline wants to write to a dataset sink (or) transform uses a lookup table. These datasets are in CDAP Standalone space.

2) Pipeline run's records, Pipeline run metrics, program status, etc are stored in System datasets in Preview space.

3) Error dataset : Its not clear if using error dataset should cause creating an error dataset in CDAP standalone space. I feel it might not be required to created in Standalone space. In which case if its a dataset then it's the only user level dataset that has to be created in Preview space, we can say we would have an in-memory implementation for maintaining error records.

 

Assumptions :

1) All Datasets in System Namespace will be using the "LocalDatasetFramework"

2) All Datasets in User's Namespaces will be using the "RemoteDatasetFramework"

 

PreviewDatasetFramework
... snippet
@Nullable
@Override
public <T extends Dataset> T getDataset(Id.DatasetInstance datasetInstanceId,
                                        @Nullable Map<String, String> arguments,
                                        @Nullable ClassLoader classLoader)
  throws DatasetManagementException, IOException {
  if (datasetInstanceId.getNamespace().equals(Id.Namespace.SYSTEM)) {
    return localDatasetFramework.getDataset(datasetInstanceId, arguments, classLoader);
  } else {
    return remoteDatasetFramework.getDataset(datasetInstanceId, arguments, classLoader);
  }
}

 

 

Adapting to Cluster 

Having a LocalDatasetFramework for system namespace would make it useful for adapting to cluster, where the container's local directory will be used to store the system datasets and we can use the RemoteDatasetFramework of CDAP master for datasets in other namespace. 

 

End to End flow:

  1. When CDAP standalone is started, it will start PreviewService (which has PreviewHttpHandler) along with other required services. When CDAP shuts down, PreviewService will be terminated.
  2. No-op implementation of the PreviewContext will be injected into SDK and BasicPreviewContext will be injected in preview. 
  3. DatasetFramework and DiscoveryService from SDK will be used by Preview. 
    1. DiscoveryService will be used for registering preview service so that it can be discovered by router.
    2. DatasetFramework will be used for accessing the datasets in the user namespace. 
  4. User will give the preview request using preview REST endpoint.
  5. We will have rule in the cdap-router which will forward the preview requests to the PreviewHttpHandler.
  6. PreviewHttpHandler will receive request with preview configurations and generate unique preview id for it which will be used as app id.
  7. When the app gets configured during LocalArtifactLoaderStage, application will replace the config object with the object updated for preview

    public class DataPipelineApp extends AbstractApplication<ETLBatchConfig> {
    	public void configure() {
      		ETLBatchConfig config = getConfig();
    		if (config.isPreviewMode()) {
    			// This method should be responsible to create new pipeline configuration for example: replacing source with mock source
    			config = config.getConfigForPreview();
    		}
     
    		PipelineSpecGenerator<ETLBatchConfig, BatchPipelineSpec> specGenerator = new BatchPipelineSpecGenerator(...);
    		BatchPipelineSpec spec = specGenerator.generateSpec(config);
    		PipelinePlanner planner = new PipelinePlanner(...);
    		PipelinePlan plan = planner.plan(spec);
    
    		addWorkflow(new SmartWorkflow(spec, plan, getConfigurer(), config.getEngine()));
    		scheduleWorkflow(...);
    	}   
    }
  8. There will be inconsistency between application JSON configurations and programs created for the applications. Since we create programs once the app configurations are updated with the preview configs. - OPEN QUESTION

  9. Preview application deployment pipeline

    Stage NameRegular ApplicationPreview Application
    LocalArtifactLoaderStageYesYes
    ApplicationVerificationStageYesYes
    DeployDatasetModulesStageYesNo
    CreateDatasetInstanceStageYesNo
    CreateStreamsStageYesNo
    DeleteProgramHandlerStageYesNo
    ProgramGenerationStageYesYes
    ApplicationRegistrationStageYesYes
    CreateSchedulesStageYesNo
    SystemMetadataWriterStageYesNo
  10. If there is a failure in the deploy pipeline, PreviewHttpHandler will return 500 status code with deploy failure reason.

  11. Once deployment is successful, preview handler will start the program and return preview id as response. Currently we will start the SmartWorkflow in DataPipelineApp however preview configurations can be extended to accept the program type and program name to start.
  12. During runtime when program emits preview data using PreviewContext, the implementation of it (BasicPreviewContext) will write that data to PreviewStore.
  13. PreviewStore can store data in memory. It cannot be Table dataset because we want the intermediate data even if the transaction failed. Also it cannot be Fileset dataset, because if MapReduce program fails then it cleans up the files. (Potentially we can use non-transactional table like Metrics).
  14. Logs TBD
  15. Metrics for preview will be stored in the Metric dataset created for preview.
  16. Deletion of the preview data: We can maintain the LRU cache of the preview data for different preview ids. In 3.5 we can restrict the LRU cache size to be 1.   
  17. Get preview data: PreviewManager will be used by PreviewHttpHandler to query for preview data from preview store, logs and metrics.
  18. PreviewStore: PreviewStore will be responsible for storing the preview data. Implementation of PreviewStore will store the data in memory for 3.5. In future we can think of storing it in Level db dataset.  

 

 

Implementation Plan:

1) Preview Service (Shankar)

2) Preview REST API (Sagar)

3) Storage (Sagar)

4) Non-static LevelDBService - Possible Test Case Failures (Done!!!)

5) Java API's (Sagar)

6) Hydrator changes: App with MapReduce and Spark (Shankar)

---

7) Logs and Metrics

8) ETL Config update for preview

9) Mock source plugin

 

Â