Public interface to emit the preview data:
Code Block language java public interface PreviewEmitter { /** * put the Map of properties corresponding to the given key. * @param key the key under which properties are stored * @param propertyValues the map of property values to be stored under the given key */ void put(String key, Map<String, List<Object>> propertyValues); /** * Emit the property specified by name and value for the given key. * @param key the key under which properties are stored * @param propertyName the the name of the property * @param propertyValue the value associated with the property */ void emit(String key, String propertyName, Object propertyValue); }
2. Preview Context API
Code Block public interface PreviewContext() { /** * boolean flag to indicate if preview is enabled or not. */ boolean isPreviewEnabled(); /** * get PreviewEmitter, PreviewEmitter can be used to emit objects collected by key and field names. */ PreviewEmitter getPreviewEmitter(); }
3. How the application will get access to the PreviewEmitter? Similar to Metric and @UseDataSet, instance of the PreviewEmitter will be injected by CDAP in the application.
Code Block language java public static class MyMapReduceETLMapper extends AbstractMapReduce { private PreviewEmitter<String> emitter; Mapper implements ProgramLifecycle<MapReduceTaskContext<Object, Object>> { private TransformRunner<Object, Object> transformRunner; // injected by CDAP @SuppressWarnings("unused") private Metrics mapperMetrics; private PreviewEmitter previewEmitter; @Override @Override public void initialize(MapReduceTaskContext<Object, Object> context) throws Exception { emitter.emit("MyMapReduce.initialize", "logical.start.time", getContext().getLogicalStartTime().toString()); emitter.emit("MyMapReduce.initialize", "actual.start.time", System.currentTimeMillis().toString()); } public MyMapper extends Mapper<byte[], Text, Text, Text> { @Override public void map(byte[] key, Text value, Context context) throws IOException { if (value.toString().startsWith("Product") { emitter.emit("MapReduce.map", "map.product", value.toString()); } } } }
Preview Context Public API
Code Block public interface PreviewContext() { /** * boolean flag to indicate if preview is enabled or not. */ boolean isPreviewEnabled(); /** * get PreviewEmitter, PreviewEmitter can be used to emit objects collected by key and field names. */ PreviewEmitter getPreviewEmitter(); } // get source, transform, sink ids from program properties Map<String, String> properties = context.getSpecification().getProperties(); if (Boolean.valueOf(properties.get(Constants.STAGE_LOGGING_ENABLED))) { LogStageInjector.start(); } transformRunner = new TransformRunner<>(context, mapperMetrics); if (context.isPreviewEnabled()) { previewEmitter = context.getPreviewEmitter(); } } @Override public void map(Object key, Object value, Mapper.Context context) throws IOException, InterruptedException { try { transformRunner.transform(key, value, previewEmitter); } catch (Exception e) { Throwables.propagate(e); } } ... } TrackedTransform.java public TrackedTransform(Transformation<IN, OUT> transform, StageMetrics metrics, @Nullable String metricInName, @Nullable String metricOutName, String stageName, @Nullable PreviewEmitter previewEmitter) { this.transform = transform; this.metrics = metrics; this.metricInName = metricInName; this.metricOutName = metricOutName; this.stageName = stageName; this.previewEmitter = previewEmitter; } @Override public void transform(IN input, Emitter<OUT> emitter) throws Exception { if (metricInName != null) { metrics.count(metricInName, 1); } if (previewEmitter != null) { // emitting input data to preview previewEmitter.emit(stageName, "inputData", input); } transform.transform(input, metricOutName == null ? emitter : new TrackedEmitter<>(emitter, metrics, metricOutName, stageName, previewEmitter)); } ... TrackedEmitter.java public TrackedEmitter(Emitter<T> delegate, StageMetrics stageMetrics, String emitMetricName, String stageName, PreviewEmitter previewEmitter) { this.delegate = delegate; this.stageMetrics = stageMetrics; this.emitMetricName = emitMetricName; this.stageName = stageName; this.previewEmitter = previewEmitter; } @Override public void emit(T value) { delegate.emit(value); stageMetrics.count(emitMetricName, 1); //emitting output data for preview previewEmitter.emit(stageName, "outputData", value); } @Override public void emitError(InvalidEntry<T> value) { delegate.emitError(value); stageMetrics.count("records.error", 1); // emitting error data for preview previewEmitter.emit(stageName, "errorData", value); }
PreviewContext implementation will use previewId to create a preview emitter which can be obtained using getPreviewEmitter by programs. Programs can use isPreviewEnabled to check if preview is enabled before emitting.
4 How will CDAP get data from the preview?
Code Block /** * Represents the state of the preview. */ public class PreviewStatus { public enum Status { RUNNING, COMPLETED, DEPLOY_FAILED, RUNTIME_FAILED }; Status previewStatus; @Nullable String failureMessage; } // This is internal interface which will be used by REST handlers // to retrieve the preview information. public interface PreviewManager { /** * Get the state of the preview represented by previewId. */ PreviewStatus getStatus(PreviewId previewId); /** * Get the data associated with the preview represented by previewId. */ Map<String, Map<String, List<Object>> getData(PreviewId previewId); /** * Get all metrics associated with the preview represented by previewId. */ Collection<MetricTimeSeries> getMetrics(PreviewId previewId); /** * Get all logs associated with the preview represented by previewId. */ List<LogEntry> getLogs(PreviewId previewId); } class PreviewId extends EntityId implements NamespaceId, ParentId<NamespaceId> { NamespaceId namespace; String preview; }
...