展开集合数据

  1. CREATE TABLE data_audit_log (
  2. `user` STRING,
  3. `eventTime` TIMESTAMP,
  4. `source` STRING,
  5. `domain` STRING,
  6. `engine` STRING,
  7. `action` ROW<
  8. `read` ARRAY<ROW<
  9. `storageEngine` STRING,
  10. `category` STRING,
  11. `dbName` STRING,
  12. `tableName` STRING,
  13. `columns` ARRAY<STRING>
  14. >>,
  15. `write` ARRAY<ROW<
  16. `storageEngine` STRING,
  17. `category` STRING,
  18. `dbName` STRING,
  19. `tableName` STRING,
  20. `columns` ARRAY<STRING>
  21. >>,
  22. `create` ARRAY<ROW<
  23. `storageEngine` STRING,
  24. `category` STRING,
  25. `dbName` STRING,
  26. `tableName` STRING,
  27. `columns` ARRAY<STRING>
  28. >>,
  29. `alter` ARRAY<ROW<
  30. `storageEngine` STRING,
  31. `category` STRING,
  32. `dbName` STRING,
  33. `tableName` STRING,
  34. `columns` ARRAY<STRING>
  35. >>,
  36. `drop` ARRAY<ROW<
  37. `storageEngine` STRING,
  38. `category` STRING,
  39. `dbName` STRING,
  40. `tableName` STRING,
  41. `columns` ARRAY<STRING>
  42. >>
  43. >
  44. ) WITH (
  45. 'connector' = 'kafka',
  46. 'topic' = 'wdp.stanlee.data_audit_log',
  47. 'properties.bootstrap.servers' = '10.1.128.48:9092,10.1.128.63:9092,10.1.128.31:9092,10.1.128.30:9092,10.1.128.32:9092,10.1.128.47:9092',
  48. 'properties.group.id' = 'dg-datahub',
  49. 'scan.startup.mode' = 'earliest-offset',
  50. 'format' = 'json'
  51. );
  52. select
  53. user, eventTime, source, domain, engine,
  54. storageEngine as storage_engine, category,
  55. dbName as db_name, tableName as table_name
  56. from (
  57. select * from data_audit_log
  58. where CARDINALITY(`action`.`alter`) > 0
  59. )
  60. CROSS JOIN UNNEST(`alter`) AS t (storageEngine, category, dbName, tableName, columns)