方案更新说明:
其实pig没有直接连接mysql的操作,借用sqoop这么不容易的话,可以用shell封装sqoop和pig,这样pig就可以直接使用sqoop导出的mysql数据了,这样会合理很多。
—————
查了很久,网上都没有用pig直接读写mysql的实现,只能借用sqoop来处理。因为打算直接用pig脚本实现工作,在pig脚本中调用sqoop是个麻烦的问题,需要记录下来。
工作内容:增量增加/修改ip库。
流程:在pig脚本中,提取hive表的日志ip,调用sqoop提取mysql的ip库,join起来,得到已有ip和新ip。调研python脚本,查询新ip的地区,写入mysql,更新旧ip的地区。
pig脚本:
set mapreduce.reduce.memory.mb 4096
set default_parallel 20;
-- 导入包,自己补全绝对路径
REGISTER /.../jar/elephant-bird-core-4.10.jar
REGISTER /.../jar/elephant-bird-hadoop-compat-4.10.jar
REGISTER /.../jar/elephant-bird-pig-4.10.jar
REGISTER /.../jar/json-simple-1.1.1.jar
REGISTER /.../jar/elasticsearch-hadoop-pig-7.10.1.jar
REGISTER /.../jar/commons-httpclient-3.1.jar
%declare Today `date -d "1 day ago" +%Y-%m-%d`
%declare TodayIndex `date +%Y%m%d`
-- 从hive表提取昨天的日志,字段要对齐表字段顺序,写全
Parquet = LOAD 'log_info' USING org.apache.hive.hcatalog.pig.HCatLoader AS (logtime:chararray,clientip:chararray,ip:chararray,authuser:chararray);
Remote2LocalFull = FILTER Parquet BY ymdh<ToString(CurrentTime(),'yyyy-MM-dd') AND ymdh>'$Today' ;
-- 计算ip唯一值,待查询
GeoIP = FOREACH Remote2LocalFull GENERATE ip;
DistinctGeoIP = DISTINCT GeoIP;
rmf /user/hive/warehouse/abnormalLogin/aliyunGeoip/authguardIPWaitingForQuery;
STORE DistinctGeoIP INTO '/user/hive/warehouse/abnormalLogin/aliyunGeoip/authguardIPWaitingForQuery' USING PigStorage;
-- 导出mysql已查询ip数据到hdfs,格式为parquet
-- 这里是调用sqoop的重点,复杂是因为 --是pig的注释符.......
%declare SqoopDir /tmp/pig_sqoop_order;
rmf /user/hive/warehouse/abnormalLogin/aliyunGeoip/aliyunGeoip;
sh echo 'sqoop import [AA]username pqchen [AA]password 0000000 [AA]connect jdbc:mysql://真实ip:端口号/mysql database name?useSSL=false [AA]table aliyunip [AA]target-dir /user/hive/warehouse/abnormalLogin/aliyunGeoip/aliyunGeoip [AA]as-parquetfile -m 1' > '$SqoopDir';
sh sed -i 's/\\\\\[A/-/g' $SqoopDir;
sh sed -i 's/A\\\\\]/-/g' $SqoopDir;
sh sed -i 's/\\\\\[B\\\\\]/\\\\\$CONDITIONS/g' $SqoopDir;
sh /bin/sh $SqoopDir;
-- pig加载导出的文件,字段写全,对齐
mysqlIP = LOAD '/user/hive/warehouse/abnormalLogin/aliyunGeoip/aliyunGeoip' USING org.apache.parquet.pig.ParquetLoader AS (ip:chararray, isp:chararray,latitude:long, city:chararray, county:chararray, longitude:long, province:chararray, country:chararray, updatetime:long);
a = FOREACH mysqlIP GENERATE ip;
b = limit a 10;
dump b;