1. # coding: utf8
    2. import contextlib
    3. import os
    4. import re
    5. import socket
    6. import subprocess
    7. import time
    8. import pandas
    9. import unicodecsv as csv
    10. from collections import OrderedDict
    11. from tempfile import NamedTemporaryFile, TemporaryDirectory
    12. from typing import Any, Dict, List, Optional, Union
    13. from impala.dbapi import connect
    14. from impala.error import ProgrammingError
    15. from airflow.configuration import conf
    16. from airflow.exceptions import AirflowException
    17. from airflow.hooks.base import BaseHook
    18. from airflow.hooks.dbapi import DbApiHook
    19. from airflow.utils.operator_helpers import AIRFLOW_VAR_NAME_FORMAT_MAPPING
    20. from airflow.providers.apache.hdfs.hooks.webhdfs import WebHDFSHook
    21. def get_context_from_env_var() -> Dict[Any, Any]:
    22. """
    23. Extract context from env variable, e.g. dag_id, task_id and execution_date,
    24. so that they can be used inside BashOperator and PythonOperator.
    25. :return: The context of interest.
    26. """
    27. return {
    28. format_map['default']: os.environ.get(format_map['env_var_format'], '')
    29. for format_map in AIRFLOW_VAR_NAME_FORMAT_MAPPING.values()
    30. }
    31. class ImpalaHook(DbApiHook):
    32. """
    33. Wrapper around the impyla library
    34. Notes:
    35. * the default authMechanism is PLAIN, to override it you
    36. can specify it in the ``extra`` of your connection in the UI
    37. * the default for run_set_variable_statements is true, if you
    38. are using impala you may need to set it to false in the
    39. ``extra`` of your connection in the UI
    40. :param impala_conn_id: Reference to the
    41. :type impala_conn_id: str
    42. :param schema: Impala database name.
    43. :type schema: Optional[str]
    44. """
    45. conn_name_attr = 'impala_conn_id'
    46. default_conn_name = 'impala_default'
    47. conn_type = 'impala'
    48. hook_name = 'Impala Thrift'
    49. supports_autocommit = False
    50. def get_conn(self, schema: Optional[str] = None) -> Any:
    51. """Returns a Impala connection object."""
    52. username: Optional[str] = None
    53. password: Optional[str] = None
    54. db = self.get_connection(self.impala_conn_id)
    55. auth_mechanism = db.extra_dejson.get('auth_mechanism', 'PLAIN')
    56. password = db.password
    57. return connect(
    58. host=db.host,
    59. port=db.port,
    60. auth_mechanism=auth_mechanism,
    61. user=db.login or username,
    62. password=password,
    63. database=db.schema or schema or 'default',
    64. )
    65. def _get_results(
    66. self,
    67. hql: Union[str, str, List[str]],
    68. schema: str = 'default',
    69. fetch_size: Optional[int] = None,
    70. impala_conf: Optional[Dict[Any, Any]] = None,
    71. ) -> Any:
    72. if isinstance(hql, str):
    73. hql = [hql_ for hql_ in hql.split(";") if hql_ != ""]
    74. previous_description = None
    75. with contextlib.closing(self.get_conn(schema)) as conn, contextlib.closing(conn.cursor()) as cur:
    76. cur.arraysize = fetch_size or 1000
    77. # not all query services (e.g. impala AIRFLOW-4434) support the set command
    78. db = self.get_connection(self.impala_conn_id) # type: ignore
    79. if db.extra_dejson.get('run_set_variable_statements', True):
    80. env_context = get_context_from_env_var()
    81. if impala_conf:
    82. env_context.update(impala_conf)
    83. for statement in hql:
    84. self.log.info(statement)
    85. cur.execute(statement)
    86. # we only get results of statements that returns
    87. lowered_statement = statement.lower().strip()
    88. if (
    89. lowered_statement.startswith('select')
    90. or lowered_statement.startswith('with')
    91. or lowered_statement.startswith('show')
    92. ):
    93. description = cur.description
    94. if previous_description and previous_description != description:
    95. message = '''The statements are producing different descriptions:
    96. Current: {}
    97. Previous: {}'''.format(
    98. repr(description), repr(previous_description)
    99. )
    100. raise ValueError(message)
    101. elif not previous_description:
    102. previous_description = description
    103. yield description
    104. try:
    105. # DB API 2 raises when no results are returned
    106. # we're silencing here as some statements in the list
    107. # may be `SET` or DDL
    108. yield from cur
    109. except ProgrammingError:
    110. self.log.debug("get_results returned no records")
    111. def get_results(
    112. self,
    113. hql: Union[str, str],
    114. schema: str = 'default',
    115. fetch_size: Optional[int] = None,
    116. impala_conf: Optional[Dict[Any, Any]] = None,
    117. ) -> Dict[str, Any]:
    118. """
    119. Get results of the provided hql in target schema.
    120. :param hql: hql to be executed.
    121. :type hql: str or list
    122. :param schema: target schema, default to 'default'.
    123. :type schema: str
    124. :param fetch_size: max size of result to fetch.
    125. :type fetch_size: int
    126. :param impala_conf: impala_conf to execute alone with the hql.
    127. :type impala_conf: dict
    128. :return: results of hql execution, dict with data (list of results) and header
    129. :rtype: dict
    130. """
    131. results_iter = self._get_results(hql, schema, fetch_size=fetch_size, impala_conf=impala_conf)
    132. header = next(results_iter)
    133. results = {'data': list(results_iter), 'header': header}
    134. return results
    135. def to_csv(
    136. self,
    137. hql: Union[str, str],
    138. csv_filepath: str,
    139. schema: str = 'default',
    140. delimiter: str = ',',
    141. lineterminator: str = '\r\n',
    142. output_header: bool = True,
    143. fetch_size: int = 1000,
    144. impala_conf: Optional[Dict[Any, Any]] = None,
    145. ) -> None:
    146. """
    147. Execute hql in target schema and write results to a csv file.
    148. :param hql: hql to be executed.
    149. :type hql: str or list
    150. :param csv_filepath: filepath of csv to write results into.
    151. :type csv_filepath: str
    152. :param schema: target schema, default to 'default'.
    153. :type schema: str
    154. :param delimiter: delimiter of the csv file, default to ','.
    155. :type delimiter: str
    156. :param lineterminator: lineterminator of the csv file.
    157. :type lineterminator: str
    158. :param output_header: header of the csv file, default to True.
    159. :type output_header: bool
    160. :param fetch_size: number of result rows to write into the csv file, default to 1000.
    161. :type fetch_size: int
    162. :param impala_conf: impala_conf to execute alone with the hql.
    163. :type impala_conf: dict
    164. """
    165. results_iter = self._get_results(hql, schema, fetch_size=fetch_size, impala_conf=impala_conf)
    166. header = next(results_iter)
    167. message = None
    168. i = 0
    169. with open(csv_filepath, 'wb') as file:
    170. writer = csv.writer(file, delimiter=delimiter, lineterminator=lineterminator, encoding='utf-8')
    171. try:
    172. if output_header:
    173. self.log.debug('Cursor description is %s', header)
    174. writer.writerow([c[0] for c in header])
    175. for i, row in enumerate(results_iter, 1):
    176. writer.writerow(row)
    177. if i % fetch_size == 0:
    178. self.log.info("Written %s rows so far.", i)
    179. except ValueError as exception:
    180. message = str(exception)
    181. if message:
    182. # need to clean up the file first
    183. os.remove(csv_filepath)
    184. raise ValueError(message)
    185. self.log.info("Done. Loaded a total of %s rows.", i)
    186. def get_records(
    187. self,
    188. hql: Union[str, str],
    189. schema: str = 'default',
    190. impala_conf: Optional[Dict[Any, Any]] = None
    191. ) -> Any:
    192. """
    193. Get a set of records from a Impala query.
    194. :param hql: hql to be executed.
    195. :type hql: str or list
    196. :param schema: target schema, default to 'default'.
    197. :type schema: str
    198. :param impala_conf: impala_conf to execute alone with the hql.
    199. :type impala_conf: dict
    200. :return: result of impala execution
    201. :rtype: list
    202. >>> hh = ImpalaHook()
    203. >>> sql = "SELECT * FROM airflow.static_babynames LIMIT 100"
    204. >>> len(hh.get_records(sql))
    205. 100
    206. """
    207. return self.get_results(hql, schema=schema, impala_conf=impala_conf)['data']
    208. def get_pandas_df(
    209. self,
    210. hql: Union[str, str],
    211. schema: str = 'default',
    212. impala_conf: Optional[Dict[Any, Any]] = None,
    213. **kwargs,
    214. ) -> pandas.DataFrame:
    215. """
    216. Get a pandas dataframe from a Impala query
    217. :param hql: hql to be executed.
    218. :type hql: str or list
    219. :param schema: target schema, default to 'default'.
    220. :type schema: str
    221. :param impala_conf: impala_conf to execute alone with the hql.
    222. :type impala_conf: dict
    223. :param kwargs: (optional) passed into pandas.DataFrame constructor
    224. :type kwargs: dict
    225. :return: result of impala execution
    226. :rtype: DataFrame
    227. >>> hh = ImpalaHook()
    228. >>> sql = "SELECT * FROM airflow.static_babynames LIMIT 100"
    229. >>> df = hh.get_pandas_df(sql)
    230. >>> len(df.index)
    231. 100
    232. :return: pandas.DateFrame
    233. """
    234. res = self.get_results(hql, schema=schema, impala_conf=impala_conf)
    235. df = pandas.DataFrame(res['data'], **kwargs)
    236. df.columns = [c[0] for c in res['header']]
    237. return df
    238. def run_hql(
    239. self,
    240. hql: Union[str, str],
    241. schema: str = 'default',
    242. impala_conf: Optional[Dict[Any, Any]] = None
    243. ) -> None:
    244. if isinstance(hql, str):
    245. hql = [hql_ for hql_ in hql.split(";") if hql_ != ""]
    246. with contextlib.closing(self.get_conn(schema)) as conn, contextlib.closing(conn.cursor()) as cur:
    247. # not all query services (e.g. impala AIRFLOW-4434) support the set command
    248. db = self.get_connection(self.impala_conn_id) # type: ignore
    249. if db.extra_dejson.get('run_set_variable_statements', True):
    250. env_context = get_context_from_env_var()
    251. if impala_conf:
    252. env_context.update(impala_conf)
    253. for statement in hql:
    254. self.log.info("RUN HQL [%s (...)]", statement if len(statement) < 1000 else statement[:1000])
    255. cur.execute(statement)
    256. def test_hql(self,
    257. hql: Union[str, str],
    258. schema: str = 'default',
    259. impala_conf: Optional[Dict[Any, Any]] = None
    260. ) -> None:
    261. """Test an hql statement using the Impala cusor and EXPLAIN"""
    262. create, insert, select = [], [], []
    263. for query in hql.split(';'):
    264. query_original = query
    265. query = query.lower().strip()
    266. if query.startswith('create table'):
    267. create.append(query_original)
    268. elif query.startswith('insert'):
    269. insert.append(query_original)
    270. elif query.startswith('select'):
    271. select.append(query_original)
    272. for query_set in [create, insert, select]:
    273. for query in query_set:
    274. query = 'explain ' + query
    275. try:
    276. # self.get_results(hql, schema=schema, impala_conf=impala_conf)['data']
    277. self.run_hql(query, schema=schema, impala_conf=impala_conf)
    278. except AirflowException as e:
    279. message = e.args[0].split('\n')[-2]
    280. self.log.info(message)
    281. error_loc = re.search(r'(\d+):(\d+)', message)
    282. if error_loc and error_loc.group(1).isdigit():
    283. lst = int(error_loc.group(1))
    284. begin = max(lst - 2, 0)
    285. end = min(lst + 3, len(query.split('\n')))
    286. context = '\n'.join(query.split('\n')[begin:end])
    287. self.log.info("Context :\n %s", context)
    288. else:
    289. self.log.info("SUCCESS")
    290. def load_df(
    291. self,
    292. df: pandas.DataFrame,
    293. table: str,
    294. hdfspath: str = "/external/user/yumingmin",
    295. field_dict: Optional[Dict[Any, Any]] = None,
    296. delimiter: str = ',',
    297. encoding: str = 'utf8',
    298. pandas_kwargs: Any = None,
    299. **kwargs: Any,
    300. ) -> None:
    301. """
    302. Loads a pandas DataFrame into hive.
    303. Hive data types will be inferred if not passed but column names will
    304. not be sanitized.
    305. :param df: DataFrame to load into a Hive table
    306. :type df: pandas.DataFrame
    307. :param table: target Hive table, use dot notation to target a
    308. specific database
    309. :type table: str
    310. :param field_dict: mapping from column name to hive data type.
    311. Note that it must be OrderedDict so as to keep columns' order.
    312. :type field_dict: collections.OrderedDict
    313. :param delimiter: field delimiter in the file
    314. :type delimiter: str
    315. :param encoding: str encoding to use when writing DataFrame to file
    316. :type encoding: str
    317. :param pandas_kwargs: passed to DataFrame.to_csv
    318. :type pandas_kwargs: dict
    319. :param kwargs: passed to self.load_file
    320. """
    321. def _infer_field_types_from_df(df: pandas.DataFrame) -> Dict[Any, Any]:
    322. dtype_kind_hive_type = {
    323. 'b': 'BOOLEAN', # boolean
    324. 'i': 'BIGINT', # signed integer
    325. 'u': 'BIGINT', # unsigned integer
    326. 'f': 'DOUBLE', # floating-point
    327. 'c': 'STRING', # complex floating-point
    328. 'M': 'TIMESTAMP', # datetime
    329. 'O': 'STRING', # object
    330. 'S': 'STRING', # (byte-)string
    331. 'U': 'STRING', # Unicode
    332. 'V': 'STRING', # void
    333. }
    334. order_type = OrderedDict()
    335. for col, dtype in df.dtypes.iteritems():
    336. order_type[col] = dtype_kind_hive_type[dtype.kind]
    337. return order_type
    338. if pandas_kwargs is None:
    339. pandas_kwargs = {}
    340. with TemporaryDirectory(prefix='airflow_hiveop_') as tmp_dir:
    341. with NamedTemporaryFile(dir=tmp_dir, mode="w") as f:
    342. if field_dict is None:
    343. field_dict = _infer_field_types_from_df(df)
    344. df.to_csv(
    345. path_or_buf=f,
    346. sep=delimiter,
    347. header=False,
    348. index=False,
    349. encoding=encoding,
    350. chunksize=10000,
    351. date_format="%Y-%m-%d %H:%M:%S",
    352. **pandas_kwargs,
    353. )
    354. f.flush()
    355. return self.load_file(
    356. filepath=f.name,
    357. table=table,
    358. hdfspath=hdfspath,
    359. delimiter=delimiter,
    360. field_dict=field_dict,
    361. recreate=True,
    362. tblproperties={"EXTERNAL": "TRUE"},
    363. **kwargs
    364. )
    365. def load_file(
    366. self,
    367. filepath: str,
    368. table: str,
    369. hdfspath: str = "/external/user/yumingmin",
    370. delimiter: str = ",",
    371. field_dict: Optional[Dict[Any, Any]] = None,
    372. create: bool = True,
    373. overwrite: bool = True,
    374. partition: Optional[Dict[str, Any]] = None,
    375. recreate: bool = False,
    376. tblproperties: Optional[Dict[str, Any]] = None,
    377. ) -> None:
    378. """
    379. Loads a local file into Impala(HDFS)
    380. Note that the table generated in Impala uses ``STORED AS textfile``
    381. which isn't the most efficient serialization format. If a
    382. large amount of data is loaded and/or if the tables gets
    383. queried considerably, you may want to use this operator only to
    384. stage the data into a temporary table before loading it into its
    385. final destination using a ``ImpalaOperator``.
    386. :param filepath: local filepath of the file to load
    387. :type filepath: str
    388. :param hdfspath: local filepath of the file to load
    389. :type hdfspath: str
    390. :param table: target Impala table, use dot notation to target a
    391. specific database
    392. :type table: str
    393. :param delimiter: field delimiter in the file
    394. :type delimiter: str
    395. :param field_dict: A dictionary of the fields name in the file
    396. as keys and their Impala types as values.
    397. Note that it must be OrderedDict so as to keep columns' order.
    398. :type field_dict: collections.OrderedDict
    399. :param create: whether to create the table if it doesn't exist
    400. :type create: bool
    401. :param overwrite: whether to overwrite the data in table or partition
    402. :type overwrite: bool
    403. :param partition: target partition as a dict of partition columns
    404. and values
    405. :type partition: dict
    406. :param recreate: whether to drop and recreate the table at every
    407. execution
    408. :type recreate: bool
    409. :param tblproperties: TBLPROPERTIES of the impala table being created
    410. :type tblproperties: dict
    411. """
    412. hql = ''
    413. if recreate:
    414. hql += f"\nDROP TABLE IF EXISTS {table};"
    415. if create or recreate:
    416. if field_dict is None:
    417. raise ValueError("Must provide a field dict when creating a table")
    418. fields = " " + ",\n ".join(f"`{k.strip('`')}` {v}" for k, v in field_dict.items())
    419. hql += f"\nCREATE TABLE IF NOT EXISTS {table} (\n{fields})\n"
    420. if partition:
    421. pfields = ",\n ".join(p + " STRING" for p in partition)
    422. hql += f"PARTITIONED BY ({pfields})\n"
    423. hql += "ROW FORMAT DELIMITED\n"
    424. hql += f"FIELDS TERMINATED BY '{delimiter}'\n"
    425. hql += f"STORED AS textfile LOCATION '{hdfspath}'\n"
    426. if tblproperties is not None:
    427. tprops = ", ".join(f"'{k}'='{v}'" for k, v in tblproperties.items())
    428. hql += f"TBLPROPERTIES({tprops})\n"
    429. hql += f"; \nCOMPUTE STATS {table}"
    430. self.log.info(hql)
    431. self.run_hql(hql)
    432. # Upload a file using WebHDFSHook
    433. WebHDFSHook(proxy_user="yumingmin").load_file(source=filepath, destination=hdfspath, overwrite=overwrite)
    434. def kill(self) -> None:
    435. """Kill Hive cli command"""
    436. if hasattr(self, 'sub_process'):
    437. if self.sub_process.poll() is None:
    438. print("Killing the Hive job")
    439. self.sub_process.terminate()
    440. time.sleep(60)
    441. self.sub_process.kill()
    442. class ImpalaMetastoreHook(ImpalaHook):
    443. def check_for_partition(self, db: str = None, table: str = None, partition: str = None) -> bool:
    444. """
    445. Checks whether a partition exists
    446. :param db: Name of impala database @table belongs to
    447. :type schema: str
    448. :param table: Name of impala table @partition belongs to
    449. :type table: str
    450. :partition: Expression that matches the partitions to check for
    451. (eg `a = 'b' AND c = 'd'`)
    452. :type schema: str
    453. :rtype: bool
    454. >>> hh = ImpalaMetastoreHook()
    455. >>> t = 'static_babynames_partitioned'
    456. >>> hh.check_for_partition('airflow', t, "ds='2015-01-01'")
    457. True
    458. """
    459. if '.' in table:
    460. db, table = table.split('.')[:2]
    461. if not self.table_exists(db=db, table=table):
    462. raise Exception(f"{db}.{table} does not exist!")
    463. partition_names = self.get_table_partiton_name(db=db, table=table)
    464. if len(partition_names) == 0:
    465. raise Exception(f"{db}.{table} is not partitioned table!")
    466. elif partition.split("=")[0] not in partition_names:
    467. partition = "%s.%s" % (partition_names[0], partition.split("=")[1])
    468. partition = partition if "\'" in partition else "%s='%s'" % (partition_names[0], partition.split("=")[1])
    469. hql = f"SELECT COUNT(1) AS rows_num FROM {db}.{table} WHERE 1=1 AND {partition}"
    470. rows_num = self.get_pandas_df(hql)["rows_num"].values[0]
    471. return True if rows_num > 0 else False
    472. def get_table_partiton_name(self, table: str, db: str = 'default') -> list:
    473. if '.' in table:
    474. db, table = table.split('.')[:2]
    475. if not self.table_exists(db=db, table=table):
    476. raise Exception(f"{db}.{table} does not exist!")
    477. hql = f"SHOW CREATE TABLE {db}.{table}"
    478. ddl = self.get_pandas_df(hql)["result"].values[0]
    479. if ddl.lower().find("partitioned by") == -1:
    480. self.log.info(f"{db}.{table} is not partitioned table!")
    481. return []
    482. else:
    483. partition_name = re.findall(r'partitioned by.\(\s+(.*) string.\)', ddl.lower(), re.S)
    484. return partition_name
    485. def get_table(self, table: str, db: str = 'default') -> Any:
    486. """Get a metastore table object"""
    487. if '.' in table:
    488. db, table = table.split('.')[:2]
    489. hql = f"SELECT * FROM {db}.{table} LIMIT 1"
    490. tbl_metastore = {}
    491. try:
    492. df = self.get_pandas_df(hql)
    493. tbl_metastore[table] = {}
    494. tbl_metastore[table]["columns"] = df.columns
    495. return tbl_metastore
    496. except Exception as e:
    497. self.log.error(e)
    498. return tbl_metastore
    499. def get_tables(self, db: str, pattern: str = '*') -> Any:
    500. """Get a metastore table object"""
    501. hql = f"USE {db}; \nSHOW TABLES LIKE '{pattern}'"
    502. try:
    503. df = self.get_pandas_df(hql)
    504. except Exception as e:
    505. return {}
    506. if len(df) == 0:
    507. return {}
    508. else:
    509. tbls_metastore = {}
    510. for tb in df.name.tolist():
    511. try:
    512. tbls_metastore[tb] = self.get_table(db=db, table_name=tb)[tb]
    513. except Exception as e:
    514. tbls_metastore[tb] = {}
    515. return tbls_metastore
    516. def get_databases(self, db: str, pattern: str = '*') -> Any:
    517. """Get a metastore databases object"""
    518. hql = f"\nSHOW DATABASES LIKE '{pattern}'"
    519. try:
    520. df = self.get_pandas_df(hql)
    521. return df.name.columns
    522. except Exception as e:
    523. return []
    524. def get_partitions(self,
    525. db: str,
    526. table: str,
    527. partition_filter: Optional[Dict[Any, Any]] = None
    528. ) -> List[Any]:
    529. """
    530. Returns a list of all partitions in a table. Works only
    531. for tables with less than 32767 (java short max val).
    532. For subpartitioned table, the number might easily exceed this.
    533. >>> hh = ImpalaMetastoreHook()
    534. >>> t = 'static_babynames_partitioned'
    535. >>> parts = hh.get_partitions(schema='airflow', table=t)
    536. >>> len(parts)
    537. 1
    538. >>> parts
    539. [{'ds': '2015-01-01'}]
    540. """
    541. if '.' in table:
    542. db, table = table.split('.')[:2]
    543. partition = self.get_table_partiton_name(db=db, table=table)
    544. if not partition:
    545. raise Exception(f"{db}.{table} is not partition table!")
    546. hql = f"SELECT DISTINCT {partition[0]} AS value FROM {db}.{table} WHERE 1=1"
    547. if "part_le" in partition_filter.keys():
    548. hql += f" AND {partition[0]} <= '{partition_filter['part_le']}'"
    549. elif "part_ge" in partition_filter.keys():
    550. hql += f" AND {partition[0]} >= '{partition_filter['part_ge']}'"
    551. elif "part_notin" in partition_filter.keys():
    552. cond = str(partition_filter['part_notin']).strip('[').strip(']')
    553. hql += f" AND {partition[0]} NOT IN ({cond})"
    554. df_parts = self.get_pandas_df(hql)
    555. return [{partition[0]: self.get_pandas_df(hql)['value'].tolist()}]
    556. def max_partition(self, db: str, table: str) -> Any:
    557. """
    558. Returns the maximum value for all partitions with given field in a table.
    559. If only one partition key exist in the table, the key will be used as field.
    560. filter_map should be a partition_key:partition_value map and will be used to
    561. filter out partitions.
    562. :param db: schema name.
    563. :type db: str
    564. :param table_name: table name.
    565. :type table_name: str
    566. >>> hh = HiveMetastoreHook()
    567. >>> filter_map = {'ds': '2015-01-01'}
    568. >>> t = 'static_babynames_partitioned'
    569. >>> hh.max_partition(db='airflow', table=t)
    570. '2015-01-01'
    571. """
    572. if '.' in table:
    573. db, table = table.split('.')[:2]
    574. partition = self.get_table_partiton_name(db=db, table=table)
    575. if not partition:
    576. raise Exception(f"{db}.{table} is not partition table!")
    577. hql = f"SELECT MAX({partition[0]}) AS value FROM {db}.{table}"
    578. max_part = self.get_pandas_df(hql)["value"].values[0]
    579. self.log.info(f"{db}.{table} max partition is: {max_part}")
    580. return max_part
    581. def table_exists(self, table: str, db: str = 'default') -> bool:
    582. """
    583. Check if table exists
    584. >>> hh = ImalaMetastoreHook()
    585. >>> hh.table_exists(db='airflow', table_name='static_babynames')
    586. True
    587. >>> hh.table_exists(db='airflow', table_name='does_not_exist')
    588. False
    589. """
    590. if '.' in table:
    591. db, table = table.split('.')[:2]
    592. try:
    593. if self.get_table(table, db):
    594. return True
    595. except Exception:
    596. return False
    597. def drop_partitions(self, table, part_vals, delete_data=False, db='default'):
    598. """
    599. Drop partitions from the given table matching the part_vals input
    600. :param table: table name.
    601. :type table: str
    602. :param part_vals: list of partition specs.
    603. :type part_vals: list
    604. :param delete_data: Setting to control if underlying data have to deleted
    605. in addition to dropping partitions.
    606. :type delete_data: bool
    607. :param db: Name of impala schema (database) @table belongs to
    608. :type db: str
    609. >>> hh = ImpalaMetastoreHook()
    610. >>> hh.drop_partitions(db='airflow', table_name='static_babynames',
    611. part_vals="['2020-05-01']")
    612. True
    613. """
    614. partition = self.get_table_partiton_name(db=db, table=table)
    615. if not partition:
    616. raise Exception(f"{db}.{table} is not partition table!")
    617. for pval in part_vals:
    618. hql = f"ALTER TABLE {db}.{table} DROP IF EXISTS PARTITION ({partition[0]}='{pval}')"
    619. self.run_hql(hql)
    620. def add_partitions(self, table, part_vals, delete_data=False, db='default'):
    621. """
    622. Add partitions from the given table matching the part_vals input
    623. :param table: table name.
    624. :type table: str
    625. :param part_vals: list of partition specs.
    626. :type part_vals: list
    627. :param delete_data: Setting to control if underlying data have to deleted
    628. in addition to dropping partitions.
    629. :type delete_data: bool
    630. :param db: Name of impala schema (database) @table belongs to
    631. :type db: str
    632. >>> hh = ImpalaMetastoreHook()
    633. >>> hh.add_partitions(db='airflow', table_name='static_babynames',
    634. part_vals="['2020-05-01']")
    635. True
    636. """
    637. if '.' in table:
    638. db, table = table.split('.')[:2]
    639. partition = self.get_table_partiton_name(db=db, table=table)
    640. if not partition:
    641. raise Exception(f"{db}.{table} is not partition table!")
    642. self.drop_partitions(db=db, table=table, part_vals=part_vals)
    643. for pval in part_vals:
    644. hql = f"ALTER TABLE {db}.{table} ADD PARTITION ({partition[0]}='{pval}')"
    645. self.run_hql(hql)
    646. def refresh_table(self, db: str, table: str) -> bool:
    647. if '.' in table:
    648. db, table = table.split('.')[:2]
    649. if not self.table_exists(db=db, table=table):
    650. raise AirflowException(f"{db}.{table} not exists!")
    651. hql = f"INVALIDATE METADATA {db}.{table}"
    652. self.run_hql(hql)

    TO-DO:

    • 完善 load_df 功能