Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
  1. Public interface to emit the preview data:

    Code Block
    languagejava
    public interface PreviewEmitter {
    	
    	/**
     	 * put the Map of properties corresponding to the given key.
     	 * @param key the key under which properties are stored
    	 * @param propertyValues the map of property values to be stored under the given key
     	 */
    	 void put(String key, Map<String, List<Object>> propertyValues);
     
    	/**
     	 * Emit the property specified by name and value for the given key. 
    	 * values will be grouped in a list for values emitted with same key and propertyName.
     	 * @param key the key under which properties are stored
         * @param propertyName the the name of the property
    	 * @param propertyValue the value associated with the property
     	 */
    	 void emit(String key, String propertyName, Object propertyValue);
    	
    } 

    2. Preview Context API

    Code Block
    public interface PreviewContext() {
    	/**
    	 * boolean flag to indicate if preview is enabled or not.
    	 */
    	boolean isPreviewEnabled();
     
    	/**
    	 * get PreviewEmitter, PreviewEmitter can be used to emit objects collected by key and field names.
    	 */
        PreviewEmitter getPreviewEmitter();
    }

    3. How the application will get access to the PreviewEmitter

    Code Block
    languagejava
    public static class ETLMapper extends Mapper implements ProgramLifecycle<MapReduceTaskContext<Object, Object>> {
      private TransformRunner<Object, Object> transformRunner;  
      private PreviewEmitter previewEmitter;
    
      @Override
      public void initialize(MapReduceTaskContext<Object, Object> context) throws Exception {
        // get source, transform, sink ids from program properties
        Map<String, String> properties = context.getSpecification().getProperties();
        if (Boolean.valueOf(properties.get(Constants.STAGE_LOGGING_ENABLED))) {
          LogStageInjector.start();
        }
        transformRunner = new TransformRunner<>(context, mapperMetrics);
    
      }
    
      @Override
      public void map(Object key, Object value, Mapper.Context context) throws IOException, InterruptedException {
          transformRunner.transform(key, value);
      }
    ...
    }
     
    TrackedTransform.java
     
    /**
     * A {@link Transformation} that delegates transform operations while emitting metrics
     * around how many records were input into the transform and output by it.
     *
     * @param <IN> Type of input object
     * @param <OUT> Type of output object
     */
    public class TrackedTransform<IN, OUT> implements Transformation<IN, OUT>, Destroyable {
    
    
    	private final PreviewContext previewContext;
    	private final String stageName;
     
    	public TrackedTransform(Transformation<IN, OUT> transform, StageMetrics metrics, PreviewContext previewContext, String stageName,
        	                    @Nullable String metricInName, @Nullable String metricOutName) {
    		...
      		this.previewContext = previewContext;
            this.stageName = stageName;
    		...
    	}
    
    	@Override
    	public void transform(IN input, Emitter<OUT> emitter) throws Exception {
      		if (metricInName != null) {
        		metrics.count(metricInName, 1);
      		}
      		if (previewContext.isPreviewEnabled()) {
    			// emitting input data to preview
    			previewContext.getEmitter().emit(stageName, "inputData", input); 
      		}
      		transform.transform(input, new TrackedEmitter<>(emitter, metrics, metricOutName, stageName, previewContext));
    	}
    }
    
    
     
    ... TrackedEmitter.java
    
    
    @Override
    public void emit(T value) {
      delegate.emit(value);
      stageMetrics.count(emitMetricName, 1);
      if (previewContext.isPreviewEnabled()) {
    	  //emitting output data for preview
    	  previewContext.getPreviewEmitter().emit(stageName, "outputData", value);
      }
    }
    
    @Override
    public void emitError(InvalidEntry<T> value) {
      delegate.emitError(value);
      stageMetrics.count("records.error", 1);
      if (previewContext.isPreviewEnabled()) {
    
    	  // emitting error data for preview
      	  previewContext.getPreviewEmitter().emit(stageName, "errorData", value);
      }
    }	

     

    PreviewContext implementation will use previewId to create a preview emitter which can be obtained using getPreviewEmitter by programs. Programs can use isPreviewEnabled to check if preview is enabled before emitting. 

    4 How will CDAP get data from the preview?

    Code Block
    /**
     * Represents the state of the preview.
     */
    public class PreviewStatus {
      public enum Status {
    	RUNNING,
    	COMPLETED,
    	DEPLOY_FAILED,
    	RUNTIME_FAILED	
      };
     
      Status previewStatus;
      @Nullable	
      String failureMessage;			
    }
     
    // This is internal interface which will be used by REST handlers
    // to retrieve the preview information.
    public interface PreviewManager {
     
        /**
    	 * Get the status of the preview represented by previewId.
         */
    	PreviewStatus getStatus(PreviewId previewId);
     
    	/**
    	 * Get the data associated with the preview represented by previewId.
    	 */
        Map<String, Map<String, List<Object>> getData(PreviewId previewId);
     
    	/**
    	 * Get all metrics associated with the preview represented by previewId.
    	 */
    	Collection<MetricTimeSeries> getMetrics(PreviewId previewId);
      
     	/**
    	 * Get all logs associated with the preview represented by previewId.
    	 */
    	List<LogEntry> getLogs(PreviewId previewId);
    }
     
    class PreviewId extends EntityId implements NamespaceId, ParentId<NamespaceId> {
    	NamespaceId namespace;
        String preview;
    }

...