本文所有功能需使用python的pandas库和sqlalchemy库,请先学习本项目中的SQLite格式说明再学习本文。

Hash值

Hash简介

hash是把一组复杂数据映射为一个数值的算法,常见用途是用于判断两个文件是否是同一个文件。映射算法有很多种,如MD5、CRC32、SHA256等。同样,hash值可以用于判断数据库中的两条记录是否相等。
一般而言,一条记录由这些类型的值组成:整数、实数、布尔值(是/否)、字符串、日期时间。通过某种方式转换后,它们可以统一转换为字符串格式,从而生成hash值。

Python原生函数生成hash值

hash()函数

python内置hash()函数用于计算对象的hash值。出于对安全性的考虑,python内置的算法保证了,即使是同一设备同一版本的python,每次运行时,对同一个python对象的hash值计算也不相同,因此这种hash值不能进行持久化存储。

pickle库

python官网:pickle库
pickle.load()pickle.dump()可序列化任何python对象为Bytes,这个Bytes对象可以利用本文“跨平台跨语言的通用算法”所述方法转为hash字符串。但由于具体的序列化值取决于平台和python版本,算法不完全可重复,也不适合进行持久化存储。

Pandas生成Hash值

若不考虑跨编程语言,只使用python的pandas库进行数据处理,可以用pandas封装好的函数hash_pandas_object。该函数只能对DataFrame的每一行生成uint64类型的hash值,若数据的存储载体不支持uint64类型(比如SQLite就不支持),则需转换为16位长度的HEX字符串。

  1. from pandas.util import hash_pandas_object
  2. # 注意这里的第二个参数index必须为False
  3. df["hash"] = hash_pandas_object(data_init, False)
  4. # 转换为HEX字符串,形如a274890d45ee21cf,若位数不足则在左侧补0
  5. df["hash"] = df["hash"].apply("{:0>16x}".format)

跨平台跨语言的通用算法

python官网:hashlib库
hashlib库支持MD5,SHA256等常见的平台无关的hash算法,每种算法都接收Bytes并生成固定长度的HEX数字作为hash。其他编程语言也支持这些常见的hash算法。
记录很容易转换成相对复杂的字符串,而字符串通过utf-8编码可转换为Bytes。因此只要使不同编程语言用同一种算法将一条记录转为字符串,再用utf-8编码转为Bytes,最终转为hash,就可以保证不同平台不同语言计算出的hash值相同。

  1. import hashlib
  2. hashlib.md5("一段文本".encode("utf-8)).hexdigest()

不同算法生成的hash值不同的影响

记录本来就有ID等唯一标识符,hash值只用于校验该记录是否需要更新,而非记录的唯一标识符。若由于工作交接等原因,将数据库交给其他负责人另作维护,则新负责人首次获得新数据时,程序会认为数据库中所有已存在数据都需要更新(不考虑更新后数据的新算法hash值正好等于原数据的旧算法hash值的情况),这一次运行程序的耗时较长,但是再次运行程序时,hash值又能够在新环境下重新正确匹配。
若需避免bug,可在交接前用不含有WHERE的UPDATE语句把所有记录的hash改为空字符。如

  1. with engine.connect() as cursor:
  2. cursor.execute(update(Test1).values({"hash":""}))

举例说明集合运算

数据的集合运算必须要利用唯一的标识符ID和数据的hash值,并采用SQLite格式存储数据。
假设数据库中已有如下数据:

  1. id,name,上次使用,hash
  2. 4,name1,2022-02-03 15:00:00.000000,2b34e5d8a0a244a4
  3. 5,name2,2022-02-01 16:00:00.000000,80d1238a20dac888
  4. 6,name3,2022-01-03 09:00:23.000000,e738cc067e9e7637

现获得一组新数据,已转换为DataFrame。要求如下:
向数据库中插入新出现的ID,并检查数据库中已有的ID,若对应记录的值有变化,则更新数据库中的记录。若数据库中有新数据未提及的记录,则依然予以保留。

datanew = pd.DataFrame(
    {
        "id": [4, 5, 6, 7],
        "name": ["name1", "name2", "name33", "name4"],
        "上次使用": pd.to_datetime(
            [
                "2022-02-03 15:00:00",
                "2022-02-01 16:00:00",
                "2022-01-03 10:00:23",
                "2022-05-03 12:20:03",
            ],
            format="%Y-%m-%d %H:%M:%S",
        ),
    }
)
datanew["hash"] = hash_pandas_object(datanew, False).apply("{:0>16x}".format)

首先,从数据库中读取已有数据的id和hash

old_idhash = pd.read_sql_query(select(Test1.id, Test1.hash), engine)

然后计算data_insert,即需要新增的记录。
pandas官网:merge

# 对于左表(datanew)中有,而右表(old_idhash)中没有的数据,hash_old会自动为NaN
# 用ID的并集和hash表征求ID的差集
data_insert_idhash = pd.merge(
    datanew[["id", "hash"]], old_idhash, "left", "id", suffixes=("_new", "_old")
)
# 由于hash列是not nullable的,所以hash_old为NaN表明该ID对应的数据是新数据
有NaN的行 = data_insert_idhash.isna().any(1)
data_insert_idhash = data_insert_idhash[有NaN的行]
#这里的右表只有一个id列,这样相当于用inner join求ID的交集
data_insert = pd.merge(datanew, data_insert_idhash["id"], "inner", "id")

接着计算data_merge,即需要更新的记录。

# 用outer join求并集
data_update_idhash = pd.merge(
    datanew[["id", "hash"]], old_idhash, "outer", "id", suffixes=("_new", "_old")
)
# 若hash中只有一个有值,说明这条记录要么是新记录,要么是本次新数据未提及的老数据
# 二者都不可能参与更新,因此排除
# 用ID的并集和hash表征求ID的交集
data_update_idhash = data_update_idhash.dropna(how="any")
# 找出两个表ID并集的记录中,hash不同的记录
data_update_idhash = data_update_idhash[
    data_update_idhash["hash_old"] != data_update_idhash["hash_new"]
]
#这里的右表只有一个id列,这样相当于用inner join求ID的交集
data_update = pd.merge(datanew, data_update_idhash["id"], "inner", "id")