Public interface to emit the preview data:
Code Block language java public interface PreviewEmitter { /** * put the Map of properties corresponding to the given key. * @param key the key under which properties are stored * @param propertyValues the map of property values to be stored under the given key */ void put(String key, Map<String, List<Object>> propertyValues); /** * Emit the property specified by name and value for the given key. * values will be grouped in a list for values emitted with same key and propertyName. * @param key the key under which properties are stored * @param propertyName the the name of the property * @param propertyValue the value associated with the property */ void emit(String key, String propertyName, Object propertyValue); }
2. Preview Context API
Code Block public interface PreviewContext() { /** * boolean flag to indicate if preview is enabled or not. */ boolean isPreviewEnabled(); /** * get PreviewEmitter, PreviewEmitter can be used to emit objects collected by key and field names. */ PreviewEmitter getPreviewEmitter(); }
3. How the application will get access to the PreviewEmitter
Code Block language java public static class ETLMapper extends Mapper implements ProgramLifecycle<MapReduceTaskContext<Object, Object>> { private TransformRunner<Object, Object> transformRunner; private PreviewEmitter previewEmitter; @Override public void initialize(MapReduceTaskContext<Object, Object> context) throws Exception { // get source, transform, sink ids from program properties Map<String, String> properties = context.getSpecification().getProperties(); if (Boolean.valueOf(properties.get(Constants.STAGE_LOGGING_ENABLED))) { LogStageInjector.start(); } transformRunner = new TransformRunner<>(context, mapperMetrics); } @Override public void map(Object key, Object value, Mapper.Context context) throws IOException, InterruptedException { transformRunner.transform(key, value); } ... } TrackedTransform.java /** * A {@link Transformation} that delegates transform operations while emitting metrics * around how many records were input into the transform and output by it. * * @param <IN> Type of input object * @param <OUT> Type of output object */ public class TrackedTransform<IN, OUT> implements Transformation<IN, OUT>, Destroyable { private final PreviewContext previewContext; private final String stageName; public TrackedTransform(Transformation<IN, OUT> transform, StageMetrics metrics, PreviewContext previewContext, String stageName, @Nullable String metricInName, @Nullable String metricOutName) { ... this.previewContext = previewContext; this.stageName = stageName; ... } @Override public void transform(IN input, Emitter<OUT> emitter) throws Exception { if (metricInName != null) { metrics.count(metricInName, 1); } if (previewContext.isPreviewEnabled()) { // emitting input data to preview previewContext.getEmitter().emit(stageName, "inputData", input); } transform.transform(input, new TrackedEmitter<>(emitter, metrics, metricOutName, stageName, previewContext)); } } ... TrackedEmitter.java @Override public void emit(T value) { delegate.emit(value); stageMetrics.count(emitMetricName, 1); if (previewContext.isPreviewEnabled()) { //emitting output data for preview previewContext.getPreviewEmitter().emit(stageName, "outputData", value); } } @Override public void emitError(InvalidEntry<T> value) { delegate.emitError(value); stageMetrics.count("records.error", 1); if (previewContext.isPreviewEnabled()) { // emitting error data for preview previewContext.getPreviewEmitter().emit(stageName, "errorData", value); } }
PreviewContext implementation will use previewId to create a preview emitter which can be obtained using getPreviewEmitter by programs. Programs can use isPreviewEnabled to check if preview is enabled before emitting.
4 How will CDAP get data from the preview?
Code Block /** * Represents the state of the preview. */ public class PreviewStatus { public enum Status { RUNNING, COMPLETED, DEPLOY_FAILED, RUNTIME_FAILED }; Status previewStatus; @Nullable String failureMessage; } // This is internal interface which will be used by REST handlers // to retrieve the preview information. public interface PreviewManager { /** * Get the status of the preview represented by previewId. */ PreviewStatus getStatus(PreviewId previewId); /** * Get the data associated with the preview represented by previewId. */ Map<String, Map<String, List<Object>> getData(PreviewId previewId); /** * Get all metrics associated with the preview represented by previewId. */ Collection<MetricTimeSeries> getMetrics(PreviewId previewId); /** * Get all logs associated with the preview represented by previewId. */ List<LogEntry> getLogs(PreviewId previewId); } class PreviewId extends EntityId implements NamespaceId, ParentId<NamespaceId> { NamespaceId namespace; String preview; }
...