Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 17 Next »

  1. Public interface to emit the preview data:

    public interface PreviewEmitter {
    	
    	/**
     	 * put the Map of properties corresponding to the given key.
     	 * @param key the key under which properties are stored
    	 * @param propertyValues the map of property values to be stored under the given key
     	 */
    	 void put(String key, Map<String, List<Object>> propertyValues);
     
    	/**
     	 * Emit the property specified by name and value for the given key.
     	 * @param key the key under which properties are stored
         * @param propertyName the the name of the property
    	 * @param propertyValue the value associated with the property
     	 */
    	 void emit(String key, String propertyName, Object propertyValue);
    	
    } 

    2. Preview Context API

    public interface PreviewContext() {
    	/**
    	 * boolean flag to indicate if preview is enabled or not.
    	 */
    	boolean isPreviewEnabled();
     
    	/**
    	 * get PreviewEmitter, PreviewEmitter can be used to emit objects collected by key and field names.
    	 */
        PreviewEmitter getPreviewEmitter();
    }

    3. How the application will get access to the PreviewEmitter

    public static class ETLMapper extends Mapper implements ProgramLifecycle<MapReduceTaskContext<Object, Object>> {
      private TransformRunner<Object, Object> transformRunner;  
      private PreviewEmitter previewEmitter;
    
      @Override
      public void initialize(MapReduceTaskContext<Object, Object> context) throws Exception {
        // get source, transform, sink ids from program properties
        Map<String, String> properties = context.getSpecification().getProperties();
        if (Boolean.valueOf(properties.get(Constants.STAGE_LOGGING_ENABLED))) {
          LogStageInjector.start();
        }
        transformRunner = new TransformRunner<>(context, mapperMetrics);
    
      }
    
      @Override
      public void map(Object key, Object value, Mapper.Context context) throws IOException, InterruptedException {
          transformRunner.transform(key, value);
      }
    ...
    }
     
    TrackedTransform.java
     
    /**
     * A {@link Transformation} that delegates transform operations while emitting metrics
     * around how many records were input into the transform and output by it.
     *
     * @param <IN> Type of input object
     * @param <OUT> Type of output object
     */
    public class TrackedTransform<IN, OUT> implements Transformation<IN, OUT>, Destroyable {
    
    
    	private final PreviewContext previewContext;
    	private final String stageName;
     
    	public TrackedTransform(Transformation<IN, OUT> transform, StageMetrics metrics, PreviewContext previewContext, String stageName,
        	                    @Nullable String metricInName, @Nullable String metricOutName) {
    		...
      		this.previewContext = previewContext;
            this.stageName = stageName;
    		...
    	}
    
    	@Override
    	public void transform(IN input, Emitter<OUT> emitter) throws Exception {
      		if (metricInName != null) {
        		metrics.count(metricInName, 1);
      		}
      		if (previewContext.isPreviewEnabled()) {
    			// emitting input data to preview
    			previewContext.getEmitter().emit(stageName, "inputData", input); 
      		}
      		transform.transform(input, metricOutName == null ? emitter : 
    							new TrackedEmitter<>(emitter, metrics, metricOutName, stageName, previewContext));
    	}
    }
    
    
     
    ... TrackedEmitter.java
    
    
    @Override
    public void emit(T value) {
      delegate.emit(value);
      stageMetrics.count(emitMetricName, 1);
      if (previewContext.isPreviewEnabled()) {
    	  //emitting output data for preview
    	  previewContext.getPreviewEmitter().emit(stageName, "outputData", value);
      }
    }
    
    @Override
    public void emitError(InvalidEntry<T> value) {
      delegate.emitError(value);
      stageMetrics.count("records.error", 1);
      if (previewContext.isPreviewEnabled()) {
    
    	  // emitting error data for preview
      	  previewContext.getPreviewEmitter().emit(stageName, "errorData", value);
      }
    }	

     

    PreviewContext implementation will use previewId to create a preview emitter which can be obtained using getPreviewEmitter by programs. Programs can use isPreviewEnabled to check if preview is enabled before emitting. 

    4 How will CDAP get data from the preview?

    /**
     * Represents the state of the preview.
     */
    public class PreviewStatus {
      public enum Status {
    	RUNNING,
    	COMPLETED,
    	DEPLOY_FAILED,
    	RUNTIME_FAILED	
      };
     
      Status previewStatus;
      @Nullable	
      String failureMessage;			
    }
     
    // This is internal interface which will be used by REST handlers
    // to retrieve the preview information.
    public interface PreviewManager {
     
        /**
    	 * Get the state of the preview represented by previewId.
         */
    	PreviewStatus getStatus(PreviewId previewId);
     
    	/**
    	 * Get the data associated with the preview represented by previewId.
    	 */
        Map<String, Map<String, List<Object>> getData(PreviewId previewId);
     
    	/**
    	 * Get all metrics associated with the preview represented by previewId.
    	 */
    	Collection<MetricTimeSeries> getMetrics(PreviewId previewId);
      
     	/**
    	 * Get all logs associated with the preview represented by previewId.
    	 */
    	List<LogEntry> getLogs(PreviewId previewId);
    }
     
    class PreviewId extends EntityId implements NamespaceId, ParentId<NamespaceId> {
    	NamespaceId namespace;
        String preview;
    }

SDK:

Preview Execution Isolation:

Requirement:

  1. We want the program runs we execute, datasets created during preview for preview purpose, logs and metrics emitted during preview to be isolated from the regular Standalone execution which is used to publish and run the pipeline.
  2. In Preview, pipeline could have lookup datasets in a transform which reads from the datasets in Standalone. so we want a way to share datasets in preview with datasets in standalone. 
  3. In Preview, we want to skip writing meta data and lineage information as they are unnecessary. 

Preview Injector vs Standalone Injector:

ServiceStandalone (Yes/No)Preview (Yes/No)
userInterfaceService
YesNo
trackerAppCreationService
YesNo
router
YesNo
streamService
YesNo
exploreExecutorService
YesNo
exploreClient
YesNo
metadataService
Yes

No ( since we use remote dataset framework for

datasets in user namespace, they should have metadata by default)

serviceStore (set/get service instances)
YesNo
appFabricServer
YesNo
previewServer
NoYes
datasetService
YesYes
metricsQueryService
YesNo (Can call MetricStore query)
txService
YesNo (can use standalone's tx service)
externalAuthenticationServer (if security enabled)
YesNo
logAppenderInitializer
YesYes
kafkaClient(if audit enabled)
YesNo
zkClient (if audit enabled)
YesNo
authorizerInstantiator (started by default)
YesNo

 

AppFabricServer vs PreviewServer :

This is a subset of services started in app-fabric server.

ServicesAppFabricServerPreviewServer
notificationService
YesNo
schedulerService
YesNo
applicationLifecycleService
YesYes
systemArtifactLoader
YesYes
programRuntimeService
YesYes
streamCoordinatorClient
YesYes
programLifecycleService
YesYes
pluginService
YesYes
handlerHttpService
YesYes (but only with preview handler). CDAP Router should route calls for preview here.

 

 

PreviewDatasetFramework

Requirements:

1) Pipeline want's to read from a dataset source (or) pipeline wants to write to a dataset sink (or) transform uses a lookup table. These datasets are in CDAP Standalone space.

2) Pipeline run's records, Pipeline run metrics, program status, etc are stored in System datasets in Preview space.

3) Error dataset : Its not clear if using error dataset should cause creating an error dataset in CDAP standalone space. I feel it might not be required to created in Standalone space. In which case if its a dataset then it's the only user level dataset that has to be created in Preview space, we can say we would have an in-memory implementation for maintaining error records.

 

Assumptions :

1) All Datasets in System Namespace will be using the "LocalDatasetFramework"

2) All Datasets in User's Namespaces will be using the "RemoteDatasetFramework"

 

PreviewDatasetFramework
... snippet
@Nullable
@Override
public <T extends Dataset> T getDataset(Id.DatasetInstance datasetInstanceId,
                                        @Nullable Map<String, String> arguments,
                                        @Nullable ClassLoader classLoader)
  throws DatasetManagementException, IOException {
  if (datasetInstanceId.getNamespace().equals(Id.Namespace.SYSTEM)) {
    return localDatasetFramework.getDataset(datasetInstanceId, arguments, classLoader);
  } else {
    return remoteDatasetFramework.getDataset(datasetInstanceId, arguments, classLoader);
  }
}

 

 

Adapting to Cluster 

Having a LocalDatasetFramework for system namespace would make it useful for adapting to cluster, where the container's local directory will be used to store the system datasets and we can use the RemoteDatasetFramework of CDAP master for datasets in other namespace. 

 

 

 

  • No labels