导言本文主要介绍如何快速的通过Spark访问Iceberg table。 Spark通过DataSource和DataFrame API访问Iceberg table,或者进行Catalog相关的操作。由于Spark Data Source V2 API还在持续的演进和修改中,所以Iceberg在不同的Spark版本中的使用方式有所不同。 版本对比
Spark 2.4配置Hive MetaStoreIceberg内部支持Hive和Hadoop两种catalog:
后文以Hive catalog为主做介绍。Hive catalog需要Hive MetaStore的支持。注意其有多种配置方式,其中内嵌的Derby数据库仅仅用于实验和学习,不能用于生产环境。 Spark<SPARK_HOME>/conf/spark-defaults.conf需要加入如下配置,使Iceberg能够访问Hive MetaStore: spark.hadoop.hive.metastore.uris thrift://<HiveMetaStore>:9083 spark.hadoop.hive.metastore.warehouse.dir hdfs://<NameNode>:8020/path 部署
git clone https://github.com/apache/incubator-iceberg.git cd incubator-iceberg # master branch supports Spark 2.4.4 ./gradlew assemble spark-shell --jars <iceberg-git-working-directory>/spark-runtime/build/libs/iceberg-spark-runtime-<version>.jar 读Iceberg table通过DataFrameSpark 2.4只能读写已经存在的Iceberg table。在后续的操作前,需要先通过Iceberg API来创建table。 读取是通过
Iceberg会判断path中是否含有'/'。如果是,则认为是一个用路径表示Hadoop table;否则,会去Hive catalog中寻找。 利用time travel回溯某一个snapshot的数据在读取时,通过option指定 // Time travel to October 26, 1986 at 01:21:00 spark.read .format('iceberg') .option('as-of-timestamp', '499162860000') .load('db.table')
// Time travel to snapshot with ID 10963874102873L spark.read .format('iceberg') .option('snapshot-id', 10963874102873L) .load('db.table')
在DataFrame基础上使用SQL SELECT在DataFrame的基础上,创建local temporary view后,也可以通过SQL SELECT来读取Iceberg table的内容:
写Iceberg tableSpark 2.4可以通过 // Append df.write .format('iceberg') .mode('append') .save('db.table')
// Overwrite df.write .format('iceberg') .mode('overwrite') .save('db.table') 有如下几点需要注意:
访问Iceberg table的元数据Iceberg支持通过
结果如下: +-------------------------+---------------------+---------------------+---------------------+ | made_current_at | snapshot_id | parent_id | is_current_ancestor | +-------------------------+---------------------+---------------------+---------------------+ | 2019-02-08 03:29:51.215 | 5781947118336215154 | NULL | true | | 2019-02-08 03:47:55.948 | 5179299526185056830 | 5781947118336215154 | true | | 2019-02-09 16:24:30.13 | 296410040247533544 | 5179299526185056830 | false | | 2019-02-09 16:32:47.336 | 2999875608062437330 | 5179299526185056830 | true | | 2019-02-09 19:42:03.919 | 8924558786060583479 | 2999875608062437330 | true | | 2019-02-09 19:49:16.343 | 6536733823181975045 | 8924558786060583479 | true | +-------------------------+---------------------+---------------------+---------------------+ 又如:
可以进一步将history和snapshot按照snapshot id做join,来查找snapshot id对应的application id: spark.read .format('iceberg') .load('db.table.history') .createOrReplaceTempView('history') spark.read .format('iceberg') .load('db.table.snapshots') .createOrReplaceTempView('snapshots')
结果如下: -------------------------+-----------+----------------+---------------------+----------------------------------+ | made_current_at | operation | snapshot_id | is_current_ancestor | summary[spark.app.id] | +-------------------------+-----------+----------------+---------------------+----------------------------------+ | 2019-02-08 03:29:51.215 | append | 57897183625154 | true | application_1520379288616_155055 | | 2019-02-09 16:24:30.13 | delete | 29641004024753 | false | application_1520379288616_151109 | | 2019-02-09 16:32:47.336 | append | 57897183625154 | true | application_1520379288616_155055 | | 2019-02-08 03:47:55.948 | overwrite | 51792995261850 | true | application_1520379288616_152431 | +-------------------------+-----------+----------------+---------------------+----------------------------------+ Spark 3.0Iceberg在Spark 3.0中,作为V2 Data Source,除了上述Spark 2.4所有的访问能力外,还可以通过V2 Data Source专属的DataFrame API访问;同时,受益于external catalog的支持,Spark SQL的DDL功能也可以操作Iceberg table,并且DML语句支持也更加丰富。 配置external catalog在<SPARK_HOME>/conf/spark-defaults.conf加入如下配置:
通过V2 Data Source专属DataFrame API访问df.writeTo('catalog-name.db.table') .overwritePartitions() 通过Spark SQL访问相较于Spark 2.4,Spark 3.0可以省去DataFrameReader和创建local temporary view的步骤,直接通过Spark SQL进行操作:
我们作为社区中 总结本文作为Iceberg的快速入门,介绍了如何通过Spark访问Iceberg table,以及不同Spark版本的支持情况:
随着Iceberg自身功能的完善(如向量化读取,merge on read等),以及上下游对接和生态的丰富,Iceberg作为优秀的表格式抽象,在大数据领域必然会有更好的发展。 |
|
来自: 阿明哥哥资料区 > 《53.大数据.机器学习.思维》