配色: 字号:
MySQL、Teradata和PySpark代码互转表和数据转换代码
2021-05-25 | 阅:  转:  |  分享 
  
MySQL、Teradata和PySpark代码互转表和代码代码描述MySQLTeradataSQLPySpark添加/删除列1、添加列a
ltertable[`<架构名称>`.]`<表名>`addcolumn<字段名><类型>;2、删除列alterta
ble[`<架构名称>`.]`<表名>`dropcolumn<字段名>;1、添加列ALTERTABLE[<架构名称>
.]<表名>ADD<字段名><类型>;2、删除列ALTERTABLE[<架构名称>.]<表名>DROP<字段名>;1
、添加列=…=.withColumn(
''<字段名>'',sum([col]forcolin.colum
ns))2、删除列.drop(''<字段名>'')删除表DROPTABLE[`<架构名称>`.]`<表
名>`;DROPTABLE[<架构名称>.]<表名>;importsubprocess?subprocess.check_c
all(''rm-r<存储路径>/<表名>''),shell=True)创建表并插入查询数据CREATETABLE[`<架构名
称>`.]`<表名>`(<字段名1><类型1>[AUTO_INCREMENT],<字段名2><类型2>[AUTO_I
NCREMENT],<字段名3><类型3>[AUTO_INCREMENT],…<字段名n><类型3n>[AUTO_INCR
EMENT][,PRIMARYKEY(<主键字段名>)][,UNIQUE(<唯一值字段名1>,<唯一值字段名2>,<唯一
值字段名3>,…,<唯一值字段名m>)])[ENGINE={InnoDB|MYISAM|BDB}DEFAULTCHARSE
T={utf8|gbk}];?INSERTINTO[`<架构名称>`.]`<表名>`;?或者?CREATE
TABLE[`<架构名称>`.]`<表名>`;??CREATE[MULTISET]TABLE[<
架构名称>.]<表名>[,<参数1>,<参数2>,<参数3>,…<参数n>](<字段名1><类型1>[CHARACT
ERSET<字符集1>NOTCASESPECIFIC],<字段名2><类型2>[CHARACTERSET<字符集2
>NOTCASESPECIFIC],<字段名3><类型3>[CHARACTERSET<字符集3>NOTCASESP
ECIFIC],…<字段名n><类型n>[CHARACTERSET<字符集3>NOTCASESPECIFIC])[U
NIQUE][PRIMARYINDEX(<主键字段名>)];???INSERTINTO[<架构名称>.]<表名>ECT查询>;?或者?CREATE[MULTISET]TABLE[<架构名称>.]<表名>[,<参数1>,<参数2>,
<参数3>,…<参数n>](<字段名1><类型1>[CHARACTERSET<字符集1>NOTCASESPECIF
IC],<字段名2><类型2>[CHARACTERSET<字符集2>NOTCASESPECIFIC],<字段名3><
类型3>[CHARACTERSET<字符集3>NOTCASESPECIFIC],…<字段名n><类型3>[CHARA
CTERSET<字符集3>NOTCASESPECIFIC])[UNIQUE][PRIMARYINDEX(<主键字段
名>)]AS;=spark.sql("""<查询语句>""")me变量>.write.parquet(path=''<存储路径>/<表名>'',mode="overwrite")插入少量数据I
NSERTINTO[`<架构名称>`.]`<表名>`(<字段名1>,<字段名2>,<字段名3>,...,<字段名n>)VA
LUES(<值1>,<值2>,<值3>,…,<值n>);INSERTINTO[`<架构名称>`.]`<表名>`(<字段名1
>,<字段名2>,<字段名3>,...,<字段名n>)VALUES(<值n+1>,<值n+2>,<值n+3>,…,<值2n>)
;INSERTINTO[`<架构名称>`.]`<表名>`(<字段名1>,<字段名2>,<字段名3>,...,<字段名n>)
VALUES(<值2n+1>,<值2n+2>,<值2n+3>,…,<值3n>);…INSERTINTO[`<架构名称>`.]
`<表名>`(<字段名1>,<字段名2>,<字段名3>,...,<字段名n>)VALUES(<值mn+1>,<值mn+2>,
<值mn+3>,…,<值(m+1)n>);????INSERTINTO[<架构名称>.]<表名>(<字段名1>,<字段名2
>,<字段名3>,...,<字段名n>)VALUES(<值1>,<值2>,<值3>,…,<值n>);INSERTINTO[
<架构名称>.]<表名>(<字段名1>,<字段名2>,<字段名3>,...,<字段名n>)VALUES(<值n+1>,<值n+
2>,<值n+3>,…,<值2n>);INSERTINTO<表名>(<字段名1>,<字段名2>,<字段名3>,...,<字段
名n>)VALUES(<值2n+1>,<值2n+2>,<值2n+3>,…,<值3n>);…INSERTINTO[<架构名称
>.]<表名>(<字段名1>,<字段名2>,<字段名3>,...,<字段名n>)VALUES(<值mn+1>,<值mn+2>
,<值mn+3>,…,<值(m+1)n>);spark.createDataFrame([(<值1>,<值2>,<值3>,...,
<值n>),(<值n+1>,<值n+2>,<值n+3>,...,<值2n>),(<值2n+1>,<值2n+2>,<值2n+3>,.
..,<值3n>),...,(<值mn+1>,<值mn+2>,<值mn+3>,...,<值(m+1)n>)],[''<字段名1>'',
''<字段名2>'',''<字段名3>'',...,''<字段名n>'']).write.parquet(path=''<存储路径>/<表名>
''),mode="overwrite")限制查询返回行数SELECT<字段列表>FROM[`<架构名称1>`.]`<表名>`
{INNER/LEFT/RIGHT/FULL}JOIN[`<架构名称2>`.]`<维度表名1>`ON<表连接条件1>{
INNER/LEFT/RIGHT/FULL}JOIN[`<架构名称3>`.]`<度量表名1>`…ON<表连接条件2>[W
HERE<筛选条件>]LIMIT<限制返回行数>;SELECTTOP<限制返回行数><字段列表>FROM[<架构名
称1>.]<表名1>{INNER/LEFT/RIGHT/FULL}JOIN[<架构名称2>.]<维度表名1>ON<表连接
条件1>{INNER/LEFT/RIGHT/FULL}JOIN[<架构名称3>.]<度量表名1>…ON<表连接条件2>
[WHERE<筛选条件>];=spark.sql("""SELECTFROM[<架构名称
1>.]<表名1>""").createOrReplaceTempView("<表名1>")Frame变量2>=spark.sql("""SELECTFROM[<架构名称2>.]<维度表名1>""").cach
e().createOrReplaceTempView("<表名2>")
=spark.sql("""SELECTFROM[<架构名称3>.]<度量表名1>""").
createOrReplaceTempView("<表名3>")…=spark.sql("""SE
LECT<字段列表>FROM<表名1>JOIN<表名2>ON<表连接条件1>{INNER/LEFT/RIGHT/F
ULL}JOIN<表名3>ON<表连接条件2>[WHERE<筛选条件>]""").limit(<限制返回行数>)带表连
接的查询SELECT<字段列表>FROM[`<架构名称1>`.]`<表名1>`{INNER/LEFT/RIGHT/FUL
L}JOIN[`<架构名称2>`.]`<维度表名1>`ON<表连接条件1>{INNER/LEFT/RIGHT/FULL}
JOIN[`<架构名称3>`.]`<度量表名1>`…ON<表连接条件2>[WHERE<筛选条件>];SELECT<
字段列表>FROM[<架构名称1>.]<表名1>{INNER/LEFT/RIGHT/FULL}JOIN[<架构名称2>
.]<维度表名1>ON<表连接条件1>{INNER/LEFT/RIGHT/FULL}JOIN[<架构名称3>.]<度量表
名1>…ON<表连接条件2>[WHERE<筛选条件>];=spark.sql("""SEL
ECTFROM[<架构名称1>.]<表名1>""").createOrReplaceTempV
iew("<表名1>")=spark.sql("""SELECTFROM[<架构名称2>.
]<维度表名1>""").cache().createOrReplaceTempView("<表名2>
")=spark.sql("""SELECTFROM[<架构名称3>.]<度量表名1>""
").createOrReplaceTempView("<表名3>")…
=spark.sql("""SELECT<字段列表>FROM<表名1>JOIN<表名2>ON<表连接条件1>{I
NNER/LEFT/RIGHT/FULL}JOIN<表名3>ON<表连接条件2>[WHERE<筛选条件>]""")带表
连接更新表记录CREATETABLE[`<架构名称1>`.]`<表名1>`(<字段名1><类型1>[AUTO_INC
REMENT],<字段名2><类型2>[AUTO_INCREMENT],<字段名3><类型3>[AUTO_INCREMEN
T],…<字段名n><类型3n>[AUTO_INCREMENT][,PRIMARYKEY(<主键字段名>)][,UNIQ
UE(<唯一值字段名1>,<唯一值字段名2>,<唯一值字段名3>,…,<唯一值字段名m>)])[ENGINE={InnoD
B|MYISAM|BDB}DEFAULTCHARSET={utf8|gbk}];?INSERTINTO[`<架构名称1>`
.]`<表名1>`SELECT<主键字段>,<值不变字段1>,<值不变字段2>,<值不变字段3>,…<值不变字段n>
,<值改变字段1>,<值改变字段1>,<值改变字段2>,…<值改变字段n>,FROM[`<架构名称2>`.]`<表名2>`WH
ERE<筛选条件>;??UPDATE<别名1>FROM[`<架构名称1>`.]`<表名1>`AS<别名1>,[`<架构
名称3>`.`<表名3>`]SET<值改变字段1>=<改变值1><值改变字段2>=<改变值2><值改变字段3>=<改变
值3>…<值改变字段n>=<改变值n>?WHERE<表连接条件>[AND<筛选条件>];CREATE[MULTISET]
TABLE[<架构名称1>.]<表名1>[,<参数1>,<参数2>,<参数3>,…<参数n>](<字段名1><类型
1>[CHARACTERSET<字符集1>NOTCASESPECIFIC],<字段名2><类型2>[CHARACTE
RSET<字符集2>NOTCASESPECIFIC],<字段名3><类型3>[CHARACTERSET<字符集3>
NOTCASESPECIFIC],…<字段名n><类型3>[CHARACTERSET<字符集3>NOTCASESP
ECIFIC])[UNIQUE][PRIMARYINDEX(<主键字段名>)];?INSERTINTO[<架构名称1
>.]<表名1>SELECT<主键字段>,<值不变字段1>,<值不变字段2>,<值不变字段3>,…<值不变字段n>,
<值改变字段1>,<值改变字段1>,<值改变字段2>,…<值改变字段n>,FROM[<架构名称2>.]<表名2>WHERE
<筛选条件>;??UPDATE<别名1>FROM[<架构名称1>.]<表名1>AS<别名1>,[<架构名称3>.<表名3
>]SET<值改变字段1>=<改变值1><值改变字段2>=<改变值2><值改变字段3>=<改变值3>…<值改变字段n
>=<改变值n>?WHERE<表连接条件>[AND<筛选条件>];=spark.sql(""
"SELECTFROM<架构名称2>.<表名2>""").createOrReplaceTem
pView("<表名1>")=spark.sql("""SELECTFROM<架构名称3>
.<表名3>""").createOrReplaceTempView("<表名2>")me变量3>=spark.sql("""SELECT<别名1>.<主键字段>,<值不变字段1>,<值不变字段2>,<值
不变字段3>,…<值不变字段n>,if(<别名2>.<主键字段>isnull,<别名1>.<值改变字段1>,<改变值
1>)AS<值改变字段1>,if(<别名2>.<主键字段>isnull,<别名1>.<值改变字段2>,<改变值2>)
AS<值改变字段2>,if(<别名2>.<主键字段>isnull,<别名1>.<值改变字段3>,<改变值3>)AS<
值改变字段3>,…if(<别名2>.<主键字段>isnull,<别名1>.<值改变字段n>,<改变值n>)AS<值改变
字段n>FROM<表名1>AS<别名1>INNERJOIN<表名2>AS<别名2>ON<表连接条件>[WHE
RE<筛选条件>]""").write.parquet(path=''<存储路径>/<表名1>'',
mode="overwrite")查询分组排名数据SELECT<字段1>,<字段2>,<字段3>,…<字段n>FROM
(SELECT<字段1>,<字段2>,<字段3>,…<字段n>,ROW_NUMBER()OVER(PARTIT
IONBY<分组字段>ORDERBY<排序字段>[DESC])ASrn[WHERE<筛选条件>])tWHER
Ern=1SELECT<字段1>,<字段2>,<字段3>,…<字段n>FROM[<架构名称>.]<表名>QUAL
IFYROW_NUMBER()OVER(PARTITIONBY<分组字段>ORDERBY<排序字段>[DESC]
)=1[WHERE<筛选条件>];???=spark.sql("""SELECTFRO
M[<架构名称>.]<表名>""")=spark.sql("""SELECT<字段1>,<字
段2>,<字段3>,…<字段n>FROM(SELECT<字段1>,<字段2>,<字段3>,…<字段n>,R
OW_NUMBER()OVER(PARTITIONBY<分组字段>ORDERBY<排序字段>[DESC])A
SrnFROM<变量1>[WHERE<筛选条件>])tWHERErn=1""")字符串连接SELECTCO
NCAT(<字符串变量/字段/常量1>,<字符串变量/字段/常量2>)SELECT<字符串变量/字段/常量1>||<字符串变
量/字段/常量2>=spark.sql("""SELECTCONCAT(<字符串变量/字段/常量1
>,<字符串变量/字段/常量2>)""")查询分组里有数字SELECT<维度字段1>,<维度字段2>,<维度字段3>,…
<维度字段n>,<聚合函数1>(<度量字段1>)AS<别名1>,<聚合函数2>(<度量字段2>)AS<别名2>,?<
聚合函数3>(<度量字段3>)AS<别名3>,…<聚合函数m>(<度量字段m>)AS<别名m>FROM[<架构名称
>.]<表名>GROUPBY1,2,3,…,nSELECT<维度字段1>,<维度字段2>,<维度字段3>,…<维
度字段n>,<聚合函数1>(<度量字段1>)AS<别名1>,<聚合函数2>(<度量字段2>)AS<别名2>,?<聚合
函数3>(<度量字段3>)AS<别名3>,…<聚合函数m>(<度量字段m>)AS<别名m>FROM[<架构名称>.
]<表名>GROUPBY1,2,3,…,n=spark.sql("""SELECT<维度字
段1>,<维度字段2>,<维度字段3>,…<维度字段n>,<聚合函数1>(<度量字段1>)AS<别名1>,<聚合函
数2>(<度量字段2>)AS<别名2>,?<聚合函数3>(<度量字段3>)AS<别名3>,…<聚合函数m>(<度量字
段m>)AS<别名m>FROM[<架构名称>.]<表名>GROUPBY<维度字段1>,<维度字段2>,<维度字
段3>,…<维度字段n>""")?DECIMAL类型转换SELECT(CAST(<数值字段/变量/常量1>ASDECIM
AL(38,2))/CAST(<数值字段/变量/常量2>ASDECIMAL(38,2)));SELECT(CAST(<数
值字段/变量/常量1>ASDECIMAL(38,2))/CAST(<数值字段/变量/常量2>ASDECIMAL(38,
2)));<变量>=spark.sql("""SELECT<数值字段/变量/常量1>100/<数值字段/变量/常量
2>/100""")获取年月日和获取中国时区的当天日期SELECTYEAR(CURRENT_DATE()),MONTH(C
URRENT_DATE()),DAY(CURRENT_DATE()),CONVERT_TZ(create_time,@@se
ssion.time_zone,''+8:00'');SETtime_zone=''Asia/Shanghai'';selectnow
();SELECTEXTRACT(YEARFROMCURRENT_DATE),EXTRACT(MONTHFROMCUR
RENT_DATE),EXTRACT(DAYFROMCURRENT_DATE),CAST(CONVERT_TIMEZONE
(''Asia/Shanghai'',CAST(GETDATE()ASTIMESTAMP))ASDATE)<变量>=spa
rk.sql("""SELECTYEAR(CURRENT_DATE),MONTH(CURRENT_DATE),DAY(CUR
RENT_DATE),CAST(CONVERT_TIMEZONE(''Asia/Shanghai'',CAST(GETDATE()
ASTIMESTAMP))ASDATE)""")时间戳之间间隔天数计算SELECTTIMESTAMPDIFF(DAY,<
开始时间戳>,<结束时间戳>)SELECTEXTRACT(DAYFROM(<结束时间戳>-<开始时间戳>DAY(4)
TOSECOND))86400<变量>=spark.sql("""SELECTYEAR(CURRENT_DATE)
,MONTH(CURRENT_DATE),DAY(CURRENT_DATE)""")分区操作一、查看MySQL是否支持分区1、
MySQL5.6以及之前版本showvariableslike''%partition%'';2、MySQL5.7showpl
ugins;?二、分区表的分类与限制1、分区表分类RANGE分区:基于属于一个给定连续区间的列值,把多行分配给分区。?LIST分区
:类似于按RANGE分区,区别在于LIST分区是基于列值匹配一个离散值集合中的某个值来进行选择。?HASH分区:基于用户定义的表达
式的返回值来进行选择的分区,该表达式使用将要插入到表中的这些行的列值进行计算。这个函数可以包含MySQL中有效的、产生非负整数值
的任何表达式。?KEY分区:类似于按HASH分区,区别在于KEY分区只支持计算一列或多列,且MySQL服务器提供其自身的哈希函数。
必须有一列或多列包含整数值。?复合分区:在MySQL5.6版本中,只支持RANGE和LIST的子分区,且子分区的类型只能为HAS
H和KEY。?2、分区表限制1)分区键必须包含在表的所有主键、唯一键中。?2)MYSQL只能在使用分区函数的列本身进行比较时才能过
滤分区,而不能根据表达式的值去过滤分区,即使这个表达式就是分区函数也不行。?3)最大分区数:不使用NDB存储引擎的给定表的最大可
能分区数为8192(包括子分区)。如果当分区数很大,但是未达到8192时提示Goterror…fromstoragee
ngine:Outofresourceswhenopeningfile,可以通过增加open_files_limit系
统变量的值来解决问题,当然同时打开文件的数量也可能由操作系统限制。?4)不支持查询缓存:分区表不支持查询缓存,对于涉及分区表的查
询,它自动禁用。查询缓存无法启用此类查询。?5)分区的innodb表不支持外键。?6)服务器SQL_mode影响分区表的同步复制
。主机和从机上的不同SQL_mode可能会导致sql语句;这可能导致分区之间的数据分配给定主从位置不同,甚至可能导致插入主机上
成功的分区表在从库上失败。为了获得最佳效果,您应该始终在主机和从机上使用相同的服务器SQL模式。?7)ALTERTABLE…
ORDERBY:对分区表运行的ALTERTABLE…ORDERBY列语句只会导致每个分区中的行排序。?8)全文索引。
分区表不支持全文索引,即使是使用InnoDB或MyISAM存储引擎的分区表。9)分区表无法使用外键约束。10)Spatialc
olumns:具有空间数据类型(如POINT或GEOMETRY)的列不能在分区表中使用。11)临时表:临时表不能分区。12)s
ubpartition问题:subpartition必须使用HASH或KEY分区。只有RANGE和LIST分区可能被分区;H
ASH和KEY分区不能被子分区。13)分区表不支持mysqlcheck,myisamchk和myisampack。三、创建分区表
1、range分区行数据基于一个给定的连续区间的列值放入分区。CREATETABLE`test_11`(`id`int(
11)NOTNULL,`t`dateNOTNULL,PRIMARYKEY(`id`,`t`))ENGINE=I
nnoDBDEFAULTCHARSET=utf8PARTITIONBYRANGE(to_days(t))(PARTIT
IONp20170801VALUESLESSTHAN(736907)ENGINE=InnoDB,PARTITIO
Np20170901VALUESLESSTHAN(736938)ENGINE=InnoDB,PARTITION
pmaxVALUESLESSTHANmaxvalueENGINE=InnoDB);123456789然后插入4条数据
:insertintotest_11values(1,"20170722"),(2,"20170822"),(3,"201
70823"),(4,"20170824");1然后查看information下partitions对分区别信息的统计:selec
tPARTITION_NAMEas"分区",TABLE_ROWSas"行数"frominformation_sche
ma.partitionswheretable_schema="mysql_test"andtable_name="tes
t_11";+-----------+--------+|分区|行数|+-----------+--------+|p2
0170801|1||p20170901|3|+-----------+--------+2r
owsinset(0.00sec)12345678可以看出分区p20170801插入1行数据,p20170901插入的3行
数据。可以是用year、to_days、unix_timestamp等函数对相应的时间字段进行转换,然后分区。2、list分区和r
ange分区一样,只是list分区面向的是离散的值mysql>CREATETABLEh2(->c1INT,->c
2INT->)->PARTITIONBYLIST(c1)(->PARTITIONp0VALUESIN(
1,4,7),->PARTITIONp1VALUESIN(2,5,8)->);QueryOK,0ro
wsaffected(0.11sec)123456789与RANGE分区的情况不同,没有“catch-all”,如MAXVA
LUE;分区表达式的所有预期值应在PARTITION…VALUESIN(…)子句中涵盖。包含不匹配的分区列值的INSER
T语句失败并显示错误,如此示例所示:mysql>INSERTINTOh2VALUES(3,5);ERROR1525
(HY000):Tablehasnopartitionforvalue3123、hash分区根据用户自定义表达式的返
回值来进行分区,返回值不能为负数CREATETABLEt1(col1INT,col2CHAR(5),col3DAT
E)PARTITIONBYHASH(YEAR(col3))PARTITIONS4;123如果你插入col3的数值为’
2005-09-15’,那么根据以下计算来选择插入的分区:MOD(YEAR(''2005-09-01''),4)=MOD(200
5,4)=11234、key分区根据MySQL数据库提供的散列函数进行分区CREATETABLEk1(idINT
NOTNULL,nameVARCHAR(20),UNIQUEKEY(id))PARTITIONBYKEY()PAR
TITIONS2;1234567KEY仅列出零个或多个列名称。用作分区键的任何列必须包含表的主键的一部分或全部,如果该表具有一
个。如果没有列名称作为分区键,则使用表的主键(如果有)。如果没有主键,但是有一个唯一的键,那么唯一键用于分区键。但是,如果唯一键
列未定义为NOTNULL,则上一条语句将失败。与其他分区类型不同,KEY使用的分区不限于整数或空值。例如,以下CREATET
ABLE语句是有效的:CREATETABLEtm1(s1CHAR(32)PRIMARYKEY)PARTITIONB
YKEY(s1)PARTITIONS10;12345注意:对于key分区表,不能执行ALTERTABLEDROPPRIM
ARYKEY,因为这样做会生成错误ERROR1466(HY000):Fieldinlistoffieldsfo
rpartitionfunctionnotfoundintable.5、Column分区COLUMN分区是5.5开始
引入的分区功能,只有RANGECOLUMN和LISTCOLUMN这两种分区;支持整形、日期、字符串;RANGE和LIST的分区
方式非常的相似。COLUMNS和RANGE和LIST分区的区别1)针对日期字段的分区就不需要再使用函数进行转换了,例如针对date
字段进行分区不需要再使用YEAR()表达式进行转换。2)COLUMN分区支持多个字段作为分区键但是不支持表达式作为分区键。colu
mn支持的数据类型:1)所有的整型,float和decimal不支持2)日期类型:date和datetime,其他不支持3)字符类
型:CHAR,VARCHAR,BINARY和VARBINARY,blob和text不支持单列的columnrange分区m
ysql>showcreatetablelist_c;CREATETABLE`list_c`(`c1`int(
11)DEFAULTNULL,`c2`int(11)DEFAULTNULL)ENGINE=InnoDBDEFAUL
TCHARSET=latin1/!50500PARTITIONBYRANGECOLUMNS(c1)(PARTITIO
Np0VALUESLESSTHAN(5)ENGINE=InnoDB,PARTITIONp1VALUESLE
SSTHAN(10)ENGINE=InnoDB)/多列的columnrange分区mysql>showcrea
tetablelist_c;CREATETABLE`list_c`(`c1`int(11)DEFAULTNUL
L,`c2`int(11)DEFAULTNULL,`c3`char(20)DEFAULTNULL)ENGINE=
InnoDBDEFAULTCHARSET=latin1/!50500PARTITIONBYRANGECOLUMNS
(c1,c3)(PARTITIONp0VALUESLESSTHAN(5,''aaa'')ENGINE=InnoDB,
PARTITIONp1VALUESLESSTHAN(10,''bbb'')ENGINE=InnoDB)/单列的co
lumnlist分区mysql>showcreatetablelist_c;CREATETABLE`list_c`
(`c1`int(11)DEFAULTNULL,`c2`int(11)DEFAULTNULL,`c3`cha
r(20)DEFAULTNULL)ENGINE=InnoDBDEFAULTCHARSET=latin1/!50500
PARTITIONBYLISTCOLUMNS(c3)(PARTITIONp0VALUESIN(''aaa'')ENG
INE=InnoDB,PARTITIONp1VALUESIN(''bbb'')ENGINE=InnoDB)/
6、子分区(组合分区)在分区的基础上再进一步分区,有时成为复合分区;MySQL数据库允许在range和list的分区上进行HASH
和KEY的子分区。例如:CREATETABLEts(idINT,purchasedDATE)PARTITIONBY
RANGE(YEAR(purchased))SUBPARTITIONBYHASH(TO_DAYS(purchased
))SUBPARTITIONS2(PARTITIONp0VALUESLESSTHAN(1990),PARTI
TIONp1VALUESLESSTHAN(2000),PARTITIONp2VALUESLESSTHANMA
XVALUE);[root@mycat-3~]#ll/data/mysql_data_3306/mysql_test/ts
-rw-r-----1mysqlmysql8596Aug813:54/data/mysql_data_330
6/mysql_test/ts.frm-rw-r-----1mysqlmysql98304Aug813:54/d
ata/mysql_data_3306/mysql_test/ts#P#p0#SP#p0sp0.ibd-rw-r-----1m
ysqlmysql98304Aug813:54/data/mysql_data_3306/mysql_test/ts
#P#p0#SP#p0sp1.ibd-rw-r-----1mysqlmysql98304Aug813:54/da
ta/mysql_data_3306/mysql_test/ts#P#p1#SP#p1sp0.ibd-rw-r-----1my
sqlmysql98304Aug813:54/data/mysql_data_3306/mysql_test/ts#
P#p1#SP#p1sp1.ibd-rw-r-----1mysqlmysql98304Aug813:54/dat
a/mysql_data_3306/mysql_test/ts#P#p2#SP#p2sp0.ibd-rw-r-----1mys
qlmysql98304Aug813:54/data/mysql_data_3306/mysql_test/ts#P
#p2#SP#p2sp1.ibd1234567891011121314151617ts表根据purchased进行range分区,
然后又进行了一次hash分区,最后形成了32个分区,可以从物理文件证实此分区方式。可以通过subpartition语法来显示指定
子分区名称。注意:每个子分区的数量必须相同;如果一个分区表的任何子分区已经使用subpartition,那么必须表明所有的子分区名
称;每个subpartition子句必须包括子分区的一个名字;子分区的名字必须是一致的另外,对于MyISAM表可以使用index
directory和datadireactory来指定各个分区的数据和索引目录,但是对于innodb表来说,因为该存储引擎使用表
空间自动的进行数据和索引的管理,因此会忽略指定index和data的语法。四、普通表转换为分区表1、用altertable
table_namepartitionby命令重建分区表?altertablejxfp_data_bakPARTITIO
NBYKEY(SH)PARTITIONS8;?五、分区表操作CREATETABLEt1(idINT,year_
colINT)PARTITIONBYRANGE(year_col)(PARTITIONp0VALUESLESS
THAN(1991),PARTITIONp1VALUESLESSTHAN(1995),PARTITIONp2V
ALUESLESSTHAN(1999));?1、ADDPARTITION(新增分区)ALTERTABLEt1ADD
PARTITION(PARTITIONp3VALUESLESSTHAN(2002));?2、DROPPARTITI
ON(删除分区)ALTERTABLEt1DROPPARTITIONp0,p1;?3、TRUNCATEPARTITI
ON(截取分区)ALTERTABLEt1TRUNCATEPARTITIONp0;?ALTERTABLEt1TRUN
CATEPARTITIONp1,p3;?4、COALESCEPARTITION(合并分区)CREATETABLEt2
(nameVARCHAR(30),startedDATE)PARTITIONBYHASH(YEAR(started
))PARTITIONS6;?ALTERTABLEt2COALESCEPARTITION2;?5、REORGANIZ
EPARTITION(拆分/重组分区)1)拆分分区?ALTERTABLEtableALGORITHM=INPLACE,R
EORGANIZEPARTITION;?ALTERTABLEemployeesADDPARTITION(PARTIT
IONp5VALUESLESSTHAN(2010),PARTITIONp6VALUESLESSTHANMAX
VALUE);?2)重组分区?ALTERTABLEmembersREORGANIZEPARTITIONs0,s1INT
O(PARTITIONp0VALUESLESSTHAN(1970));ALTERTABLEtbl_nameRE
ORGANIZEPARTITIONpartition_listINTO(partition_definitions);AL
TERTABLEmembersREORGANIZEPARTITIONp0,p1,p2,p3INTO(PARTITI
ONm0VALUESLESSTHAN(1980),PARTITIONm1VALUESLESSTHAN(200
0));ALTERTABLEttADDPARTITION(PARTITIONnpVALUESIN(4,8));
ALTERTABLEttREORGANIZEPARTITIONp1,npINTO(PARTITIONp1VAL
UESIN(6,18),PARTITIONnpVALUESin(4,8,12));?6、ANALYZE、CH
ECKPARTITION(分析与检查分区)1)ANALYZE读取和存储分区中值的分布情况?ALTERTABLEt1ANA
LYZEPARTITIONp1,ANALYZEPARTITIONp2;?ALTERTABLEt1ANALYZEP
ARTITIONp1,p2;?2)CHECK检查分区是否存在错误?ALTERTABLEt1ANALYZEPARTIT
IONp1,CHECKPARTITIONp2;?7、REPAIR分区修复被破坏的分区?ALTERTABLEt1REP
AIRPARTITIONp0,p1;?8、OPTIMIZE该命令主要是用于回收空闲空间和分区的碎片整理。对分区执行该命令,相当
于依次对分区执行CHECKPARTITION,ANALYZEPARTITION,REPAIRPARTITION命令。?譬
如:?ALTERTABLEt1OPTIMIZEPARTITIONp0,p1;?9、REBUILD分区重建分区,它相当于
先删除分区中的数据,然后重新插入。这个主要是用于分区的碎片整理。?ALTERTABLEt1REBUILDPARTITION
p0,p1;?10、EXCHANGEPARTITION(分区交换)分区交换的语法如下:?ALTERTABLEptEXC
HANGEPARTITIONpWITHTABLEnt?其中,pt是分区表,p是pt的分区(注:也可以是子分区),nt是目
标表。?其实,分区交换的限制还是蛮多的:?1)nt不能为分区表?2)nt不能为临时表?3)nt和pt的结构必须一致?4)nt不存
在任何外键约束,即既不能是主键,也不能是外键。?5)nt中的数据不能位于p分区的范围之外。?具体可参考MySQL的官方文档?11、
迁移分区(DISCARD、IMPORT)ALTERTABLEt1DISCARDPARTITIONp2,p3TAB
LESPACE;?ALTERTABLEt1IMPORTPARTITIONp2,p3TABLESPACE;?实验环境:
(都是mysql5.7)源库:192.168.2.200mysql5.7.16zhangdb下的emp_2分区
表的目标库:192.168.2.100mysql5.7.18test下(将zhangdb的emp表,导入到目标库
的testschema下)--:在源数据库中创建测试分区表emp_2,然后导入数据MySQL[zhangdb]>CREATE
TABLEemp_2(idBIGINTunsignedNOTNULLAUTO_INCREMENT,xVARCHAR
(500)NOTNULL,yVARCHAR(500)NOTNULL,PRIMARYKEY(id))PARTITION
BYRANGECOLUMNS(id)(PARTITIONp1VALUESLESSTHAN(1000),PARTIT
IONp2VALUESLESSTHAN(2000),PARTITIONp3VALUESLESSTHAN(30
00));(接着创建存储过程,导入测试数据)DELIMITER//CREATEPROCEDUREinsert_batch(
)beginDECLAREnumINT;SETnum=1;WHILEnum<3000DOIF(num%10000=
0)THENCOMMIT;ENDIF;INSERTINTOemp_2VALUES(NULL,REPEAT(''X'',5
00),REPEAT(''Y'',500));SETnum=num+1;ENDWHILE;COMMIT;END//DELIM
ITER;mysql>selectTABLE_NAME,PARTITION_NAMEfrominformation_sc
hema.partitionswheretable_schema=''zhangdb'';+------------+------
----------+|TABLE_NAME|PARTITION_NAME|+------------+---------
-------+|emp|NULL||emp_2|p1
||emp_2|p2||emp_2|p3
|+------------+----------------+4rowsinset(0.00sec)mysql>s
electcount()fromemp_2partition(p1);+----------+|count()|
+----------+|999|+----------+1rowinset(0.00sec)mysql>
selectcount()fromemp_2partition(p2);+----------+|count()
|+----------+|1000|+----------+1rowinset(0.00sec)mysq
l>selectcount()fromemp_2partition(p3);+----------+|count(
)|+----------+|1000|+----------+1rowinset(0.00sec)从上
面可以看出,emp_2分区表已经创建完成,并且有3个子分区,每个分区都有一点数据。--:在目标数据库中,创建emp_2表的结构,不
要数据(要在源库,使用showcreatetableemp_2\G的方法查看创建该表的sql)MySQL[test]>
CREATETABLE`emp_2`(`id`bigint(20)unsignedNOTNULLAUTO_IN
CREMENT,`x`varchar(500)NOTNULL,`y`varchar(500)NOTNULL,PR
IMARYKEY(`id`))ENGINE=InnoDBAUTO_INCREMENT=3000DEFAULTCHARS
ET=utf8mb4/!50500PARTITIONBYRANGECOLUMNS(id)(PARTITIONp1V
ALUESLESSTHAN(1000)ENGINE=InnoDB,PARTITIONp2VALUESLESS
THAN(2000)ENGINE=InnoDB,PARTITIONp3VALUESLESSTHAN(3000)
ENGINE=InnoDB)/;[root@localhosttest]#ll-rw-r-----1mysql
mysql98304May2515:58emp_2#P#p0.ibd-rw-r-----1mysqlmysql
98304May2515:58emp_2#P#p1.ibd-rw-r-----1mysqlmysql98304M
ay2515:58emp_2#P#p2.ibd注意:※约束条件、字符集等等也必须一致,建议使用showcreatetab
let1;来获取创建表的SQL,否则在新服务器上导入表空间的时候会提示1808错误。--:在目标数据库上,丢弃分区表的表空间M
ySQL[test]>altertableemp_2discardtablespace;QueryOK,0row
saffected(0.12sec)[root@localhosttest]#ll---这时候在看,刚才的3个分
区的idb文件都没有了-rw-r-----1mysqlmysql8604May2504:14emp_2.frm-
-:在源数据库上运行FLUSHTABLES…FOREXPORT锁定表并生成.cfg元数据文件,最后将cfg和ibd文件传
输到目标数据库中mysql>flushtablesemp_2forexport;QueryOK,0rowsaff
ected(0.00sec)[root@localhostzhangdb]#scpemp_2root@192.168
.2.100:/mysql/data/test/--将文件cp到目标数据库mysql>unlocktables;-
--最后将表的锁是否--:在目标数据库中对文件授权,然后导入表空间查看数据是否完整可用[root@localhosttest]#
chownmysql.mysqlemp_2#MySQL[test]>altertableemp_2import
tablespace;QueryOK,0rowsaffected(0.96sec)MySQL[test]>sele
ctcount()fromemp_2;+----------+|count()|+----------+|
2999|+----------+1rowinset(0.63sec)从上面的查看得知,分区表都已经导入到目标数据库中
了,另外,也可以将部分子分区导入到目标数据库中,(往往整个分区表会很大,可用只将需要用到的子分区导入到目标数据库中),将部分子分区
导入到目标数据库的方法是:1)在创建目标表的时候,只需要创建要导入的分区即可,如:只创建了p2p3两个分区CREATETA
BLE`emp_2`(`id`bigint(20)unsignedNOTNULLAUTO_INCREMENT,`
x`varchar(500)NOTNULL,`y`varchar(500)NOTNULL,PRIMARYKEY
(`id`))ENGINE=InnoDBAUTO_INCREMENT=3000DEFAULTCHARSET=utf8mb4
/!50500PARTITIONBYRANGECOLUMNS(id)(PARTITIONp2VALUESLES
STHAN(2000)ENGINE=InnoDB,PARTITIONp3VALUESLESSTHAN(300
0)ENGINE=InnoDB)/2)从源库cp到目标库的文件,当然也就是这俩的,就不需要其他分区的了,3)其他的操作方
法都一样了。?六、如何获取分区的相关信息1.通过SHOWCREATETABLE语句来查看分区表的分区子句譬如:mysql
>showcreatetablee/G?2.通过SHOWTABLESTATUS语句来查看表是否分区对应Creat
e_options字段譬如:mysql>showtablestatus/G?
1.row?Name:eEngine:InnoDBVer
sion:10Row_format:CompactRows:6Avg_row_length:10922Data_l
ength:65536Max_data_length:0Index_length:0Data_free:0Auto_
increment:NULLCreate_time:2015-12-0722:26:06Update_time:NUL
LCheck_time:NULLCollation:latin1_swedish_ciChecksum:NULLCr
eate_options:partitionedComment:?3.查看INFORMATION_SCHEMA.PARTI
TIONS表4.通过EXPLAINPARTITIONSSELECT语句查看对于具体的SELECT语句,会访问哪个分区。?
七、MySQL5.7对于partition表的改进HANDLERstatements:MySQL5.7.1分区表开始支持HAN
DLER语句;indexconditionpushdown:MySQL5.7.3分区表开始支持ICP;loaddata:My
SQL5.7开始使用缓存来实现性能提升,每个分区使用130KB缓冲区来实现这一点;Per-partition索引缓存:MySQL5
.7开始支持使用CACHEINDEX和LOADINDEXINTOCACHE语句对分区的MyISAM表支持索引缓存;FOR
EXPORT选项(FLUSHTABLES):MySQL5.7.4分区的InnoDB表开始支持FLUSHTABLES语句FOR
EXPORT选项;从MySQL5.7.2开始,子分区支持ANALYZE,CHECK,OPTIMIZE,REPAIR和TRUNC
ATE操作;Teradata的分区中常用的是按时间分区,如下例只要添加到createtable语句末尾就可以实现2013年全年一
天一个分区了(为了省事,可以一次分5-10年):?PARTITIONBYRANGE_N(?Rcd_DtBETWEEN
DATE''2013-01-01''ANDDATE''2013-12-31''?EACHINTERVAL''1''DAY,N
ORANGE?);?另外一个常用(但是不容易掌握)的是按字符串取值分区。在上述按时间分区中我们可以看到RANGE_N关键字。按
值分区采用CASE_N关键字,如下例所示:?PARTITIONBYCASE_N(?(CASEWHEN(my_field=
''A'')THEN(1)ELSE(0)END)=1,?(CASEWHEN(my_field=''B'')THEN(2
)ELSE(0)END)=2,?(CASEWHEN(my_field=''C'')THEN(3)ELSE(0)E
ND)=3,?NOCASEORUNKNOWN);?更进一步,其中如下面的语法元素:?my_field=''A''?可以修改为类似
于这样的形式:?SUBSTR(my_field,1,1)IN(''E'',''F'',''G'')?在现实中,因为访问数据从全表扫描变成
了分区扫描的原因,某些步骤可以达成10-100倍的性能提升。对于复杂的耗时较长的大作业,也总是能够缩短一半以上的运行时间。?1.数
据分区?在分布式程序中,通信的代价较大,通过对数据集在节点间的分区进行控制以获得较少的网络传输从而提升整体性能。如果给定的RDD只
需要被扫描一次,则完全没有必要对其预先进行处理。只有当数据集多次在诸如连接这种基于键的操作中使用时,分区才会有帮助。?尽管Spar
k无法显示控制每个键具体落在哪一个工作节点,但Spark可以确保同一组的键出现在同一个节点上。?以Join操作为例,如果未根据RD
D中的键重新分,那么在默认情况下,连接操作会将两个数据集中的所有键的哈希值求出来,将哈希值相同的记录通过网络传输到同一台机器上,然
后在那台机器上对所有键相同的记录进行连接操作。?2.partitionBy()算子frompysparkimportSpar
kContext,SparkConfif__name__==''__main__'':conf=SparkConf().s
etMaster("local").setAppName("word_count")sc=SparkContext(conf=c
onf)pair_rdd=sc.parallelize([(''a'',1),(''b'',10),(''c'',4),(''d'',7),(''
e'',9),(''f'',10)])rdd_1=pair_rdd.partitionBy(2,partitionFunc=lambd
ax:ord(x)%2).persist()print(rdd_1.glom().collect())?结果如下:rdd_1[
[(''b'',10),(''d'',7),(''f'',10)],[(''a'',1),(''c'',4),(''e'',9)]]
parittionBy()只能用于pairRDD,将pairRDD中的key传入到partitionFunc函数中。还需要注意的是
,如果不将partitonBy()操作的结果持久化,那么后面每次用到这个RDD时都会重复地对数据进行分区操作,那样的话,parti
tionBy()重新分区带来的好处将会被抵消。通过这个算子可以完成python中的自定义分区,则不要向scala语言中那样麻烦(s
park本身提供了HashPartitioner和RangePartitioner)。?3.影响分区方式的操作?以下算子会为生成
的结果RDD设好分区方式:?cogroup()、groupWith()、join()、leftOuterJoin()、rightO
uterJoin()、groupByKey()、reduceByKey()、combineByKey()、partitionBy(
)、sort()、mapValues(如果父RDD有分区方式),flatMapValues(如果父RDD有分区方式),filter
(如果父RDD有分区方式)。写入分区?<变量>=spark.sql("""<查询语句>""")<变量>.write.parqu
et(path=''<存储路径>/<表名>/{par_col}={par_val}''.format(par_col=''<分区列名>
'',par_val=''<分区值>''),mode="overwrite")?删除分区importsubprocess?subpr
ocess.check_call(''rm-r<存储路径>/<表名>/{par_col}={par_val}''.format(
par_col=''<分区列名>'',par_val=''<分区值>''),shell=True)PySpark代码基本结构#--co
ding:utf-8--frompyspark.sqlimportHiveContext,SparkSession#初
始化SparkContext,同时启用Hive支持,#将终端命令行的测试模式下输出字段的最大长度设置为100个字符spark=
SparkSession.builder.appName("name").config("spark.debug.maxToS
tringFields",100).enableHiveSupport().getOrCreate()#初始化HiveCont
exthive=HiveContext(spark.sparkContext)#启用SparkSQL的表连接支持spark.
conf.set("spark.sql.crossJoin.enabled","true")#读取parquet文件数据的代码
#Parquet是面向分析型业务的列式存储格式,由Twitter和Cloudera合作开发,AWS中也使用#parquet文件
数据存储在AWSS3上#AWS使用S3作为数据存储的服务,S3全名是SimpleStorageService,也就是简
便的存储服务df1=spark.read.load(path='''',format=''parque
t'',header=True)#读取CSV文件数据的代码#这边以CSV文件作为手工交换文件的标准,#主要的原因是因为csv
的格式简单,它的数字类型数据是字符串存储的,所以它的精度也可以保证df2=spark.read.load(path=''v文件路径>'',format=''csv'',header=True)#读取Hive表或视图数据的代码df3=hive.sq
l("""selectfrom<数据库名>.<表名>""")#对于多次使用的小表数据集进行数据的内存缓存(第一条Spa
rk优化策略),#这样的话,之后pyspark代码在多次调用数据的时候Spark就不会重复地读取相同的文件数据了df4=sp
ark.read.load(path='''',format=''parquet'',header=Tru
e).cache()#将刚才得到的数据集命名,以便放入SparkSQL编写查询语句df1.createOrReplaceTem
pView("DF1")df2.createOrReplaceTempView("DF2")df3.createOrReplace
TempView("DF3")df4.createOrReplaceTempView("DF4")#创建SparkSQL数据集的
代码#如果数据量比较多,而且业务逻辑复杂的话,可以将数据临时缓存在存储服务/磁盘上,#从而避免之后pyspark代码在使用Sp
arkSQL调用这里的SparkSQL数据集的时候,#这里的SparkSQL数据集被重复运行计算逻辑,从而节约计算资源(第二条S
park优化策略)df5=spark.sql("""SELECT...fromDF1ASD1LEFTJOINDF2
ASD2ON...LEFTJOINDF4ASD4ON...WHERE...""").persist()#由于c
ount是Action算子,会触发spark-submit事件,让之前的persist()缓存操作即刻生效,#不使用count(
)操作,persist()缓存操作会在下一个Action算子处或程序结束处生效df5.count()df5.createOrRep
laceTempView("DF5")#创建SparkSQL数据集的代码df6=spark.sql("""SELECT..
.fromDF5ASD5LEFTJOINDF3ASD3ON...LEFTJOINDF4ASD4ON...
WHERE...""")#写入结果数据集到parquet文件df6.write.parquet(path=''文件路径2>'',mode="overwrite")#释放磁盘缓存df5.unpersist()#sparkContext停止
spark.stop()PySpark从MySQL导出数据到parquet文件frompyspark.sqlimportSp
arkSessionspark=SparkSession.Builder().getOrCreate()url="jdbc:m
ysql://[:<端口号>]?useTimezone=false&serverTimezone=UT
C"mysql_df=spark.read.jdbc(url=url,table="<查询语句>",properties
={"user":"<用户名>","password":"<密码>","database":"<选择数据库>"})mysq
l_df.write.parquet(path=''<存储路径>/<表名>'',mode="overwrite")spark.s
top()PySpark从Teradata导出数据到parquet文件frompyspark.sqlimportSparkS
essionspark=SparkSession.Builder().getOrCreate()url="jdbc:ter
adata://{ip}/"\"DATABASE={database},"\"DBS_PORT={dbs_port},"
\"LOGMECH={LDAP},"\"CHARSET={ASCII|UTF8},"\"COLUMN_NAME=ON,"
\"MAYBENULL=ON".format(ip="",database="se名称>",dbs_port="<端口号>")teradata_df=spark.read.jdbc(url=url,t
able="<查询语句>",properties={"user":"<用户名>","password":"<密码>",
"driver":"com.teradata.jdbc.TeraDriver"})teradata_df.write.parq
uet(path=''<存储路径>/<表名>'',mode="overwrite")spark.stop()PySpark从Pa
rquent文件写入Hive表frompyspark.sqlimportSparkSession#打开动态分区spark.
sql("sethive.exec.dynamic.partition.mode=nonstrict")spark.sql(
"sethive.exec.dynamic.partition=true")#使用普通的hive-sql写入分区表spark
.sql("""insertoverwritetableai.da_aipurchase_dailysale_hive
partition(saledate)selectproductid,propertyid,processcenter
id,saleplatform,sku,poa,salecount,saledatefromszy_aipurch
ase_tmp_szy_dailysaledistributebysaledate""")#或者使用每次重建分区表的方
式jdbcDF.write.mode("overwrite").partitionBy("saledate").insertInt
o("ai.da_aipurchase_dailysale_hive")jdbcDF.write.saveAsTable("ai.
da_aipurchase_dailysale_hive",None,"append",partitionBy=''saled
ate'')#不写分区表,只是简单的导入到hive表jdbcDF.write.saveAsTable("ai.da_aipurc
hase_dailysale_for_ema_predict",None,"overwrite",None)PySpark读
取HiveSQL查询数据写入parquet文件frompyspark.sqlimportHiveContext,Spark
Sessionspark=SparkSession.builder.appName("<配置名称>").config("spa
rk.debug.maxToStringFields",100).enableHiveSupport().getOrCreate
()hive=HiveContext(spark.sparkContext)df=hive.sql(""">""")df.write.parquet("<存储路径>/<文件名>",mode=''overwrite'')PySpark读取p
arquet文件数据到DataFrame变量并获取所有字段名称和类型frompyspark.sqlimportSparkSe
ssionspark=SparkSession.Builder().getOrCreate()df=spark.read.
load(path=''<存储路径>/<表名>'',format=''parquet'')df.printSchema()forsc
indf.schema:print(sc.name)print(sc.dataType)print(sc.nullable)Py
Spark获取Dataframe的采样数据并保存到CSV文件frompyspark.sqlimportSparkSessio
nspark=SparkSession.Builder().getOrCreate()df=…df.sample(Fals
e,0.1,22345).repartition(1).write.csv(path=''<存储路径>/<文件名>'',header
=True,sep=",",mode=''overwrite'')PySpark将Dataframe数据保存为CSV文件from
pyspark.sqlimportSparkSessionspark=SparkSession.Builder().get
OrCreate()df=…df.repartition(1).write.csv(path=''<存储路径>/<表名>[/<分区字
段>=<分区值>].csv'',header=True,sep=",",mode=''overwrite'')spark.stop
()HiveQL从parquet文件创建Hive表DROPTABLEIFEXISTS`<库名>`.`<表名>`;CREAT
EEXTERNALTABLEIFNOTEXISTS`<库名>`.`<表名>`(`<字段1>`<类型1>,`<字
段2>`<类型2>,`<字段3>`<类型3>,...`<字段n>`<类型n>)ROWFORMATSERDE''or
g.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe''STORED
ASINPUTFORMAT''org.apache.hadoop.hive.ql.io.parquet.MapredParque
tInputFormat''OUTPUTFORMAT''org.apache.hadoop.hive.ql.io.parquet.M
apredParquetOutputFormat''LOCATION''<存储路径>/<表名>[/<分区字段>=<分区值>]'';Hi
veQL从Hive表创建Hive视图CREATEORREPLACEVIEW`<库名>`.`<表名>`AS句>;移动Parquet文件所在目录importsubprocesssubprocess.check_call(''mv<存储
路径1>/<表名>[/<分区字段>=<分区值>]<存储路径2>/<表名>[/<分区字段>=<分区值>]'',shell=True
)复制Parquet文件所在目录importsubprocesssubprocess.check_call("cp-r<存储
路径1>/<表名>[/<分区字段>=<分区值>]<存储路径2>/<表名>[/<分区字段>=<分区值>]",shell=True
)删除Parquet文件所在目录importsubprocesssubprocess.check_call(''rm-r<存储
路径1>/<表名>[/<分区字段>=<分区值>]'',shell=True)Teradata支持的数据类型MySQL支持的数据类型名
称类型说明INT整型4字节整数类型,范围约+/-21亿BIGINT长整型8字节整数类型,范围约+/-922亿亿REAL浮点型4字节
浮点数,范围约+/-1038DOUBLE浮点型8字节浮点数,范围约+/-10308DECIMAL(M,N)高精度小数由用户指定精度
的小数,例如,DECIMAL(20,10)表示一共20位,其中小数10位,通常用于财务计算CHAR(N)定长字符串存储指定长度的字
符串,例如,CHAR(100)总是存储100个字符的字符串VARCHAR(N)变长字符串存储可变长度的字符串,例如,VARCHAR
(100)可以存储0~100个字符的字符串BOOLEAN布尔类型存储True或者FalseDATE日期类型存储日期,例如,2018
-06-22TIME时间类型存储时间,例如,12:20:59DATETIME日期和时间类型存储日期+时间,例如,2018-06-2
212:20:59Hive支持的数据类型1.基本数据类型Hive数据类型Java数据类型长度例子TINYINTbyte1by
te有符号整数20SMALINTshort2byte有符号整数20INTint4byte有符号整数20BIGINTlong8
byte有符号整数20BOOLEANboolean布尔类型,true或者falseTRUEFALSEFLOATfloat单
精度浮点数3.14159DOUBLEdouble双精度浮点数3.14159STRINGstring字符系列。可以指定字符集。可以使
用单引号或者双引号。‘nowisthetime’“forallgoodmen”TIMESTAMP?时间类型?BINA
RY?字节数组?对于Hive的String类型相当于数据库的varchar类型,该类型是一个可变的字符串,不过它不能声
明其中最多能存储多少个字符,理论上它可以存储2GB的字符数。2.集合数据类型数据类型描述语法示例STRUCT和c语言中的
struct类似,都可以通过“点”符号访问元素内容。例如,如果某个列的数据类型是STRUCT{firstSTRING,la
stSTRING},那么第1个元素可以通过字段.first来引用。struct()MAPMAP是一组键-值对元组集合,使
用数组表示法可以访问数据。例如,如果某个列的数据类型是MAP,其中键->值对是’first’->’John’和’last’->’
Doe’,那么可以通过字段名[‘last’]获取最后一个元素map()ARRAY数组是一组具有相同类型和名称的变量的集合。这些变量
称为数组的元素,每个数组元素都有一个编号,编号从零开始。例如,数组值为[‘John’,‘Doe’],那么第2个元素可以通过数
组名[1]进行引用。Array()Hive有三种复杂数据类型ARRAY、MAP和STRUCT。ARRAY和MAP与
Java中的Array和Map类似,而STRUCT与C语言中的Struct类似,它封装了一个命名字段集合,复杂
数据类型允许任意层次的嵌套。Parquet文件存储格式Parquet仅仅是一种存储格式,它是语言、平台无关的,并且不需要和任何一种
数据处理框架绑定,目前能够和Parquet适配的组件包括下面这些,可以看出基本上通常使用的查询引擎和计算框架都已适配,并且可以很方
便的将其它序列化工具生成的数据转换成Parquet格式。查询引擎:Hive,Impala,Pig,Presto,Dril
l,Tajo,HAWQ,IBMBigSQL计算框架:MapReduce,Spark,Cascading,Crun
ch,Scalding,Kite数据模型:Avro,Thrift,ProtocolBuffers,POJOs项目组成
Parquet项目由以下几个子项目组成:https://github.com/apache/parquet-formatparqu
et-format项目由java实现,它定义了所有Parquet元数据对象,Parquet的元数据是使用ApacheThrift
进行序列化并存储在Parquet文件的尾部。https://github.com/apache/parquet-mrparquet
-format项目由java实现,它包括多个模块,包括实现了读写Parquet文件的功能,并且提供一些和其它组件适配的工具,例如H
adoopInput/OutputFormats、HiveSerde(目前Hive已经自带Parquet了)、Pigloa
ders等。https://github.com/Parquet/parquet-compatibilityparquet-com
patibility项目,包含不同编程语言之间(JAVA和C/C++)读写文件的测试代码。https://github.com/a
pache/parquet-cppparquet-cpp项目,它是用于用于读写Parquet文件的C++库。下图展示了Parque
t各个组件的层次以及从上到下交互的方式。数据存储层定义了Parquet的文件格式,其中元数据在parquet-format中定义,
包括Parquet原始类型定义、Page类型、编码类型、压缩类型等等。对象转换层完成其他对象模型与Parquet内部数据模型的映射
和转换,Parquet的编码方式使用的是stripingandassembly算法。对象模型层定义了如何读取Parquet文件
的内容,这一层转换包括Avro、Thrift、PB等序列化格式、Hiveserde等的适配。并且为了帮助大家理解和使用,Parq
uet提供了org.apache.parquet.example包实现了java对象和Parquet文件的转换。数据模型Parqu
et支持嵌套的数据模型,类似于ProtocolBuffers,每一个数据模型的schema包含多个字段,每一个字段又可以包含多个
字段,每一个字段有三个属性:重复数、数据类型和字段名,重复数可以是以下三种:required(出现1次),repeated(出现0
次或多次),optional(出现0次或1次)。每一个字段的数据类型可以分成两种:group(复杂类型)和primitive(基本
类型)。例如Dremel中提供的Document的schema示例,它的定义如下:messageDocument{requi
redint64DocId;optionalgroupLinks{repeatedint64Backward;
repeatedint64Forward;}repeatedgroupName{repeatedgroupLa
nguage{requiredstringCode;optionalstringCountry;}optiona
lstringUrl;}}可以把这个Schema转换成树状结构,根节点可以理解为repeated类型,如下图:?可以看出在S
chema中所有的基本类型字段都是叶子节点,在这个Schema中一共存在6个叶子节点,如果把这样的Schema转换成扁平式的关系模
型,就可以理解为该表包含六个列。Parquet中没有Map、Array这样的复杂数据结构,但是可以通过repeated和group
组合来实现这样的需求。在这个包含6个字段的表中有以下几个字段和每一条记录中它们可能出现的次数:DocId
int64只能出现一次Links.Backwardint64可能出现任意多次,但是如果出现0次则需要使用
NULL标识Links.Forwardint64同上Name.Language.Codestring同上
Name.Language.Countrystring同上Name.Urlstring同上由于在一个表中可能存在出现任意多
次的列,对于这些列需要标示出现多次或者等于NULL的情况,它是由Striping/Assembly算法实现的。Striping/A
ssembly算法上文介绍了Parquet的数据模型,在Document中存在多个非required列,由于Parquet一条记录
的数据分散的存储在不同的列中,如何组合不同的列值组成一条记录是由Striping/Assembly算法决定的,在该算法中列的每一个
值都包含三部分:value、repetitionlevel和definitionlevel。RepetitionLevels
为了支持repeated类型的节点,在写入的时候该值等于它和前面的值在哪一层节点是不共享的。在读取的时候根据该值可以推导出哪一层上
需要创建一个新的节点,例如对于这样的一个schema和两条记录。messagenested{repeatedgroupl
eve1{repeatedstringleve2;}}r1:[[a,b,c,],[d,e,f,g]]r2:[[h]
,[i,j]]计算repetitionlevel值的过程如下:value=a是一条记录的开始,和前面的值(已经没有值了)在根节
点(第0层)上是不共享的,所以repeatedlevel=0.value=b它和前面的值共享了level1这个节点,但是leve
l2这个节点上是不共享的,所以repeatedlevel=2.同理value=c,repeatedlevel=2.value
=d和前面的值共享了根节点(属于相同记录),但是在level1这个节点上是不共享的,所以repeatedlevel=1.valu
e=h和前面的值不属于同一条记录,也就是不共享任何节点,所以repeatedlevel=0.根据以上的分析每一个value需要记
录的repeatedlevel值如下:在读取的时候,顺序的读取每一个值,然后根据它的repeatedlevel创建对象,当读取
value=a时repeatedlevel=0,表示需要创建一个新的根节点(新记录),value=b时repeatedleve
l=2,表示需要创建一个新的level2节点,value=d时repeatedlevel=1,表示需要创建一个新的level1节
点,当所有列读取完成之后可以创建一条新的记录。本例中当读取文件构建每条记录的结果如下:可以看出repeatedlevel=0表示
一条记录的开始,并且repeatedlevel的值只是针对路径上的repeated类型的节点,因此在计算该值的时候可以忽略非re
peated类型的节点,在写入的时候将其理解为该节点和路径上的哪一个repeated节点是不共享的,读取的时候将其理解为需要在哪一
层创建一个新的repeated节点,这样的话每一列最大的repeatedlevel值就等于路径上的repeated节点的个数(不
包括根节点)。减小repeatedlevel的好处能够使得在存储使用更加紧凑的编码方式,节省存储空间。DefinitionLe
vels有了repeatedlevel我们就可以构造出一个记录了,为什么还需要definitionlevels呢?由于repe
ated和optional类型的存在,可能一条记录中某一列是没有值的,假设我们不记录这样的值就会导致本该属于下一条记录的值被当做当
前记录的一部分,从而造成数据的错误,因此对于这种情况需要一个占位符标示这种情况。definitionlevel的值仅仅对于空值是
有效的,表示在该值的路径上第几层开始是未定义的,对于非空的值它是没有意义的,因为非空值在叶子节点是定义的,所有的父节点也肯定是定义
的,因此它总是等于该列最大的definitionlevels。例如下面的schema。messageExampleDefini
tionLevel{optionalgroupa{optionalgroupb{optionalstring
c;}}}它包含一个列a.b.c,这个列的的每一个节点都是optional类型的,当c被定义时a和b肯定都是已定义的,当c未
定义时我们就需要标示出在从哪一层开始时未定义的,如下面的值:由于definitionlevel只需要考虑未定义的值,而对于rep
eated类型的节点,只要父节点是已定义的,该节点就必须定义(例如Document中的DocId,每一条记录都该列都必须有值,同样
对于Language节点,只要它定义了Code必须有值),所以计算definitionlevel的值时可以忽略路径上的requi
red节点,这样可以减小definitionlevel的最大值,优化存储。一个完整的例子本节我们使用Dremel论文中给的Doc
ument示例和给定的两个值r1和r2展示计算repeatedlevel和definitionlevel的过程,这里把未定义的
值记录为NULL,使用R表示repeatedlevel,D表示definitionlevel。首先看DocuId这一列,对于r
1,DocId=10,由于它是记录的开始并且是已定义的,所以R=0,D=0,同样r2中的DocId=20,R=0,D=0。对于Li
nks.Forward这一列,在r1中,它是未定义的但是Links是已定义的,并且是该记录中的第一个值,所以R=0,D=1,在r1
中该列有两个值,value1=10,R=0(记录中该列的第一个值),D=2(该列的最大definitionlevel)。对于Na
me.Url这一列,r1中它有三个值,分别为url1=’http://A‘,它是r1中该列的第一个值并且是定义的,所以R=0,D=
2;value2=’http://b/http://B‘,和上一个值value1在Name这一层是不相同的,所以R=1,D=2;v
alue3=NULL,和上一个值value2在Name这一层是不相同的,所以R=1,但它是未定义的,而Name这一层是定义的,所以
D=1。r2中该列只有一个值value3=’http://c/http://C‘,R=0,D=2.最后看一下Name.Langua
ge.Code这一列,r1中有4个值,value1=’en-us’,它是r1中的第一个值并且是已定义的,所以R=0,D=2(由于C
ode是required类型,这一列repeatedlevel的最大值等于2);value2=’en’,它和value1在Lan
guage这个节点是不共享的,所以R=2,D=2;value3=NULL,它是未定义的,但是它和前一个值在Name这个节点是不共享
的,在Name这个节点是已定义的,所以R=1,D=1;value4=’en-gb’,它和前一个值在Name这一层不共享,所以R=1
,D=2。在r2中该列有一个值,它是未定义的,但是Name这一层是已定义的,所以R=0,D=1.?Parquet文件格式Parqu
et文件是以二进制方式存储的,所以是不可以直接读取的,文件中包括该文件的数据和元数据,因此Parquet格式文件是自解析的。在HD
FS文件系统和Parquet文件中存在如下几个概念。HDFS块(Block):它是HDFS上的最小的副本单位,HDFS会把一个Bl
ock存储在本地的一个文件并且维护分散在不同的机器上的多个副本,通常情况下一个Block的大小为256M、512M等。HDFS文件
(File):一个HDFS的文件,包括数据和元数据,数据分散存储在多个Block中。行组(RowGroup):按照行将数据物理上
划分为多个单元,每一个行组包含一定的行数,在一个HDFS文件中至少存储一个行组,Parquet读写的时候会将整个行组缓存在内存中,
所以如果每一个行组的大小是由内存大的小决定的,例如记录占用空间比较小的Schema可以在每一个行组中存储更多的行。列块(Colum
nChunk):在一个行组中每一列保存在一个列块中,行组中的所有列连续的存储在这个行组文件中。一个列块中的值都是相同类型的,不同
的列块可能使用不同的算法进行压缩。页(Page):每一个列块划分为多个页,一个页是最小的编码的单位,在同一个列块的不同页可能使用不
同的编码方式。文件格式通常情况下,在存储Parquet数据的时候会按照Block大小设置行组的大小,由于一般情况下每一个Mappe
r任务处理数据的最小单位是一个Block,这样可以把每一个行组由一个Mapper任务处理,增大任务执行并行度。Parquet文件的格式如下图所示上图展示了一个Parquet文件的内容,一个文件中可以存储多个行组,文件的首位都是该文件的MagicCode,用于校验它是否是一个Parquet文件,Footerlength了文件元数据的大小,通过该值和文件长度可以计算出元数据的偏移量,文件的元数据中包括每一个行组的元数据信息和该文件存储数据的Schema信息。除了文件中每一个行组的元数据,每一页的开始都会存储该页的元数据,在Parquet中,有三种类型的页:数据页、字典页和索引页。数据页用于存储当前行组中该列的值,字典页存储该列值的编码字典,每一个列块中最多包含一个字典页,索引页用来存储当前行组下该列的索引,目前Parquet中还不支持索引页,但是在后面的版本中增加。在执行MR任务的时候可能存在多个Mapper任务的输入是同一个Parquet文件的情况,每一个Mapper通过InputSplit标示处理的文件范围,如果多个InputSplit跨越了一个RowGroup,Parquet能够保证一个RowGroup只会被一个Mapper任务处理。映射下推(ProjectPushDown)说到列式存储的优势,映射下推是最突出的,它意味着在获取表中原始数据时只需要扫描查询中需要的列,由于每一列的所有值都是连续存储的,所以分区取出每一列的所有值就可以实现TableScan算子,而避免扫描整个表文件内容。在Parquet中原生就支持映射下推,执行查询的时候可以通过Configuration传递需要读取的列的信息,这些列必须是Schema的子集,映射每次会扫描一个RowGroup的数据,然后一次性得将该RowGroup里所有需要的列的CloumnChunk都读取到内存中,每次读取一个RowGroup的数据能够大大降低随机读的次数,除此之外,Parquet在读取的时候会考虑列是否连续,如果某些需要的列是存储位置是连续的,那么一次读操作就可以把多个列的数据读取到内存。谓词下推(PredicatePushDown)在数据库之类的查询系统中最常用的优化手段就是谓词下推了,通过将一些过滤条件尽可能的在最底层执行可以减少每一层交互的数据量,从而提升性能,例如”selectcount(1)fromAJoinBonA.id=B.idwhereA.a>10andB.b<100”SQL查询中,在处理Join操作之前需要首先对A和B执行TableScan操作,然后再进行Join,再执行过滤,最后计算聚合函数返回,但是如果把过滤条件A.a>10和B.b<100分别移到A表的TableScan和B表的TableScan的时候执行,可以大大降低Join操作的输入数据。无论是行式存储还是列式存储,都可以在将过滤条件在读取一条记录之后执行以判断该记录是否需要返回给调用者,在Parquet做了更进一步的优化,优化的方法时对每一个RowGroup的每一个ColumnChunk在存储的时候都计算对应的统计信息,包括该ColumnChunk的最大值、最小值和空值个数。通过这些统计值和该列的过滤条件可以判断该RowGroup是否需要扫描。另外Parquet未来还会增加诸如BloomFilter和Index等优化数据,更加有效的完成谓词下推。在使用Parquet的时候可以通过如下两种策略提升查询性能:1、类似于关系数据库的主键,对需要频繁过滤的列设置为有序的,这样在导入数据的时候会根据该列的顺序存储数据,这样可以最大化的利用最大值、最小值实现谓词下推。2、减小行组大小和页大小,这样增加跳过整个行组的可能性,但是此时需要权衡由于压缩和编码效率下降带来的I/O负载。性能相比传统的行式存储,Hadoop生态圈近年来也涌现出诸如RC、ORC、Parquet的列式存储格式,它们的性能优势主要体现在两个方面:1、更高的压缩比,由于相同类型的数据更容易针对不同类型的列使用高效的编码和压缩方式。2、更小的I/O操作,由于映射下推和谓词下推的使用,可以减少一大部分不必要的数据扫描,尤其是表结构比较庞大的时候更加明显,由此也能够带来更好的查询性能上图是展示了使用不同格式存储TPC-H和TPC-DS数据集中两个表数据的文件大小对比,可以看出Parquet较之于其他的二进制文件存储格式能够更有效的利用存储空间,而新版本的Parquet(2.0版本)使用了更加高效的页存储方式,进一步的提升存储空间上图展示了Twitter在Impala中使用不同格式文件执行TPC-DS基准测试的结果,测试结果可以看出Parquet较之于其他的行式存储格式有较明显的性能提升。上图展示了criteo公司在Hive中使用ORC和Parquet两种列式存储格式执行TPC-DS基准测试的结果,测试结果可以看出在数据存储方面,两种存储格式在都是用snappy压缩的情况下量中存储格式占用的空间相差并不大,查询的结果显示Parquet格式稍好于ORC格式,两者在功能上也都有优缺点,Parquet原生支持嵌套式数据结构,而ORC对此支持的较差,这种复杂的Schema查询也相对较差;而Parquet不支持数据的修改和ACID,但是ORC对此提供支持,但是在OLAP环境下很少会对单条数据修改,更多的则是批量导入。项目发展自从2012年由Twitter和Cloudera共同研发Parquet开始,该项目一直处于高速发展之中,并且在项目之初就将其贡献给开源社区,2013年,Criteo公司加入开发并且向Hive社区提交了向hive集成Parquet的patch(HIVE-5783),在Hive0.13版本之后正式加入了Parquet的支持;之后越来越多的查询引擎对此进行支持,也进一步带动了Parquet的发展。目前Parquet正处于向2.0版本迈进的阶段,在新的版本中实现了新的Page存储格式,针对不同的类型优化编码算法,另外丰富了支持的原始类型,增加了Decimal、Timestamp等类型的支持,增加更加丰富的统计信息,例如BloonFilter,能够尽可能得将谓词下推在元数据层完成。总结本文介绍了一种支持嵌套数据模型对的列式存储系统Parquet,作为大数据系统中OLAP查询的优化方案,它已经被多种查询引擎原生支持,并且部分高性能引擎将其作为默认的文件存储格式。通过数据编码和压缩,以及映射下推和谓词下推功能,Parquet的性能也较之其它文件格式有所提升,可以预见,随着数据模型的丰富和Adhoc查询的需求,Parquet将会被更广泛的使用。
献花(0)
+1
(本文系zymITsky首藏)