Datasets Design 3.5.0

 

 

Goals

  1. Dataset interfaces: @Read, @Write, @ReadWrite
  2. Dataset admin APIs: "Updatable"
  3. Schema as a system property
  4. Better transaction control for programs: timeout, long tx, no tx
  5. Read-only transactions (not exposed to programs yet)

Checklist

  • User stories documented (AndreasTerence)
  • User stories reviewed (Nitin)
  • Design documented (Andreas/Terence)
  • Design reviewed (?)
  • Feature merged (Andreas/Terence)
  • Examples and guides (Andreas/Terence)
  • Integration tests (Ali) 
  • Documentation for feature (Andreas)
  • Blog post 

User Stories

  1. As a CDAP admin, I want to restrict access to datasets based on the access type (read or write)
  2. As a developer, I want to develop a custom dataset and indicate the access type of its methods. 
  3. As a CDAP admin, I want to see lineage and audit logs that distinguish between read and write access.
  4. As a developer, I want to update the schema of a dataset. I expect that if the new schema is not compatible with the existing schema, the update will fail
  5. As a developer, I want to implement a compatibility check that allows to reject an incompatible update of dataset properties. This check is a method that has access to the existing and the new properties. 
  6. As a developer, I want to be able to update the properties of a dataset as part of redeploying my application. If the update is incompatible, I expect the redeploy to fail.
  7. As a developer, if I deploy an application or data pipeline that attempts to create a dataset, I expect the deploy to fail if the dataset already exists and has incompatible properties.
  8. As a developer, I want to create a dataset with a schema, without having to configure the schema for every subsystem that is involved (for example, Hadoop input format, Hive, etc.) 
  9. As a developer, I need to perform some data maintenance in a custom workflow action, a worker, a flow, or a service handler. I need a way to control the transaction timeout, or have no timeout. 
  10. As a developer, I want to have way to access to dataset and execute transactional code asynchronously, especially for long running programs (worker, flowlet, server).
  11. As a developer, I want to access a non-transactional dataset (for example, a file set), without starting a transaction.
  12. As a CDAP system developer, I want to reduce the footprint of CDAP services on the transaction system, by using read-only transactions for lookups. 

Design

1. Updatable Datasets

In the current dataset framework (pre-3.5.0), there is a bit of confusion about dataset update and upgrade. We will use the following terminology:

  • Update means reconfiguring an existing dataset with new properties, for example, changing the TTL or adding a field to the schema. 
  • Upgrade means migration of an existing dataset after a new version of the dataset type (code) was deployed. Currently this is only supported after an upgrade of CDAP, as part of running the upgrade tool.

In the current code base, both update and upgrade are handled by the same method in DatasetAdmin, namely upgrade(). This method, however, does not have access to the previously existing dataset properties or spec. Therefore, in the case of update, it is hard for it to decide whether the new properties are compatible with the existing dataset, and what actions it needs to perform for the update. Therefore, we will introduce a new method update(), which receives the existing dataset spec as a parameter. For modularity, we will not add this method to the DatasetAdmin interface, but instead introduce a new interface Updateable that can be implemented by dataset admins that need to perform actions when the properties change. This will be most if not all system-defined dataset types. 

/**
 * Interface implemented by DatasetAdmin that have a way to update the dataset after reconfiguration.
 * It is optional for a DatasetAdmin to implement this interface; if not implemented, the dataset system
 * will assume that this dataset does need any actions to be performed on update (other than updating its spec).
 */
public interface Updatable {

  /**
   * Updates the dataset instance after it has been reconfigured. This method that will be called during
   * dataset update, on a {@link DatasetAdmin} that was created using the new dataset spec, before that
   * new spec is saved. That is, if this method fails, then the update fails and no dataset metadata has
   * been changed.
   *
   * @param oldSpec the specification of the dataset before reconfiguration
   */
  void update(DatasetSpecification oldSpec) throws IOException;
}

Because this is a method implemented by DatasetAdmin, and that can only be instantiated from a dataset spec, we also need a way to create the spec from the new dataset properties. Current code does this by calling DatasetDefinition.configure(), exactly in the same way as when the dataset is first created. We will change that and introduce a method reconfigure() that dataset definitions can optionally implement. This method has access to previous dataset properties, so that it can validate compatibility of the new properties and reject the update in case that fails. 

/**
 * Interface implemented by DatasetDefinitions that have a way to reconfigure the dataset properties.
 * It is optional for a dataset type to implement this interface; if not implemented, the dataset system
 * will assume that no compatibility check is required and call
 * {@link DatasetDefinition#configure(String, DatasetProperties)} instead to obtain the new dataset specification.
 */
public interface Reconfigurable {

  /**
   * Validates the new properties, including a compatibility check with the existing spec, and returns a
   * new dataset specification for the dataset instance.
   *
   * @param newProperties the updated dataset properties, to be validated by this method
   * @param currentSpec the current specification of the dataset
   * @return a new dataset specification, created from the new properties
   * @throws IncompatibleUpdateException if the new properties are not compatible with the existing dataset
   */
  DatasetSpecification reconfigure(String instanceName,
                                   DatasetProperties newProperties,
                                   DatasetSpecification currentSpec)
    throws IncompatibleUpdateException;
}

With these two new interfaces, the flow of creating a dataset vs. updating a dataset is very symmetric:

 

 Create

Update

1.spec = def.configure(props)spec = def.reconfigure(oldProps, newProps)
2. disableExplore(datasetName)
3.admin = def.getDatasetAdmin(spec)
4.admin.create()admin.update(oldSpec)
5.datasetMDS.save(spec)
6.enableExplore(datasetName)

 

There are three ways to update a dataset (instance): 

  1. use the REST API: PUT .../datasets/{instance-name}/properties
  2. use the CLI: set dataset instance properties "{ ... }" 
  3. redeploy the app that created the dataset, with a new createDataset() that passes in updated properties. If these properties are no compatible (that is, reconfigure() fails), or the update of physical storage fails (that is, update() fails), then the dataset update fails and so does the redeploy of the app. The current system property dataset.unchecked.upgrade becomes obsolete, because on app deployment, we will always upgrade the dataset code and update ithe dataset's  properties. This is now feasible, because a dataset definition can reject an incompatible change, and a dataset admin can fail to update the dataset if it detects that the code is incompatible.  

The existing method DatasetAdmin.upgrade() will now only be used to upgrade to a newer version of CDAP. Note that of all existing dataset types in CDAP, only HBaseTable actually implements upgrade(), and that is idempotent if applied to a table without change of properties. 

Dataset upgrade poses another interesting question: Currently, a dataset type (that is, the code that implements the dataset) cannot be upgraded at all - unless the property "dataset.unchecked.upgrade" is set to true in cdap-site.xml, and then dataset upgrade happens without any checks. However, that is a global setting, and enabling it will give up all control over dataset upgrade. To improve this, let's look at the two major use cases for dataset upgrade:

  1. An application bundles a dataset type, and when a new version of the application is deployed, the expectation is that its dataset types are also upgraded. Therefore, dataset upgrade should happen as part of application deployment in the same implicit way as creation of the dataset type upon initial deployment.  
  2. A dataset module was deployed without an application, and the developer/admin wishes to upgrade it. This is an explicit request to upgrade, and we should allow it. The tricky part here is that there may be two modules that contain a dataset type of the same name. In that case, upgrading one module would upgrade a dataset type created by the other module, and there is high risk that these two definitions are completely unrelated and incompatible; in fact it is likely that the two modules just happened to use the same name for their different dataset types. Note that this is a problem not limited to upgrade: it applies equally to initial deployment of a dataset module. We need to disallow overlapping (that is, conflicting) dataset types between different modules. 

Therefore we will pursue the following strategy: 

  • An attempt to deploy or upgrade a dataset module will fail with "409 Conflict" if any of its dataset types already exist as part of a different module;
  • Upgrading a dataset module will result in removal of the types that were previously in the module and are not present in the new version;
  • To allow consolidation of multiple dataset modules into one, we will have a "force=true" flag to override the conflict check; 
  • With that, we do not need the global unchecked upgrade setting any more. 

2. Authorization, Lineage and Usage Tracking on Dataset Operations

In the current dataset framework (< 3.5.0), Authorization, Lineage and Usage (ALU) are only enforced/tracked at the dataset instance level, which is an all or nothing approach. This restricts the capabilities of Datasets in secure environment and CDAP is not able to capture complete lineage/usage information. Starting from 3.5.0, we would like to support per data operation ALU.

We will introduce new annotations in cdap-api for datasets to annotate constructors/methods:

  • @ReadOnly - Method only performs read operations. Read access on the dataset instance is needed.
  • @WriteOnly - Method only performs write operations. Write access on the dataset instance is needed.
  • @ReadWrite - Method performs both read and write operations. Both read and write access on the dataset instance are needed.
  • @NoAccess - Method doesn't perform any read/write operations.

These annotations are for the datasets developers to annotate their datasets methods to have fine grain control on how their datasets can be used under different contexts.

Implementation

Basis

A new internal class, DatasetRuntimeContext, will be introduced for recording dataset call stack in order to perform ALU operations. That class needs to be in the cdap-api module, since it needs to be callable from any dataset, including custom dataset. The class looks like this:

public abstract class DatasetRuntimeContext {
  private static final ThreadLocal<DatasetRuntimeContext> CONTEXT_THREAD_LOCAL = new InheritableThreadLocal<>();
 
  /**
   * Return the context from the thread local
   */
  public static DatasetRuntimeContext getContext() {
    ...
  }
 
  /**
   * Set the context to the thread local. This method can only be called from
   * CDAP system. Callstack will be check to make sure this call is initiated from
   * a class that is loaded by the system classloader.
   */
  public static void setContext(DatasetRuntimeContext context) {
    ...
  }
 
  /**
   * Dataset methods should call this method first, which the annotation that represents
   * the type of operations this method is going to do.
   */
  public abstract void onMethodEntry(Class<? extends Annotation> annotation);
 
  /**
   * Dataset methods should call this method before the method exit. The main purpose of this
   * method is to pop the call stack.
   */
  public abstract void onMethodExit();
 
  /**
   * A security manager to help acquire the call stack.
   */
  private static final class CallerClassSecurityManager extends SecurityManager {
    private static final CallerClassSecurityManager INSTANCE = new CallerClassSecurityManager();
    static Class[] getCallerClasses() {
      return INSTANCE.getClassContext();
    }
  }
}

The DatasetFramework will set the context before calling the DatasetDefinition.getDataset method. In the Dataset constructor, it is expected that it will get hold of the DatasetRuntimeContext instance and store it in a field and use it on each method. This is the pattern on how to use the context:

public class KeyValueTable extends AbstractDataset {
 
  private final Table table;
  private final DatasetRuntimeContext runtimeContext;

  public KeyValueTable(String instanceName, Table table) {
    super(instanceName, table);
    this.runtimeContext = DatasetRuntimeContext.getContext();
    runtimeContext.onMethodEnter(NoAccess.class);
    try {
      this.table = table;
    } finally {
      runtimeContext.onMethodExit();
    }
  }


  @Nullable
  public byte[] read(String key) {
    runtimeContext.onMethodEntry(ReadOnly.class);
    try {
      return read(Bytes.toBytes(key));
    } finally {
      runtimeContext.onMethodExit();
    }
  }


  public void write(byte[] key, byte[] value) {
    runtimeContext.onMethodEntry(ReadWriter.class);
    try {
      this.table.put(key, KEY_COLUMN, value);
    } finally {
      runtimeContext.onMethodExit();
    }
  }
}

Code Generation

Since the call to the onMethodEntry and onMethodExit is required for every methods on a dataset class, it is unrealistic to require dataset developer to do that by themselves (error-prone and untrusted). Since every custom dataset is loaded through a custom classloader (ProgramClassLoader), we can rewrite the bytecode during classloading to insert calls to those two methods.

Call Stack

A method can have different entry points:

  1. Non-private constructor called from DatasetDefinition.
  2. Non-private method called from program.
  3. Non-private method called from another Dataset (embedded Dataset case).
  4. Constructor/method called from another constructor/method from the same (sub)class.

In all above cases, we can generalize constructors/methods calls to a Dataset into two cases

  1. The method is the first entry point for dataset operation (point 1 and 2 above).
    • ALU operations are pretty straightforward. It will base on the annotation. E.g. if annotated with @ReadOnly, then consult the AuthorizationEnforcer for the READ action on the current dataset from the current Principal.
      • If a constructor is not annotated, default to @NoAccess
      • If a method is not annotated, default to @ReadWrite

  2. The method is not the first entry point and there is already an operation scope defined (point 3 and 4 above).
    • Through the DatasetRuntimeContext, the call stack is tracked
    • If the current method annotation is not the same or a proper subset of the immediate parent, the onMethodEnter call will fail with an exception
      • @NoAccess is a proper subset of all others
      • @ReadOnly is a proper subset of @ReadWrite
      • @WriteOnly is a proper subset of @ReadWrite
      • @ReadWrite is not a proper subset of any.
    • For unannotated constructor/method, it will default to the same annotation as the immediate parent.
    • ALU operations are performed base on the annotation.

Through the onMethodEnter and onMethodExit call, we push and pop the call stack by recording the annotation of the method.

Â