Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
  1. Public interface to emit the preview data:

    Code Block
    languagejava
    public interface PreviewEmitter {
     
    	/**
     	 * Emit the property specified by name and value for the given key. 
    	 * values will be grouped in a list for values emitted with same key and propertyName.
     	 * @param key the key under which properties are stored
         * @param propertyName the the name of the property
    	 * @param propertyValue the value associated with the property
     	 */
    	 void emit(String key, String propertyName, Object propertyValue);
    	
    

    2. Preview Context API

    Code Block
    public interface PreviewContextDebugger() {
    	/**
    	 * boolean flag to indicate if preview is enabledmode or not.
    	 */
    	boolean isPreviewEnabled();
     
    	/**
    	 * get PreviewEmitter, PreviewEmitter can be used to emit objects collected by key and field names.
    	 */
        PreviewEmitter getPreviewEmitter(String emitterName);
    }

    3. How the application will get access to the PreviewEmitter

    Code Block
    languagejava
    public static class ETLMapper extends Mapper implements ProgramLifecycle<MapReduceTaskContext<Object, Object>> {
      private TransformRunner<Object, Object> transformRunner;  
    
      @Override
      public void initialize(MapReduceTaskContext<Object, Object> context) throws Exception {
        // get source, transform, sink ids from program properties
        Map<String, String> properties = context.getSpecification().getProperties();
        if (Boolean.valueOf(properties.get(Constants.STAGE_LOGGING_ENABLED))) {
          LogStageInjector.start();
        }
        transformRunner = new TransformRunner<>(context, mapperMetrics);
    
      }
    
      @Override
      public void map(Object key, Object value, Mapper.Context context) throws IOException, InterruptedException {
          transformRunner.transform(key, value);
      }
    ...
    }
     
    TrackedTransform.java
     
    /**
     * A {@link Transformation} that delegates transform operations while emitting metrics
     * around how many records were input into the transform and output by it.
     *
     * @param <IN> Type of input object
     * @param <OUT> Type of output object
     */
    public class TrackedTransform<IN, OUT> implements Transformation<IN, OUT>, Destroyable {
    
    
    	private final PreviewContext previewContext;
    	private final String stageName;
     
    	public TrackedTransform(Transformation<IN, OUT> transform, StageMetrics metrics, PreviewContext previewContext, String stageName,
        	                    @Nullable String metricInName, @Nullable String metricOutName) {
    		...
      		this.previewContext = previewContext;
            this.stageName = stageName;
    		...
    	}
    
    	@Override
    	public void transform(IN input, Emitter<OUT> emitter) throws Exception {
      		if (metricInName != null) {
        		metrics.count(metricInName, 1);
      		}
      		if (previewContext.isPreviewEnabled()) {
    			// emitting input data to preview
    			previewContext.getEmitter().emit(stageName, "inputData", input); 
      		}
      		transform.transform(input, new TrackedEmitter<>(emitter, metrics, metricOutName, stageName, previewContext));
    	}
    }
    
    
     
    ... TrackedEmitter.java
    
    
    @Override
    public void emit(T value) {
      delegate.emit(value);
      stageMetrics.count(emitMetricName, 1);
      if (previewContext.isPreviewEnabled()) {
    	  //emitting output data for preview
    	  previewContext.getPreviewEmitter().emit(stageName, "outputData", value);
      }
    }
    
    @Override
    public void emitError(InvalidEntry<T> value) {
      delegate.emitError(value);
      stageMetrics.count("records.error", 1);
      if (previewContext.isPreviewEnabled()) {
    
    	  // emitting error data for preview
      	  previewContext.getPreviewEmitter().emit(stageName, "errorData", value);
      }
    }	

     

    PreviewContext implementation will use previewId to create a preview emitter which can be obtained using getPreviewEmitter by programs. Programs can use isPreviewEnabled to check if preview is enabled before emitting. 

    4 How will CDAP get data from the preview?

    Code Block
    /**
     * Represents the state of the preview.
     */
    public class PreviewStatus {
      public enum Status {
    	RUNNING,
    	COMPLETED,
    	DEPLOY_FAILED,
    	RUNTIME_FAILED	
      };
     
      Status previewStatus;
      @Nullable	
      String failureMessage;			
    }
     
    // This is internal interface which will be used by REST handlers
    // to retrieve the preview information.
    public interface PreviewManager {
     
        /**
    	 * Get the status of the preview represented by previewId.
         */
    	PreviewStatus getStatus(PreviewId previewId);
     
    	/**
    	 * Get the data associated with the preview represented by previewId.
    	 */
        Map<String, Map<String, List<Object>> getData(PreviewId previewId);
     
    	/**
    	 * Get all metrics associated with the preview represented by previewId.
    	 */
    	Collection<MetricTimeSeries> getMetrics(PreviewId previewId);
      
     	/**
    	 * Get all logs associated with the preview represented by previewId.
    	 */
    	List<LogEntry> getLogs(PreviewId previewId);
    }
     
    class PreviewId extends EntityId implements NamespaceId, ParentId<NamespaceId> {
    	NamespaceId namespace;
        String preview;
    }

...

ServiceStandalone (Yes/No)Preview (Yes/No)Description
userInterfaceService
YesNoWe don't want to run UI separately.
trackerAppCreationService
YesNo
router
YesNo
Tracker app is for exploring meta data, this should be on real data (standalone) and not preview data.
router
YesNowe don't want to run another router, existing router should be able to discover and router to preview service.
streamService
YesNo 
exploreExecutorService
YesNoNo requirement to explore data in preview
exploreClient
YesNo
metadataService
YesNo ( No requirement to explore data in preview
metadataService
Yes

No

Metadata service just starts a service with Metadata and Lineage handler. which is used by user to add user-level meta data. CDAP System uses Metadata Store to emit system level metadata.

since we use remote dataset framework for datasets in user namespace, they should have metadata by default)system level dataset, we need to check if that will be enough to emit metadata in system dataset or do we need to share meta data store.

serviceStore (set/get service instances)
YesNo
appFabricServer
YesNo
YesNoPreview service runs as a single instance and works on small input set, doesn't need many instances, so we wouldn't need a serviceStore to increase/decrease preview instances.
appFabricServer
YesNoAppFabric has many services which we wouldn't need, PreviewServer can include just the required services.
previewServer
NoYesNew addition
datasetService
YesYesWe have a new shared dataset framework, need dataset service to handle dataset requests.
metricsQueryService
YesNo (Can call MetricStore query)Can user MetricStore to query directly, as our requirement for metrics is straightforward. we will return all metrics emitted by a preview-id
txService
YesNo (can use standalone's tx service) 
externalAuthenticationServer (if security enabled)
YesNo 
logAppenderInitializer
YesYes 
kafkaClient(if audit enabled)
YesNo 
zkClient (if audit enabled)
YesNo 
authorizerInstantiator (started by default)
YesNo 

 

AppFabricServer vs PreviewServer :

...

ServicesAppFabricServerPreviewServer
notificationService
YesNo
schedulerService
YesNo
applicationLifecycleService
YesYes
systemArtifactLoader
YesYes
programRuntimeService
YesYes
streamCoordinatorClient
YesYes
programLifecycleService
YesYes
pluginService
YesYesNo (PluginService is needed only during config and not during preview)
handlerHttpService
YesYes (but only with preview handler). CDAP Router should route calls for preview here.
metricsCollectionServiceYesYes
defaultNamespaceEnsurerYesNo

PreviewDatasetFramework

Requirements:

...