代码:
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
import org.apache.kudu.Type;
import org.apache.kudu.client.*;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
public class TestKudu {
// 声明全局变量方便后续增删改查操作
private KuduClient kuduClient;
// kudu Master地址
private String kuduMaster;
// kudu table name
private String kuduTableName;
// TODO 初始化方法用于kudu集群建立连接
@Before
public void init() {
// 指定待操作的table名
kuduTableName = "student1";
// 指定kudu集群master地址
kuduMaster = "cdh1.macro.com:7051,cdh2.macro.com:7051,cdh3.macro.com:7051";
KuduClient.KuduClientBuilder kuduClientBuilder = new KuduClient.KuduClientBuilder(kuduMaster);
// 设置超时时间
kuduClientBuilder.defaultSocketReadTimeoutMs(6000);
// 通过builder中的build方法创建kuduclient
kuduClient = kuduClientBuilder.build();
}
@Test
public void createTable() throws KuduException {
// TODO 判断表是否存在
if (!kuduClient.tableExists(kuduTableName)) {
// 指定表的schema信息
ArrayList<ColumnSchema> columnSchemas = new ArrayList<ColumnSchema>();
// 添加字段的schema信息
columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("id", Type.INT32).key(true).build());
columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("name", Type.STRING).nullable(true).build());
columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("sex", Type.INT32).nullable(true).build());
columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("age", Type.INT32).nullable(true).build());
Schema schema = new Schema(columnSchemas);
// 表属性信息(指定表分区规则,采用hash分区,根据id哈希指定的6个部分中)
CreateTableOptions cto = new CreateTableOptions();
List<String> hashKeys = new ArrayList<String>(1);
hashKeys.add("id");
int numBuckets = 8;
cto.addHashPartitions(hashKeys, numBuckets);
// 建表
kuduClient.createTable(kuduTableName, schema, cto);
System.out.println("Created table " + kuduTableName);
}
}
@Test
public void insertData() throws KuduException {
// 向表加载数据需要一个kuduSession对象
KuduSession kuduSession = kuduClient.newSession();
// 设置提交数据为自动flush
kuduSession.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC);// 设置数据更新类型为同步增加
// 打开本次操作的表名
KuduTable kuduTable = kuduClient.openTable(kuduTableName);
for (int i = 1; i <= 10; i++) {
// 需要使用kuduTable来构建Operation的子类实例对象,此处是insert
Insert insert = kuduTable.newInsert();
PartialRow row = insert.getRow();
row.addInt("id", i);
row.addString("name", "zhangsan-" + i);
row.addInt("sex", i % 2);
row.addInt("age", 20 + i);
kuduSession.apply(insert);
}
}
@Test
public void queryData() throws KuduException {
// 构建一个查询的扫描器
KuduScanner.KuduScannerBuilder kuduScannerBuilder = kuduClient.newScannerBuilder(kuduClient.openTable(kuduTableName));
// 创建集合 用于存储扫描字段的信息
List<String> columnsList = new ArrayList<String>();
columnsList.add("id");
columnsList.add("name");
columnsList.add("sex");
columnsList.add("age");
kuduScannerBuilder.setProjectedColumnNames(columnsList);
// 调用build方法执行数据的少秒得到返回结果
KuduScanner kuduScanner = kuduScannerBuilder.build();
// 遍历
while (kuduScanner.hasMoreRows()) { //判断扫描器中是否有更多结果
RowResultIterator rowResults = kuduScanner.nextRows();// 取出每一行
while (rowResults.hasNext()) {// 判断是否有下一行
RowResult row = rowResults.next();// 取出每一行
int id = row.getInt("id");
String name = row.getString("name");
int sex = row.getInt("sex");
int age = row.getInt("age");
System.out.print("id:" + id);
System.out.print(" name:" + name);
System.out.print(" sex:" + sex);
System.out.println(" age:" + age);
}
}
}
@Test
public void updateData() throws KuduException {
// 向表加载数据需要一个kuduSession对象
KuduSession kuduSession = kuduClient.newSession();
// 设置提交数据为自动flush
kuduSession.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC);// 设置数据更新类型为同步增加
// 打开本次操作的表名
KuduTable kuduTable = kuduClient.openTable(kuduTableName);
Upsert upsert = kuduTable.newUpsert(); // kuduTable.newUpsert 插入和更新 如果指定数据存在,就更新,不存在就插入
PartialRow row = upsert.getRow();
row.addInt("id", 1);
row.addString("name", "李四");
row.addInt("sex", 1);
row.addInt("age", 50);
kuduSession.apply(upsert);
}
@Test
public void deleteDate() throws KuduException {
// 向表加载数据需要一个kuduSession对象
KuduSession kuduSession = kuduClient.newSession();
// 设置提交数据为自动flush
kuduSession.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC);// 设置数据更新类型为同步增加
// 打开本次操作的表名
KuduTable kuduTable = kuduClient.openTable(kuduTableName);
Delete delete = kuduTable.newDelete(); // 构建update对象,用于删除数据
PartialRow row = delete.getRow();
row.addInt("id", 1);
kuduSession.apply(delete);
}
@Test
public void deleteTable() throws KuduException {
if (kuduClient.tableExists(kuduTableName)) {
kuduClient.deleteTable(kuduTableName);
}
}
@After
public void close() throws KuduException {
// 如果客户端未关闭执行close操作
if (kuduClient != null) {
kuduClient.close();
}
}
/*
* kudu 分区方式:为了提高可扩展性,kudu表被划分为table的单元,并分布在许多的tablet servers
* 上。行总是属于单个tablet。将行分配给tablet的方法由在表创建期间设置的表的分区决定的。
* 提供了三种分区方式。
*/
// TODO Range Partitioning (范围分区)
// 范围分区可以根据存入数据的数据量,均衡到各个机器上,防止机器出现负载不均衡现象
@Test
public void testRangePartitions() throws KuduException {
// TODO 判断表是否存在
if (!kuduClient.tableExists("t_range_partition")) {
// 指定表的schema信息
ArrayList<ColumnSchema> columnSchemas = new ArrayList<ColumnSchema>();
// 添加字段的schema信息
columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("id", Type.INT32).key(true).build());
columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("name", Type.STRING).nullable(true).build());
columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("sex", Type.INT32).nullable(true).build());
columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("age", Type.INT32).nullable(true).build());
Schema schema = new Schema(columnSchemas);
// 表属性信息(指定表分区规则,采用hash分区,根据id哈希指定的6个部分中)
CreateTableOptions cto = new CreateTableOptions();
// 指定用于分区的字段
List<String> partitionList = new ArrayList<String>(1);
partitionList.add("id");
cto.setRangePartitionColumns(partitionList);
// 指定分区策略
int count = 0;
for (int i = 0; i < 5; i++) {
PartialRow lower = schema.newPartialRow();//分区上界
lower.addInt("id", count);
count += 10;
PartialRow upper = schema.newPartialRow();// range分区下界
upper.addInt("id",count);
cto.addRangePartition(lower, upper);// 添加
}
// 建表
kuduClient.createTable("t_range_partition", schema, cto);
System.out.println("Created table " + "t_range_partition");
}
}
// TODO Hash Partitioning(hash分区)
/* hash分区通过hash值将行分配到许多的buckets(存储捅)之一:
hash分区是一种有效的策略,当不需要对表进行有序访问时。hash分区对于
在tablet之间随机分布这些功能时有效的,这有助于减轻热点和tablet大小不均匀
*/
@Test
public void testHashPartitions() throws KuduException {
// TODO 判断表是否存在
if (!kuduClient.tableExists("t_hash_partition")) {
// 指定表的schema信息
ArrayList<ColumnSchema> columnSchemas = new ArrayList<ColumnSchema>();
// 添加字段的schema信息
columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("id", Type.INT32).key(true).build());
columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("name", Type.STRING).nullable(true).build());
columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("sex", Type.INT32).nullable(true).build());
columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("age", Type.INT32).nullable(true).build());
Schema schema = new Schema(columnSchemas);
// 表属性信息(指定表分区规则,采用hash分区,根据id哈希指定的6个部分中)
CreateTableOptions cto = new CreateTableOptions();
// 指定用于分区的字段
List<String> hashKeys = new ArrayList<String>(1);
hashKeys.add("id");
int numBuckets = 6;
cto.addHashPartitions(hashKeys, numBuckets);
// 建表
kuduClient.createTable("t_hash_partition", schema, cto);
System.out.println("Created table " + kuduTableName);
}
}
// TODO Multilevel Partitioning (多级分区) hash + range
/* kudu允许一个表在单个表上组合多级分区。
当正确使用时,多级分区可以保留各个分区类型的优点,
同时减少每个分区的缺点*/
@Test
public void testMultilevelPartitions() throws KuduException {
// TODO 判断表是否存在
if (!kuduClient.tableExists("t_multilevel_partition")) {
// 指定表的schema信息
ArrayList<ColumnSchema> columnSchemas = new ArrayList<ColumnSchema>();
// 添加字段的schema信息
columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("id", Type.INT32).key(true).build());
columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("name", Type.STRING).nullable(true).build());
columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("sex", Type.INT32).nullable(true).build());
columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("age", Type.INT32).nullable(true).build());
Schema schema = new Schema(columnSchemas);
// 表属性信息(指定表分区规则,采用hash分区,根据id哈希指定的6个部分中)
CreateTableOptions cto = new CreateTableOptions();
// 指定用于分区的字段
List<String> partitionList = new ArrayList<String>(1);
partitionList.add("id");
// hash partition
cto.addHashPartitions(partitionList, 6);
cto.setRangePartitionColumns(partitionList);
// 指定分区策略
int count = 0;
for (int i = 0; i < 5; i++) {
PartialRow lower = schema.newPartialRow();//分区上界
lower.addInt("id", count);
count += 10;
PartialRow upper = schema.newPartialRow();// range分区下界
upper.addInt("id",count);
cto.addRangePartition(lower, upper);// 添加
}
// 建表
kuduClient.createTable("t_multilevel_partition", schema, cto);
System.out.println("Created table " + "t_multilevel_partition");
}
}
}