...
Code Block |
---|
// Input class will encapsulate one of the following three things: // 1. Dataset - name, args, splits // 2. Stream - name, startTime, endTime, decoderType, bodyformatSpec // 3. InputFormatProvider - name, inputFormatProvider class // StreamBatchReadable will be deprecated. // It will have the following APIs exposed to the user: /** * Defines input to a program, such as MapReduce. */ public abstract class Input { /** * Returns an Input defined by a dataset. * * @param datasetName the name of the input dataset */ public static Input ofDataset(String datasetName) { return ofDataset(datasetName, Collections.<String, String>emptyMap()); } /** * Returns an Input defined by a dataset. * * @param datasetName the name of the input dataset * @param arguments the arguments to use when instantiating the dataset */ public static Input ofDataset(String datasetName, Map<String, String> arguments) { return ofDataset(datasetName, arguments, null); } /** * Returns an Input defined by a dataset. * * @param datasetName the name of the input dataset * @param splits the data selection splits */ public static Input ofDataset(String datasetName, Iterable<Split> splits) { return ofDataset(datasetName, Collections.<String, String>emptyMap(), splits); } /** * Returns an Input defined by a dataset. * * @param datasetName the name of the input dataset * @param arguments the arguments to use when instantiating the dataset * @param splits the data selection splits */ public static Input ofDataset(String datasetName, Map<String, String> arguments, Iterable<Split> splits) { return new DatasetInput(datasetName, arguments, splits); } /** * Returns an Input defined by an InputFormatProvider. * * @param inputName the name of the input */ public static Input of(String inputName, InputFormatProvider inputFormatProvider) { return new InputFormatProviderInput(inputName, inputFormatProvider); } /** * Returns an Input defined by a stream. * * @param streamBatchReadable specifies the stream to be used as input */ private static Input ofStream(StreamBatchReadable streamBatchReadable) { return new StreamInput(streamBatchReadable); } /** * Returns an Input defined with the given stream name with all time range. * * @param streamName Name of the stream. */ public static Input ofStream(String streamName) { return ofStream(new StreamBatchReadable(streamName, 0, Long.MAX_VALUE)); } /** * Returns an Input defined by a stream with the given properties. * * @param streamName Name of the stream. * @param startTime Start timestamp in milliseconds. * @param endTime End timestamp in milliseconds. */ public static Input ofStream(String streamName, long startTime, long endTime) { return ofStream(new StreamBatchReadable(streamName, startTime, endTime)); } /** * Returns an Input defined by a stream with the given properties. * * @param streamName Name of the stream * @param startTime Start timestamp in milliseconds (inclusive) of stream events provided to the job * @param endTime End timestamp in milliseconds (exclusive) of stream events provided to the job * @param decoderType The {@link StreamEventDecoder} class for decoding {@link StreamEvent} */ public static Input ofStream(String streamName, long startTime, long endTime, Class<? extends StreamEventDecoder> decoderType) { return ofStream(new StreamBatchReadable(streamName, startTime, endTime, decoderType)); } /** * Returns an Input defined by a stream with the given properties. * * @param streamName Name of the stream * @param startTime Start timestamp in milliseconds (inclusive) of stream events provided to the job * @param endTime End timestamp in milliseconds (exclusive) of stream events provided to the job * @param bodyFormatSpec The {@link FormatSpecification} class for decoding {@link StreamEvent} */ @Beta public static Input ofStream(String streamName, long startTime, long endTime, FormatSpecification bodyFormatSpec) { return ofStream(new StreamBatchReadable(streamName, startTime, endTime, bodyFormatSpec)); } } |
Code Block |
---|
// The new APIs on MapReduceContext will simply be: void addInput(Input input); void addInput(Input input, Class<?> mapperClass); |
...