基本的语法请看https://www.w3cschool.cn/apache_pig/apache_pig_filter_operator.html
这里会补充常见的但教程不具体的语法细节。
安装略,看教程即可。
进入环境:pig -x mapreduce
初学时会很不适应,其实只要记住自己在写sql即可,先做什么再做什么,逐步写出来而已。剩下的就是一些细节的处理。

加载和存储

pig日常操作基本从HDFS加载,所以各种格式的文件都需要先放在HDFS上

加载数据(LOAD运算符)

Load语句会简单地将数据加载到Pig的指定的关系中。要验证Load语句的执行情况,必须使用Diagnostic运算符
语法:

  1. grunt> Relation_name = LOAD 'Input file path' USING function as schema;

说明:

  • relation_name- 我们必须提到要存储数据的关系。
  • Input file path- 我们必须提到存储文件的HDFS目录。(在MapReduce模式下)必须用单引号指定表名
  • function- 我们必须从Apache Pig提供的一组加载函数中选择一个函数( BinStorage,JsonLoader,PigStorage,TextLoader )。
  • Schema- 我们必须定义数据的模式,可以定义所需的模式如下 - (column1 : data type, column2 : data type, column3 : data type); 必须写全所有字段

注意:我们加载数据而不指定模式。在这种情况下,列将被寻址为$01,$02,等…(检查)。

存储(STORE运算符)

语法

  1. grunt> STORE Relation_name INTO ' required_directory_path ' [USING function];

使用PigStorage存储到hdfs

  1. grunt> STORE student INTO ' hdfs://localhost:9000/pig_Output/ ' USING PigStorage (',');

加载/存储hive

需要使用HCatLoader,属于hive的工具,所以要先安装hive

  1. grunt> Parquet = LOAD 'test' USING org.apache.hive.hcatalog.pig.HCatLoader AS (key: chararray);

加载/存储json

需要用到json-simple-1.1.1.jar包,参考http://pig.apache.org/docs/latest/func.html#jsonloadstore

  1. grunt> REGISTER /home/jar/json-simple-1.1.1.jar
  2. grunt> SpamIPGeo = LOAD '/user/hive/warehouse/tmp/spamipgeo.json' USING com.twitter.elephantbird.pig.load.JsonLoader();
  3. grunt> SpamIPGeoTuple = FOREACH SpamIPGeo GENERATE (chararray)$0#'regionip' AS regionip, (chararray)$0#'country' AS country;
  4. grunt> STORE SpamIPGeo INTO '/user/hive/warehouse/tmp/SpamIPGeo' USING JsonStorage();

加载/存储txt

使用了PigStorage() 函数,将数据加载并存储为结构化文本文件。它采用分隔符,使用元组的每个实体作为参数分隔。默认情况下,它以“\t”作为参数。

  1. grunt> student = LOAD 'hdfs://localhost:9000/pig_data/student_data.txt' USING PigStorage(',') as ( id:int, firstname:chararray, lastname:chararray, phone:chararray, city:chararray );
  2. grunt> STORE student INTO '/user/hive/warehouse/tmp/student' USING PigStorage;

诊断运算符

1. Dump运算符

Dump运算符用于运行Pig Latin语句,并在屏幕上显示结果,它通常用于调试目的。注意这个会打印出全量数据,容易刷屏。最好先用LIMIT运算符取topk个数据。

  1. grunt> Dump student;
  2. 输出:
  3. (1,Rajiv,Reddy,9848022337,Hyderabad)
  4. (2,siddarth,Battacharya,9848022338,Kolkata)

2. Describe运算符

describe运算符用于查看关系的模式。

  1. grunt> describe student;
  2. 输出:
  3. grunt> student: { id: int,firstname: chararray,lastname: chararray,phone: chararray,city: chararray }

过滤

1. Foreach运算符

FOREACH运算符用于基于列数据生成指定的数据转换。相当于select

  1. 语法:
  2. grunt> Relation_name2 = FOREACH Relatin_name1 GENERATE (required data);
  3. grunt> foreach_data = FOREACH student GENERATE id,age,city;
  4. grunt> Dump foreach_data;
  5. 输出:
  6. (1,21,Hyderabad)
  7. (2,22,Kolkata)

2. Filter运算符

FILTER运算符用于根据条件从关系中选择所需的元组。相当于where

  1. 语法:
  2. grunt> Relation2_name = FILTER Relation1_name BY (condition);
  3. grunt> filter_data = FILTER student BY city == 'Chennai';
  4. grunt> Dump filter_data;
  5. 输出:
  6. (6,Archana,Mishra,23,9848022335,Chennai)
  7. (8,Bharathi,Nambiayar,24,9848022333,Chennai)

3. Distinct运算符

DISTINCT运算符用于从关系中删除冗余(重复)元组。整行唯一值

  1. 语法:
  2. grunt> Relation_name2 = DISTINCT Relatin_name1;
  3. grunt> distinct_data = DISTINCT student;
  4. grunt> Dump distinct_data;
  5. 输出:
  6. (1,Rajiv,Reddy,9848022337,Hyderabad)
  7. (2,siddarth,Battacharya,9848022338,Kolkata)

排序

1. Order By运算符

  1. 语法:
  2. grunt> Relation_name2 = ORDER Relatin_name1 BY (ASC|DESC);
  3. grunt> order_by_data = ORDER student_details BY age DESC;
  4. grunt> Dump order_by_data;
  5. 输出:
  6. (8,Bharathi,Nambiayar,24,9848022333,Chennai)
  7. (7,Komal,Nayak,24,9848022334,trivendram)

2. Limit运算符

  1. 语法:
  2. grunt> Result = LIMIT Relation_name required number of tuples;
  3. grunt> limit_data = LIMIT student_details 2;
  4. grunt> Dump limit_data;
  5. 输出:
  6. (1,Rajiv,Reddy,21,9848022337,Hyderabad)
  7. (2,siddarth,Battacharya,22,9848022338,Kolkata)

前面部分还算简单,接下来看看比较复杂的部分

分组和连接

1. Group运算符

按单列分组
  1. 语法:
  2. grunt> Group_data = GROUP Relation_name BY age;
  3. grunt> group_data = GROUP student_details by age;
  4. grunt> Dump group_data;
  5. 输出:
  6. (21,{(4,Preethi,Agarwal,21,9848022330,Pune),(1,Rajiv,Reddy,21,9848022337,Hydera bad)})
  7. (22,{(3,Rajesh,Khanna,22,9848022339,Delhi),(2,siddarth,Battacharya,22,984802233 8,Kolkata)})
  8. 获得显示名为group_data关系的内容的输出,如下所示。在这里你可以观察到结果模式有两列:
  9. 一个是age,通过它我们将关系分组。
  10. 另一个是bag,其中包含一组元组,有各自年龄的学生记录。
  11. 进一步操作:
  12. a = FOREACH group_data GENERATE group AS authuser, SUM(age) as agesum, COUNT(country) as count;

按多列分组
  1. grunt> group_multiple = GROUP student_details by (age, city);
  2. grunt> Dump group_multiple;
  3. 输出:
  4. ((21,Pune),{(4,Preethi,Agarwal,21,9848022330,Pune)})
  5. ((21,Hyderabad),{(1,Rajiv,Reddy,21,9848022337,Hyderabad)})
  6. 进一步操作:
  7. a = FOREACH group_multiple GENERATE group.$0 as age , group.$1 as city, COUNT(country) as count;

所有的列对关系进行分组
  1. group_all = GROUP student_details All;

2. Cogroup运算符

COGROUP 运算符的运作方式与 GROUP 运算符相同。两个运算符之间的唯一区别是 group 运算符通常用于一个关系,而 cogroup 运算符用于涉及两个或多个关系的语句

假设两个关系名称分别为 student_details 和 employee_details

  1. grunt> student_details = LOAD 'hdfs://localhost:9000/pig_data/student_details.txt' USING PigStorage(',')
  2. as (id:int, firstname:chararray, lastname:chararray, age:int, phone:chararray, city:chararray);
  3. grunt> employee_details = LOAD 'hdfs://localhost:9000/pig_data/employee_details.txt' USING PigStorage(',')
  4. as (id:int, name:chararray, age:int, city:chararray);
  5. 现在,将 student_details employee_details 关系的记录/元组按关键字age进行分组,如下所示。
  6. grunt> cogroup_data = COGROUP student_details by age, employee_details by age;
  7. 输出:
  8. (21,{(4,Preethi,Agarwal,21,9848022330,Pune), (1,Rajiv,Reddy,21,9848022337,Hyderabad)},
  9. { })
  10. (22,{ (3,Rajesh,Khanna,22,9848022339,Delhi), (2,siddarth,Battacharya,22,9848022338,Kolkata) },
  11. { (6,Maggy,22,Chennai),(1,Robin,22,newyork) })
  12. (23,{(6,Archana,Mishra,23,9848022335,Chennai),(5,Trupthi,Mohanthy,23,9848022336 ,Bhuwaneshwar)},
  13. {(5,David,23,Bhuwaneshwar),(3,Maya,23,Tokyo),(2,BOB,23,Kolkata)})
  14. (24,{(8,Bharathi,Nambiayar,24,9848022333,Chennai),(7,Komal,Nayak,24,9848022334, trivendram)},
  15. { })
  16. (25,{ },
  17. {(4,Sara,25,London)})

cogroup运算符根据年龄对来自每个关系的元组进行分组,其中每个组描述特定的年龄值。
例如,如果我们考虑结果的第一个元组,它按照年龄21分组,那它包含两个包

  • 第一个包保存了具有21岁的第一关系(在这种情况下是student_details)的所有元组;
  • 第二个包具有第二关系(在这种情况下为employee_details)的所有元组,其年龄为21岁。

如果关系不具有年龄值为21的元组,则返回一个空包。

3. Join运算符

注意,判不存在用 a is not null
JOIN运算符用于组合来自两个或多个关系的记录。在执行连接操作时,我们从每个关系中声明一个(或一组)元组作为key。当这些key匹配时,两个特定的元组匹配,否则记录将被丢弃。连接可以是以下类型:

  • Self-join
  • Inner-join
  • Outer-join − left join, right join, and full join
    Self-join(自连接)
    需要加载两次数据,并用不同的变量名。
    1. grunt> customers1 = LOAD '....customers.txt' USING PigStorage(',') as (id:int, name:chararray, age:int, address:chararray, salary:int);
    2. grunt> customers2 = LOAD '....customers.txt' USING PigStorage(',') as (id:int, name:chararray, age:int, address:chararray, salary:int);
    ```powershell 语法: grunt> Relation3_name = JOIN Relation1_name BY key, Relation2_name BY key ;

grunt> customers3 = JOIN customers1 BY id, customers2 BY id; grunt> Dump customers3; 输出: (1,Ramesh,32,Ahmedabad,2000,1,Ramesh,32,Ahmedabad,2000) (2,Khilan,25,Delhi,1500,2,Khilan,25,Delhi,1500)

  1. <a name="Ssh1V"></a>
  2. #####
  3. <a name="nzJ3z"></a>
  4. ##### Inner Join(内部连接)
  5. **Inner Join**使用较为频繁;它也被称为**等值连接**。当两个表中都存在匹配时,内部连接将返回行。
  6. ```powershell
  7. 语法:
  8. grunt> result = JOIN relation1 BY columnname, relation2 BY columnname;
  9. grunt> coustomer_orders = JOIN customers BY id, orders BY customer_id;
  10. grunt> Dump coustomer_orders;
  11. 输出:
  12. (2,Khilan,25,Delhi,1500,101,2009-11-20 00:00:00,2,1560)
  13. (3,kaushik,23,Kota,2000,100,2009-10-08 00:00:00,3,1500)

Left Outer Join(左外连接)

left outer join操作返回左表中的所有行,即使右边的关系中没有匹配项。

  1. 语法:
  2. grunt> Relation3_name = JOIN Relation1_name BY id LEFT OUTER, Relation2_name BY customer_id;
  3. grunt> outer_left = JOIN customers BY id LEFT OUTER, orders BY customer_id;
  4. grunt> Dump outer_left;
  5. 输出:
  6. (1,Ramesh,32,Ahmedabad,2000,,,,)
  7. (2,Khilan,25,Delhi,1500,101,2009-11-20 00:00:00,2,1560)

注意:这时候过滤不存在的记录时,不是用!=’’,而是用 is not null

Right Outer Join(右外连接)

grunt> outer_right = JOIN customers BY id RIGHT, orders BY customer_id;
即可,同上类似

Full Outer Join(全外连接)

grunt> outer_full = JOIN customers BY id FULL OUTER, orders BY customer_id;
即可,同上类似

join与group后的字段名处理

join后,字段名会带有关系名。如join前,有个字段a;join后,字段a就变成了 table::a,往往这时候直接使用字段名进行select时会出错,可以使用以下的方式

  1. b = FOREACH a GENERATE $0 as domain, $1 as regionip, $3 as country, $4 as city;

同样group by后也有类似的问题,可以用以下方式

  1. b = FOREACH a GENERATE group.$0 AS ip, group.$2 As country, COUNT(oneTable) AS attackCount;

合并和拆分

Union运算符

Pig Latin的UNION运算符用于合并两个关系的内容。要对两个关系执行UNION操作,它们的列和域必须相同。

  1. 语法:
  2. grunt> Relation_name3 = UNION Relation_name1, Relation_name2;
  3. 例子
  4. grunt> student = UNION student1, student2;

Split运算符

SPLIT运算符用于将关系拆分为两个或多个关系。

  1. 语法:
  2. grunt> SPLIT Relation1_name INTO Relation2_name IF (condition1), Relation2_name (condition2),
  3. grunt> SPLIT student_details into student_details1 if age<23, student_details2 if (22<age and age>25);
  4. grunt> Dump student_details1;
  5. grunt> Dump student_details2;

其他

脚本外传参

  1. A = LOAD '$INPUT_DIR' AS (t0:long, msisdn:chararray, t2:chararray);
  2. STORE D INTO '$OUTPUT_DIR';
  3. pig -p INPUT_DIR=hdfs://mycluster/pig/in -p OUTPUT_DIR=hdfs://mycluster/pig/out ./schedule.pig
  4. ---------------------
  5. 原文:https://blog.csdn.net/aaronhadoop/article/details/44310633
  6. PIG 命令行传多个参数
  7. PIG 命令行执行脚本,多个参数传递问题终于解决了,实例如下:
  8. pig -p startdate=2011-03-21 -p enddate=2011-03-28 script.pig
  9. 这样就可以实现多个参数传递的例子,但其中,如果参数值中存在空格,则会报错,
  10. 原文:https://blog.csdn.net/iteye_19679/article/details/82580903

注释

  • 单行:—  
  • 多行:/ /

    使用三目运算符来替换空值

    ```powershell B = FOREACH A GENERATE ((col1 is null) ? -1 :col1)

替换bag空值, 其中col1为bag,类型为{(int),(int)}

C = FOREACH A GENERATE ((col1 is null or IsEmpty(col1)) ? {(0)} :col1;

  1. <a name="KMrzb"></a>
  2. #### 表示文件的第一个字段(第一列):$0
  3. <a name="ODczJ"></a>
  4. #### 常量声明与使用
  5. ```powershell
  6. %declare Today `date +%Y-%m-%d`
  7. sh cat /home/dfs/cacstat/tmp/SpamTag/* > /home/dfs/cacstat/output/SpamTag/$Today.json;

配置设置

  1. set mapreduce.map.memory.mb 4096
  2. set mapreduce.reduce.memory.mb 4096
  3. set default_parallel 20;

路径是否写单引号

rmf 的时候不能写单引号,其余都要写(如LOAD、%declare hdfsDir ‘/user/hive/warehouse/‘)。

函数

distinct_count 功能:

  1. -- 导入这个包
  2. define HyperLogLogPlusPlus datafu.pig.stats.HyperLogLogPlusPlus();
  3. aGroup = GROUP b BY tag;
  4. d = FOREACH aGroup GENERATE group AS tag, HyperLogLogPlusPlus(ip) as ipcnt, HyperLogLogPlusPlus(age)