python操作sqlite的示例代码: import timeimport threadingimport sqlite3def nomal_producer(conn): ''' @summary: producer defination ''' counter = 0 conn.isolation_level = None conn.row_factory = sqlite3.Row while True: # insert to db cur = conn.cursor() cur.execute('INSERT INTO datas(content, flag) VALUES (?, ?);', ('content %s'%counter, False)) counter = counter + 1 # conn.commit() time.sleep(0.1)def nomal_consumer(conn): ''' @summary: consumer defination ''' conn.isolation_level = None conn.row_factory = sqlite3.Row while True: # select data cur = conn.cursor() cur.execute('SELECT * FROM datas ORDER BY id LIMIT 10;') records = cur.fetchall() if len(records) > 0: print 'begin to delete: ' print records # delete records for r in records: conn.execute('DELETE FROM datas WHERE id = ?;', (r['id'], )) time.sleep(0.5)if __name__ == '__main__': # init db conn = sqlite3.connect('./db.sqlite', check_same_thread = False) # conn = sqlite3.connect('./db.sqlite') # init thread producer = threading.Thread(target = nomal_producer, args = (conn,)) consumer = threading.Thread(target = nomal_consumer, args = (conn,)) # start threads producer.start() consumer.start() 在多进程操作sqlite的示例代码中,采用producer和consumer的模式来处理,没有特殊之处,但需要注意的是:在建立sqlite3的connection的时候,需要设置check_same_thread = False。 import sqlite3from Queue import Queuefrom threading import Threadclass SqliteMultithread(Thread): ''' Wrap sqlite connection in a way that allows concurrent requests from multiple threads. This is done by internally queueing the requests and processing them sequentially in a separate thread (in the same order they arrived). ''' def __init__(self, filename, autocommit, journal_mode): super(SqliteMultithread, self).__init__() self.filename = filename self.autocommit = autocommit self.journal_mode = journal_mode self.reqs = Queue() # use request queue of unlimited size self.setDaemon(True) # python2.5-compatible self.start() def run(self): if self.autocommit: conn = sqlite3.connect(self.filename, isolation_level=None, check_same_thread=False) else: conn = sqlite3.connect(self.filename, check_same_thread=False) conn.execute('PRAGMA journal_mode = %s' % self.journal_mode) conn.text_factory = str cursor = conn.cursor() cursor.execute('PRAGMA synchronous=OFF') while True: req, arg, res = self.reqs.get() if req == '--close--': break elif req == '--commit--': conn.commit() else: cursor.execute(req, arg) if res: for rec in cursor: res.put(rec) res.put('--no more--') if self.autocommit: conn.commit() conn.close() def execute(self, req, arg=None, res=None): ''' `execute` calls are non-blocking: just queue up the request and return immediately. ''' self.reqs.put((req, arg or tuple(), res)) def executemany(self, req, items): for item in items: self.execute(req, item) def select(self, req, arg=None): ''' Unlike sqlite's native select, this select doesn't handle iteration efficiently. The result of `select` starts filling up with values as soon as the request is dequeued, and although you can iterate over the result normally (`for res in self.select(): ...`), the entire result will be in memory. ''' res = Queue() # results of the select will appear as items in this queue self.execute(req, arg, res) while True: rec = res.get() if rec == '--no more--': break yield rec def select_one(self, req, arg=None): '''Return only the first row of the SELECT, or None if there are no matching rows.''' try: return iter(self.select(req, arg)).next() except StopIteration: return None def commit(self): self.execute('--commit--') def close(self): self.execute('--close--')#endclass SqliteMultithread
|
|