分享

pyspark入门教程

 爱吃鱼的俊懒猫 2021-07-07

目录
一、windows下配置pyspark环境
1.1 jdk下载安装
1.2 Scala下载安装
1.3 spark下载安装
1.4 Hadoop下载安装
1.5 pyspark下载安装
1.6 anaconda下载安装
1.7 测试环境是否搭建成功
二、pyspark原理简介
三、pyspark使用语法
3.1 RDD的基本操作
3.2 DataFrame的基本操作
3.3 pyspark.sql.functions中的方法简介
3.4 窗口函数的使用

Pyspark学习笔记
一、windows下配置pyspark环境
在python中使用pyspark并不是单纯的导入pyspark包就可以实现的。需要由不同的环境共同搭建spark环境,才可以在python中使用pyspark。
搭建pyspark所需环境:
python3,jdk,spark,Scala,Hadoop(可选)

1.1 jdk下载安装
下载地址:http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html
打开Windows中的环境变量:
创建JAVA_HOME:C:\Program Files\Java\jdk1.8.0_181
创建CLASSPATH:.;%JAVA_HOME%\lib;%JAVA_HOME%\lib\tools.jar
在Path添加:%JAVA_HOME%\bin;
测试是否安装成功:打开cmd命令行,输入java -version
1.2 Scala下载安装
下载地址:https://downloads./scala/2.12.8/scala-2.12.8.msi
下载后进行安装
创建SCALA_HOME: C:\Program Files (x86)\scala
Path添加:;%SCALA_HOME%\bin; %JAVA_HOME%\bin;;%HADOOP_HOME%\bin
测试是否安装成功:打开cmd命令行,输入scala -version
1.3 spark下载安装
下载地址:http://mirror./apache/spark/spark-3.0.0-preview2/spark-3.0.0-preview2-bin-hadoop2.7.tgz
也可以选择下载指定版本:http://spark./downloads.html
在这里插入图片描述

下载好之后解压放在随便一个目录下即可,但是目录名不可以有空格。
环境变量:
创建SPARK_HOME:D:\spark-2.2.0-bin-hadoop2.7
Path添加:%SPARK_HOME%\bin
测试是否安装成功:打开cmd命令行,输入spark-shell
spark-shell时报错:error not found:value sqlContext。参考:https://www./2017/04/19/20170419-spark-error-01/
或者是路径中有空格所致。
1.4 Hadoop下载安装
如果你需要去hdfs取数据的话,就应该先装hadoop。对于spark和hadoop的关系,可以移步这篇博客:Spark是否会替代Hadoop?
https://blog.csdn.net/yjcyyl062c/article/details/84772829
下载地址:
http://www./dyn/closer.cgi/hadoop/common/hadoop-2.7.7/hadoop-2.7.7.tar.gz
在这里插入图片描述

解压到指定目录即可。
环境变量:
创建HADOOP_HOME:D:\hadoop-2.7.7
Path添加:%HADOOP_HOME%\bin
测试是否安装成功:打开cmd命令行,输入hadoop
hadoop测试时报错:Error: JAVA_HOME is incorrectly set。参考:https://blog.csdn.net/qq_24125575/article/details/76186309
1.5 pyspark下载安装
python下安装pyspark,可以先去官网上将pyspark下载之后,再进行安装。避免超时
下载地址:https://pypi.tuna./packages/9a/5a/271c416c1c2185b6cb0151b29a91fff6fcaed80173c8584ff6d20e46b465/pyspark-2.4.5.tar.gz
下载之后使用pip install pyspark-2.4.5.tar.gz即可安装。
1.6 anaconda下载安装
1.进入Anaconda官网下载:https://www./distribution/
2.直接双击运行安装包即可。
3.一直下一步。
4.测试是否安装成功:命令行下输入conda --version
1.7 测试环境是否搭建成功
测试整体环境是否搭建完成:
新建py文件并包含下面的测试代码:

from pyspark import SparkContext
sc = SparkContext("local", "count app")
words = sc.parallelize(
    ["scala",
     "java",
     "hadoop",
     "spark",
     "akka",
     "spark vs hadoop",
     "pyspark",
     "pyspark and spark"
     ])
counts = words.count()
print("Number of elements in RDD -> %i" % counts)

注意:建议在代码之前加上下面这两行代码,可以自动寻找spark的安装位置,是在py文件的最上端加入。

import findspark
findspark.init()

在本地电脑的jupyter notebook上进行spark操作时如果报jvm的错误,说明在jupyter中没有配置好spark的环境,可以考虑使用下面这种方式来加载spark的环境。在jupyter中新建一个python3文件,输入以下代码块

import findspark
findspark.init()
import os
import sys
spark_name = os.environ.get('SPARK_HOME',None)
if not spark_name:
    raise ValueErrorError('spark环境没有配置好')
sys.path.insert(0,os.path.join(spark_name,'python'))
sys.path.insert(0,os.path.join(spark_name,'D:\spark-3.0.0-preview2-bin-hadoop2.7\python\lib\py4j-0.10.8.1-src.zip'))

#(py4j-0.10.6-src.zip位于D:spark-2.3.0-bin-hadoop2.7中python文件夹中lib文件夹内,请根据自己的版本更改)
然后运行,就可以在jupyter中愉快的使用spark啦。

mac下配置此环境可以参考下面这篇文章:
https://blog.csdn.net/wapecheng/article/details/108071538

二、pyspark原理简介
pyspark的实现机制可以用下面这张图来表示
在这里插入图片描述

在python driver端,SparkContext利用Py4J启动一个JVM并产生一个JavaSparkContext。Py4J只使用在driver端,用于本地python与java SparkContext objects的通信。大量数据的传输使用的是另一个机制。
RDD在python下的转换会被映射成java环境下PythonRDD。在远端worker机器上,PythonRDD对象启动一些子进程并通过pipes与这些子进程通信,以此send用户代码和数据。

三、pyspark使用语法
在Spark2.0之前, SparkContext 是所有 Spark 功能的结构, 驱动器(driver) 通过SparkContext 连接到集群 (通过resource manager), 因为在2.0之前, RDD就是Spark的基础。如果需要建立SparkContext,则需要SparkConf,通过Conf来配置SparkContext的内容。
在这里插入图片描述

其中:
setAppName(), 是你的程序在集群上的名字
setMaster(), 你的Spark运行的模式 'local’表示本地模式
而在Spark2.0之后,Spark Session也是Spark 的一个入口, 为了引入dataframe和dataset的API, 同时保留了原来SparkContext的functionality, 如果想要使用 HIVE,SQL,Streaming的API, 就需要Spark Session作为入口。

spark = SparkSession.builder.appName('testSQL')                    .config('spark.some.config.option','some-value')                    .getOrCreate()

写一个小小的例子,创建一个SparkContext对象并统计一个文件中的行数。

import findspark
findspark.init()
from pyspark import SparkContext
logFile = r"C:\Users\Administrator\Desktop\README.md"
sc = SparkContext("local", "first app")
logData = sc.textFile(logFile).count()
print(logData)

3.1 RDD的基本操作
Spark的核心是RDD(Resilient Distributed Dataset)即弹性分布式数据集,属于一种分布式的内存系统的数据集应用。Spark主要优势就是来自RDD本身的特性,RDD能与其他系统兼容,可以导入外部存储系统的数据集,例如,HDFS、HBase或者其他Hadoop数据源。它们是在多个节点上运行和操作以在集群上进行并行处理的元素。RDD是不可变元素,这意味着一旦创建了RDD,就无法对其进行更改。RDD也具有容错能力,因此在发生任何故障时,它们会自动恢复。你可以对这些RDD应用多个操作来完成某项任务
要对这些RDD进行操作,有两种方法:
· Transformation
· Action
转换 - 这些操作应用于RDD以创建新的RDD。Filter,groupBy和map是转换的示例。
操作 - 这些是应用于RDD的操作,它指示Spark执行计算并将结果发送回驱动程序。

使用sparkcontext读取文件:

  • data_rdd = sc.textFiles('xxxxxxx.txt’) # 读入文件内容,返回的东西是rdd
  • path_data_rdd = sc.wholeTextFile(('xxxxxxx.txt’)) # 不仅读入文件内容,还会读入文件的路径path

基本操作及示例:

  • count() 返回RDD中的元素个数

    from pyspark import SparkContext
    sc = SparkContext("local", "count app")
    words = sc.parallelize(
        ["scala",
         "java",
         "hadoop",
         "spark",
         "akka",
         "spark vs hadoop",
         "pyspark",
         "pyspark and spark"
         ])
    counts = words.count()
    print("Number of elements in RDD -> %i" % counts)
    

在这里插入图片描述

  • collect() 返回RDD中的所有元素,即转换为python数据类型

    from pyspark import SparkContext
    sc = SparkContext("local", "collect app")
    words = sc.parallelize(
        ["scala",
         "java",
         "hadoop",
         "spark",
         "akka",
         "spark vs hadoop",
         "pyspark",
         "pyspark and spark"
         ])
    coll = words.collect()
    print("Elements in RDD -> %s" % coll)
    
    

在这里插入图片描述

  • foreach(func)
    仅返回满足foreach内函数条件的元素。在下面的示例中,我们在foreach中调用print函数,该函数打印RDD中的所有元素
    from pyspark import SparkContext
    sc = SparkContext("local", "ForEach app")
    words = sc.parallelize (
       ["scala", 
       "java", 
       "hadoop", 
       "spark", 
       "akka",
       "spark vs hadoop", 
       "pyspark",
       "pyspark and spark"]
    )
    def f(x): print(x)
    fore = words.foreach(f)
     
    

在这里插入图片描述

  • filter(f) 返回一个包含元素的新RDD,它满足过滤器内部的功能。在下面的示例中,我们过滤掉包含’'spark’的字符串。
    from pyspark import SparkContext
    sc = SparkContext("local", "Filter app")
    words = sc.parallelize(
        ["scala",
         "java",
         "hadoop",
         "spark",
         "akka",
         "spark vs hadoop",
         "pyspark",
         "pyspark and spark"]
    )
    words_filter = words.filter(lambda x: 'spark' in x)
    filtered = words_filter.collect()
    print("Fitered RDD -> %s" % (filtered))
    

在这里插入图片描述

  • map(f, preservesPartitioning = False)
    通过将该函数应用于RDD中的每个元素来返回新的RDD。在下面的示例中,我们形成一个键值对,并将每个字符串映射为值1

    from pyspark import SparkContext
    sc = SparkContext("local", "Map app")
    words = sc.parallelize(
        ["scala",
         "java",
         "hadoop",
         "spark",
         "akka",
         "spark vs hadoop",
         "pyspark",
         "pyspark and spark"]
    )
    words_map = words.map(lambda x: (x, 1))
    mapping = words_map.collect()
    print("Key value pair -> %s" % (mapping))
    

在这里插入图片描述

  • reduce(f) 执行指定的可交换和关联二元操作后,将返回RDD中的元素。

    在下面的示例中,我们从运算符导入add包并将其应用于’num’以执行简单的加法运算。说白了和Python的reduce一样:假如有一组整数[x1,x2,x3],利用reduce执行加法操作add,对第一个元素执行add后,结果为sum=x1,然后再将sum和x2执行add,sum=x1+x2,最后再将x2和sum执行add,此时sum=x1+x2+x3。

    from pyspark import SparkContext
    from operator import add
    sc = SparkContext("local", "Reduce app")
    nums = sc.parallelize([1, 2, 3, 4, 5])
    adding = nums.reduce(add)
    print("Adding all the elements -> %i" % (adding))
    

在这里插入图片描述

  • join(other, numPartitions = None) 它返回RDD,其中包含一对带有匹配键的元素以及该特定键的所有值。
    from pyspark import SparkContext
     sc = SparkContext("local", "Join app")
     x = sc.parallelize([("spark", 1), ("hadoop", 4)])
     y = sc.parallelize([("spark", 2), ("hadoop", 5)])
     joined = x.join(y)
     final = joined.collect()
     print( "Join RDD -> %s" % (final))
    

在这里插入图片描述

下面换一组数据进行操作。

    import findspark
findspark.init()
from pyspark import SparkConf, SparkContext
sc = SparkContext()
intRDD = sc.parallelize([3,1,2,5,5])
stringRDD = sc.parallelize(['Apple','Orange','Grape','Banana','Apple'])

创建一个int型数据与一个string型的数据。

  • distinct() 去重操作
    print (intRDD.distinct().collect())
    

在这里插入图片描述

  • randomSplit() randomSplit
    运算将整个集合以随机数的方式按照比例分为多个RDD,比如按照0.4和0.6的比例将intRDD分为两个RDD,并输出

    sRDD = intRDD.randomSplit([0.4,0.6])
    print (len(sRDD))
    print (sRDD[0].collect())
    print (sRDD[1].collect())
    

在这里插入图片描述

  • groupBy() groupBy运算可以按照传入匿名函数的规则,将数据分为多个Array。比如下面的代码将intRDD分为偶数和奇数
       result = intRDD.groupBy(lambda x : x % 2).collect()
       print (sorted([(x, sorted(y)) for (x, y) in result]))
    

在这里插入图片描述

  • union() 进行并集运算
    intRDD1 = sc.parallelize([3,1,2,5,5])
    intRDD2 = sc.parallelize([5,6])
    intRDD3 = sc.parallelize([2,7])
    print (intRDD1.union(intRDD2).union(intRDD3).collect())
    

在这里插入图片描述

  • Intersection() 进行交集运算
    print (intRDD1.intersection(intRDD2).collect())
    

在这里插入图片描述

  • subtract() 进行差集运算

    print (intRDD1.subtract(intRDD2).collect())
    ```![在这里插入图片描述](https://img-blog./20200720200512153.png)
    
    
  • cartesian 进行笛卡尔乘积运算

print (intRDD1.cartesian(intRDD2).collect())

由于两个RDD分别有5个元素和2个元素,所以返回结果有10各元素:
在这里插入图片描述

Rdd基本动作运算

读取元素,可以使用下列命令读取RDD内的元素,这是Actions运算,所以会马上执行

#取第一条数据
print (intRDD.first())
#取前两条数据
print (intRDD.take(2))
#升序排列,并取前3条数据
print (intRDD.takeOrdered(3))
#降序排列,并取前3条数据
print (intRDD.takeOrdered(3,lambda x:-x))

在这里插入图片描述

统计功能,可以将RDD内的元素进行统计运算

#统计  会返回一些常见的统计指标的值
print (intRDD.stats())
#最小值
print (intRDD.min())
#最大值
print (intRDD.max())
#标准差
print (intRDD.stdev())
#计数
print (intRDD.count())
#求和
print (intRDD.sum())
#平均
print (intRDD.mean()

在这里插入图片描述

  • RDD Key-Value基本“转换”运算

重新进行数据的定义:

kvRDD1 = sc.parallelize([(3,4),(3,6),(5,6),(1,2)])

我们用元素类型为tuple元组的数组初始化我们的RDD,这里,每个tuple的第一个值将作为键,而第二个元素将作为值。
可以使用keys和values函数分别得到RDD的键数组和值数组:

print (kvRDD1.keys().collect())
print (kvRDD1.values().collect())

在这里插入图片描述

筛选元素,可以按照键进行元素筛选,也可以通过值进行元素筛选,和之前的一样,使用filter函数,这里要注意的是,虽然RDD中是以键值对形式存在,但是本质上还是一个二元组,二元组的第一个值代表键,第二个值代表值,所以按照如下的代码既可以按照键进行筛选,我们筛选键的数值小于5的元组:

print (kvRDD1.filter(lambda x:x[0] < 5).collect())

在这里插入图片描述

值运算,我们可以使用mapValues()方法处理value值,下面的代码将value值进行了平方处理

print (kvRDD1.mapValues(lambda x:x**2).collect())

在这里插入图片描述

可以使用sortByKey按照key进行排序,传入参数的默认值为true,是按照从小到大排序,也可以传入参数false,表示从大到小排序

print (kvRDD1.sortByKey().collect())
print (kvRDD1.sortByKey(True).collect())
print (kvRDD1.sortByKey(False).collect())

在这里插入图片描述

使用reduceByKey函数可以对具有相同key值的数据进行合并。比如下面的代码,由于RDD中存在(3,4)和(3,6)两条key值均为3的数据,他们将被合为一条数据。

print (kvRDD1.reduceByKey(lambda x,y:x+y).collect())

在这里插入图片描述

  • 多个RDD Key-Value“转换”运算
    新创建两个rdd数据集
kvRDD1 = sc.parallelize([(3,4),(3,6),(5,6),(1,2)])
kvRDD2 = sc.parallelize([(3,8)])

在对多个rdd进行计算时,首先考虑使用连接语句,将多个rdd转化为一个rdd,并且为了保证数据的不丢失,要恰当地选择合适的连接方法。下面将进行常见的连接方法的介绍。

  • 内连接运算,join运算可以实现类似数据库的内连接,将两个RDD按照相同的key值join起来,kvRDD1与kvRDD2的key值唯一相同的是3,kvRDD1中有两条key值为3的数据(3,4)和(3,6),而kvRDD2中只有一条key值为3的数据(3,8),所以join的结果是(3,(4,8))和(3,(6,8)),对于没有相同key的数据,将不再进行显示:
    print (kvRDD1.join(kvRDD2).collect())
    在这里插入图片描述

  • 左外连接,使用leftOuterJoin可以实现类似数据库的左外连接,如果kvRDD1的key值对应不到kvRDD2,就会显示None,这是与内连接不同的一点。

print (kvRDD1.leftOuterJoin(kvRDD2).collect())

在这里插入图片描述

  • 右外连接,使用rightOuterJoin可以实现类似数据库的右外连接,如果kvRDD2的key值对应不到kvRDD1,就会显示None
print (kvRDD1.rightOuterJoin(kvRDD2).collect())

在这里插入图片描述

  • 删除相同key值数据,使用subtractByKey运算会删除相同key值得数据:
    print (kvRDD1.subtractByKey(kvRDD2).collect())
    在这里插入图片描述
# Key-Value“动作”运算
#读取第一条数据
print (kvRDD1.first())
#读取前两条数据
print (kvRDD1.take(2))
#读取第一条数据的key值
print (kvRDD1.first()[0])
#读取第一条数据的value值
print (kvRDD1.first()[1])

在这里插入图片描述

按key值统计,使用countByKey函数可以统计各个key值对应的数据的条数:

print (kvRDD1.countByKey())

在这里插入图片描述

lookup查找运算,使用lookup函数可以根据输入的key值来查找对应的Value值:

print (kvRDD1.lookup(3))

在这里插入图片描述

  • 持久化操作
    spark RDD的持久化机制,可以将需要重复运算的RDD存储在内存中,以便大幅提升运算效率,有两个主要的函数:
    persist() 使用persist函数对RDD进行持久化
    kvRDD1.persist()
    unpersist() 使用unpersist函数对RDD进行去持久化

3.2 DataFrame的基本操作
在pyspark中除了rdd之外还有dataframe这种数据格式。他们之间的差别在于:dataframe比rdd的速度快,对于结构化的数据,使用dataframe编写的代码更简洁。对于非结构化数据,建议先使用rdd处理成结构化数据,然后转换成dataframe。在dataFrame中的操作主要可以分为查、增改、合并、统计、删除、去重、格式转换、SQL操作、读写csv文件几个大的方向。我们从数据库中查询到的数据返回的数据结构就是DataFrame格式的。
在进行学习之前,首先从网上下载数据:https://www./datasets/35,下载数据完毕后,打开jupyter notebook进行加载数据,加载代码如下:

import findspark
findspark.init()
import os
import sys
spark_name = os.environ.get('SPARK_HOME',None)
if not spark_name:
    raise ValueErrorError('spark环境没有配置好')
sys.path.insert(0,os.path.join(spark_name,'python'))
sys.path.insert(0,os.path.join(spark_name,'D:\spark-3.0.0-preview2-bin-hadoop2.7\python\lib\py4j-0.10.8.1-src.zip'))
#(py4j-0.10.6-src.zip位于D:spark-2.3.0-bin-hadoop2.7中python文件夹中lib文件夹内,请根据自己的版本更改)
from pyspark import SparkContext
from pyspark.sql import SQLContext
sc = SparkContext()
sqlContext = SQLContext(sc)
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load(r'C:\Users\Administrator\Desktop\数据\电信客户流失数据\WA_Fn-UseC_-Telco-Customer-Churn.csv')

3.2.1查的操作
行元素查询操作

  • df.show(30) #显示前30行数据,会逐行显示出前10行数据,包含表头
    在这里插入图片描述

  • df.printSchema() #以树的形式打印概要

  • df.dtypes #查看数据类型
    在这里插入图片描述

  • list = df.head(3)与list = df.take(5) #获取头几行数据,以python中的列表形式返回
    在这里插入图片描述

  • int_num = df.count() #查询总行数
    在这里插入图片描述

  • df.select(df.customerID.alias(“customer_ID”)).show() #取别名
    在这里插入图片描述

  • from pyspark.sql.functions import isnull
    df = df.filter(isnull(“Churn”))
    df.show() #查询某列为null的行
    在这里插入图片描述

  • df_list = df.collect()
    print(df_list) #将数据以python的列表格式输出

  • df[“Partner”,“gender”].describe().show() #对df中的数据进行统计,返回常用的一些统计指标的值
    在这里插入图片描述

  • before_dist = df.select(“gender”,“Partner”).count()
    after_dist = df.select(“gender”,“Partner”).distinct().count() #去重并打印出来,也可以使用count()进行计数之后再打印出来
    在这里插入图片描述

  • sample = df.sample(False,0.5,0)
    sample.show() #随机取样,抽取50%的数据出来

  • expr() #将字符串作为一个函数来执行
    from pyspark.sql.functions import expr
    concat_df.select(expr('length(id_pur)’)).show(5) # 返回’ id_pur '列的长度
    在这里插入图片描述

列元素查询操作,列的类型为column,它可以使用pyspark.sql.Column中的所有方法

  • df.columns #获取df中的列名,注意columns后面没有括号
    在这里插入图片描述

  • select()#选取某一列或某几列数据
    例:df.select(“name”) #使用select返回的是dataframe格式,使用df[]在选中>=2个列时返回的才是dataframe对象,否则返回的是column对象。
    df.select(df.a, df.b, df.c) # 选择a、b、c三列
    df.select(df[“a”], df[“b”], df[“c”]) # 选择a、b、c三列
    也可以用df[“b”]进行选择

  • df.where(“customerID='7795-CFOCW’ and SeniorCitizen=0”).show() #选择满足条件的数据,注意在字符串要加上引号
    还有下面这两种写法

    print(df.where(df.gender==0).where(df.over_due_days>=1).count())
    print(df.where((df.gender==0)&(df.over_due_days>=1)).count())
    

在这里插入图片描述

withColumnRenamed(“old_name”,”new_name”)# 重命名列
asc()  #基于指定列返回一个升序表达式
desc()  #基于指定列返回一个降序表达式
astype(dataType)   #转换列的数据类型,跟cast()是同一个函数
cast(dataType)   #转换数据类型
startswith(other)  #判断列中每个值是否以指定字符开头,返回布尔值
endswith(“string”)   #判断列中每个值是否以指定字符结尾,返回布尔值
isNotNull()   #判断列中的值是否不为空
isNull()   #判断列中的值是否为空
like(expression)   #判断列中的值是否满足相似条件,判断条件跟sql语法相同,支持通配符
例:
df.select(df.ip.like('222.94%').alias('ifLike')).show()
orderBy()  #排序,可以指定正向排序或者倒序排列
df.select("customerID").orderBy("customerID").show(10)

在这里插入图片描述

使用窗口函数进行对比:

from pyspark.sql import functions as F
from pyspark.sql.window import Window
df.select("customerID",F.rank().over(Window.orderBy("customerID")).alias("customerID_rank") ).show(5)或者
df.select("customerID",F.rank().over(Window.orderBy(df.customerID.desc())).alias("customerID_rank") ).show(5)或者
df.orderBy(df.customerID.desc()).show(5)   倒序排列
  • sample() #随机抽样
    t1 = df.sample(False, 0.2, 42)
    t2 = df.sample(False, 0.2, 43)
    

其中,withReplacement = True or False代表是否在原数据进行修改。
按条件筛选when / between
when(condition, value1).otherwise(value2),意为:当满足条件condition的值时赋值为values1,不满足条件的则赋值为values2,otherwise表示,不满足条件的情况下,应该赋值何值。
例:

from pyspark.sql import functions as F
df.select(df.customerID,F.when(df.gender=="Male","1").when(df.gender=="Female", "0").otherwise("2").alias("sex")).show(10)

在这里插入图片描述

  • between(lowerBound, upperBound) # 筛选出某个范围内的值,返回的是TRUE or FALSE
    df.select(df.customerID, df.tenure.between(10, 20).alias(“tenure”)).show(5)
    在这里插入图片描述

  • 如果需要进行筛选df中的特定区间内的数据时,可以使用下面这种通过行索引进行筛选的方式。

    from pyspark.sql.functions import monotonically_increasing_id
    dfWithIndex = df.withColumn("index",monotonically_increasing_id())  #为df添加行索引
    dfWithIndex.select(dfWithIndex.index,dfWithIndex.tenure, dfWithIndex.tenure.between("30","50").alias("tenure")).show(5) #筛选特定行
    

在这里插入图片描述

  • filter() #过滤数据

    df = df.filter(df[tenure]>=21)等价于df = df.where(df[tenure]>=21)
    在有多个条件时: df .filter(“id = 1 or c1 = 'b’” ).show()
    过滤null值或nan值时:
    from pyspark.sql.functions import isnan, isnull
    df = df.filter(isnull("tenure"))
    df.show()  # 把a列里面数据为null的筛选出来(代表python的None类型)
    df = df.filter(isnan("tenure "))  # 把a列里面数据为nan的筛选出来(Not a Number,非数字数据)
    

3.2.2增改的操作

  • spark.createDataFrame()、rdd.toDF() #新建数据
    • 第一种,将pandas中的DataFrame转为spark中的DataFrame
    import pandas as pd
    from pyspark.sql import SparkSession   
    spark = SparkSession.builder.getOrCreate()    # 初始化spark会话
    pandas_df = pd.DataFrame({"name":["ss","aa","qq","ee"],"age":[12,18,20,25]})
    spark_df = spark.createDataFrame(pandas_df)#将pandas中的DataFrame转化为pyspark中的DataFrame
    

在这里插入图片描述

  • 第二种,将rdd转为spark中的DataFrame格式

    from pyspark.sql import Row
    from pyspark import SparkContext
    sc = SparkContext()
    row = Row("spe_id", "InOther")
    x = ['x1','x2']
    y = ['y1','y2']
    new_df = sc.parallelize([row(x[i], y[i]) for i in range(2)])
    ss = new_df.toDF()
    

在这里插入图片描述

  • 第三种,将spark中的DataFrame格式转为pandas中的DataFrame,使用toPandas()

    import pandas as pd
    from pyspark.sql import SparkSession
    spark = SparkSession.builder.getOrCreate()# 初始化spark会话
    pandas_df = pd.DataFrame({"name":["ss","aa","qq","ee"],"age":[12,18,20,25]})
    spark_df = spark.createDataFrame(pandas_df)#将pandas中的DataFrame转化为pyspark中的DataFrame
    print(type(spark_df))
    new_pandas = spark_df.toPandas()
    print(type(new_pandas))
    

在这里插入图片描述

  • withColumn() #在df中新增数据列,会返回一个新的DataFrame,需要两个参数,第二个参数是对原来的列作何操作,比如原来的列整体加一,不可以从别的列进行取数。

    df1 = df.select("customerID","gender","Partner","tenure")
    withcolumns_df = df1.withColumn("new_col",df1.tenure+3)
    withcolumns_df.show(5)
    

在这里插入图片描述

df = df.withColumn("gender ", df["tenure "].cast("Int"))  #修改列的类型
DF.withColumnRenamed( "id" , "idx" )   #修改列名

3.2.3合并的操作

  • union() #横向拼接,要求合并的两端具有相同的列名及列数
df2 = df.select("customerID","gender")
df3 = df.select(df.Churn.alias("customerID"),df.PaymentMethod.alias("gender"))
df4 = df2.union(df3)#等价于unionAll ()
df4.show(5)

在这里插入图片描述

  • Join() #根据条件进行关联
    • 单字段进行关联
      df5 = df1.select("customerID","gender")
      df6 = df1.select("gender","tenure")
      df7 = df5.join(df6,df5.gender==df6.gender,"left_outer")
      df7.show(5)
      

其中,方法可以为:inner, outer, left_outer, right_outer, leftsemi.
注意,一般使用left_outer进行关联,如果主键不唯一就会造成笛卡尔积,比如下面这种。
在这里插入图片描述

  • 求并集、交集
    实例,构建两个dataframe

    sentenceDataFrame = spark.createDataFrame((
          (1, "asf"),
          (2, "2143"),
          (3, "rfds")
        )).toDF("label", "sentence")
    sentenceDataFrame.show()
    
    sentenceDataFrame1 = spark.createDataFrame((
          (1, "asf"),
          (2, "2143"),
          (4, "f8934y")
        )).toDF("label", "sentence")
    

在这里插入图片描述

  • subtract() #求差集,可指定对某一列求差集,如不指定,将对所有列求差集

    newDF = sentenceDataFrame1.select("sentence").subtract(sentenceDataFrame.select("sentence"))
    

在这里插入图片描述

  • intersect() #交集,可以指定对某一列求交集,如不指定,将对所有列求交集

    newDF2 = sentenceDataFrame1.select("sentence").intersect(sentenceDataFrame.select("sentence"))
    

在这里插入图片描述

  • union() #并集,与上面的表连接的语句相同

    newDF3 = sentenceDataFrame1.select("sentence").union(sentenceDataFrame.select("sentence"))
    

在这里插入图片描述

  • union+distinct #求并集并进行去重,结果可与上图union对比

    newDF4 = sentenceDataFrame1.select("sentence").union(sentenceDataFrame.select("sentence")).distinct()
    

在这里插入图片描述

  • explode()与split() #对某列的数据以固定的分隔符进行分隔,并新增一列

    from pyspark.sql import functions as F
    df11 = df.select("Contract")
    df11.withColumn("Contract_d",F.explode(F.split(df11.Contract,"-"))).show(5)
    

在这里插入图片描述

对于分隔符不唯一的会造成这种情况。

3.2.4统计的操作

  • df.stat.freqItems([“tenure”, “gender”], 0.4).show() # 频数统计与筛选,根据tenure与gender字段,统计该字段值出现频率在40%以上的内容
    在这里插入图片描述

  • crosstab() #交叉分析
    df.crosstab('tenure’, 'Partner’).show(5)
    在这里插入图片描述

  • groupBy() #分组函数

  • df8.groupby('gender’).agg({'tenure’: 'mean’}).show(5)
    在这里插入图片描述

另一个使用实例:df8.groupby('gender’).count().show()#分组并计数
在这里插入图片描述

  • 在分组后使用多个函数进行统计,返回的是DataFrame类型

    from pyspark.sql import functions as F
    df8.groupBy("gender").agg(F.avg("tenure"), F.min("tenure"), F.max("tenure")).show(5)
    

在这里插入图片描述

  • Pivot() #多行转多列,如果一个有三列值的表格使用了分组后再进行pivot转换,那么将会有一列的值会变为转换后的列名。

    df=spark.sparkContext.parallelize([[15,399,2],[15,1401,5],     [15,1608,4],[15,20,4], [18,100,3],[18,1401,3],[18,399,1]])                    .toDF(["userID","movieID","rating"])
    df.show()
    #pivot 多行转多列
    resultDF = df.groupBy("userID").pivot("movieID").sum("rating")
    #结果
    resultDF.show()
    

在这里插入图片描述

3.2.5删除的操作

  • df8.drop(“customerID”).columns或者df.drop(df.customerID). columns #删除age的列
    在这里插入图片描述

  • df = df.na.drop() # 按行将行中含有na的整行删除
    在这里插入图片描述

  • df13 = df8.dropna(subset=['customerID’, 'tenure’]) # 指定删除’customerID’或’tenure’中任一一列包含na的行
    在这里插入图片描述

  • fillna() #填充空值,与df.na.fill()相同
    train.fillna(-1).show() #将所有为na的值填充为-1,可指定某列的值填充为均值或其他统计指标。
    means = df1.select(f.avg(df1.oldbalanceDest))
    df1.na.fill({“oldbalanceDest”:means.toPandas().values[0][0]}).show()

3.2.6去重的操作

  • distinct() #返回一个不包含重复记录的DataFrame
    DF.distinct() #返回当前DataFrame中不重复的Row记录。该方法和接下来的dropDuplicates()方法不传入指定字段时的结果相同。
  • dropDuplicates() #根据指定字段去重。类似于select distinct a, b操作
    train.select('Age’,'Gender’).dropDuplicates().show()

3.2.7格式转换的操作

  • Pandas和Spark的DataFrame两者互相转换:

    pandas_df = spark_df.toPandas()#pandas转spark
    spark_df = sqlContext.createDataFrame(pandas_df)  #spark转pandas
    

注:转化为pandas,但是该数据要读入内存,如果数据量大的话,很难跑得动

两者的优缺点:
Pyspark DataFrame是在分布式节点上运行一些数据操作,而pandas是不可能的;
Pyspark DataFrame的数据反映比较缓慢,没有Pandas那么及时反映;
Pyspark DataFrame的数据框是不可变的,不能任意添加列,只能通过合并进行;
pandas比Pyspark DataFrame有更多方便的操作以及很强大
  • RDD与Spark dataframe的相互转换:

    rdd_df = df.rdd
    df = rdd_df.toDF()
    

3.2.8 SQL操作
如果要从hive中读取数据,就要在开始的时候创建一个sparksession的对象。

from pyspark.sql import SparkSession
spark = SparkSession     .builder     .appName("Python Spark SQL basic example")     .config("spark.some.config.option", "some-value")     .getOrCreate()

其中:
在pyspark中换行要 加入\

  • getOrCreate() 指的是如果当前存在一个SparkSession就直接获取,否则新建。
  • enableHiveSupport() 使我们可以从读取或写入数据到hive。

或者我们还可以自定义配置,例如定义spark任务执行时的内存:

from pyspark.sql import SparkSession
myspark = SparkSession.builder     .appName('compute_customer_age')     .config('spark.executor.memory','2g')     .enableHiveSupport()     .getOrCreate()

在创建完sparksession对象之后,就可以从hive中取数据,进行操作。

sql = """
      SELECT id as customer_id,name, register_date
      FROM [db_name].[hive_table_name]
      limit 100
      """
df = myspark.sql(sql)
df.show(20)

将数据保存到数据库中可以使用下面的语句来实现:

DataFrame.write.mode("overwrite").saveAsTable("test_db.test_table2")

3.2.9 读写csv的操作
可以使用SQLContext类中 load/save函数来读取和保存CSV文件:

from pyspark import SparkContext
from pyspark.sql import SQLContext
sc = SparkContext()
sqlContext = SQLContext(sc)
csv_content = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load(r'./电信客户流失数据\WA_Fn-UseC_-Telco-Customer-Churn.csv')
csv_content.show(10)  #读取
df.select("year", "model").save("newcars.csv", "com.databricks.spark.csv",header="true") #保存

其中,header代表是否显示表头。

注:加载文件也可以使用pandas中的read_csv方法将数据加载为pandas中的DataFrame之后再转为spark中的dataframe进行操作。同理:在保存文件时也可以先试用toPandas方法,再使用pandas中的to_csv方法,更为方便。

使用sparksession中的方法进行读取json与csv文件:

df_sparksession_read = spark.read.csv(r"E: \数据\欺诈数据集\PS_7_log.csv",header=True)
df_sparksession_read.show(10)
或:
df_sparksession_read = spark.read.json(r"E: \数据\欺诈json数据集\PS_7_log.json",header=True)
df_sparksession_read.show(10)

3.3 pyspark.sql.functions中的方法简介

  • pyspark.sql.functions.udf #进行自定义函数的使用, 可以一次执行一行遍历实现自定义函数的功能

    from pyspark.sql.functions import udf
    from pyspark.sql.types import StringType  #函数返回值的类型,要注意原来的数据类型是什么,注意保持一致
    df21 = df.select("tenure")
    def avg_(x):
    if x >= 30:
    return "yes"
    else:
        return "no"
    func = udf(avg_,returnType=StringType())   #注册函数
    df22 = df21.withColumn("avg_", func(df21.tenure))   #调用函数
    df22.select("tenure","avg_").show(5)
    

在这里插入图片描述

  • 在udf中传入多个列或参数时的用法:
    此时数据是这个样子的
    在这里插入图片描述

要新增一列,且新增的一列是通过userID与movieID两列判断或者计算得出的。方法:使用函数的嵌套,将参数间接地传入。

from pyspark.sql import functions as f
def generate_udf(constant_var):
    def test(col1, col2):
        if col1 == col2:
            return col1
        else:
            return constant_var
    return f.udf(test, StringType())

df.withColumn('new_column',generate_udf('default_value')(f.col('userID'), f.col('movieID'))).show()

在这里插入图片描述

使用udf对性能会有负面的影响,如果不是太过于复杂的逻辑,可以使用f.when.when.otherwise()的方式得出想要的结果。

字符串方法

  • 字符串拼接
    from pyspark.sql.functions import concat, concat_ws
    df = spark.createDataFrame([('abcd’,'123’)], ['s’, 'd’])
    1.直接拼接
    df.select(concat(df.s, df.d).alias('s’)).show()
    2.指定拼接符
    df.select(concat_ws(’-’, df.s, df.d).alias('s’)).show()
    在这里插入图片描述

    3.格式化字符串

    from pyspark.sql.functions import format_string
    df = spark.createDataFrame([(5, "hello")], ['a', 'b'])
    df.select(format_string('%d %s', df.a, df.b).alias('v')).withColumnRenamed("v","vv").show()
    

    在这里插入图片描述

    4.查找字符串的位置

    from pyspark.sql.functions import instr
    df = spark.createDataFrame([('abcd',)], ['s'])
    df.select(instr(df.s, 'b').alias('s')).show()
    

    在这里插入图片描述

    5.字符串截取

    from pyspark.sql.functions import substring
    df = spark.createDataFrame([('abcd',)], ['s'])
    df.select(substring(df.s, 1, 2).alias('s')).show()  #1与2表示开始与截取长度
    

    在这里插入图片描述

    6.正则表达式替换

    from pyspark.sql.functions import regexp_replace
    df = spark.createDataFrame([('100sss200',)], ['str'])
    df.select(regexp_replace('str', '(\d)', '-').alias('d')).collect()  #替换类型,正则语句,替换内容
    

在这里插入图片描述

与时间有关的方法

  • 将时间格式进行更改:
    使用pyspark.sql.functions.date_format方法

    from pyspark.sql import functions as F
    df = spark.createDataFrame([('2015-04-08',)], ['dt'])
    df.select(F.date_format('dt', 'yyyyMMdd').alias('date')).collect()
    

在这里插入图片描述

  • 获取当前日期

    from pyspark.sql.functions import current_date
    spark.range(3).withColumn('date',current_date()).show()
    

在这里插入图片描述

  • 获取当前日期时间,

    from pyspark.sql.functions import current_timestamp
    spark.range(3).withColumn('date',current_timestamp()).show()
    

    在这里插入图片描述

  • 将字符串日期改为时间日期格式:

    from pyspark.sql.functions import to_date, to_timestamp
    df = spark.createDataFrame([('1997-02-28 10:30:00',)], ['t'])
    df.select(to_date(df.t).alias('date')).show()   # 1.转日期
    df.select(to_timestamp(df.t).alias('dt')).show()   # 2.带时间的日期
    df.select(to_timestamp(df.t, 'yyyy-MM-dd HH:mm:ss').alias('dt')).show()   # 3.可以指定日期格式
    

    在这里插入图片描述

  • 获取日期中的年月日

from pyspark.sql.functions import year, month, dayofmonth
df = spark.createDataFrame([('2015-04-08',)], ['a'])
df.select(year('a').alias('year'), 
          month('a').alias('month'),
          dayofmonth('a').alias('day')
  ).show()
#只接受以-连接的日期,如果是以/等其他连接符连接的,需要进行格式转换或者字符替换。

在这里插入图片描述

  • 日期差,月份差

    from pyspark.sql.functions import datediff, months_between 
    

1.日期差

df = spark.createDataFrame([('2015-04-08','2015-05-10')], ['d1', 'd2'])
df.select(datediff(df.d2, df.d1).alias('diff')).show() 

2.月份差

df = spark.createDataFrame([('1997-02-28 10:30:00', '1996-10-30')], ['t', 'd'])
df.select(months_between(df.t, df.d).alias('months')).show()

其他方法:

pyspark.sql.functions.abs(col)   #计算绝对值
pyspark.sql.functions.avg(col)   #聚合函数:返回组中的值的平均值。
pyspark.sql.functions.variance(col)   #返回组中值的总体方差
pyspark.sql.functions.ceil(col)   #计算给定值的上限
pyspark.sql.functions.floor(col)   #计算给定值的下限。
pyspark.sql.functions.collect_list(col)   #返回重复对象的列表。
pyspark.sql.functions.collect_set(col)    #返回一组消除重复元素的对象。
pyspark.sql.functions.count(col)    #返回组中的项数量。
pyspark.sql.functions.countDistinct(col, *cols)   #返回一列或多列的去重计数的新列。
pyspark.sql.functions.initcap(col)    #在句子中将每个单词的第一个字母翻译成大写。
pyspark.sql.functions.isnan(col)    #如果列是NaN,则返回true的表达式
pyspark.sql.functions.lit(col)     #创建一个文字值的列
pyspark.sql.functions.lower(col)   #将字符串列转换为小写
pyspark.sql.functions.reverse(col)   #反转字符串列并将其作为新的字符串列返回
pyspark.sql.functions.sort_array(col, asc=True)   #按升序对给定列的输入数组进行排序
pyspark.sql.functions.split(str, pattern)   #按指定字符进行分隔数据
pyspark.sql.functions. array_min (col)   #计算指定列的最小值
pyspark.sql.functions. array_max (col)   #计算指定列的最大值
pyspark.sql.functions.stddev(col)    #返回组中表达式的无偏样本标准差
pyspark.sql.functions.sumDistinct(col)   #返回表达式中不同值的总和
pyspark.sql.functions.trim(col)   #去除空格
pyspark.sql.functions. greatest (col1,col2)   #求行的最大值,可以计算一行中多列的最大值
pyspark.sql.functions. least (col1,col2)   #求行的最小值,可以计算一行中多列的最小值,也可以用lit()指定常数进行与列的值进行比较

3.4 窗口函数的使用

from pyspark.sql import Row, functions as F
from pyspark.sql.window import Window
from pyspark.sql import SQLContext
rddData = sc.parallelize( (Row(c="class1", s=50), Row(c="class2", s=40),                           Row(c="class3", s=70), Row(c="class2", s=49),                           Row(c="class3", s=29), Row(c="class1", s=78)))
sqlContext = SQLContext(sc)
testDF = rddData.toDF()
result = (testDF.select("c", "s", F.row_number().over(Window.partitionBy("c").orderBy("s")).alias("rowNum")))
finalResult = result.where(result.rowNum <= 1).show()

在这里插入图片描述

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多