AirFlow 中使用的 HiveServer2Hook 底层使用的 pyhive,所以先来看下 pyhive 的使用:
1. PyHive连接Hive
from pyhive import hive
conn = hive.connect(host='1.1.1.1',
port=2000,
auth="LDAP",
database='test',
username='yumingmin',
password='yumingmin')
cursor = conn.cursor()
cursor.execute('select * from test.students limit 10')
2. 使用HiveServer2Hook
首先常见一个 connection,注意 authMechanism
不可以使用 PLAIN,这个和 impyla 是不一样的。
$ airflow connections add 'hive_db' --conn-uri 'hiveserver2://yumingmin:yumingmin@1.1.1.1:20000/test?authMechanism=LDAP'
执行代码:
from airflow.providers.apache.hive.hooks.hive import HiveServer2Hook
hh = HiveServer2Hook(hiveserver2_conn_id="impala_db")
sql = "SELECT * FROM cszc.sop_phone_recom_num_m2small_aftn LIMIT 1"
print(len(hh.get_records(sql)))