代码:

    1. import org.apache.kudu.ColumnSchema;
    2. import org.apache.kudu.Schema;
    3. import org.apache.kudu.Type;
    4. import org.apache.kudu.client.*;
    5. import org.junit.After;
    6. import org.junit.Before;
    7. import org.junit.Test;
    8. import java.util.ArrayList;
    9. import java.util.List;
    10. public class TestKudu {
    11. // 声明全局变量方便后续增删改查操作
    12. private KuduClient kuduClient;
    13. // kudu Master地址
    14. private String kuduMaster;
    15. // kudu table name
    16. private String kuduTableName;
    17. // TODO 初始化方法用于kudu集群建立连接
    18. @Before
    19. public void init() {
    20. // 指定待操作的table名
    21. kuduTableName = "student1";
    22. // 指定kudu集群master地址
    23. kuduMaster = "cdh1.macro.com:7051,cdh2.macro.com:7051,cdh3.macro.com:7051";
    24. KuduClient.KuduClientBuilder kuduClientBuilder = new KuduClient.KuduClientBuilder(kuduMaster);
    25. // 设置超时时间
    26. kuduClientBuilder.defaultSocketReadTimeoutMs(6000);
    27. // 通过builder中的build方法创建kuduclient
    28. kuduClient = kuduClientBuilder.build();
    29. }
    30. @Test
    31. public void createTable() throws KuduException {
    32. // TODO 判断表是否存在
    33. if (!kuduClient.tableExists(kuduTableName)) {
    34. // 指定表的schema信息
    35. ArrayList<ColumnSchema> columnSchemas = new ArrayList<ColumnSchema>();
    36. // 添加字段的schema信息
    37. columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("id", Type.INT32).key(true).build());
    38. columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("name", Type.STRING).nullable(true).build());
    39. columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("sex", Type.INT32).nullable(true).build());
    40. columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("age", Type.INT32).nullable(true).build());
    41. Schema schema = new Schema(columnSchemas);
    42. // 表属性信息(指定表分区规则,采用hash分区,根据id哈希指定的6个部分中)
    43. CreateTableOptions cto = new CreateTableOptions();
    44. List<String> hashKeys = new ArrayList<String>(1);
    45. hashKeys.add("id");
    46. int numBuckets = 8;
    47. cto.addHashPartitions(hashKeys, numBuckets);
    48. // 建表
    49. kuduClient.createTable(kuduTableName, schema, cto);
    50. System.out.println("Created table " + kuduTableName);
    51. }
    52. }
    53. @Test
    54. public void insertData() throws KuduException {
    55. // 向表加载数据需要一个kuduSession对象
    56. KuduSession kuduSession = kuduClient.newSession();
    57. // 设置提交数据为自动flush
    58. kuduSession.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC);// 设置数据更新类型为同步增加
    59. // 打开本次操作的表名
    60. KuduTable kuduTable = kuduClient.openTable(kuduTableName);
    61. for (int i = 1; i <= 10; i++) {
    62. // 需要使用kuduTable来构建Operation的子类实例对象,此处是insert
    63. Insert insert = kuduTable.newInsert();
    64. PartialRow row = insert.getRow();
    65. row.addInt("id", i);
    66. row.addString("name", "zhangsan-" + i);
    67. row.addInt("sex", i % 2);
    68. row.addInt("age", 20 + i);
    69. kuduSession.apply(insert);
    70. }
    71. }
    72. @Test
    73. public void queryData() throws KuduException {
    74. // 构建一个查询的扫描器
    75. KuduScanner.KuduScannerBuilder kuduScannerBuilder = kuduClient.newScannerBuilder(kuduClient.openTable(kuduTableName));
    76. // 创建集合 用于存储扫描字段的信息
    77. List<String> columnsList = new ArrayList<String>();
    78. columnsList.add("id");
    79. columnsList.add("name");
    80. columnsList.add("sex");
    81. columnsList.add("age");
    82. kuduScannerBuilder.setProjectedColumnNames(columnsList);
    83. // 调用build方法执行数据的少秒得到返回结果
    84. KuduScanner kuduScanner = kuduScannerBuilder.build();
    85. // 遍历
    86. while (kuduScanner.hasMoreRows()) { //判断扫描器中是否有更多结果
    87. RowResultIterator rowResults = kuduScanner.nextRows();// 取出每一行
    88. while (rowResults.hasNext()) {// 判断是否有下一行
    89. RowResult row = rowResults.next();// 取出每一行
    90. int id = row.getInt("id");
    91. String name = row.getString("name");
    92. int sex = row.getInt("sex");
    93. int age = row.getInt("age");
    94. System.out.print("id:" + id);
    95. System.out.print(" name:" + name);
    96. System.out.print(" sex:" + sex);
    97. System.out.println(" age:" + age);
    98. }
    99. }
    100. }
    101. @Test
    102. public void updateData() throws KuduException {
    103. // 向表加载数据需要一个kuduSession对象
    104. KuduSession kuduSession = kuduClient.newSession();
    105. // 设置提交数据为自动flush
    106. kuduSession.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC);// 设置数据更新类型为同步增加
    107. // 打开本次操作的表名
    108. KuduTable kuduTable = kuduClient.openTable(kuduTableName);
    109. Upsert upsert = kuduTable.newUpsert(); // kuduTable.newUpsert 插入和更新 如果指定数据存在,就更新,不存在就插入
    110. PartialRow row = upsert.getRow();
    111. row.addInt("id", 1);
    112. row.addString("name", "李四");
    113. row.addInt("sex", 1);
    114. row.addInt("age", 50);
    115. kuduSession.apply(upsert);
    116. }
    117. @Test
    118. public void deleteDate() throws KuduException {
    119. // 向表加载数据需要一个kuduSession对象
    120. KuduSession kuduSession = kuduClient.newSession();
    121. // 设置提交数据为自动flush
    122. kuduSession.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC);// 设置数据更新类型为同步增加
    123. // 打开本次操作的表名
    124. KuduTable kuduTable = kuduClient.openTable(kuduTableName);
    125. Delete delete = kuduTable.newDelete(); // 构建update对象,用于删除数据
    126. PartialRow row = delete.getRow();
    127. row.addInt("id", 1);
    128. kuduSession.apply(delete);
    129. }
    130. @Test
    131. public void deleteTable() throws KuduException {
    132. if (kuduClient.tableExists(kuduTableName)) {
    133. kuduClient.deleteTable(kuduTableName);
    134. }
    135. }
    136. @After
    137. public void close() throws KuduException {
    138. // 如果客户端未关闭执行close操作
    139. if (kuduClient != null) {
    140. kuduClient.close();
    141. }
    142. }
    143. /*
    144. * kudu 分区方式:为了提高可扩展性,kudu表被划分为table的单元,并分布在许多的tablet servers
    145. * 上。行总是属于单个tablet。将行分配给tablet的方法由在表创建期间设置的表的分区决定的。
    146. * 提供了三种分区方式。
    147. */
    148. // TODO Range Partitioning (范围分区)
    149. // 范围分区可以根据存入数据的数据量,均衡到各个机器上,防止机器出现负载不均衡现象
    150. @Test
    151. public void testRangePartitions() throws KuduException {
    152. // TODO 判断表是否存在
    153. if (!kuduClient.tableExists("t_range_partition")) {
    154. // 指定表的schema信息
    155. ArrayList<ColumnSchema> columnSchemas = new ArrayList<ColumnSchema>();
    156. // 添加字段的schema信息
    157. columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("id", Type.INT32).key(true).build());
    158. columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("name", Type.STRING).nullable(true).build());
    159. columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("sex", Type.INT32).nullable(true).build());
    160. columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("age", Type.INT32).nullable(true).build());
    161. Schema schema = new Schema(columnSchemas);
    162. // 表属性信息(指定表分区规则,采用hash分区,根据id哈希指定的6个部分中)
    163. CreateTableOptions cto = new CreateTableOptions();
    164. // 指定用于分区的字段
    165. List<String> partitionList = new ArrayList<String>(1);
    166. partitionList.add("id");
    167. cto.setRangePartitionColumns(partitionList);
    168. // 指定分区策略
    169. int count = 0;
    170. for (int i = 0; i < 5; i++) {
    171. PartialRow lower = schema.newPartialRow();//分区上界
    172. lower.addInt("id", count);
    173. count += 10;
    174. PartialRow upper = schema.newPartialRow();// range分区下界
    175. upper.addInt("id",count);
    176. cto.addRangePartition(lower, upper);// 添加
    177. }
    178. // 建表
    179. kuduClient.createTable("t_range_partition", schema, cto);
    180. System.out.println("Created table " + "t_range_partition");
    181. }
    182. }
    183. // TODO Hash Partitioning(hash分区)
    184. /* hash分区通过hash值将行分配到许多的buckets(存储捅)之一:
    185. hash分区是一种有效的策略,当不需要对表进行有序访问时。hash分区对于
    186. 在tablet之间随机分布这些功能时有效的,这有助于减轻热点和tablet大小不均匀
    187. */
    188. @Test
    189. public void testHashPartitions() throws KuduException {
    190. // TODO 判断表是否存在
    191. if (!kuduClient.tableExists("t_hash_partition")) {
    192. // 指定表的schema信息
    193. ArrayList<ColumnSchema> columnSchemas = new ArrayList<ColumnSchema>();
    194. // 添加字段的schema信息
    195. columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("id", Type.INT32).key(true).build());
    196. columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("name", Type.STRING).nullable(true).build());
    197. columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("sex", Type.INT32).nullable(true).build());
    198. columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("age", Type.INT32).nullable(true).build());
    199. Schema schema = new Schema(columnSchemas);
    200. // 表属性信息(指定表分区规则,采用hash分区,根据id哈希指定的6个部分中)
    201. CreateTableOptions cto = new CreateTableOptions();
    202. // 指定用于分区的字段
    203. List<String> hashKeys = new ArrayList<String>(1);
    204. hashKeys.add("id");
    205. int numBuckets = 6;
    206. cto.addHashPartitions(hashKeys, numBuckets);
    207. // 建表
    208. kuduClient.createTable("t_hash_partition", schema, cto);
    209. System.out.println("Created table " + kuduTableName);
    210. }
    211. }
    212. // TODO Multilevel Partitioning (多级分区) hash + range
    213. /* kudu允许一个表在单个表上组合多级分区。
    214. 当正确使用时,多级分区可以保留各个分区类型的优点,
    215. 同时减少每个分区的缺点*/
    216. @Test
    217. public void testMultilevelPartitions() throws KuduException {
    218. // TODO 判断表是否存在
    219. if (!kuduClient.tableExists("t_multilevel_partition")) {
    220. // 指定表的schema信息
    221. ArrayList<ColumnSchema> columnSchemas = new ArrayList<ColumnSchema>();
    222. // 添加字段的schema信息
    223. columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("id", Type.INT32).key(true).build());
    224. columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("name", Type.STRING).nullable(true).build());
    225. columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("sex", Type.INT32).nullable(true).build());
    226. columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("age", Type.INT32).nullable(true).build());
    227. Schema schema = new Schema(columnSchemas);
    228. // 表属性信息(指定表分区规则,采用hash分区,根据id哈希指定的6个部分中)
    229. CreateTableOptions cto = new CreateTableOptions();
    230. // 指定用于分区的字段
    231. List<String> partitionList = new ArrayList<String>(1);
    232. partitionList.add("id");
    233. // hash partition
    234. cto.addHashPartitions(partitionList, 6);
    235. cto.setRangePartitionColumns(partitionList);
    236. // 指定分区策略
    237. int count = 0;
    238. for (int i = 0; i < 5; i++) {
    239. PartialRow lower = schema.newPartialRow();//分区上界
    240. lower.addInt("id", count);
    241. count += 10;
    242. PartialRow upper = schema.newPartialRow();// range分区下界
    243. upper.addInt("id",count);
    244. cto.addRangePartition(lower, upper);// 添加
    245. }
    246. // 建表
    247. kuduClient.createTable("t_multilevel_partition", schema, cto);
    248. System.out.println("Created table " + "t_multilevel_partition");
    249. }
    250. }
    251. }