需求描述
一份数据集描述每个站点在各个时间段内的总降雨量,可以将这个时间段的每分钟平均降雨量视为这1分钟的降雨量,最终整理合并出按整点小时统计的降雨量。例如结果是6:00-7:00,7:00-8:00等。
按照需求方的计算思路是先按照如下方法计算:
然后重采样到小时的单位。
但是原始数据集有上百万条,经过这样处理后可能出现上亿条中间数据,很可能导致内存不足。所以我出场使用一些几乎不使用额外内存的巧妙方式。
需求解决
首先读取数据集:
import pandas as pd
df = pd.read_csv("river/data1980.csv", parse_dates=["s_date", "e_date"])
df.sort_values(by=['code', 's_date'], ignore_index=True, inplace=True)
print(df.shape)
df.head(20)
(532390, 4)
咱们的测试数据共48万条。可以很明显的看到角标11到角标12的时间段再按小时拆分后存在重叠时间段,需要合并。
最终完整处理代码为:
result = []
# last_code用于记录上一个站
last_code = None
for code, s_date, e_date, flow in df.itertuples(False):
# 当前时间段的总秒数
total_minutes = (e_date - s_date).total_seconds()
# 按小时整点处切分当前时间段,并处理边界问题
dates = pd.date_range(s_date.ceil('H'), e_date.floor('H'), freq='h').to_list()
if len(dates) == 0 or s_date != dates[0]:
dates.insert(0, s_date)
if len(dates) == 1 or e_date != dates[-1]:
dates.append(e_date)
# 对于同一站而言判断当前起始时间是否可以和前一条数据合并
cur_minutes = (dates[1]-dates[0]).total_seconds()
if code == last_code and result[-1][1]+pd.tseries.offsets.Hour() >= dates[1]:
result[-1][2] = dates[1]
result[-1][3] += cur_minutes*flow/total_minutes
else:
result.append([code, dates[0], dates[1],
cur_minutes*flow/total_minutes])
# 处理除第一个时间段以外的剩余时间段
for start_date, end_date in zip(dates[1:-1], dates[2:]):
cur_minutes = (end_date-start_date).total_seconds()
result.append([code, start_date, end_date,
cur_minutes*flow/total_minutes])
last_code = code
result = pd.DataFrame(result, columns=["code", "start_date", "end_date", "flow"])
result.start_date = result.start_date.dt.floor("H")
result.end_date = result.end_date.dt.ceil("H")
result.sort_values(by=['code', 'end_date'], ignore_index=True,
ascending=True, inplace=True)
print(result.shape)
result.head(100)
共134万多条结果数据:
(1346022, 4)
截取一部分有代表性的合并结果示例:
后面需求方又要求缺失断层的时间段用0填充,基于前面的代码修改,经初步测试出现严重的内存不足,于是进一步升级分区处理,每个站点处理完成结果便落地磁盘。
最终代码为:
import os
for i, (code, df_split) in enumerate(df.groupby("code"), 1):
print(i, code)
result = []
for code, s_date, e_date, flow in df_split.itertuples(False):
# 当前时间段的总秒数
total_minutes = (e_date - s_date).total_seconds()
# 按小时整点处切分当前时间段,并处理边界问题
dates = pd.date_range(s_date.ceil(
'H'), e_date.floor('H'), freq='h').to_list()
if len(dates) == 0 or s_date != dates[0]:
dates.insert(0, s_date)
if len(dates) == 1 or e_date != dates[-1]:
dates.append(e_date)
cur_minutes = (dates[1]-dates[0]).total_seconds()
# 判断当前起始时间是否可以和前一条数据合并
if result and result[-1][1]+pd.tseries.offsets.Hour() >= dates[1]:
result[-1][2] = dates[1]
result[-1][3] += cur_minutes*flow/total_minutes
else:
# 与前一条数据相差超过1小时的需要填充0
if result and result[-1][2]+pd.tseries.offsets.Hour() <= dates[0]:
date_zeros = pd.date_range(
result[-1][2].ceil('H'), dates[0].floor('H'), freq='h')
for start_date, end_date in zip(date_zeros[:-1], date_zeros[1:]):
result.append([code, start_date, end_date, 0])
result.append([code, dates[0], dates[1],
cur_minutes*flow/total_minutes])
# 处理除第一个时间段以外的剩余时间段
for start_date, end_date in zip(dates[1:-1], dates[2:]):
cur_minutes = (end_date-start_date).total_seconds()
result.append([code, start_date, end_date,
cur_minutes*flow/total_minutes])
result = pd.DataFrame(
result, columns=["code", "start_date", "end_date", "flow"])
result.start_date = result.start_date.dt.floor("H")
result.end_date = result.end_date.dt.ceil("H")
result.sort_values(by=['end_date'], ignore_index=True,
ascending=True, inplace=True)
file = "result.csv"
result.to_csv(file, index=False, mode='a', header=not os.path.exists(file))
经过近8分钟的漫长等待终于处理完成,结果文件达到1.16GB。
从结果文件可以看到能够顺利的插入空值0: