分享

dagster用于AI量化投资的数据流编排

 AI量化实验室 2023-10-12 发布于北京

原创文章第172篇,专注“个人成长与财富自由、世界运作的逻辑,AI量化投资"。

大年初一,祝大家兔年离财务自由越来越近。

无论是量化投资还是算法模型,当然这二者会统一到“AI量化”这个大命题下边——都需要一个pipeline的编排调度。

无论是量化还是算法,我们花了大量的时间,在数据整理、准备下。策略或者模型反倒花的时间很少。这本身是不对的,若是这个过程不标准,很多工作需要重复去做,浪费时间,容易出错,还没有积累就不应该了。

所以,反复调研了airflow, prefect,dagster。

目前看dagster还是比较契合。尽管这次引入asset的概念搞得有点云里雾里,但看它的案例即有量化,也为AI,也我们的场景契合。

python更多以函数为计算单元。

下面这个函数是官方的一个tutorial,这个“资产”其实就是从远程拉一个“谷物”信息的csv,解析成python的list[dict]。

import csv
import requests
from dagster import asset


@asset
def cereals():
response = requests.get("https://docs./assets/cereal.csv")
lines = response.text.split("\n")
cereal_rows = [row for row in csv.DictReader(lines)]

return cereal_rows

if __name__ == '__main__':
rows = cereals()
print(rows)

原始表格的信息如上。

这个函数如何直接使用,就是一个函数,本身可以直接运行,所以debug是很方便的。——这是我们要求的低侵入性,就是业务代码不需要有太多的改动,可以在框架里呈现。

而当我们使用 dagster dev -f cereal.py时,可以启动UI:

只要文件里定义了asset,就可以“物化“, 其实就是运行这个”函数“。

我们看下”物化“具体做了什么,就是把这个数组,保存在本地storage/cereal文件里。

使用dagit -f也可以运行,但没有启动deamon等服务,dagster dev方便开发调试。

资产可以形成图,上下游依赖。有时候获得相当的资源会失败,那只需要重建这一部分即可。这里的思想挺好的。

从工作流到数据流。

如上都是基于内存计算。

我们看一下远程下载zip文件

@asset
def cereal_ratings_zip() -> None:
urllib.request.urlretrieve(
"https://rcel.app/assets/cereal-ratings.csv.zip",
"cereal-ratings.csv.zip",
)

这里的一个逻辑上,从远程下载csv,并对数据进行筛选和排序后,与远程下载zip文件 解压后的结果合并计算。

asset的核心逻辑是把”计算逻辑“与”IO逻辑“分离。当使用了外部系统或不易分离时,也可以使用没有参数或返回值的asset。计算逻辑独立后,易于测试,不容易出bug。

仔细口味下来,asset的设计还是有精妙之处。

每个计算单元都是极简的逻辑,这样可以确保功能的复用,而且长链条的计算,中间容易出错,这样可以避免从头开始。

目前我们量化平台的数据流是从 tushare/baostock或者更多其它的数据源,把数据采取,预处理后存储到mongo,mysql或者es里备查。回测时,全市场数据导入到hdf5,计算特征工程,再写入到hdf5,然后从hdf5里加载数据进行回测。

另外每天的计算,需要更新增量数据后,全市场的指标都要重新计算后,更新到mysql。

    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多