分享

Apache Spark 实现可扩展日志分析,挖掘系统最大潜力(2)

 新用户0175WbuX 2022-02-16

  在本节中,我们将尝试清理和解析日志数据集,以便真正从每个日志消息中提取包含有意义信息的结构化属性。

  日志数据理解

  如果你熟悉 Web 服务器日志,你将认识到上面显示的数据是通用的日志格式。

  Apache Spark 实现可扩展日志分析,挖掘系统最大潜力(2)

  我们需要使用一些特定的技术来解析、匹配和提取日志数据中的这些属性。

  使用正则表达式进行数据解析和提取

  接下来,我们必须将半结构化的日志数据解析为单独的列。我们将使用专门的内置函数regexp_extract()进行解析。此函数将针对具有一个或多个捕获组的正则表达式匹配列,并允许提取其中一个匹配的组。我们将对希望提取的每个字段使用一个正则表达式。

  到目前为止,你一定已经听说或使用了大量正则表达式。如果你发现正则表达式令人困惑,并且希望了解更多关于正则表达式的信息,我们建议你访问RegexOne 网站。你可能还会发现,Goyvaerts 和 Levithan 编写的《正则表达式手册》是非常有用的参考资料。

  让我们看下我们使用的数据集中的日志总数。

  print((base_df.count(), len(base_df.columns)))

  #Output

  (3461613, 1)

  看起来我们总共有大约 346 万条日志消息。一个不小的数字!让我们提取并查看一些日志消息。

  sample_logs=[item['value'] for item in base_df.take(15)]

  sample_logs

  Apache Spark 实现可扩展日志分析,挖掘系统最大潜力(2)

  提取主机名

  让我们尝试编写一些正则表达式来从日志中提取主机名。

  host_pattern=r'(^\S+\.[\S+\.]+\S+)\s'

  hosts=[re.search(host_pattern, item)(1)

  if re.search(host_pattern, item)

  else 'no match'

  for item in sample_logs]

  hosts

  ['199.72.81.55’,

  ** 'unicomp6.unicomp.net’,**

  ** '199.120.110.21’,**

  ** 'burger.letters’,**

  …,

  …,

  ** 'unicomp6.unicomp.net’,**

  ** 'd104.aa.net’,**

  ** 'd104.aa.net’]**

  提取时间戳

  现在让我们尝试使用正则表达式从日志中提取时间戳字段。

  ts_pattern=r'\[(\d{2}/\w{3}/\d{4}:\d{2}:\d{2}:\d{2} -\d{4})]'

  timestamps=[re.search(ts_pattern, item)(1) for item in sample_logs]

  timestamps

  ['01/Jul/1995:00:00:01 -0400’,

  '01/Jul/1995:00:00:06 -0400’,

  '01/Jul/1995:00:00:09 -0400’,

  …,

  …,

  '01/Jul/1995:00:00:14 -0400’,

  '01/Jul/1995:00:00:15 -0400’,

  '01/Jul/1995:00:00:15 -0400’]

  提取 HTTP 请求方法、URI 和协议

  现在让我们尝试使用正则表达式从日志中提取 HTTP 请求方法、URI 和协议模式字段。

  method_uri_protocol_pattern=r'"(\S+)\s(\S+)\s*(\S*)"'

  method_uri_protocol=[re.search(method_uri_protocol_pattern, item)s()

  if re.search(method_uri_protocol_pattern, item)

  else 'no match'

  for item in sample_logs]

  method_uri_protocol

  [('GET’, '/history/apollo/’, 'HTTP/1.0’),

  ** ('GET’, '/shuttle/countdown/’, 'HTTP/1.0’),**

  …,

  …,

  ** ('GET’, '/shuttle/countdown/count.gif’, 'HTTP/1.0’),**

  ** ('GET’, '/images/NASA-logosmall.gif’, 'HTTP/1.0’)]**

  提取 HTTP 状态码

  现在让我们尝试使用正则表达式从日志中提取 HTTP 状态码。

  content_size_pattern=r'\s(\d+)$'

  content_size=[re.search(content_size_pattern, item)(1) for item in sample_logs]

  print(content_size)

  ['200’, '200’, '200’, '304’, …, '200’, '200’]

  提取 HTTP 响应内容大小

  现在让我们尝试使用正则表达式从日志中提取 HTTP 响应内容大小。

  content_size_pattern=r'\s(\d+)$'

  content_size=[re.search(content_size_pattern, item)(1) for item in sample_logs]

  print(content_size)

  ['6245’, '3985’, '4085’, '0’, …, '1204’, '40310’, '786’]

  把它们放在一起

  现在,让我们尝试利用前面构建的所有正则表达式模式,并使用regexp_extract(…)方法构建 DataFrame,所有日志属性都整齐地提取到各自的列中。

  from pyspark.sqlctions import regexp_extract

  logs_df=base_df.select(regexp_extract('value', host_pattern, 1).alias('host'),

  regexp_extract('value', ts_pattern, 1).alias('timestamp'),

  regexp_extract('value', method_uri_protocol_pattern, 1).alias('method'),

  regexp_extract('value', method_uri_protocol_pattern, 2).alias('endpoint'),

  regexp_extract('value', method_uri_protocol_pattern, 3).alias('protocol'),

  regexp_extract('value', status_pattern, 1).cast('integer').alias('status'),

  regexp_extract('value', content_size_pattern, 1).cast('integer').alias('content_size'))

  logs_df(10, truncate=True)

  print((logs_df.count(), len(logs_df.columns)))

  Apache Spark 实现可扩展日志分析,挖掘系统最大潜力(2)

  查找缺失值

  缺失值和空值是数据分析和机器学习的祸根。让我们看看我们的数据解析和提取逻辑是如何工作的。首先,让我们验证原始数据框中有没有空行。

  (base_df

  .filter(base_df['value']

  .isNull())

  .count())

  0

  没问题!现在,如果我们的数据解析和提取毕业证书工作正常,我们就不应该有任何可能存在空值的行。让我们来试试吧!

  bad_rows_df=logs_df.filter(logs_df['host'].isNull()|

  logs_df['timestamp'].isNull() |

  logs_df['method'].isNull() |

  logs_df['endpoint'].isNull() |

  logs_df['status'].isNull() |

  logs_df['content_size'].isNull()|

  logs_df['protocol'].isNull())

  bad_rows_df.count()

  33905

  哎哟!看起来我们的数据中有超过 33K 的缺失值!我们能搞定吗?

  请记住,这不是一个常规的 pandas DataFrame,你无法直接查询并获得哪些列为空。我们所谓的大数据集驻留在磁盘上,它可能存在于 Spark 集群中的多个节点上。那么我们如何找出哪些列有可能为空呢?

  查找 Null 值

  我们通常可以使用以下技术找出哪些列具有空值。(注意:这种方法是从 StackOverflow 上的一个绝妙的回答改造而来的。)

  from pyspark.sqlctions import col

  from pyspark.sqlctions import sum as spark_sum

  def count_null(col_name):

  return spark_sum(col(col_name).isNull().cast('integer')).alias(col_name)

  # Build up a list of column expressions, one per column.

  exprs=[count_null(col_name) for col_name in logs_df.columns]

  # Run the aggregation. The *exprs converts the list of expressions into

  # variable function arguments.

  logs_df.agg(*exprs)()

  Apache Spark 实现可扩展日志分析,挖掘系统最大潜力(2)

  看起来status列中有一个缺失值而其它的都在content_size列中。让我们看看能不能找出问题所在!

  处理 HTTP 状态中的空值

  状态列解析使用的原始正则表达式是:

  regexp_extract('value', r'\s(\d{3})\s', 1).cast('integer')

  .alias( 'status')

  是否有更多的数字使正则表达式出错?还是数据点本身的问题?让我们试着找出答案。

  注意:在下面的表达式中,~表示“非”。

  null_status_df=base_df.filter(~base_df['value'].rlike(r'\s(\d{3})\s'))

  null_status_df.count()

  1

  让我们看看这条糟糕的记录是什么样子?

  null_status_df(truncate=False)

  Apache Spark 实现可扩展日志分析,挖掘系统最大潜力(2)

  看起来像一条有很多信息丢失的记录!让我们通过日志数据解析管道来传递它。

  bad_status_df=null_status_df.select(regexp_extract('value', host_pattern, 1).alias('host'),

  regexp_extract('value', ts_pattern, 1).alias('timestamp'),

  regexp_extract('value', method_uri_protocol_pattern, 1).alias('method'),

  regexp_extract('value', method_uri_protocol_pattern, 2).alias('endpoint'),

  regexp_extract('value', method_uri_protocol_pattern, 3).alias('protocol'),

  regexp_extract('value', status_pattern, 1).cast('integer').alias('status'),

  regexp_extract('value', content_size_pattern, 1).cast('integer').alias('content_size'))

  bad_status_df(truncate=False)

  (图片)

  看起来这条记录本身是一个不完整的记录,没有有用的信息,最好的选择是删除这条记录,如下所示!

  logs_df=logs_df[logs_df['status'].isNotNull()]

  exprs=[count_null(col_name) for col_name in logs_df.columns]

  logs_df.agg(*exprs)()

  Apache Spark 实现可扩展日志分析,挖掘系统最大潜力(2)

  处理 HTTP content size 列中的空值

  根据之前的正则表达式,content_size列的原始解析正则表达式为:

  regexp_extract('value', r'\s(\d+)$', 1).cast('integer')

  .alias('content_size')

  原始数据集中是否有数据丢失?让我们试着找出答案吧!我们首先尝试找出基本 DataFrame 中可能缺少内容大小的记录。

  null_content_size_df=base_df.filter(~base_df['value'].rlike(r'\s\d+$'))

  null_content_size_df.count()

  33905

  这个数值似乎与处理后的 DataFrame 中缺失的内容大小的数量相匹配。让我们来看看我们的数据框中缺少内容大小的前十条记录。

  null_content_size_df.take(10)

  很明显,糟糕的原始数据记录对应错误响应,其中没有发回任何内容,服务器为content_size字段发出了一个“-”。

  因为我们不想从我们的分析中丢弃这些行,所以我们把它们代入或填充为 0。

  修复 content_size 为 null 的行

  最简单的解决方案是像前面讨论的那样,用 0 替换logs_df中的 null 值。Spark DataFrame API 提供了一组专门为处理 null 值而设计的函数和字段,其中包括:

  fillna():用指定的非空值填充空值。

  na:它返回一个DataFrameNaFunctions对象,其中包含许多用于在空列上进行操作的函数。

  有几种方法可以调用这个函数。最简单的方法就是用已知值替换所有空列。但是,为了安全起见,最好传递一个包含(column_name, value)映射的 Python 字典。这就是我们要做的。下面是文档中的一个示例:

  >>> df4.na.fill({'age': 50, 'name': 'unknown'})()

  +---+------+-------+

  |age|height| name|

  +---+------+-------+

  | 10| 80| Alice|

  | 5| null| Bob|

  | 50| null| Tom|

  | 50| null|unknown|

  +---+------+-------+

  现在我们使用这个函数,用 0 填充content_size字段中所有缺失的值!

  logs_df=logs_df.na.fill({'content_size': 0})

  exprs=[count_null(col_name) for col_name in logs_df.columns]

  logs_df.agg(*exprs)()

  Apache Spark 实现可扩展日志分析,挖掘系统最大潜力(2)

  看,没有缺失值了!

  处理时间字段(时间戳)

  现在我们有了一个干净的、已解析的 DataFrame,我们必须将 timestamp 字段解析为一个实际的时间戳。通用的日志格式时间有点不标准。用户定义函数(UDF)是解析它最直接的方法。

  from pyspark.sqlctions import udf

  month_map={

  'Jan': 1, 'Feb': 2, 'Mar':3, 'Apr':4, 'May':5, 'Jun':6, 'Jul':7,

  'Aug':8, 'Sep': 9, 'Oct':10, 'Nov': 11, 'Dec': 12

  }

  def parse_clf_time(text):

  """ Convert Common Log time format into a Python datetime object

  Args:

  text (str): date and time in Apache time format [dd/mmm/yyyy:hh:mm:ss (+/-)zzzz]

  Returns:

  a string suitable for passing to CAST('timestamp')

  """

  # NOTE: We're ignoring the time zones here, might need to be handled depending on the problem you are solving

  return "{0:04d}-{1:02d}-{2:02d} {3:02d}:{4:02d}:{5:02d}".format(

  int(text[7:11]),

  month_map[text[3:6]],

  int(text[0:2]),

  int(text[12:14]),

  int(text[15:17]),

  int(text[18:20])

  )

  现在,让我们使用这个函数来解析 DataFrame 中的time列。

  udf_parse_time=udf(parse_clf_time)

  logs_df=(logs_df.select('*', udf_parse_time(logs_df['timestamp'])

  .cast('timestamp')

  .alias('time'))

  .drop('timestamp')

  logs_df(10, truncate=True)

  Apache Spark 实现可扩展日志分析,挖掘系统最大潜力(2)

  一切看起来都很好!让我们通过检查 DataFrame 的模式来验证这一点。

  logs_df.printSchema()

  root

  |-- host: string (nullable=true)

  |-- method: string (nullable=true)

  |-- endpoint: string (nullable=true)

  |-- protocol: string (nullable=true)

  |-- status: integer (nullable=true)

  |-- content_size: integer (nullable=false)

  |-- time: timestamp (nullable=true)

  现在,让我们缓存logs_df,因为我们将在下一部分的数据分析部分中大量地使用它!

  logs_df.cache()

    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多