Scalaz-Free:实践-DBTransactionfreestyle 我一直在不断的提示大家:FP就是MonadicProgramming,是一种特殊的编程风格。在我们熟悉的数据库编程领域能不能实现FP风格呢?我们先设计一些示范例子来分析一下惯用的数据库编程过程:
复制代码 1importscalaz._ 2importScalaz._ 3importscala.language.higherKinds 4importscala.language.implicitConversions 5importcom.jolbox.bonecp.BoneCP 6importcom.jolbox.bonecp.BoneCPConfig 7importjava.sql.Connection 8importjava.sql.ResultSet 9 10objectfreedbtxns{ 11defgetTutorId(courseId:Int,conn:Connection):Int={ 12valsqlString="selectTUTORfromCOURSESwhereID="+courseId 13conn.createStatement().executeQuery(sqlString).getInt("ID") 14} 15defgetTutorPay(courseId:Int,conn:Connection):Double={ 16valsqlString="selectPAYAMTfromCOURSESwhereID="+courseId 17conn.createStatement().executeQuery(sqlString).getDouble("PAYAMT") 18} 19defgetStudentFee(courseId:Int,conn:Connection):Double={ 20valsqlString="selectFEEAMTfromCOURSESwhereID="+courseId 21conn.createStatement().executeQuery(sqlString).getDouble("FEEAMT") 22} 23defupdateTutorPay(tutorId:Int,plusAmt:Double,conn:Connection):Unit={ 24valsqlString="updateTUTORSsetPAYABLE=PAYABLE+"+plusAmt.toString+"whereID="+tutorId 25conn.createStatement().executeUpdate(sqlString) 26} 27defupdateStudentFee(studentId:Int,plusAmt:Double,conn:Connection):Unit={ 28valsqlString="updateSTUDENTSsetDUEAMT=DUEAMT+"+plusAmt.toString+"whereID="+studentId 29conn.createStatement().executeUpdate(sqlString) 30} 31deffindEmptySeat(courseId:Int,conn:Connection):Int={ 32valsqlString="selectIDfromSEATSwhereOCCUPIED=''T''ANDID="+courseId 33conn.createStatement().executeQuery(sqlString).getInt("ID") 34} 35defupdateSeatsStatus(seatId:Int,taken:Boolean,conn:Connection):Unit={ 36valsqlString="updateSEATSsetOCCUPIED=''"+taken.toString.toUpperCase.head+"''whereID="+seatId 37conn.createStatement().executeUpdate(sqlString) 38} 复制代码 我这里模拟了一个培训学校内的一些业务。上面设计的是一些基本函数,可以分别对学员、导师、座位进行查询和更新。如果我们需要把更新工作放入事务处理(transaction)内的话我们可以这样做:
复制代码 1defupdateStudent(studentId:Int,courseId:Int):Unit={ 2valconfig=newBoneCPConfig() 3valbonecp=newBoneCP(config) 4valconn=bonecp.getConnection() 5conn.setReadOnly(false) 6conn.setAutoCommit(false) 7conn.rollback() 8try{ 9valfee=getStudentFee(courseId,conn) 10updateStudentFee(studentId,fee,conn) 11conn.commit() 12}catch{ 13case(e:Exception)=>conn.rollback() 14}finally{ 15conn.close() 16} 17} 18defupdateStudentAndSeat(studentId:Int,courseId:Int):Unit={ 19valconfig=newBoneCPConfig() 20valbonecp=newBoneCP(config) 21valconn=bonecp.getConnection() 22conn.setReadOnly(false) 23conn.setAutoCommit(false) 24conn.rollback() 25try{ 26valfee=getStudentFee(courseId,conn) 27updateStudentFee(studentId,fee,conn) 28valseatId=findEmptySeat(courseId,conn) 29updateSeatsStatus(seatId,true,conn) 30conn.commit() 31}catch{ 32case(e:Exception)=>conn.rollback() 33}finally{ 34conn.close() 35} 36} 复制代码 马上可以发现在我们对这些函数在事务处理内进行组合使用时我们必须重新对事务处理进行设置,无法实现真正意义上的函数组合。如果我们认可FP风格的话,这里起码有两项弊处:一是源代码增加了大量的铺垫(boilerplatecode),重复事务处理设置、二是每个更新函数都会产生副作用,换句话说就是这里那里都会有副作用影响,很难控制,这样就增加了程序的复杂程度,造成代码分析的困难。
我们希望达到的目标:
复制代码 1/ 2defupdateStudentAndSeat(studentId:Int):program{ 3//findEmptySeat 4//updateStudentFee 5//updateSeatStatus 6} 7 8defrunDBTxn(prg:program){ 9//conn=getConnection 10//try 11//run(pre) 12//commit 13//catch 14//rollback 15} 16runDBTxn(updateStudent) 17runDBTxn(updateStudentAndSeat) 18runDBTxn(updateSeatStatus) 19/ 复制代码 我们只在一个地方设置和运行事务处理。我们希望能把不同的program传入runDBTxn去运算。这不就是FreeMonad的编程、运算关注分离模式嘛。那我们就试着用FreeMonad来提供数据库事务处理支持。按上篇讨论的设计流程我们先设计ADT:
1caseclassSqlOp[A](run:Connection=>A) 模拟sql指令很简单,两种情况:query或者update。两者都可以用函数run表示:传入Connection,返回结果A,A有可能是Unit。要成为FreeMonad就必须先获取SqlOp的Functor实例:
1caseclassSqlOp[A](run:Connection=>A) 2implicitvalsqlOpFunctor=newFunctor[SqlOp]{ 3defmap[A,B](sa:SqlOp[A])(f:A=>B):SqlOp[B]= 4SqlOp{(conn:Connection)=>f(sa.run(conn))} 5} 基本功能的sql操作函数及升格Free:
复制代码 1typeSql[A]=Free[SqlOp,A] 2defgetTutorId(courseId:Int):Sql[Int]= 3Free.liftF(SqlOp{ 4(conn:Connection)=>{ 5valsqlString="selectTUTORfromCOURSESwhereID="+courseId 6conn.createStatement().executeQuery(sqlString).getInt("ID") 7} 8}) 9 10defgetTutorPay(courseId:Int):Sql[Double]= 11Free.liftF(SqlOp{ 12(conn:Connection)=>{ 13valsqlString="selectPAYAMTfromCOURSESwhereID="+courseId 14conn.createStatement().executeQuery(sqlString).getDouble("PAYAMT") 15} 16}) 17defgetStudentFee(courseId:Int):Sql[Double]= 18Free.liftF(SqlOp{ 19(conn:Connection)=>{ 20valsqlString="selectFEEAMTfromCOURSESwhereID="+courseId 21conn.createStatement().executeQuery(sqlString).getDouble("FEEAMT") 22} 23}) 24defupdateTutorPay(tutorId:Int,plusAmt:Double):Sql[Unit]= 25Free.liftF(SqlOp{ 26(conn:Connection)=>{ 27valsqlString="updateTUTORSsetPAYABLE=PAYABLE+"+plusAmt.toString+"whereID="+tutorId 28conn.createStatement().executeUpdate(sqlString) 29} 30}) 31defupdateStudentFee(studentId:Int,plusAmt:Double):Sql[Unit]= 32Free.liftF(SqlOp{ 33(conn:Connection)=>{ 34valsqlString="updateSTUDENTSsetDUEAMT=DUEAMT+"+plusAmt.toString+"whereID="+studentId 35conn.createStatement().executeUpdate(sqlString) 36} 37}) 38deffindEmptySeat(courseId:Int):Sql[Int]= 39Free.liftF(SqlOp{ 40(conn:Connection)=>{ 41valsqlString="selectIDfromSEATSwhereOCCUPIED=''T''ANDID="+courseId 42conn.createStatement().executeQuery(sqlString).getInt("ID") 43} 44}) 45defupdateSeatsStatus(seatId:Int,taken:Boolean):Sql[Unit]= 46Free.liftF(SqlOp{ 47(conn:Connection)=>{ 48valsqlString="updateSEATSsetOCCUPIED=''"+taken.toString.toUpperCase.head+"''whereID="+seatId 49conn.createStatement().executeUpdate(sqlString) 50} 51}) 复制代码 我们现在可以用这些升格成Free的函数来建设AST示范例子:
复制代码 1deftakeSeat(courseId:Int):Sql[Unit]=for{ 2emptySeat<-findEmptySeat(courseId) 3_<-updateSeatsStatus(emptySeat,true) 4}yield() 5defaddCourse(studentId:Int,courseId:Int):Sql[Unit]=for{ 6fee<-getStudentFee(courseId) 7pay<-getTutorPay(courseId) 8tutorId<-getTutorId(courseId) 9_<-updateStudentFee(studentId,fee) 10_<-updateTutorPay(tutorId,pay) 11_<-takeSeat(courseId) 12}yield() 复制代码 addCourse对基本函数进行了组合,又调用了已经组合过一次的takeSeat,证明AST可以实现高度的函数组合。
下面示范实现相关的Interpreter:
1defrunTransactionImpl[A](conn:Connection,ast:Sql[A]):A= 2ast.resume.fold({ 3casex:SqlOp[Sql[A]]=>runTransactionImpl(conn,x.run(conn)) 4}, 5(a:A)=>a 6) 我们需要一个通用的事务处理方法:
复制代码 1defrunTransaction[A](ast:Sql[A]):Exception\/A={ 2valconfig=newBoneCPConfig() 3valbonecp=newBoneCP(config) 4valconn=bonecp.getConnection() 5conn.setReadOnly(false) 6conn.setAutoCommit(false) 7conn.rollback() 8try{ 9valresult:A=runTransactionImpl(conn,ast) 10result.right[Exception] 11}catch{ 12casee:Exception=>e.left[A] 13}finally{ 14conn.close 15} 16} 复制代码 这样,我们可以在一个地方使用事务处理来运算任何事先设计的AST。
我们可以用不同的方法来实现Interpreter。下面就是用Free.foldMap来运算AST的示范。由于我们需要注入Connection,所以采用了SqltoState的自然转换(naturaltransformation):
复制代码 1typeSqlState[A]=State[Connection,A] 2objectSqlToStateextends(SqlOp~>SqlState){ 3defapply[A](sa:SqlOp[A]):SqlState[A]=samatch{ 4caseSqlOp(f)=>State{ 5conn=>(conn,f(conn)) 6} 7} 8} 9defrunTransactionImplState[A](conn:Connection,ast:Sql[A])= 10ast.foldMap(SqlToState).run(conn) 复制代码
下面是这个用Free来实现FP风格数据库事务处理的完整示范代码:
复制代码 1importscalaz._ 2importScalaz._ 3importscala.language.higherKinds 4importscala.language.implicitConversions 5importcom.jolbox.bonecp.BoneCP 6importcom.jolbox.bonecp.BoneCPConfig 7importjava.sql.Connection 8importjava.sql.ResultSet 9 10objectfreedbtxns{ 11 12caseclassSqlOp[A](run:Connection=>A) 13implicitvalsqlOpFunctor=newFunctor[SqlOp]{ 14defmap[A,B](sa:SqlOp[A])(f:A=>B):SqlOp[B]= 15SqlOp{(conn:Connection)=>f(sa.run(conn))} 16} 17typeSql[A]=Free[SqlOp,A] 18defgetTutorId(courseId:Int):Sql[Int]= 19Free.liftF(SqlOp{ 20(conn:Connection)=>{ 21valsqlString="selectTUTORfromCOURSESwhereID="+courseId 22conn.createStatement().executeQuery(sqlString).getInt("ID") 23} 24}) 25 26defgetTutorPay(courseId:Int):Sql[Double]= 27Free.liftF(SqlOp{ 28(conn:Connection)=>{ 29valsqlString="selectPAYAMTfromCOURSESwhereID="+courseId 30conn.createStatement().executeQuery(sqlString).getDouble("PAYAMT") 31} 32}) 33defgetStudentFee(courseId:Int):Sql[Double]= 34Free.liftF(SqlOp{ 35(conn:Connection)=>{ 36valsqlString="selectFEEAMTfromCOURSESwhereID="+courseId 37conn.createStatement().executeQuery(sqlString).getDouble("FEEAMT") 38} 39}) 40defupdateTutorPay(tutorId:Int,plusAmt:Double):Sql[Unit]= 41Free.www.wang027.comliftF(SqlOp{ 42(conn:Connection)=>{ 43valsqlString="updateTUTORSsetPAYABLE=PAYABLE+"+plusAmt.toString+"whereID="+tutorId 44conn.createStatement().executeUpdate(sqlString) 45} 46}) 47defupdateStudentFee(studentId:Int,plusAmt:Double):Sql[Unit]= 48Free.liftF(SqlOp{ 49(conn:Connection)=>{ 50valsqlString="updateSTUDENTSsetDUEAMT=DUEAMT+"+plusAmt.toString+"whereID="+studentId 51conn.createStatement().executeUpdate(sqlString) 52} 53}) 54deffindEmptySeat(courseId:Int):Sql[Int]= 55Free.liftF(SqlOp{ 56(conn:Connection)=>{ 57valsqlString="selectIDfromSEATSwhereOCCUPIED=''T''ANDID="+courseId 58conn.createStatement().executeQuery(sqlString).getInt("ID") 59} 60}) 61defupdateSeatsStatus(seatId:Int,taken:Boolean):Sql[Unit]= 62Free.liftF(SqlOp{ 63(conn:Connection)=>{ 64valsqlString="updateSEATSsetOCCUPIED=''"+taken.toString.toUpperCase.head+"''whereID="+seatId 65conn.createStatement().executeUpdate(sqlString) 66} 67}) 68 69deftakeSeat(courseId:Int):Sql[Unit]=for{ 70emptySeat<-findEmptySeat(courseId) 71_<-updateSeatsStatus(emptySeat,true) 72}yield() 73defaddCourse(studentId:Int,courseId:Int):Sql[Unit]=for{ 74fee<-getStudentFee(courseId) 75pay<-getTutorPay(courseId) 76tutorId<-getTutorId(courseId) 77_<-updateStudentFee(studentId,fee) 78_<-updateTutorPay(tutorId,pay) 79_<-takeSeat(courseId) 80}yield() 81 82defrunTransactionImpl[A](conn:Connection,ast:Sql[A]):A= 83ast.resume.fold({ 84casex:SqlOp[Sql[A]]=>runTransactionImpl(conn,x.run(conn)) 85}, 86(a:A)=>a 87) 88defrunTransaction[A](ast:Sql[A]):Exception\/A={ 89valconfig=newBoneCPConfig() 90valbonecp=newBoneCP(config) 91valconn=bonecp.getConnection() 92conn.setReadOnly(false) 93conn.setAutoCommit(false) 94conn.rollback() 95try{ 96valresult:A=runTransactionImpl(conn,ast) 97result.right[Exception] 98}catch{ 99casee:Exception=>e.left[A] 100}finally{ 101conn.close 102} 103} 104} |
|