本文介绍如何通过 Neo4j 的 APOC 完成 MySQL 的表数据导入 Neo4j 图数据库中。
介绍
APOC 是什么?
APOC是一个定义的存储过程和函数库,由许多 (约450个) 过程和函数组成,可帮助完成数据集成、图形算法或数据转换等领域的许多不同任务。
安装
Neo4j 如何开启 APOC 支持?
下载 APOC Full 库文件并保存到 Neo4j 安装目录的 plugins 目录下
下载地址:https://github.com/neo4j-contrib/neo4j-apoc-procedures/releases/4.3.0.0
通过 SCP 上传文件到服务器指定目录:scp .\apoc-3.5.0.11-all.jar root@192.168.1.104:/data2/neo4j/plugins/
为了能够使用 APOC 的 JDBC 功能,还需要导入 MySQL 的驱动包
重启 Neo4j
docker-compose restart neo4j
使用
APOC 的使用
查看是否正常加载: CALL apoc.help("apoc");
**jdbc:mysql://192.168.1.104:3306/test1?user=root&password=XnxDF1RnyxJuswcn**
jdbc:mysql://172.17.0.1:3306/ebpc?user=root&password=XnxDF1RnyxJuswcn
# sql querywith "jdbc:mysql://172.17.0.1:3306/ebpc?user=root&password=XnxDF1RnyxJuswcn" as urlCALL apoc.load.jdbc(url,"t_eth_address") YIELD row return row limit 10;# 多线程 批处理导入CALL apoc.periodic.iterate('CALL apoc.load.jdbc("jdbc:mysql://172.17.0.1:3306/ebpc?user=root&password=XnxDF1RnyxJuswcn","select * from t_eth_address limit 10000")','CREATE (a:Address) SET a += row',{batchSize:1000, parallel:true, iterateList:true}) YIELD batches, totalRETURN batches,total
RETURN apoc.number.exact.toFloat(‘0.000000000000031337’,32) as output;
通过 Apoc 查询 PATH 数据,封装成点和线列表:
match (a:Address{address:'0x000000000004d7463d0f9c77383600bc82d612f5'})optional match p= ()-[:TRANSFER*0..1]-(a)with nodes(p) as v,relationships(p) as ewhere all(rel in e where rel.bn >10000)with apoc.coll.toSet(apoc.coll.flatten(collect(e))) as es,apoc.coll.toSet(apoc.coll.flatten(collect(v))) as vsreturn es,vs
数据导入测试:
# 多线程 批处理导入
CALL apoc.periodic.iterate(
'CALL apoc.load.jdbc(
"jdbc:mysql://172.17.0.1:3306/ebpc?user=root&password=XnxDF1RnyxJuswcn",
"select * ,CONVERT(value / 1000000000000000000, DECIMAL(32, 18)) as vv
from t_eth_transacion1 where id < 1000000") yield row',
'CREATE (a:Address) SET a += row',
{batchSize:1000, parallel:true, iterateList:true}) YIELD batches, total
RETURN batches,total
CALL apoc.periodic.iterate(
'CALL apoc.load.jdbc(
"jdbc:mysql://172.17.0.1:3306/ebpc?user=root&password=XnxDF1RnyxJuswcn",
"select hash,value,`from`,`to`,block_number,(value / 1000000000000000000) as v from t_eth_transaction1 where id < 2000") yield row',
'MATCH (f:Address:{address:row.from}),(t:Address:{address:row.to}) CREATE (f)-[:TRANSFER{hash:row.hash,bn:row.block_number,v:row.v,value:row.value}]-(t)',
{batchSize:1000, parallel:true, iterateList:true}) YIELD batches, total
RETURN batches,total
CALL apoc.periodic.iterate(
'CALL apoc.load.jdbc(
"jdbc:mysql://172.17.0.1:3306/ebpc?user=root&password=XnxDF1RnyxJuswcn",
"select *,(value / 1000000000000000000) as v from t_eth_transaction1 where block_number < 1100000") yield row',
'MATCH (f:Address:{address:row.from}),(t:Address:{address:row.to}) CREATE (f)-[:TRANSFER{hash:row.hash,bn:row.block_number,v:row.apoc.number.exact.toFloat(row.v,32),value:row.value}]-(t)',
{batchSize:1000, parallel:true, iterateList:true}) YIELD batches, total
RETURN batches,total
match ()-[x:TRANSFER]-() where x.bn > 1000000 and x.bn < 1100000 return count(x);
CALL apoc.periodic.iterate(
'CALL apoc.load.jdbc(
"jdbc:mysql://172.17.0.1:3306/ebpc?user=root&password=XnxDF1RnyxJuswcn",
"select *,(value / 1000000000000000000) as v from t_eth_transaction1 where block_number <= 1100000") yield row',
'MATCH (f:Address:{address:row.from}),(t:Address:{address:row.to}) CREATE (f)-[:TRANSFER{hash:row.hash,bn:row.block_number,v:row.apoc.number.exact.toFloat(row.v,32),value:row.value}]->(t)',
{batchSize:1000, parallel:true, iterateList:true}) YIELD batches, total
RETURN batches,total
# why 还是不行?~
CALL apoc.periodic.iterate(
'CALL apoc.load.jdbc(
"jdbc:mysql://172.17.0.1:3306/ebpc?user=root&password=XnxDF1RnyxJuswcn",
"select *,(value / 1000000000000000000) as v from t_eth_transaction1 where block_number <= 1002000") yield row',
'MATCH (f:Address{address:row.`from`}),(t:Address{address:row.to}) CREATE (f)-[:TRANSFER{hash:row.hash,bn:row.block_number,v:row.apoc.number.exact.toFloat(row.v,32),value:row.value}]->(t)',
{batchSize:1000, parallel:true, iterateList:true}) YIELD batches, total
RETURN batches,total
终于大功告成了~
CALL apoc.periodic.iterate(
'CALL apoc.load.jdbc(
"jdbc:mysql://172.17.0.1:3306/ebpc?user=root&password=XnxDF1RnyxJuswcn",
"select *,(value / 1000000000000000000) as v from t_eth_transaction1 where block_number <= 1002000 and block_number > 1000020") yield row',
'MATCH (f:Address{address:row.`from`}),(t:Address{address:row.to}) CREATE (f)-[:TRANSFER{hash:row.hash,bn:row.block_number,v:apoc.number.exact.toFloat(row.v,32),value:row.value}]->(t)',
{batchSize:1000, parallel:true, iterateList:true}) YIELD batches, total
RETURN batches,total
开始性能测试阶段:
通过浏览器测试,使用默认的配置
参考 金山文档-Excel
进行配置升级
