AirFlow 中使用的 HiveServer2Hook 底层使用的 pyhive,所以先来看下 pyhive 的使用:

1. PyHive连接Hive

  1. from pyhive import hive
  2. conn = hive.connect(host='1.1.1.1',
  3. port=2000,
  4. auth="LDAP",
  5. database='test',
  6. username='yumingmin',
  7. password='yumingmin')
  8. cursor = conn.cursor()
  9. cursor.execute('select * from test.students limit 10')

2. 使用HiveServer2Hook

首先常见一个 connection,注意 authMechanism 不可以使用 PLAIN,这个和 impyla 是不一样的。

  1. $ airflow connections add 'hive_db' --conn-uri 'hiveserver2://yumingmin:yumingmin@1.1.1.1:20000/test?authMechanism=LDAP'

执行代码:

  1. from airflow.providers.apache.hive.hooks.hive import HiveServer2Hook
  2. hh = HiveServer2Hook(hiveserver2_conn_id="impala_db")
  3. sql = "SELECT * FROM cszc.sop_phone_recom_num_m2small_aftn LIMIT 1"
  4. print(len(hh.get_records(sql)))