2017年06月22日 来源:CSDN
这里我们展示如何使用Spark来分析网络服务器日志。
服务器日志是富含信息的通用大数据。Spark提供了能快速简便执行这类数据分析的工具。日志数据可以来自多个源,如网络、文件、计算机服务器、应用日志、用户产生的内容,并可以用于监视服务器、改善商业和客户信息、构建推荐系统、欺诈侦测以及更多的应用。
首先我们需要载入一些会用到的库
- import re
- import datetime
- from databricks_test_helper import Test
载入日志文件并快速浏览一下数据
- import sys
- import os
-
- log_file_path = “...”
-
- base_df = sqlContext.read.text(log_file_path)
-
- base_df.printSchema()
- base_df.show(truncate=False)
尝试性的进行数据分析
网络日志一般含有以下信息:
remotehost rfc931 authuser [date] "request" status bytes
field |
meaning |
---|
remotehost |
Remote hostname (or IP number if DNS hostname is not available). |
rfc931 |
The remote logname of the user. We don't really care about this field. |
authuser |
The username of the remote user, as authenticated by the HTTP server. |
[date] |
The date and time of the request. |
"request" |
The request, exactly as it came from the browser or client. |
status |
The HTTP status code the server sent back to the client. |
bytes |
The number of bytes (Content-Length ) transferred to the client. |
接下来,我们要将这些信息解析到单独的栏,我们使用内置的regexp_extract()函数。
- from pyspark.sql.functions import split, regexp_extract
- split_df = base_df.select(regexp_extract('value', r'^([^\s]+\s)', 1).alias('host'),
- regexp_extract('value', r'^.*\[(\d\d/\w{3}/\d{4}:\d{2}:\d{2}:\d{2} -\d{4})]', 1).alias('timestamp'),
- regexp_extract('value', r'^.*"\w+\s+([^\s]+)\s+HTTP.*"', 1).alias('path'),
- regexp_extract('value', r'^.*"\s+([^\s]+)', 1).cast('integer').alias('status'),
- regexp_extract('value', r'^.*\s+(\d+)$', 1).cast('integer').alias('content_size'))
- split_df.show(truncate=False)
数据清洗
首先我们要将原始数据集中的所有null行去除
- base_df.filter(base_df['value'].isNull()).count()
进行一个快速的检查
- bad_rows_df = split_df.filter(split_df['host'].isNull() |
- split_df['timestamp'].isNull() |
- split_df['path'].isNull() |
- split_df['status'].isNull() |
- split_df['content_size'].isNull())
- bad_rows_df.count()
你会发现我们依然还有一些null值,让我们看一下哪些列有问题。
- from pyspark.sql.functions import col, sum
-
- def count_null(col_name):
- return sum(col(col_name).isNull().cast('integer')).alias(col_name)
-
- exprs = []
- for col_name in split_df.columns:
- exprs.append(count_null(col_name))
-
- split_df.agg(*exprs).show()
我们此前用了\d+来选择每行输入最后的一个以上的数字,是否可能这一栏没有合法的内容规模?
- bad_content_size_df = base_df.filter(~ base_df['value'].rlike(r'\d+$'))
- bad_content_size_df.count()
我们看一下一些含错误值的行
- from pyspark.sql.functions import lit, concat
- bad_content_size_df.select(concat(bad_content_size_df['value'], lit('*'))).show(truncate=False)
我们简单的使用fillna()来处理含有null的行
- cleaned_df = split_df.na.fill({'content_size': 0})
检查一下所有的null值都被进行了处理
- exprs = []
- for col_name in cleaned_df.columns:
- exprs.append(count_null(col_name))
-
- cleaned_df.agg(*exprs).show()
现在我们要来处理时间戳,由于时间往往不是统一标准因此最简单的方式使用一个用户定义的函数(UDF)来解析它。
- month_map = {
- 'Jan': 1, 'Feb': 2, 'Mar':3, 'Apr':4, 'May':5, 'Jun':6, 'Jul':7,
- 'Aug':8, 'Sep': 9, 'Oct':10, 'Nov': 11, 'Dec': 12
- }
-
- def parse_clf_time(s):
- """ Convert Common Log time format into a Python datetime object
- Args:
- s (str): date and time in Apache time format [dd/mmm/yyyy:hh:mm:ss (+/-)zzzz]
- Returns:
- a string suitable for passing to CAST('timestamp')
- """
-
- return "{0:04d}-{1:02d}-{2:02d} {3:02d}:{4:02d}:{5:02d}".format(
- int(s[7:11]),
- month_map[s[3:6]],
- int(s[0:2]),
- int(s[12:14]),
- int(s[15:17]),
- int(s[18:20])
- )
-
- u_parse_time = udf(parse_clf_time)
-
- logs_df = cleaned_df.select('*', u_parse_time(cleaned_df['timestamp']).cast('timestamp').alias('time')).drop('timestamp')
- total_log_entries = logs_df.count()
现在我们来看一下网络服务器返回内容的一些统计指标。
- content_size_summary_df = logs_df.describe(['content_size'])
- content_size_summary_df.show()
此外,我们也可以使用SQL直接计算这些数值。
- from pyspark.sql import functions as sqlFunctions
- content_size_stats = (logs_df
- .agg(sqlFunctions.min(logs_df['content_size']),
- sqlFunctions.avg(logs_df['content_size']),
- sqlFunctions.max(logs_df['content_size']))
- .first())
-
- print 'Using SQL functions:'
- print 'Content Size Avg: {1:,.2f}; Min: {0:.2f}; Max: {2:,.0f}'.format(*content_size_stats)
我们看一下日志中HTTP的状态值
- status_to_count_df =(logs_df
- .groupBy('status')
- .count()
- .sort('status')
- .cache())
-
- status_to_count_length = status_to_count_df.count()
- print 'Found %d response codes' % status_to_count_length
- status_to_count_df.show()
-
- assert status_to_count_length == 7
- assert status_to_count_df.take(100) == [(200, 940847), (302, 16244), (304, 79824), (403, 58), (404, 6185), (500, 2), (501, 17)]
可以使用内置函数display()来对它进行图形化
- display(status_to_count_df)
由于数据间差异太大,我们可以取它们的对数值以更好的进行比较。
- log_status_to_count_df = status_to_count_df.withColumn('log(count)', sqlFunctions.log(status_to_count_df['count']))
-
- display(log_status_to_count_df)
我们可以进一步的使用例如matplotlib之类库进行更好的图形化
- from spark_notebook_helpers import prepareSubplot, np, plt, cm
- data = log_status_to_count_df.drop('count').collect()
- x, y = zip(*data)
- index = np.arange(len(x))
- bar_width = 0.7
- colorMap = 'Set1'
- cmap = cm.get_cmap(colorMap)
-
- fig, ax = prepareSubplot(np.arange(0, 6, 1), np.arange(0, 14, 2))
- plt.bar(index, y, width=bar_width, color=cmap(0))
- plt.xticks(index + bar_width/2.0, x)
- display(fig)
我们现在看一下访问服务器最频繁的那些hosts
- host_sum_df =(logs_df
- .groupBy('host')
- .count())
-
- host_more_than_10_df = (host_sum_df
- .filter(host_sum_df['count'] > 10)
- .select(host_sum_df['host']))
-
- print 'Any 20 hosts that have accessed more then 10 times:\n'
- host_more_than_10_df.show(truncate=False)
现在我们图形化日志中对于路径(URIs)的点击量。
- paths_df = (logs_df
- .groupBy('path')
- .count()
- .sort('count', ascending=False))
-
- paths_counts = (paths_df
- .select('path', 'count')
- .map(lambda r: (r[0], r[1]))
- .collect())
-
- paths, counts = zip(*paths_counts)
-
- colorMap = 'Accent'
- cmap = cm.get_cmap(colorMap)
- index = np.arange(1000)
-
- fig, ax = prepareSubplot(np.arange(0, 1000, 100), np.arange(0, 70000, 10000))
- plt.xlabel('Paths')
- plt.ylabel('Number of Hits')
- plt.plot(index, counts[:1000], color=cmap(0), linewidth=3)
- plt.axhline(linewidth=2, color='#999999')
- display(fig)
找到错误路径的前十位
- from pyspark.sql.functions import desc
- not200DF = logs_df.filter('status != 200')
- not200DF.show(10)
-
- logs_sum_df = not200DF.groupBy('path').count().sort('count', ascending=False)
-
- print 'Top Ten failed URLs:'
- logs_sum_df.show(10, False)
-
- top_10_err_urls = [(row[0], row[1]) for row in logs_sum_df.take(10)]
独立hosts的数量
- unique_host_count = logs_df.select('host').distinct().count()
- print 'Unique hosts: {0}'.format(unique_host_count)
每日独立hosts的数量
- from pyspark.sql.functions import dayofmonth
-
- day_to_host_pair_df = logs_df.select('host', dayofmonth('time').alias('day'))
- day_group_hosts_df = day_to_host_pair_df.drop_duplicates()
- daily_hosts_df = day_group_hosts_df.groupBy('day').count().cache()
-
- print 'Unique hosts per day:'
- daily_hosts_df.show(30, False)
每个host每日请求的平均数
- total_req_per_day_df = logs_df.groupBy(dayofmonth('time').alias('day')).count()
-
- avg_daily_req_per_host_df = (
- total_req_per_day_df.join(daily_hosts_df, 'day').select(total_req_per_day_df.day, (total_req_per_day_df['count'] / daily_hosts_df['count']).alias('avg_reqs_per_host_per_day')).cache()
- )
-
- print 'Average number of daily requests per Hosts is:\n'
- avg_daily_req_per_host_df.show()
探究404状态
计算有多少404反馈
- not_found_df = logs_df.filter('status = 404').cache()
- print('Found {0} 404 URLs').format(not_found_df.count())
列举404状态的记录
- not_found_paths_df = not_found_df.select('path')
- unique_not_found_paths_df = not_found_paths_df.distinct()
-
- print '404 URLS:\n'
- unique_not_found_paths_df.show(n=40, truncate=False)
列举404反馈的路径
- top_20_not_found_df = not_found_df.groupBy('path').count().sort('count', ascending=False)
-
- print 'Top Twenty 404 URLs:\n'
- top_20_not_found_df.show(n=20, truncate=False)
列举404反馈的hosts
- hosts_404_count_df = not_found_df.groupBy('host').count().sort('count', ascending=False)
-
- print 'Top 25 hosts that generated errors:\n'
- hosts_404_count_df.show(n=25, truncate=False)
列举每天404错误
- errors_by_date_sorted_df = not_found_df.groupBy(dayofmonth('time').alias('day')).count().cache()
-
- print '404 Errors by day:\n'
- errors_by_date_sorted_df.show()
404错误最频繁的天数
- top_err_date_df = errors_by_date_sorted_df.sort('count', ascending=False)
-
- print 'Top Five Dates for 404 Requests:\n'
- top_err_date_df.show(5)
每小时的404错误
- from pyspark.sql.functions import hour
- hour_records_sorted_df = not_found_df.groupBy(hour('time').alias('hour')).count().cache()
-
- print 'Top hours for 404 requests:\n'
- hour_records_sorted_df.show(24)
我们可以看到通过使用Spark我们可以快速的对网络服务器日志的各个方面进行分析,了解我们所需要的信息,尤其是对404的详细分析使我们能发现运行中的一些问题,更好的改善用户体验,尤为重要的是我们可以借助Spark分布式处理系统对大量的数据在短时间内做出处理,并能通过图形化直观的进行展示。
|