展开集合数据
CREATE TABLE data_audit_log ( `user` STRING, `eventTime` TIMESTAMP, `source` STRING, `domain` STRING, `engine` STRING, `action` ROW< `read` ARRAY<ROW< `storageEngine` STRING, `category` STRING, `dbName` STRING, `tableName` STRING, `columns` ARRAY<STRING> >>, `write` ARRAY<ROW< `storageEngine` STRING, `category` STRING, `dbName` STRING, `tableName` STRING, `columns` ARRAY<STRING> >>, `create` ARRAY<ROW< `storageEngine` STRING, `category` STRING, `dbName` STRING, `tableName` STRING, `columns` ARRAY<STRING> >>, `alter` ARRAY<ROW< `storageEngine` STRING, `category` STRING, `dbName` STRING, `tableName` STRING, `columns` ARRAY<STRING> >>, `drop` ARRAY<ROW< `storageEngine` STRING, `category` STRING, `dbName` STRING, `tableName` STRING, `columns` ARRAY<STRING> >> >) WITH ( 'connector' = 'kafka', 'topic' = 'wdp.stanlee.data_audit_log', 'properties.bootstrap.servers' = '10.1.128.48:9092,10.1.128.63:9092,10.1.128.31:9092,10.1.128.30:9092,10.1.128.32:9092,10.1.128.47:9092', 'properties.group.id' = 'dg-datahub', 'scan.startup.mode' = 'earliest-offset', 'format' = 'json');select user, eventTime, source, domain, engine, storageEngine as storage_engine, category, dbName as db_name, tableName as table_name from ( select * from data_audit_log where CARDINALITY(`action`.`alter`) > 0 ) CROSS JOIN UNNEST(`alter`) AS t (storageEngine, category, dbName, tableName, columns)