Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 17 Next »

Checklist

  • User Stories Documented
  • User Stories Reviewed
  • Design Reviewed
  • APIs reviewed
  • Release priorities assigned
  • Test cases reviewed
  • Blog post

Introduction 

Users can use the command line tool DistCp to copy files between clusters with same or different filesystems. Currently we have hydrator plugins that support batch file copying within filesystems such as local HDFS, FTP, and Windows Share. However, these plugins all have restrictions. For example, we don't have corresponding file batch sink plugins despite having file batch source plugins. Therefore the files emitted from the source can't be written to filesystems while preserving the file/ directory structures. Action plugins such as SFTP copy only supports copying within the SFTP server, which restricts users from moving files out of the server. We wish to add some Hydrator plugins that can help users perform whole file copies between arbitrary filesystems/ databases in the CDAP UI. 

Goals

According to this user request, our new plugin ideally should have the following features: 

  1. Should support file copying between the following file systems:
    1.  Local directories
    2. HDFS
    3. Amazon S3
    4. FTP
  2. Should support failover. It should start where it left during restarts or issues.
  3. We should have UI, where we can see progress
  4. We should have metrics for each process on how many files copied, size, time.
  5. Checks network bandwidth and displays estimated completion time.
  6. Maintains the timestamp of each file as is from the source.
  7. Specify Path filters through UI on the fly.
  8. File permission configurations.

User Stories 

  • As a cluster administrator, I want to migrate all my files and preserve file structures when upgrading to a newer cluster. 
  • As a data analyst, I want to retrieve files that contain data from some remote ftp location and store them in my cluster that runs the HDFS filesystem.  
  • As a cluster administrator, I'm only interested in files with specific file names and wish to copy them to some other location.
  • As a pipeline developer, I want to organize files by path and filenames and put them into different destinations.

Design

Our high level pipeline design can be split into 2 major parts: a file reader source and a file writer sink.

  • File Reader Source should contain the following configurations:
    1. The address of the source cluster/ server.
    2. The type of filesystem we wish to copy from.
    3. The base path/ directory of all the files we wish to copy.
    4. A regular expression filter that chooses which files we wish to copy.
    5. Additional configurations such as username and password.
  • File Reader Sink should contain the following configurations:
    1. The address of the target cluster/ server.
    2. The type of filesystem we wish to copy into.
    3. The base path/ directory we wish to copy files into.
    4. Additional configurations such as username and password.

 

 

The emitted records from the File Reader Source should contain a relative path (will be appended to the base path in the sink configuration), and the file body itself.

Approach

Approach #1

Concerns: Because file copying jobs often involve large amount of files and data, a simple file reader/writer would not work due to limited memory on each node. We have to pass the files in small chunks along the pipeline and assemble the files at the sink. We have to create a new InputFormat class and a new OutputFormat class with the following functionalities.

  • InputFormat
    1. Be able to disassemble files into smaller chunks.
    2. The chunks should be labeled
    3. Each record emitted from the source will have the following schema:
      1. The name of the file that owns the chunk. Stored as a String.

      2. The byte offset of the chunk. Stored as a Long.
      3. The content of the chunk. Stored as a ByteBuffer.
  • OutputFormat

The data copying pipeline will have the following functionalities: Instead of having the source read data and pass it through the pipeline to the sink, we only pass down the file metadata from the source (path, name, timestamp, size....). After collecting metadata for every file to be copied, the sink will perform file read from source directories and file write to target location. This approach allows us to copy files at file-level-granularity (All operations depend on and only on file metadata).  

 

FileCopySource
 
public abstract class AbstractFileCopySource extends ReferenceBatchSource<String, FileAttribute, StructuredRecord> {
  private final AbstractFileCopySourceConfig config;

  public AbstractFileCopySource(AbstractFileCopySourceConfig config) {
    super(config);
    this.config = config;
  }

  public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
    /**
     * Loads configurations from UI and check if they are valid
     * Possibly initiate connections to databases and check if the
     * provided credentials are valid.
     *
     */
  }

  public void initialize(BatchSourceContext context) {
    /**
     * Initialize a record transformer that transforms file metadata
     * into StructuredRecords.
     *
     */
  }

  public void prepareRun(BatchSourceContext context) {
    /**
     * Connect to the source databases. Set up input format to read the necessary
     * file metadata.
     *
     */
  }

  public void transform(KeyValue<String, FileAttribute> input, Emitter<StructuredRecord> emitter) {
    /**
     * Convert file metadata to StructuredRecord and emit.
     */
  }

  public abstract class AbstractFileCopySourceConfig extends ReferencePluginConfig{
    public AbstractFileCopySourceConfig(String name) {
      super(name);
    }

    /**
     * Additional configurations here for specific databases.
     */
  }
}
 
 

To support the source plugin, we need an InputFormat that reads file metadata from file source.

sourceinputformat
 
public class MetadataInputFormat extends FileInputFormat<String, FileAttribute> {
  public MetadataInputFormat() {

  }

  @Override
  public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) {
    return new MetadataRecordReader();
  }

  /**
   * Returns key that contains file path.
   * Returns value that contains file attributes.
   */
  public class MetadataRecordReader extends RecordReader<String, FileAttribute> {

    public MetadataRecordReader() {
      super();
    }

    @Override
    public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {

    }

    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
      return false;
    }

    @Override
    public String getCurrentKey() throws IOException, InterruptedException {
      return null;
    }

    @Override
    public FileAttribute getCurrentValue() throws IOException, InterruptedException {
      return null;
    }

    @Override
    public float getProgress() throws IOException, InterruptedException {
      return 0;
    }

    @Override
    public void close() {

    }
  }
}
 

 

API changes

New Programmatic APIs

New Java APIs introduced (both user facing and internal)

Deprecated Programmatic APIs

New REST APIs

PathMethodDescriptionResponse CodeResponse
/v3/apps/<app-id>GETReturns the application spec for a given application

200 - On success

404 - When application is not available

500 - Any internal errors

 

     

Deprecated REST API

PathMethodDescription
/v3/apps/<app-id>GETReturns the application spec for a given application

CLI Impact or Changes

  • Impact #1
  • Impact #2
  • Impact #3

UI Impact or Changes

  • Impact #1
  • Impact #2
  • Impact #3

Security Impact 

What's the impact on Authorization and how does the design take care of this aspect

Impact on Infrastructure Outages 

System behavior (if applicable - document impact on downstream [ YARN, HBase etc ] component failures) and how does the design take care of these aspect

Test Scenarios

Test IDTest DescriptionExpected Results
   
   
   
   

Releases

Release X.Y.Z

Release X.Y.Z

Related Work

  • Work #1
  • Work #2
  • Work #3

 

Future work

  • No labels