客户端使用包含交互式命令行常用命令使用,以及各语言api客户端使用
命令行
进入HBASE_HOME/bin目录,执行./hbase shell ,进入命令行客户端
输入help可以看到命令分组
部分命令清单
general操作
ddl操作(data definition language)
创建表
create 'table1', 'tab1_id', 'tab1_add', 'tab1_info'
列出所有的表
list
获得表的描述
describe "table1"
删除一个列族 disable alter enable
disable 'table1'
alter 'table1', {NAME=>'tab1_add', METHOD=>'delete'}
enable 'table1'
查看表是否存在
exists 'table2'
删除一个表
disable 'table1'
drop 'table1'
dml操作(data manipulation language)
插入记录
put 'member' , 'scutshuxue' , 'info:age' , '24'
put 'member' , 'scutshuxue' , 'info:birthday' , '1987-06-17'
put 'member' , 'scutshuxue' , 'info:company' , 'alibaba'
put 'member' , 'scutshuxue' , 'address:contry' , 'china'
put 'member' , 'scutshuxue' , 'address:province' , 'zhejiang'
put 'member' , 'scutshuxue' , 'address:city' , 'hangzhou'
全表扫描 scan
scan 'member'
查询数据 get
获取一行数据
get 'member' , 'rowkey1'
获得某行,某列族的所有数据
get 'member' , 'rowkey1','cf1'
获得某行,某列族 ,某列所有数据
get 'member' , 'rowkey1','cf1','coulumn1'
更新一条记录 put
put 'member', 'scutshuxue', 'info:age', 99
删除delete、deleteall
删除scutshuxue行info列族中age字段的值
delete 'member', 'scutshuxue', 'info:age'
删除scutshuxue行
deleteall 'member', 'scutshuxue'
查询表中有多少行
count 'member'
- 给‘xiaoming’这个id增加’info:age’字段,并使用counter实现递增
incr 'member', 'xiaoming', 'info:age'
- 整个表清空
truncate 'member'
API客户端
JAVA 客户端
pom依赖
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<version>1.2</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
代码
@Data
public class Student{
private Integer id;
private String name;
private String age;
}
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.guoyu.hbase.entry.Student;
public class HBaseDemo {
private static Logger logger = LoggerFactory.getLogger(HBaseDemo.class);
private static Admin admin;
public static void main(String[] args) throws IOException {
//System.out.println("hhhh");
// boolean flag = createTable("dbtest:user_table",new String[] {"information","contact"});
// if(flag) {
// System.out.println("创建成功");
// }
//插入和更新数据
// Student stu = new Student();
// stu.setId(1);
// stu.setName("xiaoxin");
// stu.setAge("18");
// insertData("dbtest:student",stu);
// System.out.println("插入成功");
Student stu = getDataByRowKey("dbtest:student","1");
System.out.println(stu.toString());
String name = getCellData("dbtest:student","1","info","name");
System.out.println(name);
logger.info("student查询成功");
List<Student> students = getAllData("dbtest:student");
students.forEach(System.out::println);
//deleteByRowKey("dbtest:student","student-1");
//logger.info("删除成功");
deleteTable("dbtest:user_table");
logger.info("删除表成功");
}
// 创建HBase的连接
public static Connection initHBase() throws IOException {
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.property.clientPort", "2181");
conf.set("hbase.zookeeper.quorum", "hadoop2,hadoop3,hadoop4");
conf.set("hbase.master", "hadoop2:60000");
Connection connection = ConnectionFactory.createConnection();
return connection;
}
// 创建表
public static boolean createTable(String tableName, String[] cols) throws IOException {
TableName table = TableName.valueOf(tableName);
admin = initHBase().getAdmin();
if (admin.tableExists(table)) {
System.out.println("表已存在!");
return false;
} else {
HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName);
for (String col : cols) {
HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(col);
hTableDescriptor.addFamily(hColumnDescriptor);
}
admin.createTable(hTableDescriptor);
return true;
}
}
//更新或插入数据
public static void insertData(String tableName, Student stu) throws IOException {
TableName table = TableName.valueOf(tableName);
Put put = new Put((stu.getId()+"").getBytes());
put.addColumn("info".getBytes(), "name".getBytes(), stu.getName().getBytes());
put.addColumn("info".getBytes(), "age".getBytes(), stu.getAge().getBytes());
Table tab = initHBase().getTable(table);
tab.put(put);
System.out.println("插入成功");
}
//根据rowKey进行查询
public static Student getDataByRowKey(String tableName, String rowKey) throws IOException {
Table table = initHBase().getTable(TableName.valueOf(tableName));
Get get = new Get(rowKey.getBytes());
Student stu = new Student();
stu.setId(Integer.valueOf(rowKey));
if(!get.isCheckExistenceOnly()) {
Result result = table.get(get);
for(Cell cell : result.rawCells()) {
String colName = Bytes.toString(cell.getQualifierArray(),cell.getQualifierOffset(),cell.getQualifierLength());
String value = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
if(colName.equals("name")){
stu.setName(value);
}
if(colName.equals("age")){
stu.setAge(value);
}
}
}
return stu;
}
//查询制定单cell内容
public static String getCellData(String tableName,String rowKey,String family,String col) {
try {
Table table = initHBase().getTable(TableName.valueOf(tableName));
String result = null;
Get get = new Get(rowKey.getBytes());
if(!get.isCheckExistenceOnly()){
get.addColumn(Bytes.toBytes(family),Bytes.toBytes(col));
Result res = table.get(get);
byte[] resByte = res.getValue(Bytes.toBytes(family), Bytes.toBytes(col));
return result = Bytes.toString(resByte);
}else{
return result = "查询结果不存在";
}
}catch(Exception e) {
e.printStackTrace();
}
return "出现异常";
}
public static List<Student> getAllData(String tableName){
Table table = null;
List<Student> list = new ArrayList<Student>();
try {
table = initHBase().getTable(TableName.valueOf(tableName));
ResultScanner results = table.getScanner(new Scan());
Student stu = null;
for(Result result : results) {
String id = new String(result.getRow());
System.out.println("用户名:"+new String(result.getRow()));
stu = new Student();
for(Cell cell : result.rawCells()) {
String row = Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
String colName = Bytes.toString(cell.getQualifierArray(),cell.getQualifierOffset(),cell.getQualifierLength());
String value = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
stu.setId(Integer.valueOf(row));
if(colName.equals("name")){
stu.setName(value);
}
if(colName.equals("age")){
stu.setAge(value);
}
}
list.add(stu);
}
}catch(Exception e) {
e.printStackTrace();
}
return list;
}
//删除指定cell数据
public static void deleteByRowKey(String tableName, String rowKey) throws IOException {
Table table = initHBase().getTable(TableName.valueOf(tableName));
Delete delete = new Delete(Bytes.toBytes(rowKey));
table.delete(delete);
}
//删除表
public static void deleteTable(String tableName) {
TableName table = TableName.valueOf(tableName);
try {
admin = initHBase().getAdmin();
admin.disableTable(table);
admin.deleteTable(table);
} catch (IOException e) {
e.printStackTrace();
}
}
}
scala客户端
将生产上的hbase中的conf/hbase-site.xml文件拷贝到idea中的src/resources目录下
import java.io.IOException
import java.util.UUID
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase._
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter
import org.apache.hadoop.hbase.filter.SubstringComparator
import org.apache.hadoop.hbase.util.Bytes
import parquet.org.slf4j.LoggerFactory
object Operator_Hbase {
def LOG = LoggerFactory.getLogger(getClass)
def getHbaseConf: Configuration = {
val conf: Configuration = HBaseConfiguration.create
conf.addResource(".\\main\\resources\\hbase-site.xml")
conf.set("hbase.zookeeper.property.clientPort","2181")
/*conf.set("spark.executor.memory","3000m")
conf.set("hbase.zookeeper.quorum","master,slave1,slave2")
conf.set("hbase.master","master:60000")
conf.set("hbase.rootdir","Contant.HBASE_ROOTDIR")*/
conf
}
//创建一张表
@throws(classOf[MasterNotRunningException])
@throws(classOf[ZooKeeperConnectionException])
@throws(classOf[IOException])
def createTable(hbaseconn: Connection, tableName: String, columnFamilys: Array[String]) = {
//建立一个数据库操作对象
var admin: Admin = hbaseconn.getAdmin;
var myTableName: TableName = TableName.valueOf(tableName)
if (admin.tableExists(myTableName)) {
LOG.info(tableName + "Table exists!")
} else {
val tableDesc: HTableDescriptor = new HTableDescriptor(myTableName)
tableDesc.addCoprocessor("org.apache.hadoop.hbase.coprocessor.AggregateImplementation")
for (columnFamily <- columnFamilys) {
val columnDesc: HColumnDescriptor = new HColumnDescriptor(columnFamily)
tableDesc.addFamily(columnDesc)
}
admin.createTable(tableDesc)
LOG.info(tableName + "create table success!")
}
admin.close()
}
//载入数据
def addRow(table: Table, rowKey: String, columnFamily: String, quorm: String, value: String) = {
val rowPut: Put = new Put(Bytes.toBytes(rowKey))
if (value == null) {
rowPut.addColumn(columnFamily.getBytes, quorm.getBytes, "".getBytes())
} else {
rowPut.addColumn(columnFamily.getBytes, quorm.getBytes, value.getBytes)
}
table.put(rowPut)
}
//获取数据
def getRow(table: Table, rowKey: String): Result = {
val get: Get = new Get(Bytes.toBytes(rowKey))
val result: Result = table.get(get)
for (rowKv <- result.rawCells()) {
println("Famiily:" + new String(rowKv.getFamilyArray, rowKv.getFamilyOffset, rowKv.getFamilyLength, "UTF-8"))
println("Qualifier:" + new String(rowKv.getQualifierArray, rowKv.getQualifierOffset, rowKv.getQualifierLength, "UTF-8"))
println("TimeStamp:" + rowKv.getTimestamp)
println("rowkey:" + new String(rowKv.getRowArray, rowKv.getRowOffset, rowKv.getRowLength, "UTF-8"))
println("Value:" + new String(rowKv.getValueArray, rowKv.getValueOffset, rowKv.getValueLength, "UTF-8"))
}
return result
}
//批量添加数据
def addDataBatch(table: Table, list: java.util.List[Put]) = {
try {
table.put(list)
} catch {
case e: RetriesExhaustedWithDetailsException => {
LOG.error(e.getMessage)
}
case e: IOException => {
LOG.error(e.getMessage)
}
}
}
//查询全部
def queryAll(table: Table): ResultScanner = {
val scan: Scan = new Scan
try {
val s = new Scan()
val result: ResultScanner = table.getScanner(s)
return result
} catch {
case e: IOException => {
LOG.error(e.toString)
}
}
return null
}
//查询条记录
def queryBySingleColumn(table: Table, queryColumn: String, value: String, columns: Array[String]): ResultScanner = {
if (columns == null || queryColumn == null || value == null) {
return null
}
try {
val filter: SingleColumnValueFilter = new SingleColumnValueFilter(Bytes.toBytes(queryColumn),
Bytes.toBytes(queryColumn),CompareOp.EQUAL,new SubstringComparator(value))
val scan: Scan = new Scan()
for (columnName <- columns) {
scan.addColumn(Bytes.toBytes(columnName), Bytes.toBytes(columnName))
}
scan.setFilter(filter)
return table.getScanner(scan)
} catch {
case e: Exception => {
LOG.error(e.toString)
}
}
return null
}
//删除表
def dropTable(hbaseconn: Connection, tableName: String) = {
try {
val admin: HBaseAdmin = hbaseconn.getAdmin.asInstanceOf[HBaseAdmin]
admin.disableTable(TableName.valueOf(tableName))
admin.deleteTable(TableName.valueOf(tableName))
} catch {
case e: MasterNotRunningException => {
LOG.error(e.toString)
}
case e: ZooKeeperConnectionException => {
LOG.error(e.toString)
}
case e: IOException => {
LOG.error(e.toString)
}
}
}
def main(args: Array[String]): Unit = {
val conf: Configuration = getHbaseConf
val conn = ConnectionFactory.createConnection(conf)
//定义表名称
val table:Table = conn.getTable(TableName.valueOf("test"))
try {
//列族fam1,fam2
val familyColumn:Array[String] = Array[String]("info1","info2")
//建表
// createTable(conn,"test",familyColumn)
val uuid:UUID = UUID.randomUUID()
val s_uuid:String = uuid.toString
//载入数据
// addRow(table,s_uuid,"info","column1A",s_uuid+"_1A")
//获取表中所有数据
// getRow(table,"9ec78ac4-6042-4c34-8862-f5aca3e")
//删除表
// dropTable(conn,"test")
}catch{
case e:Exception => {
if (e.getClass == classOf[MasterNotRunningException]){
System.out.println("MasterNotRunningException")
}
if (e.getClass == classOf[ZooKeeperConnectionException]){
System.out.println("ZooKeeperConnectionException")
}
if (e.getClass == classOf[IOException]){
System.out.println("IOException")
}
e.printStackTrace()
}
}finally{
if (null != table){
table.close()
}
}
}
}
python客户端
首先服务端要启动thrift,hbase-daemon.sh start thrift 默认端口是9090
客户端安装依赖包
sudo pip install thrift
sudo pip install hbase-thrift
编写代码
# coding=utf-8
from thrift.transport import TSocket
from thrift.transport.TTransport import TBufferedTransport
from thrift.protocol import TBinaryProtocol
from hbase import Hbase
from hbase.ttypes import ColumnDescriptor
from hbase.ttypes import Mutation
class HBaseClient(object):
def __init__(self, ip, port=9090):
"""
建立与thrift server端的连接
"""
# server端地址和端口设定
self.__transport = TBufferedTransport(TSocket.TSocket(ip, port))
# 设置传输协议
protocol = TBinaryProtocol.TBinaryProtocol(self.__transport)
# 客户端
self.__client = Hbase.Client(protocol)
# 打开连接
self.__transport.open()
def __del__(self):
self.__transport.close()
def get_tables(self):
"""
获得所有表
:return:表名列表
"""
return self.__client.getTableNames()
def create_table(self, table, *columns):
"""
创建表格
:param table:表名
:param columns:列族名
"""
func = lambda col: ColumnDescriptor(col)
column_families = map(func, columns)
self.__client.createTable(table, column_families)
def put(self, table, row, columns):
"""
添加记录
:param table:表名
:param row:行键
:param columns:列名
:return:
"""
func = lambda (k, v): Mutation(column=k, value=v)
mutations = map(func, columns.items())
self.__client.mutateRow(table, row, mutations)
def delete(self, table, row, column):
"""
删除记录
:param table:表名
:param row:行键
"""
self.__client.deleteAll(table, row, column)
def scan(self, table, start_row="", columns=None):
"""
获得记录
:param table: 表名
:param start_row: 起始行
:param columns: 列族
:param attributes:
"""
scanner = self.__client.scannerOpen(table, start_row, columns)
func = lambda (k, v): (k, v.value)
while True:
r = self.__client.scannerGet(scanner)
if not r:
break
yield dict(map(func, r[0].columns.items()))
if __name__ == '__main__':
client = HBaseClient("10.211.55.7")
# client.create_table('student', 'name', 'course')
client.put("student", "1",
{"name:": "Jack",
"course:art": "88",
"course:math": "12"})
client.put("student", "2",
{"name:": "Tom", "course:art": "90",
"course:math": "100"})
client.put("student", "3",
{"name:": "Jerry"})
client.delete('student', '1', 'course:math')
for v in client.scan('student'):
print v
其他参考链接:
[https://www.cnblogs.com/royfans/p/7199271.html](https://www.cnblogs.com/royfans/p/7199271.html)<br /> [http://shzhangji.com/cnblogs/2018/04/22/connect-hbase-with-python-and-thrift/](http://shzhangji.com/cnblogs/2018/04/22/connect-hbase-with-python-and-thrift/)