本文介绍如何通过 Neo4j 的 APOC 完成 MySQL 的表数据导入 Neo4j 图数据库中。

介绍

APOC 是什么?

APOC是一个定义的存储过程和函数库,由许多 (约450个) 过程和函数组成,可帮助完成数据集成、图形算法或数据转换等领域的许多不同任务。

安装

Neo4j 如何开启 APOC 支持?

下载 APOC Full 库文件并保存到 Neo4j 安装目录的 plugins 目录下
下载地址:https://github.com/neo4j-contrib/neo4j-apoc-procedures/releases/4.3.0.0

通过 SCP 上传文件到服务器指定目录:
scp .\apoc-3.5.0.11-all.jar root@192.168.1.104:/data2/neo4j/plugins/

为了能够使用 APOC 的 JDBC 功能,还需要导入 MySQL 的驱动包
image.png

重启 Neo4j

docker-compose restart neo4j

使用

APOC 的使用

查看是否正常加载: CALL apoc.help("apoc");
image.png

**jdbc:mysql://192.168.1.104:3306/test1?user=root&password=XnxDF1RnyxJuswcn**

jdbc:mysql://172.17.0.1:3306/ebpc?user=root&password=XnxDF1RnyxJuswcn

  1. # sql query
  2. with "jdbc:mysql://172.17.0.1:3306/ebpc?user=root&password=XnxDF1RnyxJuswcn" as url
  3. CALL apoc.load.jdbc(url,"t_eth_address") YIELD row return row limit 10;
  4. # 多线程 批处理导入
  5. CALL apoc.periodic.iterate(
  6. 'CALL apoc.load.jdbc(
  7. "jdbc:mysql://172.17.0.1:3306/ebpc?user=root&password=XnxDF1RnyxJuswcn",
  8. "select * from t_eth_address limit 10000")',
  9. 'CREATE (a:Address) SET a += row',
  10. {batchSize:1000, parallel:true, iterateList:true}) YIELD batches, total
  11. RETURN batches,total

RETURN apoc.number.exact.toFloat(‘0.000000000000031337’,32) as output;

通过 Apoc 查询 PATH 数据,封装成点和线列表:

  1. match (a:Address{address:'0x000000000004d7463d0f9c77383600bc82d612f5'})
  2. optional match p= ()-[:TRANSFER*0..1]-(a)
  3. with nodes(p) as v,relationships(p) as e
  4. where all(rel in e where rel.bn >10000)
  5. with apoc.coll.toSet(apoc.coll.flatten(collect(e))) as es,
  6. apoc.coll.toSet(apoc.coll.flatten(collect(v))) as vs
  7. return es,vs

数据导入测试:

# 多线程 批处理导入
CALL apoc.periodic.iterate(
    'CALL apoc.load.jdbc(
        "jdbc:mysql://172.17.0.1:3306/ebpc?user=root&password=XnxDF1RnyxJuswcn",
      "select * ,CONVERT(value / 1000000000000000000, DECIMAL(32, 18)) as vv 
  from t_eth_transacion1 where id < 1000000") yield row',
      'CREATE (a:Address) SET a += row',
      {batchSize:1000, parallel:true, iterateList:true}) YIELD batches, total
    RETURN batches,total

    CALL apoc.periodic.iterate(
    'CALL apoc.load.jdbc(
        "jdbc:mysql://172.17.0.1:3306/ebpc?user=root&password=XnxDF1RnyxJuswcn",
      "select hash,value,`from`,`to`,block_number,(value / 1000000000000000000) as v from t_eth_transaction1 where id < 2000") yield row',
      'MATCH (f:Address:{address:row.from}),(t:Address:{address:row.to}) CREATE (f)-[:TRANSFER{hash:row.hash,bn:row.block_number,v:row.v,value:row.value}]-(t)',
      {batchSize:1000, parallel:true, iterateList:true}) YIELD batches, total
    RETURN batches,total

    CALL apoc.periodic.iterate(
    'CALL apoc.load.jdbc(
        "jdbc:mysql://172.17.0.1:3306/ebpc?user=root&password=XnxDF1RnyxJuswcn",
      "select *,(value / 1000000000000000000) as v from t_eth_transaction1 where block_number < 1100000") yield row',
      'MATCH (f:Address:{address:row.from}),(t:Address:{address:row.to}) CREATE (f)-[:TRANSFER{hash:row.hash,bn:row.block_number,v:row.apoc.number.exact.toFloat(row.v,32),value:row.value}]-(t)',
      {batchSize:1000, parallel:true, iterateList:true}) YIELD batches, total
    RETURN batches,total

    match ()-[x:TRANSFER]-() where x.bn > 1000000 and x.bn < 1100000 return count(x);

        CALL apoc.periodic.iterate(
    'CALL apoc.load.jdbc(
        "jdbc:mysql://172.17.0.1:3306/ebpc?user=root&password=XnxDF1RnyxJuswcn",
      "select *,(value / 1000000000000000000) as v from t_eth_transaction1 where block_number <= 1100000") yield row',
      'MATCH (f:Address:{address:row.from}),(t:Address:{address:row.to}) CREATE (f)-[:TRANSFER{hash:row.hash,bn:row.block_number,v:row.apoc.number.exact.toFloat(row.v,32),value:row.value}]->(t)',
      {batchSize:1000, parallel:true, iterateList:true}) YIELD batches, total
    RETURN batches,total

    # why 还是不行?~

            CALL apoc.periodic.iterate(
    'CALL apoc.load.jdbc(
        "jdbc:mysql://172.17.0.1:3306/ebpc?user=root&password=XnxDF1RnyxJuswcn",
      "select *,(value / 1000000000000000000) as v from t_eth_transaction1 where block_number <= 1002000") yield row',
      'MATCH (f:Address{address:row.`from`}),(t:Address{address:row.to}) CREATE (f)-[:TRANSFER{hash:row.hash,bn:row.block_number,v:row.apoc.number.exact.toFloat(row.v,32),value:row.value}]->(t)',
      {batchSize:1000, parallel:true, iterateList:true}) YIELD batches, total
    RETURN batches,total

终于大功告成了~

CALL apoc.periodic.iterate(
  'CALL apoc.load.jdbc(
    "jdbc:mysql://172.17.0.1:3306/ebpc?user=root&password=XnxDF1RnyxJuswcn",
    "select *,(value / 1000000000000000000) as v from t_eth_transaction1 where block_number <= 1002000 and block_number > 1000020") yield row',
    'MATCH (f:Address{address:row.`from`}),(t:Address{address:row.to}) CREATE (f)-[:TRANSFER{hash:row.hash,bn:row.block_number,v:apoc.number.exact.toFloat(row.v,32),value:row.value}]->(t)',
    {batchSize:1000, parallel:true, iterateList:true}) YIELD batches, total
    RETURN batches,total

开始性能测试阶段:
通过浏览器测试,使用默认的配置
参考 金山文档-Excel

进行配置升级