+——+——————————+————-+————-+———+——————————+——————-+
| key| value| topic|partition|offset| timestamp|timestampType|
+——+——————————+————-+————-+———+——————————+——————-+
|null|[7B 27 61 6C 65 7…|my-stream| 0| 0|1969-12-31 23:59:…| 0|
|null|[7B 27 61 6C 65 7…|my-stream| 0| 1|1969-12-31 23:59:…| 0|
- value 是二进制的,需要转成string
test_value_string = test_value.selectExpr(“CAST(value AS STRING)”)
- kafka通常用json格式存储value,具体看情况是不是
- 处理json格式的value,有两条路
3.1 用 schema 格式化,其结果是去掉所有的key,将value以逗号分隔后组成一行
3.2 不用schema 格式化, 就是 1)中转为String的样子,还是json格式
用 schema 格式化
schema = StructType([
StructField("key1", StringType(), True),
StructField("key2", StringType(), True)
])
result = test_value_string.select(from_json(col('values'), schema).alias("values"))
query = result.writeStream.format('console').start()
query.awaitTermination(60)
query.stop()
# 结果如下:
+------+
|values|
+------+
|[1, 1]|
|[4, 4]|
|[3, 3]|
不用 schema 格式化
query = test_value_string.writeStream.format('console').start()
query.awaitTermination(60)
query.stop()
# 结果如下:
+------------------------------+
| values |
+------------------------------+
|{"key1":"1", "key2":"1"}|
|{"key1":"3", "key2":"6"}|
|{"key1":"a", "key2":"c"}|
- 有时候,value并不是完全的json格式,可能在json格式前有一些字符,这个时候就要用正则去除掉,而substring + locate的嵌套函数方式好像是不支持的,知道嵌套函数方式怎么写的同学可以告诉下我。
```python
string后的value
+———————————————————————+ | values | +———————————————————————+ |fdsgfasgdsga{“key1”:”1”, “key2”:”1”}| +———————————————————————+
正则提取
value_string = value_string.select(regexp_extract(value_string.value,r’({.*})’,1).alias(“values”))
结果
+———————————————+ | values | +———————————————+ |{“key1”:”1”, “key2”:”1”}| +———————————————+ ```