Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Task marked complete

...

  •  User stories documented (Ali)
  •  User stories reviewed (Nitin)
  •  Design documented (Ali)
  •  Design reviewed (Albert/Terence/Andreas)
  •  Feature merged (Ali)
  •  Examples and guides (Ali)
  •  Integration tests (Ali) 
  •  Documentation for feature (Ali)
  •  Blog post

...

Code Block
// Input class will encapsulate one of the following three things:
//   1. Dataset - name, args, splits
//   2. Stream - StreamBatchReadablename, startTime, endTime, decoderType, bodyformatSpec
//   3. InputFormatProvider - name, inputFormatProvider class

// StreamBatchReadable will be deprecated.
// It will have the following APIs exposed to the user:

/**
 * Defines input to a program, such as MapReduce.
 */
public abstract class Input {
  /**
   * Returns an Input defined by a dataset.
   *
   * @param datasetName the name of the input dataset
   */
  public static Input ofDataset(String datasetName);

  /**
   * Returns an Input defined by a dataset.
   *
   * @param datasetName the name of the input dataset
   * @param arguments the arguments to use when instantiating the dataset
   */
  public static Input ofDataset(String datasetName, Map<String, String> arguments);

  /**
   * Returns an Input defined by a dataset.
   *
   * @param datasetName the name of the input dataset
   * @param splits the data selection splits
   */
  public static Input ofDataset(String datasetName, Iterable<Split> splits);

  /**
   * Returns an Input defined by a dataset.
   *
   * @param datasetName the name of the input dataset
   * @param arguments the arguments to use when instantiating the dataset
   * @param splits the data selection splits
   */
  public static Input ofDataset(String datasetName, Map<String, String> arguments, Iterable<Split> splits);

  /**
   * Returns an Input defined by an InputFormatProvider.
   *
   * @param inputName the name of the input
   */
  public static Input of(String inputName, InputFormatProvider inputFormatProvider);

  /**
   * Returns an Input defined by a stream.
   *
   * @param streamBatchReadable specifies the stream to be used as input
   */
  private static Input ofStream(StreamBatchReadable streamBatchReadable);

  /**
   * Returns an Input defined with the given stream name with all time range.
   *
   * @param streamName Name of the stream.
   */
  public static Input ofStream(String streamName);

  /**
   * Returns an Input defined by a stream with the given properties.
   *
   * @param streamName Name of the stream.
   * @param startTime Start timestamp in milliseconds.
   * @param endTime End timestamp in milliseconds.
   */
  public static Input ofStream(String streamName, long startTime, long endTime);

  /**
   * Returns an Input defined by a stream with the given properties.
   *
   * @param streamName Name of the stream
   * @param startTime Start timestamp in milliseconds (inclusive) of stream events provided to the job
   * @param endTime End timestamp in milliseconds (exclusive) of stream events provided to the job
   * @param decoderType The {@link StreamEventDecoder} class for decoding {@link StreamEvent}
   */
  public static Input ofStream(String streamName, long startTime,
                             long endTime, Class<? extends StreamEventDecoder> decoderType);

  /**
   * Returns an Input defined by a stream with the given properties.
   *
   * @param streamName Name of the stream
   * @param startTime Start timestamp in milliseconds (inclusive) of stream events provided to the job
   * @param endTime End timestamp in milliseconds (exclusive) of stream events provided to the job
   * @param bodyFormatSpec The {@link FormatSpecification} class for decoding {@link StreamEvent}
   */
  @Beta
  public static Input ofStream(String streamName, long startTime,
                               long endTime, FormatSpecification bodyFormatSpec);
}

 

Code Block
// The new APIs on MapReduceContext will simply be:

/**
 * Updates the input configuration of this MapReduce job to use the specified {@link Input}.
 * @param input the input to be used
 */
void addInput(Input input);

/**
 * Updates the input configuration of this MapReduce job to use the specified {@link Input}.
 * @param input the input to be used
 * @param mapperCls the mapper class to be used for the input
 */
void addInput(Input input, Class<?> mapperClassmapperCls);

Approach for CDAP-3980 (wip)

...