分享

Python+MongoDB导入股票日线及分钟线数据

 imelee 2017-09-11

本文演示了采用Python脚本,配合MongoDB数据库,读取股票数据文件,并写入数据库的过程。 导入数据库的方法支持单线程及多线程两种方式。经过本地开发机测试,开启2-3个线程时,导入速度快于单线程导入;当开启线程超过4个之后,导入速度反而小于单线程;猜测可能是跟机器CPU核心数有关,建议开始CPU核心数N-1的线程。例如,若CPU核心数为8时,可以开启7个线程进行导入,可大幅提高导入速度。 Python操作MongoDB,需要引入pymongo模块,可自行搜索该模块的安装方法,不再赘述。以下是Python代码:

util_file.py 读取数据文本,导入数据库的公共方法类

#!/usr/bin/env python
#coding:utf-8

import threading
import pymongo
import time

class UtilFile:
    def IsSubString(SubStrList,Str):
        flag=True  
        for substr in SubStrList:  
            if not(substr in Str):  
                flag=False  
        return flag

    def GetFileList(FindPath,FlagStr=[]):
        import os  
        FileList=[]  
        FileNames=os.listdir(FindPath)  
        if (len(FileNames)>0):  
           for fn in FileNames:  
               if (len(FlagStr)>0):  
                   #返回指定类型的文件名  
                   if (UtilFile.IsSubString(FlagStr,fn)):  
                       #fullfilename=os.path.join(FindPath,fn)  
                       #FileList.append(fullfilename)  
                       FileList.append(fn)
               else:  
                   #默认直接返回所有文件名  
                   #fullfilename=os.path.join(FindPath,fn)  
                   #FileList.append(fullfilename)  
                   FileList.append(fn)

        #对文件名排序  
        if (len(FileList)>0):  
            FileList.sort()       
        return FileList

    '''
    函数功能:读取通达信数据导出文件内容,导入到数据库中 
    入参:
    conn_db: 导入所使用的数据库
    file_dir: 数据文件目录
    file_list: 数据目录下文件名List
    file_type: 数据文件类型。枚举值。 M1:1分钟线数据; D: 日线数据
    tb_pre_name: 数据表名称前缀
    print_level: 打印级别。枚举值。0:不打印; 1:按文件打印; 2:按记录打印
    thread_name: 线程名称(打印输出用)
    '''
    def ImpDataFromFile(conn_db, file_dir, file_list, file_type, tb_pre_name, print_level, thread_name):
        # file_list = UtilFile.GetFileList(file_dir)    #获取1分钟线目录下文件名清单
        file_num = len(file_list)       #文件总数量
        file_prct = round(1/file_num,4)     #每文件占总数比


        # 读取文件清单中每个文件的内容,并解析每行记录 {
        file_cnt = 0
        for file in file_list:
            file_cnt += 1   #当前处理文件序号
            tm_start = time.clock()
            #计算文件中的总行数 {
            op = open(file_dir+'/'+file)
            record_num = 0
            for line in op:
                record_num += 1
            #计算文件中的总行数 }

            #将文件逐行读入表中 {
            op = open(file_dir+'/'+file)
            record_cnt = 0
            for line in op:
                if not("数据来源" in line):
                    content_temp = line.split('\n')
                    content = content_temp[0].split('\t')

                    if file_type == "M1":
                        record = {
                                    "date":content[0], #日期
                                    "time":content[1], #时间
                                    "begin":float(content[2]), #开盘
                                    "high":float(content[3]), #最高
                                    "low":float(content[4]), #最低    
                                    "end":float(content[5]), #收盘
                                    "vol":int(content[6]), #成交量
                                    "amt":float(content[7])#成交额
                                 }
                    elif file_type == "D":
                        record = {
                                    "date":content[0], #日期
                                    "begin":float(content[1]), #开盘
                                    "high":float(content[2]), #最高
                                    "low":float(content[3]), #最低    
                                    "end":float(content[4]), #收盘
                                    "vol":int(content[5]), #成交量
                                    "amt":float(content[6])#成交额
                                 }

                    clct = conn_db[tb_pre_name+file]    #打开/创建 数据表

                    #计算处理百分比 {
                    record_cnt += 1
                    if print_level == 2:
                        file_prcs = file_prct*(file_cnt-1)
                        record_prct = round(record_cnt/record_num,4)
                        ttl_prcs = round((file_prcs+file_prct*record_prct)*100,2)
                        print(thread_name+"\t"+str(ttl_prcs)+"%"+"\t"+str(file_cnt)+"/"+str(file_num)+"\t"+file+"\t"+str(record_cnt)+"/"+str(record_num))
                    #计算处理百分比 }

                    #确保索引存在 {
                    if file_type == "M1":
                        clct.ensure_index([("date", pymongo.DESCENDING), ("time", pymongo.DESCENDING)])
                        if clct.find({"date":content[0],"time":content[1]}).count() > 0:
                            pass        #防止重复导入
                        else:
                            clct.insert(record)
                    elif file_type == "D":
                        clct.ensure_index([("date", pymongo.DESCENDING)])
                        if clct.find({"date":content[0]}).count() > 0:
                            pass        #防止重复导入
                        else:
                            clct.insert(record)
                    #确保索引存在 }

            #将文件逐行读入表中 }

            tm_finish = time.clock()
            tm_spend = round(tm_finish - tm_start, 2)
            if print_level == 1:
                file_prcs = round(file_prct*file_cnt*100,2)
                print(thread_name+"\t"+str(file_prcs)+"%"+"\t"+str(file_cnt)+"/"+str(file_num)+"\t"+file+"\t"+str(tm_spend)+"s")
        # 读取文件清单中每个文件的内容,并解析每行记录 }
        print(thread_name+"\t"+"Import Done!")

    '''
    函数功能:使用多线程方式导入数据入口方法 
    入参:
    imp_thread_num: 开启线程数
    conn_db: 导入所使用的数据库
    file_dir: 数据文件目录
    file_type: 数据文件类型。枚举值。 M1:1分钟线数据; D: 日线数据
    tb_pre_name: 数据表名称前缀
    print_level: 打印级别。枚举值。0:不打印; 1:按文件打印; 2:按记录打印
    '''
    def ImpDataFromFileMultThrd(imp_thread_num, conn_db, file_dir, file_type, tb_pre_name, print_level):
        file_list = UtilFile.GetFileList(file_dir, ".txt")
        thrd_num = imp_thread_num
        thrd_task = []
        for i in range(thrd_num):
            thrd_task.append([])

        for i in range(len(file_list)):
            idx = divmod(i,thrd_num)[1]
            thrd_task[idx].append(file_list[i])

        print(thrd_task)

        for i in range(thrd_num):
            thrd = MyThread(i,"Thread-"+str(i+1),[conn_db, file_dir, thrd_task[i-1], file_type, tb_pre_name, print_level])
            thrd.start()

class MyThread (threading.Thread):   #继承父类threading.Thread
    def __init__(self, threadID, name, func_args):
        threading.Thread.__init__(self)
        self.threadID = threadID
        self.name = name

        self.conn_db = func_args[0]
        self.file_dir = func_args[1]
        self.file_list = func_args[2]
        self.file_type = func_args[3]
        self.tb_pre_name = func_args[4]
        self.print_level = func_args[5]

    def run(self):                   #把要执行的代码写到run函数里面 线程在创建后会直接运行run函数 
        print("Starting " + self.name)
        UtilFile.ImpDataFromFile(self.conn_db, self.file_dir, self.file_list, self.file_type, self.tb_pre_name, self.print_level, self.name)
        print("Exiting " + self.name)

util_otr.py 一些其它常用的公共函数方法

#!/usr/bin/env python
#coding:utf-8

'''
函数功能:根据传入的字符串,删除名称匹配的聚集 
入参:
db_conn: 数据库连接
clct_substr: 需匹配名称的字符串
'''
def DelClcts(db_conn, clct_substr):
    a = db_conn.collection_names()
    b = []
    for i in a:
        if clct_substr in i:
            b.append(i)
            db_conn[i].drop()
            print("Drop:"+i)

'''
函数功能:对Dict按Key排序
入参:
dic: 数据字典
'''
def SortDictKeys(dic): 
    return sorted(dic.items(), key=lambda d: d[0])

main.py 导入的主程序,入口方法

#!/usr/bin/env python
#coding:utf-8


from util_file import UtilFile
# from util_thread import MyThread
import pymongo

#************************************************************************
#常量定义清单
#************************************************************************
#文件
stck_m1_file_dir = "D:/new_tdx/T0002/export/min1"   #1分钟线数据文件目录
stck_day_file_dir = "D:/new_tdx/T0002/export/day"   #1分钟线数据文件目录

#数据库
ip = '127.0.0.1'
port = 27017
db_name = 'robin'

#表名称
bs_m1_pre = 'BS_M1_'#基础表_1分钟线数据表前缀
bs_day_pre = 'BS_DAY_'#基础表_日线数据表前缀

#************************************************************************
#主程序
#************************************************************************
conn = pymongo.MongoClient(host=ip, port=port)      #连接Mongodb数据库服务器
db = conn[db_name]      #选择存储数据库


#************************************************************************
##从文件中导入数据到基础表
#************************************************************************
# UtilFile.ImpDataFromFileMultThrd(2, db, stck_m1_file_dir, "M1", bs_m1_pre, 1)     #导入1分钟线数据
UtilFile.ImpDataFromFileMultThrd(2, db, stck_day_file_dir, "D", bs_day_pre, 1)      #导入日线数据

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多