0. 需求

模拟微博业务,其中包括发布微博、添加关注、取关用户、获取某个用户初始化页面数据、获取某个用户所有微博详情

1. 表结构

1.1 微博内容表

image.png

1.2 用户关系表

image.png

1.3 微博收件箱表

image.png

2. 常量类

  1. package com.jshawn.hbase.guli_weibo.constants;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.hbase.HBaseConfiguration;
  4. /**
  5. * @author JShawn 2021/6/20 16:47
  6. */
  7. public class Constants {
  8. /**
  9. * HBase的配置信息
  10. */
  11. public static final Configuration COFIGURATION = HBaseConfiguration.create();
  12. /**
  13. * 命名空间
  14. */
  15. public static final String NAMESPACE = "weibo";
  16. /**
  17. * 微博内容表
  18. */
  19. public static final String CONTENT_TABLE = "weibo:content";
  20. /**
  21. * 微博内容表-列族
  22. */
  23. public static final String CONTENT_TABLE_CF = "info";
  24. /**
  25. * 微博内容表-最大版本
  26. */
  27. public static final Integer CONTENT_TABLE_VERSIONS = 1;
  28. /**
  29. * 用户关系表
  30. */
  31. public static final String RELATION_TABLE = "weibo:relation";
  32. /**
  33. * 用户关系表-列族-所有关注
  34. */
  35. public static final String RELATION_TABLE_CF_ATTENDS = "attends";
  36. /**
  37. * 用户关系表-列族-所有粉丝
  38. */
  39. public static final String RELATION_TABLE_CF_FANS = "fans";
  40. /**
  41. * 用户关系表-最大版本
  42. */
  43. public static final Integer RELATION_TABLE_VERSIONS = 1;
  44. /**
  45. * 收件箱表(即用户初始化页面数据表)
  46. */
  47. public static final String INBOX_TABLE = "weibo:inbox";
  48. /**
  49. * 收件箱表-列族
  50. */
  51. public static final String INBOX_TABLE_CF = "info";
  52. /**
  53. * 收件箱表-最大版本
  54. */
  55. public static final Integer INBOX_TABLE_VERSIONS = 2;
  56. }

3. 工具类

  1. package com.jshawn.hbase.guli_weibo.utils;
  2. import com.jshawn.hbase.guli_weibo.constants.Constants;
  3. import org.apache.hadoop.hbase.*;
  4. import org.apache.hadoop.hbase.client.Admin;
  5. import org.apache.hadoop.hbase.client.Connection;
  6. import org.apache.hadoop.hbase.client.ConnectionFactory;
  7. import java.io.IOException;
  8. /**
  9. * @author JShawn 2021/6/20 16:41
  10. *
  11. * 1. 创建命名空间
  12. * 2. 判断表是否存在
  13. * 3. 创建表(三张表)
  14. */
  15. public class HBaseUtils {
  16. /**
  17. * 1. 创建命名空间
  18. * @param nameSpace 命名空间(相当于数据库)
  19. * @throws IOException
  20. */
  21. public static void createNameSpace(String nameSpace) throws IOException {
  22. // 1.获取Connnection对象
  23. Connection connection = ConnectionFactory.createConnection(Constants.COFIGURATION);
  24. // 2. 获取Admin对象
  25. Admin admin = connection.getAdmin();
  26. // 3. 构建命名空间描述器
  27. NamespaceDescriptor namespaceDescriptor = NamespaceDescriptor.create(nameSpace).build();
  28. // 4. 创建命名空间
  29. admin.createNamespace(namespaceDescriptor);
  30. // 5. 关闭资源
  31. admin.close();
  32. connection.close();
  33. }
  34. /**
  35. * 2. 判断表是否存在
  36. * @param tableName 表名
  37. * @return
  38. */
  39. private static boolean isTableExists(String tableName) throws IOException {
  40. // 1.获取Connnection对象
  41. Connection connection = ConnectionFactory.createConnection(Constants.COFIGURATION);
  42. // 2. 获取Admin对象
  43. Admin admin = connection.getAdmin();
  44. // 3. 判断表是否存在
  45. boolean exists = admin.tableExists(TableName.valueOf(tableName));
  46. // 4. 关闭资源
  47. admin.close();
  48. connection.close();
  49. // 5. 返回结果
  50. return exists;
  51. }
  52. /**
  53. * 3. 创建表
  54. * @param tableName 表名
  55. * @param versions 最大版本
  56. * @param cfs 列族信息
  57. */
  58. public static void createTable(String tableName, int versions, String... cfs) throws IOException {
  59. // 1. 判断是否传入了列族信息
  60. if (cfs.length <= 0) {
  61. System.out.println("请设置列族信息!!!");
  62. return;
  63. }
  64. // 2. 判断表是否存在
  65. if (isTableExists(tableName)) {
  66. System.out.println("表【" + tableName + "】已存在!!!");
  67. return;
  68. }
  69. // 3. 获取connection对象
  70. Connection connection = ConnectionFactory.createConnection(Constants.COFIGURATION);
  71. // 4. 获取Admin对象
  72. Admin admin = connection.getAdmin();
  73. // 5. 创建表描述器
  74. HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf(tableName));
  75. // 6. 添加列族信息
  76. for (String cf : cfs) {
  77. HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(cf);
  78. // 7. 设置版本
  79. hColumnDescriptor.setMaxVersions(versions);
  80. hTableDescriptor.addFamily(hColumnDescriptor);
  81. }
  82. // 8. 创建表
  83. admin.createTable(hTableDescriptor);
  84. // 9. 关闭资源
  85. admin.close();
  86. connection.close();
  87. }
  88. }

4. 业务类

4.1 创建名称空间和表结构

  1. /**
  2. * 0. 创建命名空间和表结构
  3. */
  4. public static void init() {
  5. try {
  6. // 创建命名空间
  7. HBaseUtils.createNameSpace(Constants.NAMESPACE);
  8. // 创建微博内容表
  9. HBaseUtils.createTable(Constants.CONTENT_TABLE, Constants.CONTENT_TABLE_VERSIONS, Constants.CONTENT_TABLE_CF);
  10. // 创建用户关系表
  11. HBaseUtils.createTable(Constants.RELATION_TABLE, Constants.RELATION_TABLE_VERSIONS, Constants.RELATION_TABLE_CF_ATTENDS, Constants.RELATION_TABLE_CF_FANS);
  12. // 创建收件箱表
  13. HBaseUtils.createTable(Constants.INBOX_TABLE, Constants.INBOX_TABLE_VERSIONS, Constants.INBOX_TABLE_CF);
  14. } catch (IOException e) {
  15. e.printStackTrace();
  16. }
  17. }

4.2 发布微博

image.png

  1. /**
  2. * 1. 发布微博
  3. * @param uid 用户ID
  4. * @param content 微博内容
  5. * @throws IOException
  6. */
  7. public static void publishWeibo(String uid, String content) throws IOException {
  8. // 获取connection对象
  9. Connection connection = ConnectionFactory.createConnection(Constants.COFIGURATION);
  10. // 第一部分:操作微博内容表(增加一条微博内容)
  11. // 1. 获取微博内容表对象
  12. Table contentTable = connection.getTable(TableName.valueOf(Constants.CONTENT_TABLE));
  13. // 2. 获取当前时间戳
  14. long timestamp = System.currentTimeMillis();
  15. // 3. 获取RowKey
  16. String rowKey = uid + "_" + timestamp;
  17. // 4. 创建Put对象
  18. Put contentPut = new Put(Bytes.toBytes(rowKey));
  19. // 5. 给Put对象赋值
  20. contentPut.addColumn(Bytes.toBytes(Constants.CONTENT_TABLE_CF), Bytes.toBytes("content"), Bytes.toBytes(content));
  21. // 6. 执行插入操作
  22. contentTable.put(contentPut);
  23. // 第二部分:操作微博收件箱表
  24. // 获取发布微博的用户有哪些粉丝
  25. // 1. 获取用户关系表对象
  26. Table relationTable = connection.getTable(TableName.valueOf(Constants.RELATION_TABLE));
  27. // 2. 获取当前发布微博人的fans列族数据
  28. Get get = new Get(Bytes.toBytes(uid));
  29. get.addFamily(Bytes.toBytes(Constants.RELATION_TABLE_CF_FANS));
  30. Result result = relationTable.get(get);
  31. // 3. 创建一个集合,用于存放微博内容表的Put对象
  32. ArrayList<Put> inboxPuts = new ArrayList<>();
  33. //4. 遍历粉丝
  34. for (Cell cell : result.rawCells()) {
  35. // 5. 构建微博收件箱表的Put对象
  36. Put inboxPut = new Put(CellUtil.cloneQualifier(cell));
  37. // 6. 给收件箱表的Put对象赋值
  38. inboxPut.addColumn(Bytes.toBytes(Constants.INBOX_TABLE_CF),Bytes.toBytes(uid),Bytes.toBytes(rowKey));
  39. // 7. 将收件箱表的Put对象存入集合
  40. inboxPuts.add(inboxPut);
  41. }
  42. // 8. 判断是否有粉丝
  43. if (!inboxPuts.isEmpty()) {
  44. // 获取收件箱表对象
  45. Table inboxTable = connection.getTable(TableName.valueOf(Constants.INBOX_TABLE));
  46. // 执行收件箱表数据批量插入操作
  47. inboxTable.put(inboxPuts);
  48. // 关闭收件箱表
  49. inboxTable.close();
  50. }
  51. // 9. 关闭资源
  52. contentTable.close();
  53. relationTable.close();
  54. connection.close();
  55. }

4.3 添加关注

image.png

  1. /**
  2. * 2. 关注用户
  3. * @param uid 主用户ID
  4. * @param attends 被关注人ID
  5. */
  6. public static void addAttends(String uid, String... attends) throws IOException {
  7. // 校验是否添加了待关注的人
  8. if (attends.length <= 0) {
  9. System.out.println("请添加关注人!!!");
  10. return;
  11. }
  12. // 获取Connection对象
  13. Connection connection = ConnectionFactory.createConnection(Constants.COFIGURATION);
  14. // 第一部分:操作用户关系表
  15. // 1. 获取用户关系表对象
  16. Table relationTable = connection.getTable(TableName.valueOf(Constants.RELATION_TABLE));
  17. // 2. 创建一个集合用于存放用户关系表的Put对象(主用户attend列族添加1个,被关注人fans列族添加attends.length个)
  18. LinkedList<Put> puts = new LinkedList<>();
  19. // 3. 创建主用户的Put对象
  20. Put uidPut = new Put(Bytes.toBytes(uid));
  21. // 4. 循环创建被关注者的Put对象
  22. for (String attend : attends) {
  23. //5. 给主用户的Put对象赋值
  24. uidPut.addColumn(Bytes.toBytes(Constants.RELATION_TABLE_CF_ATTENDS),Bytes.toBytes(attend),Bytes.toBytes(attend));
  25. //6. 创建被关注者的Put对象
  26. Put put = new Put(Bytes.toBytes(attend));
  27. // 7. 给被关注者的Put对象赋值
  28. put.addColumn(Bytes.toBytes(Constants.RELATION_TABLE_CF_FANS), Bytes.toBytes(uid), Bytes.toBytes(uid));
  29. // 8. 将被关注者的Put对象放入集合
  30. puts.add(put);
  31. }
  32. // 9. 将主用户的Put对象放入集合
  33. puts.add(uidPut);
  34. // 10. 执行用户关系表的插入数据操作
  35. relationTable.put(puts);
  36. // 第二部分:操作收件箱表
  37. // 1. 获取微博内容表对象
  38. Table contentTable = connection.getTable(TableName.valueOf(Constants.CONTENT_TABLE));
  39. // 2. 创建微博收件箱表的Put对象
  40. Put inboxPut = new Put(Bytes.toBytes(uid));
  41. // 3. 循环attends,获取每个被关注者的近期发布的微博
  42. for (String attend : attends) {
  43. // 4. 获取当前被关注者的近期发布的微博(scan)->集合ResultScanner
  44. // 思路一(此处采用):先获取出被关注者的所有微博,然后再取前三条(由于发布微博时是rowkey是增加了时间戳,因此HBase中就是顺序存,取出也是顺序的)
  45. // 由于微博内容表的rowkey设计是uid_时间戳
  46. // 因此 startRow 可以写成uid_,这样就比该用户所有数据rowkey都小
  47. // stopRow 可以写出uid_|,这样就比该用户所有rowkey都大,因为|在unicode编码中数据是最大的
  48. // 获取到所有微博后再取前三条
  49. //
  50. // 思路二:发布微博的时候rowkey不添加时间戳,而是(比如时间戳有13位)用13位数的最大值(13个9)-当前时间戳
  51. // 这时候存入就是倒序存入的了
  52. // 然后在这里取值直接就取到前三条即可
  53. //
  54. //由于要修改代码,这里选取思路一
  55. Scan scan = new Scan(Bytes.toBytes(attend + "_"), Bytes.toBytes(attend + "|"));
  56. ResultScanner resultScanner = contentTable.getScanner(scan);
  57. // 定义一个时间戳(然后使用++保证时间戳不一样,否则在这里批量写入以后,HBase服务端会自动加时间戳,
  58. // 下次再取的时候由于时间戳一样只能取到一条,所有需要手动加一个时间戳)
  59. long timeStamp = System.currentTimeMillis();
  60. // 5. 对获取的值进行遍历
  61. for (Result result : resultScanner) {
  62. // 6. 给收件箱表的Put对象赋值
  63. inboxPut.addColumn(Bytes.toBytes(Constants.INBOX_TABLE_CF), Bytes.toBytes(attend), timeStamp++, result.getRow());
  64. }
  65. }
  66. // 7. 判断当前的Put对象是不为空
  67. if (!inboxPut.isEmpty()) {
  68. // 不为空,获取微博收件箱表对象
  69. Table inboxTable = connection.getTable(TableName.valueOf(Constants.INBOX_TABLE));
  70. // 插入数据
  71. inboxTable.put(inboxPut);
  72. // 关闭收件箱表对象连接
  73. inboxTable.close();
  74. }
  75. // 关闭资源
  76. relationTable.close();
  77. contentTable.close();
  78. connection.close();
  79. }

4.4 取关用户

  1. /**
  2. * 3. 取关用户
  3. * @param uid 主用户ID
  4. * @param dels 被关注人ID
  5. */
  6. public static void deleteAttend(String uid,String ...dels) throws IOException {
  7. if (dels.length <= 0) {
  8. System.out.println("请添加取关的用户!!!");
  9. return;
  10. }
  11. // 获取Connection对象
  12. Connection connection = ConnectionFactory.createConnection(Constants.COFIGURATION);
  13. // 第一部分:操作用户关系表
  14. // 1. 获取用户关系表对象
  15. Table relaTable = connection.getTable(TableName.valueOf(Constants.RELATION_TABLE));
  16. // 2. 创建一个集合,用于存放用户关系表的Delete对象
  17. LinkedList<Delete> relaDeletes = new LinkedList<>();
  18. // 3. 创建主用户的Delete对象
  19. Delete uidDelete = new Delete(Bytes.toBytes(uid));
  20. // 4. 循环创建被取关者的Delete对象
  21. for (String del : dels) {
  22. // 5. 给主用户的Delete对象赋值
  23. // 注意:这里要使用带s的api,防止多次关注(一般不会,会在前端做校验)
  24. uidDelete.addColumns(Bytes.toBytes(Constants.RELATION_TABLE_CF_ATTENDS),Bytes.toBytes(del));
  25. // 6. 创建被取关者的Delete对象,删除粉丝
  26. Delete delete = new Delete(Bytes.toBytes(del));
  27. // 7. 给被取关者的Delete对象赋值(赋值到列,因为不是删rowkey,只是删被取关的人,不是删全部)
  28. delete.addColumns(Bytes.toBytes(Constants.RELATION_TABLE_CF_FANS), Bytes.toBytes(uid));
  29. // 8. 将被取关者的Delete对象添加至集合
  30. relaDeletes.add(delete);
  31. }
  32. // 9. 将操作者的Delete对象添加至结合
  33. relaDeletes.add(uidDelete);
  34. //10. 执行用户关系表的批量删除操作
  35. relaTable.delete(relaDeletes);
  36. // 第二部分:操作收件箱表
  37. // 1. 获取收件箱表对象
  38. Table inboxTable = connection.getTable(TableName.valueOf(Constants.INBOX_TABLE));
  39. // 2. 创建操作者的Delete对象
  40. Delete inboxDelete = new Delete(Bytes.toBytes(uid));
  41. // 3. 循环给操作者的Delete对象赋值
  42. for (String del : dels) {
  43. inboxDelete.addColumns(Bytes.toBytes(Constants.INBOX_TABLE_CF), Bytes.toBytes(del));
  44. }
  45. // 4. 执行收件箱表的删除操作
  46. inboxTable.delete(inboxDelete);
  47. // 关闭资源
  48. relaTable.close();
  49. inboxTable.close();
  50. connection.close();
  51. }

4.5 获取某个用户的初始化页面数据

  1. /**
  2. * 4. 获取某个用户的初始化页面数据
  3. * @param uid 用户ID
  4. */
  5. public static void getInit(String uid) throws IOException {
  6. // 1. 获取Connection对象
  7. Connection connection = ConnectionFactory.createConnection(Constants.COFIGURATION);
  8. // 2. 获取收件箱表对象
  9. Table inboxTable = connection.getTable(TableName.valueOf(Constants.INBOX_TABLE));
  10. // 3. 获取微博内容表对象
  11. Table contentTable = connection.getTable(TableName.valueOf(Constants.CONTENT_TABLE));
  12. // 4. 创建收件箱表Get对象,并获取数据(设置最大版本)
  13. Get inboxGet = new Get(Bytes.toBytes(uid));
  14. inboxGet.setMaxVersions();
  15. Result result = inboxTable.get(inboxGet);
  16. // 5. 遍历获取的数据
  17. for (Cell cell : result.rawCells()) {
  18. // 6. 构建微博内容表Get对象
  19. Get contentGet = new Get(CellUtil.cloneValue(cell));
  20. // 7. 获取该Get对象的数据内容
  21. Result contentReult = contentTable.get(contentGet);
  22. // 8. 解析内容并打印
  23. for (Cell contentCell : contentReult.rawCells()) {
  24. System.out.println("RowKey:" + Bytes.toString(CellUtil.cloneRow(contentCell)) +
  25. ",CF:" + Bytes.toString(CellUtil.cloneFamily(contentCell)) +
  26. ",CN:" + Bytes.toString(CellUtil.cloneQualifier(contentCell)) +
  27. ",Value:" + Bytes.toString(CellUtil.cloneValue(contentCell)));
  28. }
  29. }
  30. // 9. 关闭资源
  31. inboxTable.close();
  32. contentTable.close();
  33. connection.close();
  34. }

4.6 获取某个用户所有微博详情

  1. /**
  2. * 5. 获取某个用户所有微博详情
  3. * @param uid 用户ID
  4. */
  5. public static void getWeibo(String uid) throws IOException {
  6. // 1. 获取Connection对象
  7. Connection connection = ConnectionFactory.createConnection(Constants.COFIGURATION);
  8. // 2. 获取微博内容表对象
  9. Table table = connection.getTable(TableName.valueOf(Constants.CONTENT_TABLE));
  10. // 3. 构建Scan对象
  11. // 思路一:同添加关注时的思路一
  12. // startRow:uid_
  13. // stopRow:uid|
  14. // 思路二(此处采用):过滤器
  15. Scan scan = new Scan();
  16. // 构建过滤器
  17. // 表示过滤出 RowKey 中有uid_"-"的字串的数据
  18. RowFilter rowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator(uid + "_"));
  19. scan.setFilter(rowFilter);
  20. // 4. 获取数据
  21. ResultScanner resultScanner = table.getScanner(scan);
  22. // 5. 解析数据并打印
  23. for (Result result : resultScanner) {
  24. for (Cell cell : result.rawCells()) {
  25. System.out.println("RowKey:" + Bytes.toString(CellUtil.cloneRow(cell)) +
  26. ",CF:" + Bytes.toString(CellUtil.cloneFamily(cell)) +
  27. ",CN:" + Bytes.toString(CellUtil.cloneQualifier(cell)) +
  28. ",Value:" + Bytes.toString(CellUtil.cloneValue(cell)));
  29. }
  30. }
  31. // 6. 关闭资源
  32. table.close();
  33. connection.close();
  34. }

5. 测试类

  1. package com.jshawn.hbase.guli_weibo.test;
  2. import com.jshawn.hbase.guli_weibo.constants.Constants;
  3. import com.jshawn.hbase.guli_weibo.dao.HBaseDao;
  4. import com.jshawn.hbase.guli_weibo.utils.HBaseUtils;
  5. import java.io.IOException;
  6. /**
  7. * 谷粒微博测试类
  8. * @author JShawn 2021/6/20 22:35
  9. */
  10. public class TestWeibo {
  11. public static void main(String[] args) throws IOException, InterruptedException {
  12. // 创建命名空间和表结构
  13. HBaseDao.init();
  14. // 1001发布微博
  15. HBaseDao.publishWeibo("1001","赶紧下课吧");
  16. // 1002关注1001和1003
  17. HBaseDao.addAttends("1002","1001","1003");
  18. // 获取1002初始化页面
  19. HBaseDao.getInit("1002");
  20. System.out.println("***************************111*******************************");
  21. // 1003发布3条微博,同时1001发布2条微博
  22. HBaseDao.publishWeibo("1003","谁说的赶紧下课!!!");
  23. Thread.sleep(10);
  24. HBaseDao.publishWeibo("1001","我没说话!!!");
  25. Thread.sleep(10);
  26. HBaseDao.publishWeibo("1003","那谁说的!!!");
  27. Thread.sleep(10);
  28. HBaseDao.publishWeibo("1001","反正我没说!!!");
  29. Thread.sleep(10);
  30. HBaseDao.publishWeibo("1003","你们爱咋咋地!!!");
  31. // 获取1002初始化页面
  32. HBaseDao.getInit("1002");
  33. System.out.println("***************************222*******************************");
  34. // 1002取关1003
  35. HBaseDao.deleteAttend("1002","1003");
  36. // 获取1002初始化页面
  37. HBaseDao.getInit("1002");
  38. System.out.println("***************************333*******************************");
  39. // 1002再次关注1003
  40. HBaseDao.addAttends("1002","1003");
  41. // 获取1002初始化页面
  42. HBaseDao.getInit("1002");
  43. System.out.println("***************************444*******************************");
  44. // 获取1001微博详情
  45. HBaseDao.getWeibo("1001");
  46. }
  47. }