分享

Duckdb,很适合AI量化的一个基于本地文件系统的的olap分析引擎(代码下载)

 AI量化实验室 2023-10-12 发布于北京

原创文章第292篇,专注“个人成长与财富自由、世界运作的逻辑与投资"。

今天说说duckdb。之前有简单提过AI量化的基础工程设施:dagster&duckdb

duckdb可以轻松访问多个csv和parquet文件,作为本地的分析引擎很好用。

Mongo持久化,简单易用,适合交易数据增量更新。但分析性能不够。

Duckdb,一个嵌入式的olap引擎。

Mongo数据历史按symbolcsv(parquet),然后当月更新增量包覆盖即可。

@asset(description='增量(当月的增量数据)从多个库中整合宽表入csv', group_name='stocks')
def load_data_to_csv(stock_list_from_mongo):
for i, s in enumerate(stock_list_from_mongo):
get_dagster_logger().debug('主板股票一共:{},当前{},代码:{}'.format(len(stock_list_from_mongo), i, s))

date_str = datetime.now().strftime('%Y%m%d')
date = date_str[:6] + '01'
DIR = DATA_DIR_CSVS.joinpath(date_str[:6])
DIR.mkdir(exist_ok=True, parents=True)
df_all = loader_mongo.load_stock_datas(s, date)
if df_all is None:
continue
df_all.to_csv(DIR.joinpath('{}.csv'.format(s.replace('.', '_'))))

duckdb特别神奇的地方,可以对这多个文件夹下面的csv进行查询,而且性能特别好——如果使用pandas,我们需要遍历这些文件夹,然后把csv读成dataframe,然后concat在一起。——pandas读csv是需要把全量,所有列都一并读入。

duckdb可以使用sql直接查询这个文件夹:使用csvs/*/*.csv,只查询close,date,symbol数据库,某一天的数据:( 这里是不是有点像qlib的数据格式,qlib的文件存储最不方便的地方是增量更新,我这里按csv可以直接覆盖当月,确保数据不会重复,duckdb性能足够好)

当然也可以使用parquet格式,更适合大数据。在小文件下,parquet的压缩不明显,如果月度数据的话,parquet文件比csv还大。

import duckdb
import pandas

from quant_project.config import DATA_DIR_CSVS

df = duckdb.query(
"""
select symbol,date,close from '{}/*.csv'
where date = '20230809'
""".format(DATA_DIR_CSVS.resolve())
)
print(df.df())

我们的任务编排如下:

每天收盘后更新日线,日线指标,后面还有基本面,财务数据,其他数据等,然后把增量数据更新到csv里,形成当月的增量包,我们客户端下载这个增量包到对应位置即可。

在服务器上的运行界面:

我实现了Duckdbloader:比之前的CSVloader和Hdf5loader都要简洁和高效得多。

class Duckdbloader(Dataloader):
def __init__(self, symbols, columns, start_date='20100101',
end_date=datetime.now().strftime('%Y%m%d')):
super().__init__(None, symbols, start_date, end_date)
self.columns = columns
self.columns.extend(['symbol', 'date'])

def _load_dfs(self):
dfs = []
for symbol in self.symbols:
df = duckdb.query(
"""
select {} from '{}/*/*.csv'
where symbol = '{}' and date >= '{}' and date <= '{}'
""".format(','.join(self.columns), CSVS_DIR.resolve(), symbol, self.start_date, self.end_date)
).df()
df['date'] = df['date'].apply(lambda x:str(x))
dfs.append(df)
return dfs

代码已经提交到星球,大版本预计周日发布。知识星球与开源项目:万物之中,希望至美

前段时间,“有点被流量裹挟”的感觉。

每个用户有不同的诉求,不同的文章,代码发出来之后,大家的反馈不一。确实,这个领域如果要抓眼球,很容易,但不长久。——我们的目标是星辰大海。

有一些小白用户加入进来,问pycharm, python的环境如何安装,依赖包如何解决。也有一些人上来就问策略怎么自动化实盘。——客观讲,这些都不是我期望的目标用户。

给出系统的源代码,是为了引发讨论,学习与交流,它当下肯定不是一个成熟的”产品“。商用金融产品怎么可以交付代码呢。

值得庆幸的是,星球里有一些老用户了解我的逻辑。

回归初心,做长期有价值的事情!

——利用AI模型,把当下A股5000多支股票里,最合适的,得分最高的挑选出来。

我们先构建沪深300成份股列表:还是使用tushare的数据:

# 指定日期的指数成份股
def index_components(index_symbol, date):
# 拉取数据
df = pro.index_weight(**{
"index_code": index_symbol,
"trade_date": date,
"start_date": "",
"end_date": "",
"ts_code": "",
"limit": "",
"offset": ""
}, fields=[
"index_code",
"con_code",
"trade_date",
"weight"
])
return df

从数据工程的角度,计算少量几支指数会比较容易,当数据量上来的时候,工程的复杂度会指数级上升,甚至完全不是一个技术栈。

今天与一个朋友吃饭,说了一个比喻:盖两层的房子,可以木质材料就可以,如果盖十层,就需要钢筋混凝土,如果需要几十层到上百层,那么需要的技术更不一样的。

当前的大模型也是如此,看似就是从十亿、百亿到千亿。百亿可能很多公司都能解决,但要跨越千亿,那就需要很多工程上的积累。

量化数据也是,百级的数据,几十个因子,可以很容易处理,但几千支股票,上百个甚至更多的因子,那就需要在工程上好好设计一下。

有一点思考,我们的目标是星辰大海,所以星球寻找是志同道合的人群,少而精,因此我进一步提高了门槛。

回到我们的工程设计,mongo等数据库是支持增量更新,我们会到查询最新的数据日期,然后增量去获取数据,mongo数据库有一个好处,就是_id如果重复会自动跳过,这个特别好,我们可以批量覆盖,不必担心说数据重复。

但无论是数据分析,还是多因子回测,都需要对因子进行预计算,整合大宽表等。

年化29.3%,大类资产ETF动量轮动,添加商品与货币ETF,实盘跟进中(代码+数据下载)

    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章