0. 需求
模拟微博业务,其中包括发布微博、添加关注、取关用户、获取某个用户初始化页面数据、获取某个用户所有微博详情
1. 表结构
1.1 微博内容表
1.2 用户关系表
1.3 微博收件箱表
2. 常量类
package com.jshawn.hbase.guli_weibo.constants;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
/**
* @author JShawn 2021/6/20 16:47
*/
public class Constants {
/**
* HBase的配置信息
*/
public static final Configuration COFIGURATION = HBaseConfiguration.create();
/**
* 命名空间
*/
public static final String NAMESPACE = "weibo";
/**
* 微博内容表
*/
public static final String CONTENT_TABLE = "weibo:content";
/**
* 微博内容表-列族
*/
public static final String CONTENT_TABLE_CF = "info";
/**
* 微博内容表-最大版本
*/
public static final Integer CONTENT_TABLE_VERSIONS = 1;
/**
* 用户关系表
*/
public static final String RELATION_TABLE = "weibo:relation";
/**
* 用户关系表-列族-所有关注
*/
public static final String RELATION_TABLE_CF_ATTENDS = "attends";
/**
* 用户关系表-列族-所有粉丝
*/
public static final String RELATION_TABLE_CF_FANS = "fans";
/**
* 用户关系表-最大版本
*/
public static final Integer RELATION_TABLE_VERSIONS = 1;
/**
* 收件箱表(即用户初始化页面数据表)
*/
public static final String INBOX_TABLE = "weibo:inbox";
/**
* 收件箱表-列族
*/
public static final String INBOX_TABLE_CF = "info";
/**
* 收件箱表-最大版本
*/
public static final Integer INBOX_TABLE_VERSIONS = 2;
}
3. 工具类
package com.jshawn.hbase.guli_weibo.utils;
import com.jshawn.hbase.guli_weibo.constants.Constants;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import java.io.IOException;
/**
* @author JShawn 2021/6/20 16:41
*
* 1. 创建命名空间
* 2. 判断表是否存在
* 3. 创建表(三张表)
*/
public class HBaseUtils {
/**
* 1. 创建命名空间
* @param nameSpace 命名空间(相当于数据库)
* @throws IOException
*/
public static void createNameSpace(String nameSpace) throws IOException {
// 1.获取Connnection对象
Connection connection = ConnectionFactory.createConnection(Constants.COFIGURATION);
// 2. 获取Admin对象
Admin admin = connection.getAdmin();
// 3. 构建命名空间描述器
NamespaceDescriptor namespaceDescriptor = NamespaceDescriptor.create(nameSpace).build();
// 4. 创建命名空间
admin.createNamespace(namespaceDescriptor);
// 5. 关闭资源
admin.close();
connection.close();
}
/**
* 2. 判断表是否存在
* @param tableName 表名
* @return
*/
private static boolean isTableExists(String tableName) throws IOException {
// 1.获取Connnection对象
Connection connection = ConnectionFactory.createConnection(Constants.COFIGURATION);
// 2. 获取Admin对象
Admin admin = connection.getAdmin();
// 3. 判断表是否存在
boolean exists = admin.tableExists(TableName.valueOf(tableName));
// 4. 关闭资源
admin.close();
connection.close();
// 5. 返回结果
return exists;
}
/**
* 3. 创建表
* @param tableName 表名
* @param versions 最大版本
* @param cfs 列族信息
*/
public static void createTable(String tableName, int versions, String... cfs) throws IOException {
// 1. 判断是否传入了列族信息
if (cfs.length <= 0) {
System.out.println("请设置列族信息!!!");
return;
}
// 2. 判断表是否存在
if (isTableExists(tableName)) {
System.out.println("表【" + tableName + "】已存在!!!");
return;
}
// 3. 获取connection对象
Connection connection = ConnectionFactory.createConnection(Constants.COFIGURATION);
// 4. 获取Admin对象
Admin admin = connection.getAdmin();
// 5. 创建表描述器
HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf(tableName));
// 6. 添加列族信息
for (String cf : cfs) {
HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(cf);
// 7. 设置版本
hColumnDescriptor.setMaxVersions(versions);
hTableDescriptor.addFamily(hColumnDescriptor);
}
// 8. 创建表
admin.createTable(hTableDescriptor);
// 9. 关闭资源
admin.close();
connection.close();
}
}
4. 业务类
4.1 创建名称空间和表结构
/**
* 0. 创建命名空间和表结构
*/
public static void init() {
try {
// 创建命名空间
HBaseUtils.createNameSpace(Constants.NAMESPACE);
// 创建微博内容表
HBaseUtils.createTable(Constants.CONTENT_TABLE, Constants.CONTENT_TABLE_VERSIONS, Constants.CONTENT_TABLE_CF);
// 创建用户关系表
HBaseUtils.createTable(Constants.RELATION_TABLE, Constants.RELATION_TABLE_VERSIONS, Constants.RELATION_TABLE_CF_ATTENDS, Constants.RELATION_TABLE_CF_FANS);
// 创建收件箱表
HBaseUtils.createTable(Constants.INBOX_TABLE, Constants.INBOX_TABLE_VERSIONS, Constants.INBOX_TABLE_CF);
} catch (IOException e) {
e.printStackTrace();
}
}
4.2 发布微博
/**
* 1. 发布微博
* @param uid 用户ID
* @param content 微博内容
* @throws IOException
*/
public static void publishWeibo(String uid, String content) throws IOException {
// 获取connection对象
Connection connection = ConnectionFactory.createConnection(Constants.COFIGURATION);
// 第一部分:操作微博内容表(增加一条微博内容)
// 1. 获取微博内容表对象
Table contentTable = connection.getTable(TableName.valueOf(Constants.CONTENT_TABLE));
// 2. 获取当前时间戳
long timestamp = System.currentTimeMillis();
// 3. 获取RowKey
String rowKey = uid + "_" + timestamp;
// 4. 创建Put对象
Put contentPut = new Put(Bytes.toBytes(rowKey));
// 5. 给Put对象赋值
contentPut.addColumn(Bytes.toBytes(Constants.CONTENT_TABLE_CF), Bytes.toBytes("content"), Bytes.toBytes(content));
// 6. 执行插入操作
contentTable.put(contentPut);
// 第二部分:操作微博收件箱表
// 获取发布微博的用户有哪些粉丝
// 1. 获取用户关系表对象
Table relationTable = connection.getTable(TableName.valueOf(Constants.RELATION_TABLE));
// 2. 获取当前发布微博人的fans列族数据
Get get = new Get(Bytes.toBytes(uid));
get.addFamily(Bytes.toBytes(Constants.RELATION_TABLE_CF_FANS));
Result result = relationTable.get(get);
// 3. 创建一个集合,用于存放微博内容表的Put对象
ArrayList<Put> inboxPuts = new ArrayList<>();
//4. 遍历粉丝
for (Cell cell : result.rawCells()) {
// 5. 构建微博收件箱表的Put对象
Put inboxPut = new Put(CellUtil.cloneQualifier(cell));
// 6. 给收件箱表的Put对象赋值
inboxPut.addColumn(Bytes.toBytes(Constants.INBOX_TABLE_CF),Bytes.toBytes(uid),Bytes.toBytes(rowKey));
// 7. 将收件箱表的Put对象存入集合
inboxPuts.add(inboxPut);
}
// 8. 判断是否有粉丝
if (!inboxPuts.isEmpty()) {
// 获取收件箱表对象
Table inboxTable = connection.getTable(TableName.valueOf(Constants.INBOX_TABLE));
// 执行收件箱表数据批量插入操作
inboxTable.put(inboxPuts);
// 关闭收件箱表
inboxTable.close();
}
// 9. 关闭资源
contentTable.close();
relationTable.close();
connection.close();
}
4.3 添加关注
/**
* 2. 关注用户
* @param uid 主用户ID
* @param attends 被关注人ID
*/
public static void addAttends(String uid, String... attends) throws IOException {
// 校验是否添加了待关注的人
if (attends.length <= 0) {
System.out.println("请添加关注人!!!");
return;
}
// 获取Connection对象
Connection connection = ConnectionFactory.createConnection(Constants.COFIGURATION);
// 第一部分:操作用户关系表
// 1. 获取用户关系表对象
Table relationTable = connection.getTable(TableName.valueOf(Constants.RELATION_TABLE));
// 2. 创建一个集合用于存放用户关系表的Put对象(主用户attend列族添加1个,被关注人fans列族添加attends.length个)
LinkedList<Put> puts = new LinkedList<>();
// 3. 创建主用户的Put对象
Put uidPut = new Put(Bytes.toBytes(uid));
// 4. 循环创建被关注者的Put对象
for (String attend : attends) {
//5. 给主用户的Put对象赋值
uidPut.addColumn(Bytes.toBytes(Constants.RELATION_TABLE_CF_ATTENDS),Bytes.toBytes(attend),Bytes.toBytes(attend));
//6. 创建被关注者的Put对象
Put put = new Put(Bytes.toBytes(attend));
// 7. 给被关注者的Put对象赋值
put.addColumn(Bytes.toBytes(Constants.RELATION_TABLE_CF_FANS), Bytes.toBytes(uid), Bytes.toBytes(uid));
// 8. 将被关注者的Put对象放入集合
puts.add(put);
}
// 9. 将主用户的Put对象放入集合
puts.add(uidPut);
// 10. 执行用户关系表的插入数据操作
relationTable.put(puts);
// 第二部分:操作收件箱表
// 1. 获取微博内容表对象
Table contentTable = connection.getTable(TableName.valueOf(Constants.CONTENT_TABLE));
// 2. 创建微博收件箱表的Put对象
Put inboxPut = new Put(Bytes.toBytes(uid));
// 3. 循环attends,获取每个被关注者的近期发布的微博
for (String attend : attends) {
// 4. 获取当前被关注者的近期发布的微博(scan)->集合ResultScanner
// 思路一(此处采用):先获取出被关注者的所有微博,然后再取前三条(由于发布微博时是rowkey是增加了时间戳,因此HBase中就是顺序存,取出也是顺序的)
// 由于微博内容表的rowkey设计是uid_时间戳
// 因此 startRow 可以写成uid_,这样就比该用户所有数据rowkey都小
// stopRow 可以写出uid_|,这样就比该用户所有rowkey都大,因为|在unicode编码中数据是最大的
// 获取到所有微博后再取前三条
//
// 思路二:发布微博的时候rowkey不添加时间戳,而是(比如时间戳有13位)用13位数的最大值(13个9)-当前时间戳
// 这时候存入就是倒序存入的了
// 然后在这里取值直接就取到前三条即可
//
//由于要修改代码,这里选取思路一
Scan scan = new Scan(Bytes.toBytes(attend + "_"), Bytes.toBytes(attend + "|"));
ResultScanner resultScanner = contentTable.getScanner(scan);
// 定义一个时间戳(然后使用++保证时间戳不一样,否则在这里批量写入以后,HBase服务端会自动加时间戳,
// 下次再取的时候由于时间戳一样只能取到一条,所有需要手动加一个时间戳)
long timeStamp = System.currentTimeMillis();
// 5. 对获取的值进行遍历
for (Result result : resultScanner) {
// 6. 给收件箱表的Put对象赋值
inboxPut.addColumn(Bytes.toBytes(Constants.INBOX_TABLE_CF), Bytes.toBytes(attend), timeStamp++, result.getRow());
}
}
// 7. 判断当前的Put对象是不为空
if (!inboxPut.isEmpty()) {
// 不为空,获取微博收件箱表对象
Table inboxTable = connection.getTable(TableName.valueOf(Constants.INBOX_TABLE));
// 插入数据
inboxTable.put(inboxPut);
// 关闭收件箱表对象连接
inboxTable.close();
}
// 关闭资源
relationTable.close();
contentTable.close();
connection.close();
}
4.4 取关用户
/**
* 3. 取关用户
* @param uid 主用户ID
* @param dels 被关注人ID
*/
public static void deleteAttend(String uid,String ...dels) throws IOException {
if (dels.length <= 0) {
System.out.println("请添加取关的用户!!!");
return;
}
// 获取Connection对象
Connection connection = ConnectionFactory.createConnection(Constants.COFIGURATION);
// 第一部分:操作用户关系表
// 1. 获取用户关系表对象
Table relaTable = connection.getTable(TableName.valueOf(Constants.RELATION_TABLE));
// 2. 创建一个集合,用于存放用户关系表的Delete对象
LinkedList<Delete> relaDeletes = new LinkedList<>();
// 3. 创建主用户的Delete对象
Delete uidDelete = new Delete(Bytes.toBytes(uid));
// 4. 循环创建被取关者的Delete对象
for (String del : dels) {
// 5. 给主用户的Delete对象赋值
// 注意:这里要使用带s的api,防止多次关注(一般不会,会在前端做校验)
uidDelete.addColumns(Bytes.toBytes(Constants.RELATION_TABLE_CF_ATTENDS),Bytes.toBytes(del));
// 6. 创建被取关者的Delete对象,删除粉丝
Delete delete = new Delete(Bytes.toBytes(del));
// 7. 给被取关者的Delete对象赋值(赋值到列,因为不是删rowkey,只是删被取关的人,不是删全部)
delete.addColumns(Bytes.toBytes(Constants.RELATION_TABLE_CF_FANS), Bytes.toBytes(uid));
// 8. 将被取关者的Delete对象添加至集合
relaDeletes.add(delete);
}
// 9. 将操作者的Delete对象添加至结合
relaDeletes.add(uidDelete);
//10. 执行用户关系表的批量删除操作
relaTable.delete(relaDeletes);
// 第二部分:操作收件箱表
// 1. 获取收件箱表对象
Table inboxTable = connection.getTable(TableName.valueOf(Constants.INBOX_TABLE));
// 2. 创建操作者的Delete对象
Delete inboxDelete = new Delete(Bytes.toBytes(uid));
// 3. 循环给操作者的Delete对象赋值
for (String del : dels) {
inboxDelete.addColumns(Bytes.toBytes(Constants.INBOX_TABLE_CF), Bytes.toBytes(del));
}
// 4. 执行收件箱表的删除操作
inboxTable.delete(inboxDelete);
// 关闭资源
relaTable.close();
inboxTable.close();
connection.close();
}
4.5 获取某个用户的初始化页面数据
/**
* 4. 获取某个用户的初始化页面数据
* @param uid 用户ID
*/
public static void getInit(String uid) throws IOException {
// 1. 获取Connection对象
Connection connection = ConnectionFactory.createConnection(Constants.COFIGURATION);
// 2. 获取收件箱表对象
Table inboxTable = connection.getTable(TableName.valueOf(Constants.INBOX_TABLE));
// 3. 获取微博内容表对象
Table contentTable = connection.getTable(TableName.valueOf(Constants.CONTENT_TABLE));
// 4. 创建收件箱表Get对象,并获取数据(设置最大版本)
Get inboxGet = new Get(Bytes.toBytes(uid));
inboxGet.setMaxVersions();
Result result = inboxTable.get(inboxGet);
// 5. 遍历获取的数据
for (Cell cell : result.rawCells()) {
// 6. 构建微博内容表Get对象
Get contentGet = new Get(CellUtil.cloneValue(cell));
// 7. 获取该Get对象的数据内容
Result contentReult = contentTable.get(contentGet);
// 8. 解析内容并打印
for (Cell contentCell : contentReult.rawCells()) {
System.out.println("RowKey:" + Bytes.toString(CellUtil.cloneRow(contentCell)) +
",CF:" + Bytes.toString(CellUtil.cloneFamily(contentCell)) +
",CN:" + Bytes.toString(CellUtil.cloneQualifier(contentCell)) +
",Value:" + Bytes.toString(CellUtil.cloneValue(contentCell)));
}
}
// 9. 关闭资源
inboxTable.close();
contentTable.close();
connection.close();
}
4.6 获取某个用户所有微博详情
/**
* 5. 获取某个用户所有微博详情
* @param uid 用户ID
*/
public static void getWeibo(String uid) throws IOException {
// 1. 获取Connection对象
Connection connection = ConnectionFactory.createConnection(Constants.COFIGURATION);
// 2. 获取微博内容表对象
Table table = connection.getTable(TableName.valueOf(Constants.CONTENT_TABLE));
// 3. 构建Scan对象
// 思路一:同添加关注时的思路一
// startRow:uid_
// stopRow:uid|
// 思路二(此处采用):过滤器
Scan scan = new Scan();
// 构建过滤器
// 表示过滤出 RowKey 中有uid_"-"的字串的数据
RowFilter rowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator(uid + "_"));
scan.setFilter(rowFilter);
// 4. 获取数据
ResultScanner resultScanner = table.getScanner(scan);
// 5. 解析数据并打印
for (Result result : resultScanner) {
for (Cell cell : result.rawCells()) {
System.out.println("RowKey:" + Bytes.toString(CellUtil.cloneRow(cell)) +
",CF:" + Bytes.toString(CellUtil.cloneFamily(cell)) +
",CN:" + Bytes.toString(CellUtil.cloneQualifier(cell)) +
",Value:" + Bytes.toString(CellUtil.cloneValue(cell)));
}
}
// 6. 关闭资源
table.close();
connection.close();
}
5. 测试类
package com.jshawn.hbase.guli_weibo.test;
import com.jshawn.hbase.guli_weibo.constants.Constants;
import com.jshawn.hbase.guli_weibo.dao.HBaseDao;
import com.jshawn.hbase.guli_weibo.utils.HBaseUtils;
import java.io.IOException;
/**
* 谷粒微博测试类
* @author JShawn 2021/6/20 22:35
*/
public class TestWeibo {
public static void main(String[] args) throws IOException, InterruptedException {
// 创建命名空间和表结构
HBaseDao.init();
// 1001发布微博
HBaseDao.publishWeibo("1001","赶紧下课吧");
// 1002关注1001和1003
HBaseDao.addAttends("1002","1001","1003");
// 获取1002初始化页面
HBaseDao.getInit("1002");
System.out.println("***************************111*******************************");
// 1003发布3条微博,同时1001发布2条微博
HBaseDao.publishWeibo("1003","谁说的赶紧下课!!!");
Thread.sleep(10);
HBaseDao.publishWeibo("1001","我没说话!!!");
Thread.sleep(10);
HBaseDao.publishWeibo("1003","那谁说的!!!");
Thread.sleep(10);
HBaseDao.publishWeibo("1001","反正我没说!!!");
Thread.sleep(10);
HBaseDao.publishWeibo("1003","你们爱咋咋地!!!");
// 获取1002初始化页面
HBaseDao.getInit("1002");
System.out.println("***************************222*******************************");
// 1002取关1003
HBaseDao.deleteAttend("1002","1003");
// 获取1002初始化页面
HBaseDao.getInit("1002");
System.out.println("***************************333*******************************");
// 1002再次关注1003
HBaseDao.addAttends("1002","1003");
// 获取1002初始化页面
HBaseDao.getInit("1002");
System.out.println("***************************444*******************************");
// 获取1001微博详情
HBaseDao.getWeibo("1001");
}
}