在本节中,我们将尝试清理和解析日志数据集,以便真正从每个日志消息中提取包含有意义信息的结构化属性。 日志数据理解 如果你熟悉 Web 服务器日志,你将认识到上面显示的数据是通用的日志格式。 我们需要使用一些特定的技术来解析、匹配和提取日志数据中的这些属性。 使用正则表达式进行数据解析和提取 接下来,我们必须将半结构化的日志数据解析为单独的列。我们将使用专门的内置函数regexp_extract()进行解析。此函数将针对具有一个或多个捕获组的正则表达式匹配列,并允许提取其中一个匹配的组。我们将对希望提取的每个字段使用一个正则表达式。 到目前为止,你一定已经听说或使用了大量正则表达式。如果你发现正则表达式令人困惑,并且希望了解更多关于正则表达式的信息,我们建议你访问RegexOne 网站。你可能还会发现,Goyvaerts 和 Levithan 编写的《正则表达式手册》是非常有用的参考资料。 让我们看下我们使用的数据集中的日志总数。 print((base_df.count(), len(base_df.columns))) #Output (3461613, 1) 看起来我们总共有大约 346 万条日志消息。一个不小的数字!让我们提取并查看一些日志消息。 sample_logs=[item['value'] for item in base_df.take(15)] sample_logs 提取主机名 让我们尝试编写一些正则表达式来从日志中提取主机名。 host_pattern=r'(^\S+\.[\S+\.]+\S+)\s' hosts=[re.search(host_pattern, item)(1) if re.search(host_pattern, item) else 'no match' for item in sample_logs] hosts ['199.72.81.55’, ** 'unicomp6.unicomp.net’,** ** '199.120.110.21’,** ** 'burger.letters’,** …, …, ** 'unicomp6.unicomp.net’,** ** 'd104.aa.net’,** ** 'd104.aa.net’]** 提取时间戳 现在让我们尝试使用正则表达式从日志中提取时间戳字段。 ts_pattern=r'\[(\d{2}/\w{3}/\d{4}:\d{2}:\d{2}:\d{2} -\d{4})]' timestamps=[re.search(ts_pattern, item)(1) for item in sample_logs] timestamps ['01/Jul/1995:00:00:01 -0400’, '01/Jul/1995:00:00:06 -0400’, '01/Jul/1995:00:00:09 -0400’, …, …, '01/Jul/1995:00:00:14 -0400’, '01/Jul/1995:00:00:15 -0400’, '01/Jul/1995:00:00:15 -0400’] 提取 HTTP 请求方法、URI 和协议 现在让我们尝试使用正则表达式从日志中提取 HTTP 请求方法、URI 和协议模式字段。 method_uri_protocol_pattern=r'"(\S+)\s(\S+)\s*(\S*)"' method_uri_protocol=[re.search(method_uri_protocol_pattern, item)s() if re.search(method_uri_protocol_pattern, item) else 'no match' for item in sample_logs] method_uri_protocol [('GET’, '/history/apollo/’, 'HTTP/1.0’), ** ('GET’, '/shuttle/countdown/’, 'HTTP/1.0’),** …, …, ** ('GET’, '/shuttle/countdown/count.gif’, 'HTTP/1.0’),** ** ('GET’, '/images/NASA-logosmall.gif’, 'HTTP/1.0’)]** 提取 HTTP 状态码 现在让我们尝试使用正则表达式从日志中提取 HTTP 状态码。 content_size_pattern=r'\s(\d+)$' content_size=[re.search(content_size_pattern, item)(1) for item in sample_logs] print(content_size) ['200’, '200’, '200’, '304’, …, '200’, '200’] 提取 HTTP 响应内容大小 现在让我们尝试使用正则表达式从日志中提取 HTTP 响应内容大小。 content_size_pattern=r'\s(\d+)$' content_size=[re.search(content_size_pattern, item)(1) for item in sample_logs] print(content_size) ['6245’, '3985’, '4085’, '0’, …, '1204’, '40310’, '786’] 把它们放在一起 现在,让我们尝试利用前面构建的所有正则表达式模式,并使用regexp_extract(…)方法构建 DataFrame,所有日志属性都整齐地提取到各自的列中。 from pyspark.sqlctions import regexp_extract logs_df=base_df.select(regexp_extract('value', host_pattern, 1).alias('host'), regexp_extract('value', ts_pattern, 1).alias('timestamp'), regexp_extract('value', method_uri_protocol_pattern, 1).alias('method'), regexp_extract('value', method_uri_protocol_pattern, 2).alias('endpoint'), regexp_extract('value', method_uri_protocol_pattern, 3).alias('protocol'), regexp_extract('value', status_pattern, 1).cast('integer').alias('status'), regexp_extract('value', content_size_pattern, 1).cast('integer').alias('content_size')) logs_df(10, truncate=True) print((logs_df.count(), len(logs_df.columns))) 查找缺失值 缺失值和空值是数据分析和机器学习的祸根。让我们看看我们的数据解析和提取逻辑是如何工作的。首先,让我们验证原始数据框中有没有空行。 (base_df .filter(base_df['value'] .isNull()) .count()) 0 没问题!现在,如果我们的数据解析和提取毕业证书工作正常,我们就不应该有任何可能存在空值的行。让我们来试试吧! bad_rows_df=logs_df.filter(logs_df['host'].isNull()| logs_df['timestamp'].isNull() | logs_df['method'].isNull() | logs_df['endpoint'].isNull() | logs_df['status'].isNull() | logs_df['content_size'].isNull()| logs_df['protocol'].isNull()) bad_rows_df.count() 33905 哎哟!看起来我们的数据中有超过 33K 的缺失值!我们能搞定吗? 请记住,这不是一个常规的 pandas DataFrame,你无法直接查询并获得哪些列为空。我们所谓的大数据集驻留在磁盘上,它可能存在于 Spark 集群中的多个节点上。那么我们如何找出哪些列有可能为空呢? 查找 Null 值 我们通常可以使用以下技术找出哪些列具有空值。(注意:这种方法是从 StackOverflow 上的一个绝妙的回答改造而来的。) from pyspark.sqlctions import col from pyspark.sqlctions import sum as spark_sum def count_null(col_name): return spark_sum(col(col_name).isNull().cast('integer')).alias(col_name) # Build up a list of column expressions, one per column. exprs=[count_null(col_name) for col_name in logs_df.columns] # Run the aggregation. The *exprs converts the list of expressions into # variable function arguments. logs_df.agg(*exprs)() 看起来status列中有一个缺失值而其它的都在content_size列中。让我们看看能不能找出问题所在! 处理 HTTP 状态中的空值 状态列解析使用的原始正则表达式是: regexp_extract('value', r'\s(\d{3})\s', 1).cast('integer') .alias( 'status') 是否有更多的数字使正则表达式出错?还是数据点本身的问题?让我们试着找出答案。 注意:在下面的表达式中,~表示“非”。 null_status_df=base_df.filter(~base_df['value'].rlike(r'\s(\d{3})\s')) null_status_df.count() 1 让我们看看这条糟糕的记录是什么样子? null_status_df(truncate=False) 看起来像一条有很多信息丢失的记录!让我们通过日志数据解析管道来传递它。 bad_status_df=null_status_df.select(regexp_extract('value', host_pattern, 1).alias('host'), regexp_extract('value', ts_pattern, 1).alias('timestamp'), regexp_extract('value', method_uri_protocol_pattern, 1).alias('method'), regexp_extract('value', method_uri_protocol_pattern, 2).alias('endpoint'), regexp_extract('value', method_uri_protocol_pattern, 3).alias('protocol'), regexp_extract('value', status_pattern, 1).cast('integer').alias('status'), regexp_extract('value', content_size_pattern, 1).cast('integer').alias('content_size')) bad_status_df(truncate=False) (图片) 看起来这条记录本身是一个不完整的记录,没有有用的信息,最好的选择是删除这条记录,如下所示! logs_df=logs_df[logs_df['status'].isNotNull()] exprs=[count_null(col_name) for col_name in logs_df.columns] logs_df.agg(*exprs)() 处理 HTTP content size 列中的空值 根据之前的正则表达式,content_size列的原始解析正则表达式为: regexp_extract('value', r'\s(\d+)$', 1).cast('integer') .alias('content_size') 原始数据集中是否有数据丢失?让我们试着找出答案吧!我们首先尝试找出基本 DataFrame 中可能缺少内容大小的记录。 null_content_size_df=base_df.filter(~base_df['value'].rlike(r'\s\d+$')) null_content_size_df.count() 33905 这个数值似乎与处理后的 DataFrame 中缺失的内容大小的数量相匹配。让我们来看看我们的数据框中缺少内容大小的前十条记录。 null_content_size_df.take(10) 很明显,糟糕的原始数据记录对应错误响应,其中没有发回任何内容,服务器为content_size字段发出了一个“-”。 因为我们不想从我们的分析中丢弃这些行,所以我们把它们代入或填充为 0。 修复 content_size 为 null 的行 最简单的解决方案是像前面讨论的那样,用 0 替换logs_df中的 null 值。Spark DataFrame API 提供了一组专门为处理 null 值而设计的函数和字段,其中包括: fillna():用指定的非空值填充空值。 na:它返回一个DataFrameNaFunctions对象,其中包含许多用于在空列上进行操作的函数。 有几种方法可以调用这个函数。最简单的方法就是用已知值替换所有空列。但是,为了安全起见,最好传递一个包含(column_name, value)映射的 Python 字典。这就是我们要做的。下面是文档中的一个示例: >>> df4.na.fill({'age': 50, 'name': 'unknown'})() +---+------+-------+ |age|height| name| +---+------+-------+ | 10| 80| Alice| | 5| null| Bob| | 50| null| Tom| | 50| null|unknown| +---+------+-------+ 现在我们使用这个函数,用 0 填充content_size字段中所有缺失的值! logs_df=logs_df.na.fill({'content_size': 0}) exprs=[count_null(col_name) for col_name in logs_df.columns] logs_df.agg(*exprs)() 看,没有缺失值了! 处理时间字段(时间戳) 现在我们有了一个干净的、已解析的 DataFrame,我们必须将 timestamp 字段解析为一个实际的时间戳。通用的日志格式时间有点不标准。用户定义函数(UDF)是解析它最直接的方法。 from pyspark.sqlctions import udf month_map={ 'Jan': 1, 'Feb': 2, 'Mar':3, 'Apr':4, 'May':5, 'Jun':6, 'Jul':7, 'Aug':8, 'Sep': 9, 'Oct':10, 'Nov': 11, 'Dec': 12 } def parse_clf_time(text): """ Convert Common Log time format into a Python datetime object Args: text (str): date and time in Apache time format [dd/mmm/yyyy:hh:mm:ss (+/-)zzzz] Returns: a string suitable for passing to CAST('timestamp') """ # NOTE: We're ignoring the time zones here, might need to be handled depending on the problem you are solving return "{0:04d}-{1:02d}-{2:02d} {3:02d}:{4:02d}:{5:02d}".format( int(text[7:11]), month_map[text[3:6]], int(text[0:2]), int(text[12:14]), int(text[15:17]), int(text[18:20]) ) 现在,让我们使用这个函数来解析 DataFrame 中的time列。 udf_parse_time=udf(parse_clf_time) logs_df=(logs_df.select('*', udf_parse_time(logs_df['timestamp']) .cast('timestamp') .alias('time')) .drop('timestamp') logs_df(10, truncate=True) 一切看起来都很好!让我们通过检查 DataFrame 的模式来验证这一点。 logs_df.printSchema() root |-- host: string (nullable=true) |-- method: string (nullable=true) |-- endpoint: string (nullable=true) |-- protocol: string (nullable=true) |-- status: integer (nullable=true) |-- content_size: integer (nullable=false) |-- time: timestamp (nullable=true) 现在,让我们缓存logs_df,因为我们将在下一部分的数据分析部分中大量地使用它! logs_df.cache() |
|
来自: 新用户0175WbuX > 《待分类》