配色: 字号:
Scalaz- Free :实践-DB Transaction free style
2016-09-24 | 阅:  转:  |  分享 
  
Scalaz-Free:实践-DBTransactionfreestyle
我一直在不断的提示大家:FP就是MonadicProgramming,是一种特殊的编程风格。在我们熟悉的数据库编程领域能不能实现FP风格呢?我们先设计一些示范例子来分析一下惯用的数据库编程过程:

复制代码
1importscalaz._
2importScalaz._
3importscala.language.higherKinds
4importscala.language.implicitConversions
5importcom.jolbox.bonecp.BoneCP
6importcom.jolbox.bonecp.BoneCPConfig
7importjava.sql.Connection
8importjava.sql.ResultSet
9
10objectfreedbtxns{
11defgetTutorId(courseId:Int,conn:Connection):Int={
12valsqlString="selectTUTORfromCOURSESwhereID="+courseId
13conn.createStatement().executeQuery(sqlString).getInt("ID")
14}
15defgetTutorPay(courseId:Int,conn:Connection):Double={
16valsqlString="selectPAYAMTfromCOURSESwhereID="+courseId
17conn.createStatement().executeQuery(sqlString).getDouble("PAYAMT")
18}
19defgetStudentFee(courseId:Int,conn:Connection):Double={
20valsqlString="selectFEEAMTfromCOURSESwhereID="+courseId
21conn.createStatement().executeQuery(sqlString).getDouble("FEEAMT")
22}
23defupdateTutorPay(tutorId:Int,plusAmt:Double,conn:Connection):Unit={
24valsqlString="updateTUTORSsetPAYABLE=PAYABLE+"+plusAmt.toString+"whereID="+tutorId
25conn.createStatement().executeUpdate(sqlString)
26}
27defupdateStudentFee(studentId:Int,plusAmt:Double,conn:Connection):Unit={
28valsqlString="updateSTUDENTSsetDUEAMT=DUEAMT+"+plusAmt.toString+"whereID="+studentId
29conn.createStatement().executeUpdate(sqlString)
30}
31deffindEmptySeat(courseId:Int,conn:Connection):Int={
32valsqlString="selectIDfromSEATSwhereOCCUPIED=''T''ANDID="+courseId
33conn.createStatement().executeQuery(sqlString).getInt("ID")
34}
35defupdateSeatsStatus(seatId:Int,taken:Boolean,conn:Connection):Unit={
36valsqlString="updateSEATSsetOCCUPIED=''"+taken.toString.toUpperCase.head+"''whereID="+seatId
37conn.createStatement().executeUpdate(sqlString)
38}
复制代码
我这里模拟了一个培训学校内的一些业务。上面设计的是一些基本函数,可以分别对学员、导师、座位进行查询和更新。如果我们需要把更新工作放入事务处理(transaction)内的话我们可以这样做:

复制代码
1defupdateStudent(studentId:Int,courseId:Int):Unit={
2valconfig=newBoneCPConfig()
3valbonecp=newBoneCP(config)
4valconn=bonecp.getConnection()
5conn.setReadOnly(false)
6conn.setAutoCommit(false)
7conn.rollback()
8try{
9valfee=getStudentFee(courseId,conn)
10updateStudentFee(studentId,fee,conn)
11conn.commit()
12}catch{
13case(e:Exception)=>conn.rollback()
14}finally{
15conn.close()
16}
17}
18defupdateStudentAndSeat(studentId:Int,courseId:Int):Unit={
19valconfig=newBoneCPConfig()
20valbonecp=newBoneCP(config)
21valconn=bonecp.getConnection()
22conn.setReadOnly(false)
23conn.setAutoCommit(false)
24conn.rollback()
25try{
26valfee=getStudentFee(courseId,conn)
27updateStudentFee(studentId,fee,conn)
28valseatId=findEmptySeat(courseId,conn)
29updateSeatsStatus(seatId,true,conn)
30conn.commit()
31}catch{
32case(e:Exception)=>conn.rollback()
33}finally{
34conn.close()
35}
36}
复制代码
马上可以发现在我们对这些函数在事务处理内进行组合使用时我们必须重新对事务处理进行设置,无法实现真正意义上的函数组合。如果我们认可FP风格的话,这里起码有两项弊处:一是源代码增加了大量的铺垫(boilerplatecode),重复事务处理设置、二是每个更新函数都会产生副作用,换句话说就是这里那里都会有副作用影响,很难控制,这样就增加了程序的复杂程度,造成代码分析的困难。

我们希望达到的目标:

复制代码
1/
2defupdateStudentAndSeat(studentId:Int):program{
3//findEmptySeat
4//updateStudentFee
5//updateSeatStatus
6}
7
8defrunDBTxn(prg:program){
9//conn=getConnection
10//try
11//run(pre)
12//commit
13//catch
14//rollback
15}
16runDBTxn(updateStudent)
17runDBTxn(updateStudentAndSeat)
18runDBTxn(updateSeatStatus)
19/
复制代码
我们只在一个地方设置和运行事务处理。我们希望能把不同的program传入runDBTxn去运算。这不就是FreeMonad的编程、运算关注分离模式嘛。那我们就试着用FreeMonad来提供数据库事务处理支持。按上篇讨论的设计流程我们先设计ADT:

1caseclassSqlOp[A](run:Connection=>A)
模拟sql指令很简单,两种情况:query或者update。两者都可以用函数run表示:传入Connection,返回结果A,A有可能是Unit。要成为FreeMonad就必须先获取SqlOp的Functor实例:

1caseclassSqlOp[A](run:Connection=>A)
2implicitvalsqlOpFunctor=newFunctor[SqlOp]{
3defmap[A,B](sa:SqlOp[A])(f:A=>B):SqlOp[B]=
4SqlOp{(conn:Connection)=>f(sa.run(conn))}
5}
基本功能的sql操作函数及升格Free:

复制代码
1typeSql[A]=Free[SqlOp,A]
2defgetTutorId(courseId:Int):Sql[Int]=
3Free.liftF(SqlOp{
4(conn:Connection)=>{
5valsqlString="selectTUTORfromCOURSESwhereID="+courseId
6conn.createStatement().executeQuery(sqlString).getInt("ID")
7}
8})
9
10defgetTutorPay(courseId:Int):Sql[Double]=
11Free.liftF(SqlOp{
12(conn:Connection)=>{
13valsqlString="selectPAYAMTfromCOURSESwhereID="+courseId
14conn.createStatement().executeQuery(sqlString).getDouble("PAYAMT")
15}
16})
17defgetStudentFee(courseId:Int):Sql[Double]=
18Free.liftF(SqlOp{
19(conn:Connection)=>{
20valsqlString="selectFEEAMTfromCOURSESwhereID="+courseId
21conn.createStatement().executeQuery(sqlString).getDouble("FEEAMT")
22}
23})
24defupdateTutorPay(tutorId:Int,plusAmt:Double):Sql[Unit]=
25Free.liftF(SqlOp{
26(conn:Connection)=>{
27valsqlString="updateTUTORSsetPAYABLE=PAYABLE+"+plusAmt.toString+"whereID="+tutorId
28conn.createStatement().executeUpdate(sqlString)
29}
30})
31defupdateStudentFee(studentId:Int,plusAmt:Double):Sql[Unit]=
32Free.liftF(SqlOp{
33(conn:Connection)=>{
34valsqlString="updateSTUDENTSsetDUEAMT=DUEAMT+"+plusAmt.toString+"whereID="+studentId
35conn.createStatement().executeUpdate(sqlString)
36}
37})
38deffindEmptySeat(courseId:Int):Sql[Int]=
39Free.liftF(SqlOp{
40(conn:Connection)=>{
41valsqlString="selectIDfromSEATSwhereOCCUPIED=''T''ANDID="+courseId
42conn.createStatement().executeQuery(sqlString).getInt("ID")
43}
44})
45defupdateSeatsStatus(seatId:Int,taken:Boolean):Sql[Unit]=
46Free.liftF(SqlOp{
47(conn:Connection)=>{
48valsqlString="updateSEATSsetOCCUPIED=''"+taken.toString.toUpperCase.head+"''whereID="+seatId
49conn.createStatement().executeUpdate(sqlString)
50}
51})
复制代码
我们现在可以用这些升格成Free的函数来建设AST示范例子:

复制代码
1deftakeSeat(courseId:Int):Sql[Unit]=for{
2emptySeat<-findEmptySeat(courseId)
3_<-updateSeatsStatus(emptySeat,true)
4}yield()
5defaddCourse(studentId:Int,courseId:Int):Sql[Unit]=for{
6fee<-getStudentFee(courseId)
7pay<-getTutorPay(courseId)
8tutorId<-getTutorId(courseId)
9_<-updateStudentFee(studentId,fee)
10_<-updateTutorPay(tutorId,pay)
11_<-takeSeat(courseId)
12}yield()
复制代码
addCourse对基本函数进行了组合,又调用了已经组合过一次的takeSeat,证明AST可以实现高度的函数组合。

下面示范实现相关的Interpreter:

1defrunTransactionImpl[A](conn:Connection,ast:Sql[A]):A=
2ast.resume.fold({
3casex:SqlOp[Sql[A]]=>runTransactionImpl(conn,x.run(conn))
4},
5(a:A)=>a
6)
我们需要一个通用的事务处理方法:

复制代码
1defrunTransaction[A](ast:Sql[A]):Exception\/A={
2valconfig=newBoneCPConfig()
3valbonecp=newBoneCP(config)
4valconn=bonecp.getConnection()
5conn.setReadOnly(false)
6conn.setAutoCommit(false)
7conn.rollback()
8try{
9valresult:A=runTransactionImpl(conn,ast)
10result.right[Exception]
11}catch{
12casee:Exception=>e.left[A]
13}finally{
14conn.close
15}
16}
复制代码
这样,我们可以在一个地方使用事务处理来运算任何事先设计的AST。

我们可以用不同的方法来实现Interpreter。下面就是用Free.foldMap来运算AST的示范。由于我们需要注入Connection,所以采用了SqltoState的自然转换(naturaltransformation):

复制代码
1typeSqlState[A]=State[Connection,A]
2objectSqlToStateextends(SqlOp~>SqlState){
3defapply[A](sa:SqlOp[A]):SqlState[A]=samatch{
4caseSqlOp(f)=>State{
5conn=>(conn,f(conn))
6}
7}
8}
9defrunTransactionImplState[A](conn:Connection,ast:Sql[A])=
10ast.foldMap(SqlToState).run(conn)
复制代码


下面是这个用Free来实现FP风格数据库事务处理的完整示范代码:

复制代码
1importscalaz._
2importScalaz._
3importscala.language.higherKinds
4importscala.language.implicitConversions
5importcom.jolbox.bonecp.BoneCP
6importcom.jolbox.bonecp.BoneCPConfig
7importjava.sql.Connection
8importjava.sql.ResultSet
9
10objectfreedbtxns{
11
12caseclassSqlOp[A](run:Connection=>A)
13implicitvalsqlOpFunctor=newFunctor[SqlOp]{
14defmap[A,B](sa:SqlOp[A])(f:A=>B):SqlOp[B]=
15SqlOp{(conn:Connection)=>f(sa.run(conn))}
16}
17typeSql[A]=Free[SqlOp,A]
18defgetTutorId(courseId:Int):Sql[Int]=
19Free.liftF(SqlOp{
20(conn:Connection)=>{
21valsqlString="selectTUTORfromCOURSESwhereID="+courseId
22conn.createStatement().executeQuery(sqlString).getInt("ID")
23}
24})
25
26defgetTutorPay(courseId:Int):Sql[Double]=
27Free.liftF(SqlOp{
28(conn:Connection)=>{
29valsqlString="selectPAYAMTfromCOURSESwhereID="+courseId
30conn.createStatement().executeQuery(sqlString).getDouble("PAYAMT")
31}
32})
33defgetStudentFee(courseId:Int):Sql[Double]=
34Free.liftF(SqlOp{
35(conn:Connection)=>{
36valsqlString="selectFEEAMTfromCOURSESwhereID="+courseId
37conn.createStatement().executeQuery(sqlString).getDouble("FEEAMT")
38}
39})
40defupdateTutorPay(tutorId:Int,plusAmt:Double):Sql[Unit]=
41Free.www.wang027.comliftF(SqlOp{
42(conn:Connection)=>{
43valsqlString="updateTUTORSsetPAYABLE=PAYABLE+"+plusAmt.toString+"whereID="+tutorId
44conn.createStatement().executeUpdate(sqlString)
45}
46})
47defupdateStudentFee(studentId:Int,plusAmt:Double):Sql[Unit]=
48Free.liftF(SqlOp{
49(conn:Connection)=>{
50valsqlString="updateSTUDENTSsetDUEAMT=DUEAMT+"+plusAmt.toString+"whereID="+studentId
51conn.createStatement().executeUpdate(sqlString)
52}
53})
54deffindEmptySeat(courseId:Int):Sql[Int]=
55Free.liftF(SqlOp{
56(conn:Connection)=>{
57valsqlString="selectIDfromSEATSwhereOCCUPIED=''T''ANDID="+courseId
58conn.createStatement().executeQuery(sqlString).getInt("ID")
59}
60})
61defupdateSeatsStatus(seatId:Int,taken:Boolean):Sql[Unit]=
62Free.liftF(SqlOp{
63(conn:Connection)=>{
64valsqlString="updateSEATSsetOCCUPIED=''"+taken.toString.toUpperCase.head+"''whereID="+seatId
65conn.createStatement().executeUpdate(sqlString)
66}
67})
68
69deftakeSeat(courseId:Int):Sql[Unit]=for{
70emptySeat<-findEmptySeat(courseId)
71_<-updateSeatsStatus(emptySeat,true)
72}yield()
73defaddCourse(studentId:Int,courseId:Int):Sql[Unit]=for{
74fee<-getStudentFee(courseId)
75pay<-getTutorPay(courseId)
76tutorId<-getTutorId(courseId)
77_<-updateStudentFee(studentId,fee)
78_<-updateTutorPay(tutorId,pay)
79_<-takeSeat(courseId)
80}yield()
81
82defrunTransactionImpl[A](conn:Connection,ast:Sql[A]):A=
83ast.resume.fold({
84casex:SqlOp[Sql[A]]=>runTransactionImpl(conn,x.run(conn))
85},
86(a:A)=>a
87)
88defrunTransaction[A](ast:Sql[A]):Exception\/A={
89valconfig=newBoneCPConfig()
90valbonecp=newBoneCP(config)
91valconn=bonecp.getConnection()
92conn.setReadOnly(false)
93conn.setAutoCommit(false)
94conn.rollback()
95try{
96valresult:A=runTransactionImpl(conn,ast)
97result.right[Exception]
98}catch{
99casee:Exception=>e.left[A]
100}finally{
101conn.close
102}
103}
104}
献花(0)
+1
(本文系thedust79首藏)