展开集合数据
CREATE TABLE data_audit_log (
`user` STRING,
`eventTime` TIMESTAMP,
`source` STRING,
`domain` STRING,
`engine` STRING,
`action` ROW<
`read` ARRAY<ROW<
`storageEngine` STRING,
`category` STRING,
`dbName` STRING,
`tableName` STRING,
`columns` ARRAY<STRING>
>>,
`write` ARRAY<ROW<
`storageEngine` STRING,
`category` STRING,
`dbName` STRING,
`tableName` STRING,
`columns` ARRAY<STRING>
>>,
`create` ARRAY<ROW<
`storageEngine` STRING,
`category` STRING,
`dbName` STRING,
`tableName` STRING,
`columns` ARRAY<STRING>
>>,
`alter` ARRAY<ROW<
`storageEngine` STRING,
`category` STRING,
`dbName` STRING,
`tableName` STRING,
`columns` ARRAY<STRING>
>>,
`drop` ARRAY<ROW<
`storageEngine` STRING,
`category` STRING,
`dbName` STRING,
`tableName` STRING,
`columns` ARRAY<STRING>
>>
>
) WITH (
'connector' = 'kafka',
'topic' = 'wdp.stanlee.data_audit_log',
'properties.bootstrap.servers' = '10.1.128.48:9092,10.1.128.63:9092,10.1.128.31:9092,10.1.128.30:9092,10.1.128.32:9092,10.1.128.47:9092',
'properties.group.id' = 'dg-datahub',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
);
select
user, eventTime, source, domain, engine,
storageEngine as storage_engine, category,
dbName as db_name, tableName as table_name
from (
select * from data_audit_log
where CARDINALITY(`action`.`alter`) > 0
)
CROSS JOIN UNNEST(`alter`) AS t (storageEngine, category, dbName, tableName, columns)