Pig Latin是一种数据流语言,变量的命名规则同java中变量的命名规则,变量名可以复用(不建议这样做,这种情况下相当与新建一个变量,同时删除原来的变量)
A = load 'NYSE_dividends' (exchange, symbol, date, dividends); A = filter A by dividends > 0; A = foreach A generate UPPER(symbol); 。注释:--单行注释;/*……*/多行注释; 。Pig Latin关键词不区分大小写,比如load,foreach,但是变量名和udf区分大小写,COUNT是udf,所以不同于count。 。Load 加载数据 默认加载当前用户的home目录(/users/ divs = load '/data/examples/NYSE_dividends' 也可以输入完整的文件名 divs = load ‘hdfs://nn.acme.com/data/examples/NYSE_dividends’ 默认使用TAB(\t)作为分割符,也可以使用using定义其它的分割符 divs = load 'NYSE_dividends' using PigStorage(','); 注意:只能用一个字符作为分割符 还可以使用using定义其它的加载函数 divs = load 'NYSE_dividends' using HBaseStorage(); as用于定义模式 divs = load 'NYSE_dividends' as (exchange, symbol, date, dividends); 也可以使用通配符加载一个目录下的所有文件,该目录下的所有子目录的文件也会被加载。通配符由hadoop文件系统决定,下面是hadoop 0.20所支持的通配符
。as 定义模式,可用于load ** [as (ColumnName[:type])],foreach…generate ColumnName [as newColumnName] 。store存储数据,默认用using PigStorage 使用tab作为分割符。 store processed into '/data/examples/processed'; 也可以输入完整路径比如 可以使用using调用其它存储函数或其它分割符 store processed into 'processed' using HBaseStorage(); store processed into 'processed' using PigStorage(','); 注意:数据存储并不是存储为一个文件,而是由reduce进程数决定的多个part文件。 。foreach…generate[*][begin .. end] *匹配所有,同样适用与udf; ..匹配begin和end之间的部分,包括begin和end prices = load 'NYSE_daily' as (exchange, symbol, date, open, high, low, close, volume, adj_close); beginning = foreach prices generate ..open; -- produces exchange, symbol, date, open middle = foreach prices generate open..close; -- produces open, high, low, close end = foreach prices generate volume..; -- produces volume, adj_close 一般情况下foreach…generate…重新生成的模式中的数据名和数据类型保持原来的名字和数据类型,但是如果有表达式则不会,可以在generate 变量后使用as关键词定义别名; divs = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray, date:chararray, dividends:float); sym = foreach divs generate symbol; describe sym; sym: {symbol: chararray} divs = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray, date:chararray, dividends:float); in_cents = foreach divs generate dividends * 100.0 as dividend, dividends * 100.0; describe in_cents; in_cents: {dividend: double,double} #用于map查找;.用于tuple(元组)投影; bball = load 'baseball' as (name:chararray, team:chararray, position:bag{t:(p:chararray)}, bat:map[]); avg = foreach bball generate bat#'batting_average'; A = load 'input' as (t:tuple(x:int, y:int)); B = foreach A generate t.x, t.$1; 3.获取bag(包)中的数据 A = load 'input' as (b:bag{t:(x:int, y:int)}); B = foreach A generate b.x; A = load 'input' as (b:bag{t:(x:int, y:int)}); B = foreach A generate b.(x, y); 下面的语句将执行不了 A = load 'foo' as (x:chararray, y:int, z:int); B = group A by x; -- produces bag A containing all the records for a given value of x C = foreach B generate SUM(A.y + A.z); 因为A.y 和 A.z都是bag,符号+对于bag不适用。 正确的做法如下 A = load 'foo' as (x:chararray, y:int, z:int); A1 = foreach A generate x, y + z as yz; B = group A1 by x; C = foreach B generate SUM(A1.yz); 。foreach中嵌套其它语句 --distinct_symbols.pig daily = load 'NYSE_daily' as (exchange, symbol); -- not interested in other fields grpd = group daily by exchange; uniqcnt = foreach grpd { sym = daily.symbol; uniq_sym = distinct sym; generate group, COUNT(uniq_sym); }; 注意:foreach内部只支持 --double_distinct.pig divs = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray); grpd = group divs all; uniq = foreach grpd { exchanges = divs.exchange; uniq_exchanges = distinct exchanges; symbols = divs.symbol; uniq_symbols = distinct symbols; generate COUNT(uniq_exchanges), COUNT(uniq_symbols); }; 。flatten消除包嵌套关系 --flatten.pig players = load 'baseball' as (name:chararray, team:chararray, position:bag{t:(p:chararray)}, bat:map[]); pos = foreach players generate name, flatten(position) as position; bypos = group pos by position;
--flatten_noempty.pig players = load 'baseball' as (name:chararray, team:chararray, position:bag{t:(p:chararray)}, bat:map[]); noempty = foreach players generate name, ((position is null or IsEmpty(position)) ? {('unknown')} : position) as position; pos = foreach noempty generate name, flatten(position) as position; bypos = group pos by position; 。filter (注:pig中的逻辑语句同样遵循短路原则) 注意:null == 任何数据 。filter结合matches使用正则表达式(matches前加not表示不匹配) pig中的正则表达式格式和java中的正则表达所一样,参考 http://docs.oracle.com/javase/6/docs/api/java/util/regex/Pattern.html 各种转义字符,转义字符使用方式:\\后面跟上转义码 点的转义:. ==>u002E 美元符号的转义:$ ==>u0024 乘方符号的转义:^ ==>u005E 左大括号的转义:{ ==>u007B 左方括号的转义:[==> u005B 左圆括号的转义:( ==> u0028 竖线的转义:| ==> u007C 右圆括号的转义:) ==> u0029 星号的转义:* ==> u002A 加号的转义:+ ==> u002B 问号的转义:? ==> u003F 反斜杠的转义: ==> u005C 下面的例子查找包括CM.的记录 -- filter_not_matches.pig divs = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray, date:chararray, dividends:float); notstartswithcm = filter divs by not symbol matches '.*CM\\2u002E1.*';
。group之后的数据是一个map,其中key是group所用的键值,value是group针对的变量; 可用()同时对多个变量作group,group…all用于所有变量(注意:使用all时没有by),group之后的变量分为两个部分,第一部分变量名是group(不能更改),第二部是和原始bag模式一样的bag。 --twokey.pig
daily = load 'NYSE_daily' as (exchange, stock, date, dividends);
grpd = group daily by (exchange, stock);
avg = foreach grpd generate group, AVG(daily.dividends);
describe grpd;
--countall.pig daily = load 'NYSE_daily' as (exchange, stock); grpd = group daily all; cnt = foreach grpd generate COUNT(daily); 。cogroup对多个变量进行group 注意:所有key值为null的数据都被归为同一类,这一点和group相同,和join不同。 A = load 'input1' as (id:int, val:float); B = load 'input2' as (id:int, val2:int); C = cogroup A by id, B by id; describe C; C: {group: int,A: {id: int,val: float},B: {id: int,val2: int}} 。order by 对单列进行排序 --order.pig daily = load 'NYSE_daily' as (exchange:chararray, symbol:chararray, date:chararray, open:float, high:float, low:float, close:float, volume:int, adj_close:float); bydate = order daily by date; 对多列进行排序 --order2key.pig daily = load 'NYSE_daily' as (exchange:chararray, symbol:chararray, date:chararray, open:float, high:float, low:float, close:float, volume:int, adj_close:float); bydatensymbol = order daily by date, symbol; desc关键词按降序进行排序,null小于所有词 --orderdesc.pig daily = load 'NYSE_daily' as (exchange:chararray, symbol:chararray, date:chararray, open:float, high:float, low:float, close:float, volume:int, adj_close:float); byclose = order daily by close desc, open; dump byclose; -- open still sorted in ascending order 。distinct只能去掉整个元组的重复行,不能去掉某几个特定列的重复行 --distinct.pig -- find a distinct list of ticker symbols for each exchange -- This load will truncate the records, picking up just the first two fields. daily = load 'NYSE_daily' as (exchange:chararray, symbol:chararray); uniq = distinct daily; 。join/left join / right join null不匹配任何数据 -- join2key.pig daily = load 'NYSE_daily' as (exchange, symbol, date, open, high, low, close, volume, adj_close); divs = load 'NYSE_dividends' as (exchange, symbol, date, dividends); jnd = join daily by (symbol, date), divs by (symbol, date); --leftjoin.pig daily = load 'NYSE_daily' as (exchange, symbol, date, open, high, low, close, volume, adj_close); divs = load 'NYSE_dividends' as (exchange, symbol, date, dividends); jnd = join daily by (symbol, date) left outer, divs by (symbol, date); 也可以同时多个变量,但只用于inner join A = load 'input1' as (x, y); B = load 'input2' as (u, v); C = load 'input3' as (e, f); alpha = join A by x, B by u, C by e; 也可以自身和自身join,但数据要加载两次 --selfjoin.pig -- For each stock, find all dividends that increased between two dates divs1 = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray, date:chararray, dividends); divs2 = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray, date:chararray, dividends); jnd = join divs1 by symbol, divs2 by symbol; increased = filter jnd by divs1::date < divs2::date and divs1::dividends < divs2::dividends; 下面这样不行 --selfjoin.pig -- For each stock, find all dividends that increased between two dates divs1 = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray, date:chararray, dividends); jnd = join divs1 by symbol, divs1 by symbol; increased = filter jnd by divs1::date < divs2::date and divs1::dividends < divs2::dividends; 。union 相当与sql中的union,但与sql不通的是pig中的union可以针对两个不同模式的变量:如果两个变量模式相同,那么union后的变量模式与变量的模式一样;如果一个变量的模式可以由另一各变量的模式强制类型转换,那么union后的变量模式与转换后的变量模式相同;否则,union后的变量没有模式。 A = load 'input1' as (x:int, y:float); B = load 'input2' as (x:int, y:float); C = union A, B; describe C; C: {x: int,y: float} A = load 'input1' as (x:double, y:float); B = load 'input2' as (x:int, y:double); C = union A, B; describe C; C: {x: double,y: double} A = load 'input1' as (x:int, y:float); B = load 'input2' as (x:int, y:chararray); C = union A, B; describe C; Schema for C unknown. 注意:union不会剔除重复的行 如果需要对两个具有不通列名的变量union的话,可以使用onschema关键字 A = load 'input1' as (w: chararray, x:int, y:float); B = load 'input2' as (x:int, y:double, z:chararray); C = union onschema A, B; describe C; C: {w: chararray,x: int,y: double,z: chararray} 。cross 相当于离散数学中的叉乘,输入行数分别为m行,n行,输出行数则为m*n行。 --thetajoin.pig --I recommand running this one on a cluster too daily = load 'NYSE_daily' as (exchange:chararray, symbol:chararray, date:chararray, open:float, high:float, low:float, close:float, volume:int, adj_close:float); divs = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray, date:chararray, dividends:float); crossed = cross daily, divs; tjnd = filter crossed by daily::date < divs::date;
。limit --limit.pig divs = load 'NYSE_dividends'; first10 = limit divs 10; 在pig中除了order by 之外生成的数据都没有固定的顺序。上面的程序每次生成的数据也是不一样的。 。sample 用于生成测试数据,按指定参数选取部分数据。下面的程序选取10%的数据。 --sample.pig divs = load 'NYSE_dividends'; some = sample divs 0.1; 。Parallel 设置pig的reduce进程个数 --parallel.pig daily = load 'NYSE_daily' as (exchange, symbol, date, open, high, low, close, volume, adj_close); bysymbl = group daily by symbol parallel 10; parallel只针对一条语句,如果希望脚本中的所有语句都有10个reduce进程,可以使用 set default_parallel 10命令 --defaultparallel.pig set default_parallel 10; daily = load 'NYSE_daily' as (exchange, symbol, date, open, high, low, close, volume, adj_close); bysymbl = group daily by symbol; average = foreach bysymbl generate group, AVG(daily.close) as avg; sorted = order average by avg desc; 如果同时使用parallel和set default_parallel,那么parallel中的参数将覆盖set default_parallel 。UDF 注册udf --register.pig register 'your_path_to_piggybank/piggybank.jar'; divs = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray, date:chararray, dividends:float); backwards = foreach divs generate org.apache.pig.piggybank.evaluation.string.Reverse(symbol); 定义udf别名 --define.pig register 'your_path_to_piggybank/piggybank.jar'; define reverse org.apache.pig.piggybank.evaluation.string.Reverse(); divs = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray, date:chararray, dividends:float); backwards = foreach divs generate reverse(symbol); 构造函数带参数的udf --define_constructor_args.pig register 'acme.jar'; define convert com.acme.financial.CurrencyConverter('dollar', 'euro'); divs = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray, date:chararray, dividends:float); backwards = foreach divs generate convert(dividends); 。托管java中的静态函数(效率较低) --invoker.pig define hex InvokeForString('java.lang.Integer.toHexString', 'int'); divs = load 'NYSE_daily' as (exchange, symbol, date, open, high, low, close, volume, adj_close); nonnull = filter divs by volume is not null; inhex = foreach nonnull generate symbol, hex((int)volume); 如果函数的参数是一个数组,那么传递过去的是一个bag define stdev InvokeForDouble('com.acme.Stats.stdev', 'double[]'); A = load 'input' as (id: int, dp:double); B = group A by id; C = foreach B generate group, stdev(A.dp); 。multiquery --multiquery.pig players = load 'baseball' as (name:chararray, team:chararray, position:bag{t:(p:chararray)}, bat:map[]); pwithba = foreach players generate name, team, position, bat#'batting_average' as batavg; byteam = group pwithba by team; avgbyteam = foreach byteam generate group, AVG(pwithba.batavg); store avgbyteam into 'by_team'; flattenpos = foreach pwithba generate name, team, flatten(position) as position, batavg; bypos = group flattenpos by position; avgbypos = foreach bypos generate group, AVG(flattenpos.batavg); store avgbypos into 'by_position'; 。split wlogs = load 'weblogs' as (pageid, url, timestamp); split wlogs into apr03 if timestamp < '20110404', apr02 if timestamp < '20110403' and timestamp > '20110401', apr01 if timestamp < '20110402' and timestamp > '20110331'; store apr03 into '20110403'; store apr02 into '20110402'; store apr01 into '20110401'; 。设置pig环境
。parameter 向pig脚本传递参数 --daily.pig daily = load 'NYSE_daily' as (exchange:chararray, symbol:chararray, date:chararray, open:float, high:float, low:float, close:float, volume:int, adj_close:float); yesterday = filter daily by date == '$DATE'; grpd = group yesterday all; minmax = foreach grpd generate MAX(yesterday.high), MIN(yesterday.low); 用-p 传递参数,每个变量前都要加一个-p pig -p DATE=2009-12-17 daily.pig 参数也可以放在一个文件里,每行一个参数,注释部分以#开头,使用 pig脚本 wlogs = load 'clicks/$YEAR$MONTH01' as (url, pageid, timestamp); 参数文件 #Param file YEAR=2009- MONTH=12- DAY=17 DATE=$YEAR$MONTH$DAY 执行 pig -param_file daily.params daily.pig 也可以在pig内定义参数%declare 或者 %default,%default定义默认的参数,在特殊情况下可以被覆盖 注意:%declare和%default不能用于以下位置:
%default parallel_factor 10; wlogs = load 'clicks' as (url, pageid, timestamp); grp = group wlogs by pageid parallel $parallel_factor; cntd = foreach grp generate group, COUNT(wlogs); 。定义Macro宏,相当于子函数 --macro.pig -- Given daily input and a particular year, analyze how -- stock prices changed on days dividends were paid out. define dividend_analysis (daily, year, daily_symbol, daily_open, daily_close) returns analyzed { divs = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray, date:chararray, dividends:float); divsthisyear = filter divs by date matches '$year-.*'; dailythisyear = filter $daily by date matches '$year-.*'; jnd = join divsthisyear by symbol, dailythisyear by $daily_symbol; $analyzed = foreach jnd generate dailythisyear::$daily_symbol, $daily_close - $daily_open; }; daily = load 'NYSE_daily' as (exchange:chararray, symbol:chararray, date:chararray, open:float, high:float, low:float, close:float, volume:int, adj_close:float); results = dividend_analysis(daily, '2009', 'symbol', 'open', 'close'); 。引用pig文件,被引用的文件被执行一遍,相当于拼接在一起,被引用的文件中不能存在自定义变量 --main.pig import '../examples/ch6/dividend_analysis.pig'; daily = load 'NYSE_daily' as (exchange:chararray, symbol:chararray, date:chararray, open:float, high:float, low:float, close:float, volume:int, adj_close:float); results = dividend_analysis(daily, '2009', 'symbol', 'open', 'close'); 默认搜索文件夹为当前文件夹,可以使用set pig.import.search.path设置搜索的路径 set pig.import.search.path '/usr/local/pig,/grid/pig'; import 'acme/macros.pig';
|
|