0. 需求
模拟微博业务,其中包括发布微博、添加关注、取关用户、获取某个用户初始化页面数据、获取某个用户所有微博详情
1. 表结构
1.1 微博内容表
1.2 用户关系表
1.3 微博收件箱表
2. 常量类
package com.jshawn.hbase.guli_weibo.constants;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.HBaseConfiguration;/*** @author JShawn 2021/6/20 16:47*/public class Constants {/*** HBase的配置信息*/public static final Configuration COFIGURATION = HBaseConfiguration.create();/*** 命名空间*/public static final String NAMESPACE = "weibo";/*** 微博内容表*/public static final String CONTENT_TABLE = "weibo:content";/*** 微博内容表-列族*/public static final String CONTENT_TABLE_CF = "info";/*** 微博内容表-最大版本*/public static final Integer CONTENT_TABLE_VERSIONS = 1;/*** 用户关系表*/public static final String RELATION_TABLE = "weibo:relation";/*** 用户关系表-列族-所有关注*/public static final String RELATION_TABLE_CF_ATTENDS = "attends";/*** 用户关系表-列族-所有粉丝*/public static final String RELATION_TABLE_CF_FANS = "fans";/*** 用户关系表-最大版本*/public static final Integer RELATION_TABLE_VERSIONS = 1;/*** 收件箱表(即用户初始化页面数据表)*/public static final String INBOX_TABLE = "weibo:inbox";/*** 收件箱表-列族*/public static final String INBOX_TABLE_CF = "info";/*** 收件箱表-最大版本*/public static final Integer INBOX_TABLE_VERSIONS = 2;}
3. 工具类
package com.jshawn.hbase.guli_weibo.utils;import com.jshawn.hbase.guli_weibo.constants.Constants;import org.apache.hadoop.hbase.*;import org.apache.hadoop.hbase.client.Admin;import org.apache.hadoop.hbase.client.Connection;import org.apache.hadoop.hbase.client.ConnectionFactory;import java.io.IOException;/*** @author JShawn 2021/6/20 16:41** 1. 创建命名空间* 2. 判断表是否存在* 3. 创建表(三张表)*/public class HBaseUtils {/*** 1. 创建命名空间* @param nameSpace 命名空间(相当于数据库)* @throws IOException*/public static void createNameSpace(String nameSpace) throws IOException {// 1.获取Connnection对象Connection connection = ConnectionFactory.createConnection(Constants.COFIGURATION);// 2. 获取Admin对象Admin admin = connection.getAdmin();// 3. 构建命名空间描述器NamespaceDescriptor namespaceDescriptor = NamespaceDescriptor.create(nameSpace).build();// 4. 创建命名空间admin.createNamespace(namespaceDescriptor);// 5. 关闭资源admin.close();connection.close();}/*** 2. 判断表是否存在* @param tableName 表名* @return*/private static boolean isTableExists(String tableName) throws IOException {// 1.获取Connnection对象Connection connection = ConnectionFactory.createConnection(Constants.COFIGURATION);// 2. 获取Admin对象Admin admin = connection.getAdmin();// 3. 判断表是否存在boolean exists = admin.tableExists(TableName.valueOf(tableName));// 4. 关闭资源admin.close();connection.close();// 5. 返回结果return exists;}/*** 3. 创建表* @param tableName 表名* @param versions 最大版本* @param cfs 列族信息*/public static void createTable(String tableName, int versions, String... cfs) throws IOException {// 1. 判断是否传入了列族信息if (cfs.length <= 0) {System.out.println("请设置列族信息!!!");return;}// 2. 判断表是否存在if (isTableExists(tableName)) {System.out.println("表【" + tableName + "】已存在!!!");return;}// 3. 获取connection对象Connection connection = ConnectionFactory.createConnection(Constants.COFIGURATION);// 4. 获取Admin对象Admin admin = connection.getAdmin();// 5. 创建表描述器HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf(tableName));// 6. 添加列族信息for (String cf : cfs) {HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(cf);// 7. 设置版本hColumnDescriptor.setMaxVersions(versions);hTableDescriptor.addFamily(hColumnDescriptor);}// 8. 创建表admin.createTable(hTableDescriptor);// 9. 关闭资源admin.close();connection.close();}}
4. 业务类
4.1 创建名称空间和表结构
/*** 0. 创建命名空间和表结构*/public static void init() {try {// 创建命名空间HBaseUtils.createNameSpace(Constants.NAMESPACE);// 创建微博内容表HBaseUtils.createTable(Constants.CONTENT_TABLE, Constants.CONTENT_TABLE_VERSIONS, Constants.CONTENT_TABLE_CF);// 创建用户关系表HBaseUtils.createTable(Constants.RELATION_TABLE, Constants.RELATION_TABLE_VERSIONS, Constants.RELATION_TABLE_CF_ATTENDS, Constants.RELATION_TABLE_CF_FANS);// 创建收件箱表HBaseUtils.createTable(Constants.INBOX_TABLE, Constants.INBOX_TABLE_VERSIONS, Constants.INBOX_TABLE_CF);} catch (IOException e) {e.printStackTrace();}}
4.2 发布微博

/*** 1. 发布微博* @param uid 用户ID* @param content 微博内容* @throws IOException*/public static void publishWeibo(String uid, String content) throws IOException {// 获取connection对象Connection connection = ConnectionFactory.createConnection(Constants.COFIGURATION);// 第一部分:操作微博内容表(增加一条微博内容)// 1. 获取微博内容表对象Table contentTable = connection.getTable(TableName.valueOf(Constants.CONTENT_TABLE));// 2. 获取当前时间戳long timestamp = System.currentTimeMillis();// 3. 获取RowKeyString rowKey = uid + "_" + timestamp;// 4. 创建Put对象Put contentPut = new Put(Bytes.toBytes(rowKey));// 5. 给Put对象赋值contentPut.addColumn(Bytes.toBytes(Constants.CONTENT_TABLE_CF), Bytes.toBytes("content"), Bytes.toBytes(content));// 6. 执行插入操作contentTable.put(contentPut);// 第二部分:操作微博收件箱表// 获取发布微博的用户有哪些粉丝// 1. 获取用户关系表对象Table relationTable = connection.getTable(TableName.valueOf(Constants.RELATION_TABLE));// 2. 获取当前发布微博人的fans列族数据Get get = new Get(Bytes.toBytes(uid));get.addFamily(Bytes.toBytes(Constants.RELATION_TABLE_CF_FANS));Result result = relationTable.get(get);// 3. 创建一个集合,用于存放微博内容表的Put对象ArrayList<Put> inboxPuts = new ArrayList<>();//4. 遍历粉丝for (Cell cell : result.rawCells()) {// 5. 构建微博收件箱表的Put对象Put inboxPut = new Put(CellUtil.cloneQualifier(cell));// 6. 给收件箱表的Put对象赋值inboxPut.addColumn(Bytes.toBytes(Constants.INBOX_TABLE_CF),Bytes.toBytes(uid),Bytes.toBytes(rowKey));// 7. 将收件箱表的Put对象存入集合inboxPuts.add(inboxPut);}// 8. 判断是否有粉丝if (!inboxPuts.isEmpty()) {// 获取收件箱表对象Table inboxTable = connection.getTable(TableName.valueOf(Constants.INBOX_TABLE));// 执行收件箱表数据批量插入操作inboxTable.put(inboxPuts);// 关闭收件箱表inboxTable.close();}// 9. 关闭资源contentTable.close();relationTable.close();connection.close();}
4.3 添加关注

/*** 2. 关注用户* @param uid 主用户ID* @param attends 被关注人ID*/public static void addAttends(String uid, String... attends) throws IOException {// 校验是否添加了待关注的人if (attends.length <= 0) {System.out.println("请添加关注人!!!");return;}// 获取Connection对象Connection connection = ConnectionFactory.createConnection(Constants.COFIGURATION);// 第一部分:操作用户关系表// 1. 获取用户关系表对象Table relationTable = connection.getTable(TableName.valueOf(Constants.RELATION_TABLE));// 2. 创建一个集合用于存放用户关系表的Put对象(主用户attend列族添加1个,被关注人fans列族添加attends.length个)LinkedList<Put> puts = new LinkedList<>();// 3. 创建主用户的Put对象Put uidPut = new Put(Bytes.toBytes(uid));// 4. 循环创建被关注者的Put对象for (String attend : attends) {//5. 给主用户的Put对象赋值uidPut.addColumn(Bytes.toBytes(Constants.RELATION_TABLE_CF_ATTENDS),Bytes.toBytes(attend),Bytes.toBytes(attend));//6. 创建被关注者的Put对象Put put = new Put(Bytes.toBytes(attend));// 7. 给被关注者的Put对象赋值put.addColumn(Bytes.toBytes(Constants.RELATION_TABLE_CF_FANS), Bytes.toBytes(uid), Bytes.toBytes(uid));// 8. 将被关注者的Put对象放入集合puts.add(put);}// 9. 将主用户的Put对象放入集合puts.add(uidPut);// 10. 执行用户关系表的插入数据操作relationTable.put(puts);// 第二部分:操作收件箱表// 1. 获取微博内容表对象Table contentTable = connection.getTable(TableName.valueOf(Constants.CONTENT_TABLE));// 2. 创建微博收件箱表的Put对象Put inboxPut = new Put(Bytes.toBytes(uid));// 3. 循环attends,获取每个被关注者的近期发布的微博for (String attend : attends) {// 4. 获取当前被关注者的近期发布的微博(scan)->集合ResultScanner// 思路一(此处采用):先获取出被关注者的所有微博,然后再取前三条(由于发布微博时是rowkey是增加了时间戳,因此HBase中就是顺序存,取出也是顺序的)// 由于微博内容表的rowkey设计是uid_时间戳// 因此 startRow 可以写成uid_,这样就比该用户所有数据rowkey都小// stopRow 可以写出uid_|,这样就比该用户所有rowkey都大,因为|在unicode编码中数据是最大的// 获取到所有微博后再取前三条//// 思路二:发布微博的时候rowkey不添加时间戳,而是(比如时间戳有13位)用13位数的最大值(13个9)-当前时间戳// 这时候存入就是倒序存入的了// 然后在这里取值直接就取到前三条即可////由于要修改代码,这里选取思路一Scan scan = new Scan(Bytes.toBytes(attend + "_"), Bytes.toBytes(attend + "|"));ResultScanner resultScanner = contentTable.getScanner(scan);// 定义一个时间戳(然后使用++保证时间戳不一样,否则在这里批量写入以后,HBase服务端会自动加时间戳,// 下次再取的时候由于时间戳一样只能取到一条,所有需要手动加一个时间戳)long timeStamp = System.currentTimeMillis();// 5. 对获取的值进行遍历for (Result result : resultScanner) {// 6. 给收件箱表的Put对象赋值inboxPut.addColumn(Bytes.toBytes(Constants.INBOX_TABLE_CF), Bytes.toBytes(attend), timeStamp++, result.getRow());}}// 7. 判断当前的Put对象是不为空if (!inboxPut.isEmpty()) {// 不为空,获取微博收件箱表对象Table inboxTable = connection.getTable(TableName.valueOf(Constants.INBOX_TABLE));// 插入数据inboxTable.put(inboxPut);// 关闭收件箱表对象连接inboxTable.close();}// 关闭资源relationTable.close();contentTable.close();connection.close();}
4.4 取关用户
/*** 3. 取关用户* @param uid 主用户ID* @param dels 被关注人ID*/public static void deleteAttend(String uid,String ...dels) throws IOException {if (dels.length <= 0) {System.out.println("请添加取关的用户!!!");return;}// 获取Connection对象Connection connection = ConnectionFactory.createConnection(Constants.COFIGURATION);// 第一部分:操作用户关系表// 1. 获取用户关系表对象Table relaTable = connection.getTable(TableName.valueOf(Constants.RELATION_TABLE));// 2. 创建一个集合,用于存放用户关系表的Delete对象LinkedList<Delete> relaDeletes = new LinkedList<>();// 3. 创建主用户的Delete对象Delete uidDelete = new Delete(Bytes.toBytes(uid));// 4. 循环创建被取关者的Delete对象for (String del : dels) {// 5. 给主用户的Delete对象赋值// 注意:这里要使用带s的api,防止多次关注(一般不会,会在前端做校验)uidDelete.addColumns(Bytes.toBytes(Constants.RELATION_TABLE_CF_ATTENDS),Bytes.toBytes(del));// 6. 创建被取关者的Delete对象,删除粉丝Delete delete = new Delete(Bytes.toBytes(del));// 7. 给被取关者的Delete对象赋值(赋值到列,因为不是删rowkey,只是删被取关的人,不是删全部)delete.addColumns(Bytes.toBytes(Constants.RELATION_TABLE_CF_FANS), Bytes.toBytes(uid));// 8. 将被取关者的Delete对象添加至集合relaDeletes.add(delete);}// 9. 将操作者的Delete对象添加至结合relaDeletes.add(uidDelete);//10. 执行用户关系表的批量删除操作relaTable.delete(relaDeletes);// 第二部分:操作收件箱表// 1. 获取收件箱表对象Table inboxTable = connection.getTable(TableName.valueOf(Constants.INBOX_TABLE));// 2. 创建操作者的Delete对象Delete inboxDelete = new Delete(Bytes.toBytes(uid));// 3. 循环给操作者的Delete对象赋值for (String del : dels) {inboxDelete.addColumns(Bytes.toBytes(Constants.INBOX_TABLE_CF), Bytes.toBytes(del));}// 4. 执行收件箱表的删除操作inboxTable.delete(inboxDelete);// 关闭资源relaTable.close();inboxTable.close();connection.close();}
4.5 获取某个用户的初始化页面数据
/*** 4. 获取某个用户的初始化页面数据* @param uid 用户ID*/public static void getInit(String uid) throws IOException {// 1. 获取Connection对象Connection connection = ConnectionFactory.createConnection(Constants.COFIGURATION);// 2. 获取收件箱表对象Table inboxTable = connection.getTable(TableName.valueOf(Constants.INBOX_TABLE));// 3. 获取微博内容表对象Table contentTable = connection.getTable(TableName.valueOf(Constants.CONTENT_TABLE));// 4. 创建收件箱表Get对象,并获取数据(设置最大版本)Get inboxGet = new Get(Bytes.toBytes(uid));inboxGet.setMaxVersions();Result result = inboxTable.get(inboxGet);// 5. 遍历获取的数据for (Cell cell : result.rawCells()) {// 6. 构建微博内容表Get对象Get contentGet = new Get(CellUtil.cloneValue(cell));// 7. 获取该Get对象的数据内容Result contentReult = contentTable.get(contentGet);// 8. 解析内容并打印for (Cell contentCell : contentReult.rawCells()) {System.out.println("RowKey:" + Bytes.toString(CellUtil.cloneRow(contentCell)) +",CF:" + Bytes.toString(CellUtil.cloneFamily(contentCell)) +",CN:" + Bytes.toString(CellUtil.cloneQualifier(contentCell)) +",Value:" + Bytes.toString(CellUtil.cloneValue(contentCell)));}}// 9. 关闭资源inboxTable.close();contentTable.close();connection.close();}
4.6 获取某个用户所有微博详情
/*** 5. 获取某个用户所有微博详情* @param uid 用户ID*/public static void getWeibo(String uid) throws IOException {// 1. 获取Connection对象Connection connection = ConnectionFactory.createConnection(Constants.COFIGURATION);// 2. 获取微博内容表对象Table table = connection.getTable(TableName.valueOf(Constants.CONTENT_TABLE));// 3. 构建Scan对象// 思路一:同添加关注时的思路一// startRow:uid_// stopRow:uid|// 思路二(此处采用):过滤器Scan scan = new Scan();// 构建过滤器// 表示过滤出 RowKey 中有uid_"-"的字串的数据RowFilter rowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator(uid + "_"));scan.setFilter(rowFilter);// 4. 获取数据ResultScanner resultScanner = table.getScanner(scan);// 5. 解析数据并打印for (Result result : resultScanner) {for (Cell cell : result.rawCells()) {System.out.println("RowKey:" + Bytes.toString(CellUtil.cloneRow(cell)) +",CF:" + Bytes.toString(CellUtil.cloneFamily(cell)) +",CN:" + Bytes.toString(CellUtil.cloneQualifier(cell)) +",Value:" + Bytes.toString(CellUtil.cloneValue(cell)));}}// 6. 关闭资源table.close();connection.close();}
5. 测试类
package com.jshawn.hbase.guli_weibo.test;import com.jshawn.hbase.guli_weibo.constants.Constants;import com.jshawn.hbase.guli_weibo.dao.HBaseDao;import com.jshawn.hbase.guli_weibo.utils.HBaseUtils;import java.io.IOException;/*** 谷粒微博测试类* @author JShawn 2021/6/20 22:35*/public class TestWeibo {public static void main(String[] args) throws IOException, InterruptedException {// 创建命名空间和表结构HBaseDao.init();// 1001发布微博HBaseDao.publishWeibo("1001","赶紧下课吧");// 1002关注1001和1003HBaseDao.addAttends("1002","1001","1003");// 获取1002初始化页面HBaseDao.getInit("1002");System.out.println("***************************111*******************************");// 1003发布3条微博,同时1001发布2条微博HBaseDao.publishWeibo("1003","谁说的赶紧下课!!!");Thread.sleep(10);HBaseDao.publishWeibo("1001","我没说话!!!");Thread.sleep(10);HBaseDao.publishWeibo("1003","那谁说的!!!");Thread.sleep(10);HBaseDao.publishWeibo("1001","反正我没说!!!");Thread.sleep(10);HBaseDao.publishWeibo("1003","你们爱咋咋地!!!");// 获取1002初始化页面HBaseDao.getInit("1002");System.out.println("***************************222*******************************");// 1002取关1003HBaseDao.deleteAttend("1002","1003");// 获取1002初始化页面HBaseDao.getInit("1002");System.out.println("***************************333*******************************");// 1002再次关注1003HBaseDao.addAttends("1002","1003");// 获取1002初始化页面HBaseDao.getInit("1002");System.out.println("***************************444*******************************");// 获取1001微博详情HBaseDao.getWeibo("1001");}}
