分享

python操作sqlite示例(支持多进程/线程同时操作) - 学步园

 DDI2014 2015-10-13

python操作sqlite的示例代码

import timeimport threadingimport sqlite3def nomal_producer(conn):    '''    @summary: producer defination    '''    counter = 0    conn.isolation_level = None    conn.row_factory = sqlite3.Row    while True:        # insert to db        cur = conn.cursor()        cur.execute('INSERT INTO datas(content, flag) VALUES (?, ?);', ('content %s'%counter, False))        counter = counter + 1        # conn.commit()        time.sleep(0.1)def nomal_consumer(conn):    '''    @summary: consumer defination    '''    conn.isolation_level = None    conn.row_factory = sqlite3.Row    while True:        # select data        cur = conn.cursor()        cur.execute('SELECT * FROM datas ORDER BY id LIMIT 10;')        records = cur.fetchall()        if len(records) > 0:            print 'begin to delete: '            print records            # delete records            for r in records:                conn.execute('DELETE FROM datas WHERE id = ?;', (r['id'], ))        time.sleep(0.5)if __name__ == '__main__':    # init db    conn = sqlite3.connect('./db.sqlite', check_same_thread = False)    # conn = sqlite3.connect('./db.sqlite')    # init thread    producer = threading.Thread(target = nomal_producer, args = (conn,))    consumer = threading.Thread(target = nomal_consumer, args = (conn,))    # start threads    producer.start()    consumer.start()

在多进程操作sqlite的示例代码中,采用producer和consumer的模式来处理,没有特殊之处,但需要注意的是:在建立sqlite3的connection的时候,需要设置check_same_thread = False。
另外,为了达到真正的thread-safe,可以对python的sqlite3做进一步封装,以达到仅有一个thread在操作sqlite,原理很简单,就是使用queue来处理所有操作请求并同时将结果返回到另外一个queue中去,示例代码如下:

import sqlite3from Queue import Queuefrom threading import Threadclass SqliteMultithread(Thread):    '''    Wrap sqlite connection in a way that allows concurrent requests from multiple threads.    This is done by internally queueing the requests and processing them sequentially    in a separate thread (in the same order they arrived).    '''    def __init__(self, filename, autocommit, journal_mode):        super(SqliteMultithread, self).__init__()        self.filename = filename        self.autocommit = autocommit        self.journal_mode = journal_mode        self.reqs = Queue() # use request queue of unlimited size        self.setDaemon(True) # python2.5-compatible        self.start()    def run(self):        if self.autocommit:            conn = sqlite3.connect(self.filename, isolation_level=None, check_same_thread=False)        else:            conn = sqlite3.connect(self.filename, check_same_thread=False)        conn.execute('PRAGMA journal_mode = %s' % self.journal_mode)        conn.text_factory = str        cursor = conn.cursor()        cursor.execute('PRAGMA synchronous=OFF')        while True:            req, arg, res = self.reqs.get()            if req == '--close--':                break            elif req == '--commit--':                conn.commit()            else:                cursor.execute(req, arg)                if res:                    for rec in cursor:                        res.put(rec)                    res.put('--no more--')                if self.autocommit:                    conn.commit()        conn.close()    def execute(self, req, arg=None, res=None):        '''        `execute` calls are non-blocking: just queue up the request and return immediately.        '''        self.reqs.put((req, arg or tuple(), res))    def executemany(self, req, items):        for item in items:            self.execute(req, item)    def select(self, req, arg=None):        '''        Unlike sqlite's native select, this select doesn't handle iteration efficiently.        The result of `select` starts filling up with values as soon as the        request is dequeued, and although you can iterate over the result normally        (`for res in self.select(): ...`), the entire result will be in memory.        '''        res = Queue() # results of the select will appear as items in this queue        self.execute(req, arg, res)        while True:            rec = res.get()            if rec == '--no more--':                break            yield rec    def select_one(self, req, arg=None):        '''Return only the first row of the SELECT, or None if there are no matching rows.'''        try:            return iter(self.select(req, arg)).next()        except StopIteration:            return None    def commit(self):        self.execute('--commit--')    def close(self):        self.execute('--close--')#endclass SqliteMultithread

 

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约