Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
  1. Public interface to emit the preview data:

    Code Block
    languagejava
    public interface PreviewEmitter {
    	
    	/**
     	 * put the Map of properties corresponding to the given key.
     	 * @param key the key under which properties are stored
    	 * @param propertyValues the map of property values to be stored under the given key
     	 */
    	 void put(String key, Map<String, List<Object>> propertyValues);
     
    	/**
     	 * Emit the property specified by name and value for the given key.
     	 * @param key the key under which properties are stored
         * @param propertyName the the name of the property
    	 * @param propertyValue the value associated with the property
     	 */
    	 void emit(String key, String propertyName, Object propertyValue);
    	
    } 

    2. Preview Context API

    Code Block
    public interface PreviewContext() {
    	/**
    	 * boolean flag to indicate if preview is enabled or not.
    	 */
    	boolean isPreviewEnabled();
     
    	/**
    	 * get PreviewEmitter, PreviewEmitter can be used to emit objects collected by key and field names.
    	 */
        PreviewEmitter getPreviewEmitter();
    }

    3. How the application will get access to the PreviewEmitter? Similar to Metric and @UseDataSet, instance of the PreviewEmitter will be injected by CDAP in the application.

    Code Block
    languagejava
    public static class ETLMapper extends Mapper implements ProgramLifecycle<MapReduceTaskContext<Object, Object>> {
    
      private TransformRunner<Object, Object> transformRunner;
      // injected by CDAP
      @SuppressWarnings("unused")
      private Metrics mapperMetrics;
      
      private PreviewEmitter previewEmitter;
    
      @Override
      public void initialize(MapReduceTaskContext<Object, Object> context) throws Exception {
        // get source, transform, sink ids from program properties
        Map<String, String> properties = context.getSpecification().getProperties();
        if (Boolean.valueOf(properties.get(Constants.STAGE_LOGGING_ENABLED))) {
          LogStageInjector.start();
        }
        transformRunner = new TransformRunner<>(context, mapperMetrics);
    	if (context.isPreviewEnabled()) {
    		previewEmitter = context.getPreviewEmitter();
    	}
      }
    
      @Override
      public void map(Object key, Object value, Mapper.Context context) throws IOException, InterruptedException {
        try {
          transformRunner.transform(key, value, previewEmitter);
        } catch (Exception e) {
          Throwables.propagate(e);
        }
      }
    ...
    }
     
    TrackedTransform.java
     
    public TrackedTransform(Transformation<IN, OUT> transform, StageMetrics metrics,
                            @Nullable String metricInName, @Nullable String metricOutName, String stageName, @Nullable PreviewEmitter previewEmitter) {
      this.transform = transform;
      this.metrics = metrics;
      this.metricInName = metricInName;
      this.metricOutName = metricOutName;
      this.stageName = stageName;
      this.previewEmitter = previewEmitter;
    }
    
    
    @Override
    public void transform(IN input, Emitter<OUT> emitter) throws Exception {
      if (metricInName != null) {
        metrics.count(metricInName, 1);
      }
      if (previewEmitter != null) {
    	// emitting input data to preview
    	previewEmitter.emit(stageName, "inputData", input); 
      }
      transform.transform(input, metricOutName == null ? emitter : 
    						new TrackedEmitter<>(emitter, metrics, metricOutName, stageName, previewEmitter));
    }
    
    
     
    ... TrackedEmitter.java
     
    public TrackedEmitter(Emitter<T> delegate, StageMetrics stageMetrics, String emitMetricName, String stageName, PreviewEmitter previewEmitter) {
      this.delegate
    = delegate;
      this.stageMetrics = stageMetrics;
      this.emitMetricName = emitMetricName;
      this.stageName = stageName;
      this.previewEmitter = previewEmitter;
    }
    
    @Override
    public void emit(T value) {
      delegate.emit(value);
      stageMetrics.count(emitMetricName, 1);
      //emitting output data for preview
      previewEmitter.emit(stageName, "outputData", value);
    }
    
    @Override
    public void emitError(InvalidEntry<T> value) {
      delegate.emitError(value);
      stageMetrics.count("records.error", 1);
      // emitting error data for preview
      previewEmitter.emit(stageName, "errorData", value);
    }	

     

    PreviewContext implementation will use previewId to create a preview emitter which can be obtained using getPreviewEmitter by programs. Programs can use isPreviewEnabled to check if preview is enabled before emitting. 

    4 How will CDAP get data from the preview?

    Code Block
    /**
     * Represents the state of the preview.
     */
    public class PreviewStatus {
      public enum Status {
    	RUNNING,
    	COMPLETED,
    	DEPLOY_FAILED,
    	RUNTIME_FAILED	
      };
     
      Status previewStatus;
      @Nullable	
      String failureMessage;			
    }
     
    // This is internal interface which will be used by REST handlers
    // to retrieve the preview information.
    public interface PreviewManager {
     
        /**
    	 * Get the state of the preview represented by previewId.
         */
    	PreviewStatus getStatus(PreviewId previewId);
     
    	/**
    	 * Get the data associated with the preview represented by previewId.
    	 */
        Map<String, Map<String, List<Object>> getData(PreviewId previewId);
     
    	/**
    	 * Get all metrics associated with the preview represented by previewId.
    	 */
    	Collection<MetricTimeSeries> getMetrics(PreviewId previewId);
      
     	/**
    	 * Get all logs associated with the preview represented by previewId.
    	 */
    	List<LogEntry> getLogs(PreviewId previewId);
    }
     
    class PreviewId extends EntityId implements NamespaceId, ParentId<NamespaceId> {
    	NamespaceId namespace;
        String preview;
    }

...