AirFlow 中使用的 HiveServer2Hook 底层使用的 pyhive,所以先来看下 pyhive 的使用:
1. PyHive连接Hive
from pyhive import hiveconn = hive.connect(host='1.1.1.1',port=2000,auth="LDAP",database='test',username='yumingmin',password='yumingmin')cursor = conn.cursor()cursor.execute('select * from test.students limit 10')
2. 使用HiveServer2Hook
首先常见一个 connection,注意 authMechanism 不可以使用 PLAIN,这个和 impyla 是不一样的。
$ airflow connections add 'hive_db' --conn-uri 'hiveserver2://yumingmin:yumingmin@1.1.1.1:20000/test?authMechanism=LDAP'
执行代码:
from airflow.providers.apache.hive.hooks.hive import HiveServer2Hookhh = HiveServer2Hook(hiveserver2_conn_id="impala_db")sql = "SELECT * FROM cszc.sop_phone_recom_num_m2small_aftn LIMIT 1"print(len(hh.get_records(sql)))
