代码:
import org.apache.kudu.ColumnSchema;import org.apache.kudu.Schema;import org.apache.kudu.Type;import org.apache.kudu.client.*;import org.junit.After;import org.junit.Before;import org.junit.Test;import java.util.ArrayList;import java.util.List;public class TestKudu {// 声明全局变量方便后续增删改查操作private KuduClient kuduClient;// kudu Master地址private String kuduMaster;// kudu table nameprivate String kuduTableName;// TODO 初始化方法用于kudu集群建立连接@Beforepublic void init() {// 指定待操作的table名kuduTableName = "student1";// 指定kudu集群master地址kuduMaster = "cdh1.macro.com:7051,cdh2.macro.com:7051,cdh3.macro.com:7051";KuduClient.KuduClientBuilder kuduClientBuilder = new KuduClient.KuduClientBuilder(kuduMaster);// 设置超时时间kuduClientBuilder.defaultSocketReadTimeoutMs(6000);// 通过builder中的build方法创建kuduclientkuduClient = kuduClientBuilder.build();}@Testpublic void createTable() throws KuduException {// TODO 判断表是否存在if (!kuduClient.tableExists(kuduTableName)) {// 指定表的schema信息ArrayList<ColumnSchema> columnSchemas = new ArrayList<ColumnSchema>();// 添加字段的schema信息columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("id", Type.INT32).key(true).build());columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("name", Type.STRING).nullable(true).build());columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("sex", Type.INT32).nullable(true).build());columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("age", Type.INT32).nullable(true).build());Schema schema = new Schema(columnSchemas);// 表属性信息(指定表分区规则,采用hash分区,根据id哈希指定的6个部分中)CreateTableOptions cto = new CreateTableOptions();List<String> hashKeys = new ArrayList<String>(1);hashKeys.add("id");int numBuckets = 8;cto.addHashPartitions(hashKeys, numBuckets);// 建表kuduClient.createTable(kuduTableName, schema, cto);System.out.println("Created table " + kuduTableName);}}@Testpublic void insertData() throws KuduException {// 向表加载数据需要一个kuduSession对象KuduSession kuduSession = kuduClient.newSession();// 设置提交数据为自动flushkuduSession.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC);// 设置数据更新类型为同步增加// 打开本次操作的表名KuduTable kuduTable = kuduClient.openTable(kuduTableName);for (int i = 1; i <= 10; i++) {// 需要使用kuduTable来构建Operation的子类实例对象,此处是insertInsert insert = kuduTable.newInsert();PartialRow row = insert.getRow();row.addInt("id", i);row.addString("name", "zhangsan-" + i);row.addInt("sex", i % 2);row.addInt("age", 20 + i);kuduSession.apply(insert);}}@Testpublic void queryData() throws KuduException {// 构建一个查询的扫描器KuduScanner.KuduScannerBuilder kuduScannerBuilder = kuduClient.newScannerBuilder(kuduClient.openTable(kuduTableName));// 创建集合 用于存储扫描字段的信息List<String> columnsList = new ArrayList<String>();columnsList.add("id");columnsList.add("name");columnsList.add("sex");columnsList.add("age");kuduScannerBuilder.setProjectedColumnNames(columnsList);// 调用build方法执行数据的少秒得到返回结果KuduScanner kuduScanner = kuduScannerBuilder.build();// 遍历while (kuduScanner.hasMoreRows()) { //判断扫描器中是否有更多结果RowResultIterator rowResults = kuduScanner.nextRows();// 取出每一行while (rowResults.hasNext()) {// 判断是否有下一行RowResult row = rowResults.next();// 取出每一行int id = row.getInt("id");String name = row.getString("name");int sex = row.getInt("sex");int age = row.getInt("age");System.out.print("id:" + id);System.out.print(" name:" + name);System.out.print(" sex:" + sex);System.out.println(" age:" + age);}}}@Testpublic void updateData() throws KuduException {// 向表加载数据需要一个kuduSession对象KuduSession kuduSession = kuduClient.newSession();// 设置提交数据为自动flushkuduSession.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC);// 设置数据更新类型为同步增加// 打开本次操作的表名KuduTable kuduTable = kuduClient.openTable(kuduTableName);Upsert upsert = kuduTable.newUpsert(); // kuduTable.newUpsert 插入和更新 如果指定数据存在,就更新,不存在就插入PartialRow row = upsert.getRow();row.addInt("id", 1);row.addString("name", "李四");row.addInt("sex", 1);row.addInt("age", 50);kuduSession.apply(upsert);}@Testpublic void deleteDate() throws KuduException {// 向表加载数据需要一个kuduSession对象KuduSession kuduSession = kuduClient.newSession();// 设置提交数据为自动flushkuduSession.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC);// 设置数据更新类型为同步增加// 打开本次操作的表名KuduTable kuduTable = kuduClient.openTable(kuduTableName);Delete delete = kuduTable.newDelete(); // 构建update对象,用于删除数据PartialRow row = delete.getRow();row.addInt("id", 1);kuduSession.apply(delete);}@Testpublic void deleteTable() throws KuduException {if (kuduClient.tableExists(kuduTableName)) {kuduClient.deleteTable(kuduTableName);}}@Afterpublic void close() throws KuduException {// 如果客户端未关闭执行close操作if (kuduClient != null) {kuduClient.close();}}/** kudu 分区方式:为了提高可扩展性,kudu表被划分为table的单元,并分布在许多的tablet servers* 上。行总是属于单个tablet。将行分配给tablet的方法由在表创建期间设置的表的分区决定的。* 提供了三种分区方式。*/// TODO Range Partitioning (范围分区)// 范围分区可以根据存入数据的数据量,均衡到各个机器上,防止机器出现负载不均衡现象@Testpublic void testRangePartitions() throws KuduException {// TODO 判断表是否存在if (!kuduClient.tableExists("t_range_partition")) {// 指定表的schema信息ArrayList<ColumnSchema> columnSchemas = new ArrayList<ColumnSchema>();// 添加字段的schema信息columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("id", Type.INT32).key(true).build());columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("name", Type.STRING).nullable(true).build());columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("sex", Type.INT32).nullable(true).build());columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("age", Type.INT32).nullable(true).build());Schema schema = new Schema(columnSchemas);// 表属性信息(指定表分区规则,采用hash分区,根据id哈希指定的6个部分中)CreateTableOptions cto = new CreateTableOptions();// 指定用于分区的字段List<String> partitionList = new ArrayList<String>(1);partitionList.add("id");cto.setRangePartitionColumns(partitionList);// 指定分区策略int count = 0;for (int i = 0; i < 5; i++) {PartialRow lower = schema.newPartialRow();//分区上界lower.addInt("id", count);count += 10;PartialRow upper = schema.newPartialRow();// range分区下界upper.addInt("id",count);cto.addRangePartition(lower, upper);// 添加}// 建表kuduClient.createTable("t_range_partition", schema, cto);System.out.println("Created table " + "t_range_partition");}}// TODO Hash Partitioning(hash分区)/* hash分区通过hash值将行分配到许多的buckets(存储捅)之一:hash分区是一种有效的策略,当不需要对表进行有序访问时。hash分区对于在tablet之间随机分布这些功能时有效的,这有助于减轻热点和tablet大小不均匀*/@Testpublic void testHashPartitions() throws KuduException {// TODO 判断表是否存在if (!kuduClient.tableExists("t_hash_partition")) {// 指定表的schema信息ArrayList<ColumnSchema> columnSchemas = new ArrayList<ColumnSchema>();// 添加字段的schema信息columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("id", Type.INT32).key(true).build());columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("name", Type.STRING).nullable(true).build());columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("sex", Type.INT32).nullable(true).build());columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("age", Type.INT32).nullable(true).build());Schema schema = new Schema(columnSchemas);// 表属性信息(指定表分区规则,采用hash分区,根据id哈希指定的6个部分中)CreateTableOptions cto = new CreateTableOptions();// 指定用于分区的字段List<String> hashKeys = new ArrayList<String>(1);hashKeys.add("id");int numBuckets = 6;cto.addHashPartitions(hashKeys, numBuckets);// 建表kuduClient.createTable("t_hash_partition", schema, cto);System.out.println("Created table " + kuduTableName);}}// TODO Multilevel Partitioning (多级分区) hash + range/* kudu允许一个表在单个表上组合多级分区。当正确使用时,多级分区可以保留各个分区类型的优点,同时减少每个分区的缺点*/@Testpublic void testMultilevelPartitions() throws KuduException {// TODO 判断表是否存在if (!kuduClient.tableExists("t_multilevel_partition")) {// 指定表的schema信息ArrayList<ColumnSchema> columnSchemas = new ArrayList<ColumnSchema>();// 添加字段的schema信息columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("id", Type.INT32).key(true).build());columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("name", Type.STRING).nullable(true).build());columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("sex", Type.INT32).nullable(true).build());columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("age", Type.INT32).nullable(true).build());Schema schema = new Schema(columnSchemas);// 表属性信息(指定表分区规则,采用hash分区,根据id哈希指定的6个部分中)CreateTableOptions cto = new CreateTableOptions();// 指定用于分区的字段List<String> partitionList = new ArrayList<String>(1);partitionList.add("id");// hash partitioncto.addHashPartitions(partitionList, 6);cto.setRangePartitionColumns(partitionList);// 指定分区策略int count = 0;for (int i = 0; i < 5; i++) {PartialRow lower = schema.newPartialRow();//分区上界lower.addInt("id", count);count += 10;PartialRow upper = schema.newPartialRow();// range分区下界upper.addInt("id",count);cto.addRangePartition(lower, upper);// 添加}// 建表kuduClient.createTable("t_multilevel_partition", schema, cto);System.out.println("Created table " + "t_multilevel_partition");}}}
