方案更新说明:
    其实pig没有直接连接mysql的操作,借用sqoop这么不容易的话,可以用shell封装sqoop和pig,这样pig就可以直接使用sqoop导出的mysql数据了,这样会合理很多。

    —————
    查了很久,网上都没有用pig直接读写mysql的实现,只能借用sqoop来处理。因为打算直接用pig脚本实现工作,在pig脚本中调用sqoop是个麻烦的问题,需要记录下来。

    工作内容:增量增加/修改ip库。
    流程:在pig脚本中,提取hive表的日志ip,调用sqoop提取mysql的ip库,join起来,得到已有ip和新ip。调研python脚本,查询新ip的地区,写入mysql,更新旧ip的地区。

    pig脚本:

    1. set mapreduce.reduce.memory.mb 4096
    2. set default_parallel 20;
    3. -- 导入包,自己补全绝对路径
    4. REGISTER /.../jar/elephant-bird-core-4.10.jar
    5. REGISTER /.../jar/elephant-bird-hadoop-compat-4.10.jar
    6. REGISTER /.../jar/elephant-bird-pig-4.10.jar
    7. REGISTER /.../jar/json-simple-1.1.1.jar
    8. REGISTER /.../jar/elasticsearch-hadoop-pig-7.10.1.jar
    9. REGISTER /.../jar/commons-httpclient-3.1.jar
    10. %declare Today `date -d "1 day ago" +%Y-%m-%d`
    11. %declare TodayIndex `date +%Y%m%d`
    12. -- hive表提取昨天的日志,字段要对齐表字段顺序,写全
    13. Parquet = LOAD 'log_info' USING org.apache.hive.hcatalog.pig.HCatLoader AS (logtime:chararray,clientip:chararray,ip:chararray,authuser:chararray);
    14. Remote2LocalFull = FILTER Parquet BY ymdh<ToString(CurrentTime(),'yyyy-MM-dd') AND ymdh>'$Today' ;
    15. -- 计算ip唯一值,待查询
    16. GeoIP = FOREACH Remote2LocalFull GENERATE ip;
    17. DistinctGeoIP = DISTINCT GeoIP;
    18. rmf /user/hive/warehouse/abnormalLogin/aliyunGeoip/authguardIPWaitingForQuery;
    19. STORE DistinctGeoIP INTO '/user/hive/warehouse/abnormalLogin/aliyunGeoip/authguardIPWaitingForQuery' USING PigStorage;
    20. -- 导出mysql已查询ip数据到hdfs,格式为parquet
    21. -- 这里是调用sqoop的重点,复杂是因为 --是pig的注释符.......
    22. %declare SqoopDir /tmp/pig_sqoop_order;
    23. rmf /user/hive/warehouse/abnormalLogin/aliyunGeoip/aliyunGeoip;
    24. sh echo 'sqoop import [AA]username pqchen [AA]password 0000000 [AA]connect jdbc:mysql://真实ip:端口号/mysql database name?useSSL=false [AA]table aliyunip [AA]target-dir /user/hive/warehouse/abnormalLogin/aliyunGeoip/aliyunGeoip [AA]as-parquetfile -m 1' > '$SqoopDir';
    25. sh sed -i 's/\\\\\[A/-/g' $SqoopDir;
    26. sh sed -i 's/A\\\\\]/-/g' $SqoopDir;
    27. sh sed -i 's/\\\\\[B\\\\\]/\\\\\$CONDITIONS/g' $SqoopDir;
    28. sh /bin/sh $SqoopDir;
    29. -- pig加载导出的文件,字段写全,对齐
    30. mysqlIP = LOAD '/user/hive/warehouse/abnormalLogin/aliyunGeoip/aliyunGeoip' USING org.apache.parquet.pig.ParquetLoader AS (ip:chararray, isp:chararray,latitude:long, city:chararray, county:chararray, longitude:long, province:chararray, country:chararray, updatetime:long);
    31. a = FOREACH mysqlIP GENERATE ip;
    32. b = limit a 10;
    33. dump b;