Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 2 Current »

This document provides the sample implementation of HBaseDDLExecutor. HBaseDDLExecutorImpl class performs the DDL operations on the local and peer cluster.
  1. Implementation of HBaseDDLExecutor.

    /*
     * Copyright © 2017 Cask Data, Inc.
     *
     * Licensed under the Apache License, Version 2.0 (the "License"); you may not
     * use this file except in compliance with the License. You may obtain a copy of
     * the License at
     *
     * http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
     * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
     * License for the specific language governing permissions and limitations under
     * the License.
     */
    package com.example.hbase.ddlexecutor;
    
    import co.cask.cdap.spi.hbase.ColumnFamilyDescriptor;
    import co.cask.cdap.spi.hbase.CoprocessorDescriptor;
    import co.cask.cdap.spi.hbase.HBaseDDLExecutor;
    import co.cask.cdap.spi.hbase.HBaseDDLExecutorContext;
    import co.cask.cdap.spi.hbase.TableDescriptor;
    import com.google.common.base.Preconditions;
    import com.google.common.base.Stopwatch;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.hbase.HColumnDescriptor;
    import org.apache.hadoop.hbase.HConstants;
    import org.apache.hadoop.hbase.HTableDescriptor;
    import org.apache.hadoop.hbase.NamespaceDescriptor;
    import org.apache.hadoop.hbase.NamespaceNotFoundException;
    import org.apache.hadoop.hbase.TableExistsException;
    import org.apache.hadoop.hbase.TableName;
    import org.apache.hadoop.hbase.TableNotDisabledException;
    import org.apache.hadoop.hbase.TableNotEnabledException;
    import org.apache.hadoop.hbase.client.HBaseAdmin;
    import org.apache.hadoop.hbase.io.compress.Compression;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.IOException;
    import java.io.StringWriter;
    import java.io.UnsupportedEncodingException;
    import java.net.URLEncoder;
    import java.util.Map;
    import java.util.concurrent.TimeUnit;
    
    /**
     * Sample implementation of {@link HBaseDDLExecutor} for HBase version 1.0.0-cdh5.5.1
     */
    public class HBaseDDLExecutorImpl implements HBaseDDLExecutor {
      public static final Logger LOG = LoggerFactory.getLogger(HBaseDDLExecutorImpl.class);
      private HBaseAdmin admin;
      private HBaseAdmin peerAdmin;
    
      /**
       * Encode a HBase entity name to ASCII encoding using {@link URLEncoder}.
       *
       * @param entityName entity string to be encoded
       * @return encoded string
       */
      private String encodeHBaseEntity(String entityName) {
        try {
          return URLEncoder.encode(entityName, "ASCII");
        } catch (UnsupportedEncodingException e) {
          // this can never happen - we know that ASCII is a supported character set!
          throw new RuntimeException(e);
        }
      }
    
      public void initialize(HBaseDDLExecutorContext context) {
        LOG.info("Initializing executor with properties {}", context.getProperties());
        try {
          Configuration conf = context.getConfiguration();
          this.admin = new HBaseAdmin(conf);
    
          Configuration peerConf = generatePeerConfig(context);
          this.peerAdmin = new HBaseAdmin(peerConf);
        } catch (IOException e) {
          throw new RuntimeException("Failed to create the HBaseAdmin", e);
        }
      }
    
      private boolean hasNamespace(String name) throws IOException {
        Preconditions.checkArgument(admin != null, "HBaseAdmin should not be null");
        Preconditions.checkArgument(name != null, "Namespace should not be null.");
        try {
          admin.getNamespaceDescriptor(encodeHBaseEntity(name));
          return true;
        } catch (NamespaceNotFoundException e) {
          return false;
        }
      }
    
      public boolean createNamespaceIfNotExists(String name) throws IOException {
        Preconditions.checkArgument(name != null, "Namespace should not be null.");
        if (hasNamespace(name)) {
          return false;
        }
        NamespaceDescriptor namespaceDescriptor =
          NamespaceDescriptor.create(encodeHBaseEntity(name)).build();
        admin.createNamespace(namespaceDescriptor);
        peerAdmin.createNamespace(namespaceDescriptor);
        return true;
      }
    
      public void deleteNamespaceIfExists(String name) throws IOException {
        Preconditions.checkArgument(name != null, "Namespace should not be null.");
        if (hasNamespace(name)) {
          admin.deleteNamespace(encodeHBaseEntity(name));
          peerAdmin.deleteNamespace(encodeHBaseEntity(name));
        }
      }
    
      public void createTableIfNotExists(TableDescriptor descriptor, byte[][] splitKeys) throws IOException {
        createTableIfNotExists(getHTableDescriptor(descriptor), splitKeys);
      }
    
      private void createTableIfNotExists(HTableDescriptor htd, byte[][] splitKeys) throws IOException {
        if (admin.tableExists(htd.getName())) {
          return;
        }
    
        try {
          admin.createTable(htd, splitKeys);
          peerAdmin.createTable(htd, splitKeys);
          LOG.info("Table created '{}'", Bytes.toString(htd.getName()));
        } catch (TableExistsException e) {
          // table may exist because someone else is creating it at the same
          // time. But it may not be available yet, and opening it might fail.
          LOG.info("Table '{}' already exists.", Bytes.toString(htd.getName()), e);
        }
    
        // Wait for table to materialize
        try {
          Stopwatch stopwatch = new Stopwatch();
          stopwatch.start();
          long sleepTime = TimeUnit.MILLISECONDS.toNanos(5000L) / 10;
          sleepTime = sleepTime <= 0 ? 1 : sleepTime;
          do {
            if (admin.tableExists(htd.getName())) {
              LOG.debug("Table '{}' exists now. Assuming that another process concurrently created it.",
                        Bytes.toString(htd.getName()));
              return;
            } else {
              TimeUnit.NANOSECONDS.sleep(sleepTime);
            }
          } while (stopwatch.elapsedTime(TimeUnit.MILLISECONDS) < 5000L);
        } catch (InterruptedException e) {
          LOG.warn("Sleeping thread interrupted.");
        }
        LOG.error("Table '{}' does not exist after waiting {} ms. Giving up.", Bytes.toString(htd.getName()), 5000L);
      }
    
      public void enableTableIfDisabled(String namespace, String name) throws IOException {
        Preconditions.checkArgument(namespace != null, "Namespace should not be null");
        Preconditions.checkArgument(name != null, "Table name should not be null.");
    
        try {
          admin.enableTable(TableName.valueOf(namespace, encodeHBaseEntity(name)));
          peerAdmin.enableTable(TableName.valueOf(namespace, encodeHBaseEntity(name)));
        } catch (TableNotDisabledException e) {
          LOG.debug("Attempt to enable already enabled table {} in the namespace {}.", name, namespace);
        }
      }
    
      public void disableTableIfEnabled(String namespace, String name) throws IOException {
        Preconditions.checkArgument(namespace != null, "Namespace should not be null");
        Preconditions.checkArgument(name != null, "Table name should not be null.");
    
        try {
          admin.disableTable(TableName.valueOf(namespace, encodeHBaseEntity(name)));
          peerAdmin.disableTable(TableName.valueOf(namespace, encodeHBaseEntity(name)));
        } catch (TableNotEnabledException e) {
          LOG.debug("Attempt to disable already disabled table {} in the namespace {}.", name, namespace);
        }
      }
    
      public void modifyTable(String namespace, String name, TableDescriptor descriptor) throws IOException {
        Preconditions.checkArgument(namespace != null, "Namespace should not be null");
        Preconditions.checkArgument(name != null, "Table name should not be null.");
        Preconditions.checkArgument(descriptor != null, "Descriptor should not be null.");
    
        HTableDescriptor htd = getHTableDescriptor(descriptor);
        admin.modifyTable(htd.getTableName(), htd);
        peerAdmin.modifyTable(htd.getTableName(), htd);
      }
    
      public void truncateTable(String namespace, String name) throws IOException {
        Preconditions.checkArgument(namespace != null, "Namespace should not be null");
        Preconditions.checkArgument(name != null, "Table name should not be null.");
    
        HTableDescriptor descriptor = admin.getTableDescriptor(TableName.valueOf(namespace, encodeHBaseEntity(name)));
        disableTableIfEnabled(namespace, name);
        deleteTableIfExists(namespace, name);
        createTableIfNotExists(descriptor, null);
      }
    
      public void deleteTableIfExists(String namespace, String name) throws IOException {
        Preconditions.checkArgument(namespace != null, "Namespace should not be null");
        Preconditions.checkArgument(name != null, "Table name should not be null.");
    
        admin.deleteTable(TableName.valueOf(namespace, encodeHBaseEntity(name)));
        peerAdmin.deleteTable(TableName.valueOf(namespace, encodeHBaseEntity(name)));
      }
    
      @Override
      public void grantPermissions(String s, String s1, Map<String, String> map) throws IOException {
        // no-op
      }
    
      public void close() throws IOException {
        if (admin != null) {
          admin.close();
        }
        if (peerAdmin != null) {
          peerAdmin.close();
        }
      }
    
      /**
       * Converts the {@link ColumnFamilyDescriptor} to the {@link HColumnDescriptor} for admin operations.
       * @param ns the namespace for the table
       * @param tableName the name of the table
       * @param descriptor descriptor of the column family
       * @return the instance of HColumnDescriptor
       */
      private static HColumnDescriptor getHColumnDesciptor(String ns, String tableName,
                                                           ColumnFamilyDescriptor descriptor) {
        HColumnDescriptor hFamily = new HColumnDescriptor(descriptor.getName());
        hFamily.setMaxVersions(descriptor.getMaxVersions());
        hFamily.setCompressionType(Compression.Algorithm.valueOf(descriptor.getCompressionType().name()));
        hFamily.setBloomFilterType(org.apache.hadoop.hbase.regionserver.BloomType.valueOf(
          descriptor.getBloomType().name()));
        for (Map.Entry<String, String> property : descriptor.getProperties().entrySet()) {
          hFamily.setValue(property.getKey(), property.getValue());
        }
        LOG.info("Setting replication scope to global for ns {}, table {}, cf {}", ns, tableName, descriptor.getName());
        hFamily.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
        return hFamily;
      }
    
      /**
       * Converts the {@link TableDescriptor} into corresponding {@link HTableDescriptor} for admin operations.
       * @param descriptor the table descriptor instance
       * @return the instance of HTableDescriptor
       */
      private static HTableDescriptor getHTableDescriptor(TableDescriptor descriptor) {
        TableName tableName = TableName.valueOf(descriptor.getNamespace(), descriptor.getName());
        HTableDescriptor htd = new HTableDescriptor(tableName);
        for (Map.Entry<String, ColumnFamilyDescriptor> family : descriptor.getFamilies().entrySet()) {
          htd.addFamily(getHColumnDesciptor(descriptor.getNamespace(), descriptor.getName(), family.getValue()));
        }
    
        for (Map.Entry<String, CoprocessorDescriptor> coprocessor : descriptor.getCoprocessors().entrySet()) {
          CoprocessorDescriptor cpd = coprocessor.getValue();
          try {
            htd.addCoprocessor(cpd.getClassName(), new Path(cpd.getPath()), cpd.getPriority(), cpd.getProperties());
          } catch (IOException e) {
            LOG.error("Error adding coprocessor.", e);
          }
        }
    
        for (Map.Entry<String, String> property : descriptor.getProperties().entrySet()) {
          htd.setValue(property.getKey(), property.getValue());
        }
        return htd;
      }
    
      /**
       * Generate the peer configuration which is used to perform DDL operations on the remote cluster using Admin
       * @param context instance of {@link HBaseDDLExecutorContext} with which the DDL executor is initialized
       * @return the {@link Configuration} to be used for DDL operations on the remote cluster
       */
      private static Configuration generatePeerConfig(HBaseDDLExecutorContext context) {
        Configuration peerConf = new Configuration();
        peerConf.clear();
    
        for (Map.Entry<String, String> entry : context.getProperties().entrySet()) {
          peerConf.set(entry.getKey(), entry.getValue());
        }
    
        StringWriter sw = new StringWriter();
        try {
          Configuration.dumpConfiguration(peerConf, sw);
          LOG.debug("PeerConfig - {}", sw);
        } catch (IOException e) {
          LOG.error("Error dumping config.", e);
        }
        return peerConf;
      }
    }
    
    
  2. Corresponding pom.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
    
      <groupId>co.cask.cdap</groupId>
      <artifactId>HBaseDDLExecutorExtension</artifactId>
      <version>1.0-SNAPSHOT</version>
    
      <name>HBase DDL executor</name>
      <properties>
        <cdap.version>4.1.0-SNAPSHOT</cdap.version>
        <slf4j.version>1.7.5</slf4j.version>
      </properties>
    
      <repositories>
        <repository>
          <id>sonatype</id>
          <url>https://oss.sonatype.org/content/groups/public</url>
        </repository>
        <repository>
          <id>apache.snapshots</id>
          <url>https://repository.apache.org/content/repositories/snapshots</url>
        </repository>
        <repository>
          <id>cloudera</id>
          <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
      </repositories>
    
      <dependencies>
        <dependency>
          <groupId>co.cask.cdap</groupId>
          <artifactId>cdap-hbase-spi</artifactId>
          <version>${cdap.version}</version>
          <scope>provided</scope>
        </dependency>
        <dependency>
          <groupId>org.apache.hbase</groupId>
          <artifactId>hbase-client</artifactId>
          <version>1.0.0-cdh5.5.1</version>
          <scope>provided</scope>
        </dependency>
        <dependency>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-api</artifactId>
          <version>${slf4j.version}</version>
        </dependency>
        <dependency>
          <groupId>org.slf4j</groupId>
          <artifactId>log4j-over-slf4j</artifactId>
          <version>${slf4j.version}</version>
        </dependency>
        <dependency>
          <groupId>org.slf4j</groupId>
          <artifactId>jcl-over-slf4j</artifactId>
          <version>${slf4j.version}</version>
        </dependency>
      </dependencies>
    
    </project>

 

 

  • No labels