+——+——————————+————-+————-+———+——————————+——————-+
| key| value| topic|partition|offset| timestamp|timestampType|
+——+——————————+————-+————-+———+——————————+——————-+
|null|[7B 27 61 6C 65 7…|my-stream| 0| 0|1969-12-31 23:59:…| 0|
|null|[7B 27 61 6C 65 7…|my-stream| 0| 1|1969-12-31 23:59:…| 0|

  1. value 是二进制的,需要转成string

test_value_string = test_value.selectExpr(“CAST(value AS STRING)”)

  1. kafka通常用json格式存储value,具体看情况是不是
  2. 处理json格式的value,有两条路

3.1 用 schema 格式化,其结果是去掉所有的key,将value以逗号分隔后组成一行
3.2 不用schema 格式化, 就是 1)中转为String的样子,还是json格式

用 schema 格式化

  1. schema = StructType([
  2. StructField("key1", StringType(), True),
  3. StructField("key2", StringType(), True)
  4. ])
  5. result = test_value_string.select(from_json(col('values'), schema).alias("values"))
  6. query = result.writeStream.format('console').start()
  7. query.awaitTermination(60)
  8. query.stop()
  9. # 结果如下:
  10. +------+
  11. |values|
  12. +------+
  13. |[1, 1]|
  14. |[4, 4]|
  15. |[3, 3]|

不用 schema 格式化

  1. query = test_value_string.writeStream.format('console').start()
  2. query.awaitTermination(60)
  3. query.stop()
  4. # 结果如下:
  5. +------------------------------+
  6. | values |
  7. +------------------------------+
  8. |{"key1":"1", "key2":"1"}|
  9. |{"key1":"3", "key2":"6"}|
  10. |{"key1":"a", "key2":"c"}|
  1. 有时候,value并不是完全的json格式,可能在json格式前有一些字符,这个时候就要用正则去除掉,而substring + locate的嵌套函数方式好像是不支持的,知道嵌套函数方式怎么写的同学可以告诉下我。 ```python

    string后的value

    +———————————————————————+ | values | +———————————————————————+ |fdsgfasgdsga{“key1”:”1”, “key2”:”1”}| +———————————————————————+

正则提取

value_string = value_string.select(regexp_extract(value_string.value,r’({.*})’,1).alias(“values”))

结果

+———————————————+ | values | +———————————————+ |{“key1”:”1”, “key2”:”1”}| +———————————————+ ```