回到目录
上一讲中基本实现了对数据库的读写分离,而在选择只读数据库上只是随机选择,并没有去检测数据库服务器是否有效,如服务器挂了,SQL服务停了,端口被封了等等,而本讲主要对以上功能进行一个实现,并对配置文件也进行了一些优化,让它更好的支持多个数据库服务器,分别配置各个的账号和密码及数据库服务端口等等,接下来,就来看一下主要的代码吧。
一 配置文件
<!-- ef实现对sql读写分离的配置,sqlserver端采用发布与订阅实现 -->
<add key="readDb" value="
192.168.2.71|1433|background_read1|sa|zzl123,
192.168.2.71|1433|TestWrite_Read_Zzl|sa|zzl123,
192.168.2.29|1433|TestWrite_Read_Zzl|sa|1"
/>
<!-- 只读服务器的sql连接串配置模版-->
<add key ="readDbConnectioin" value="data source={0};initial catalog={1};persist security info=True;user id={2};password={3};multipleactiveresultsets=True;application name=EntityFramework"/>
二 数据库配置实体类
/// <summary>
/// 只读数据库配置实体
/// </summary>
public class ReadDbConfig
{
public ReadDbConfig()
{
Port = 1433;
UserId = "sa";
}
public string Ip { get; set; }
public int Port { get; set; }
public string DbName { get; set; }
public string UserId { get; set; }
public string Password { get; set; }
}
三 对SQL拦截器进行优化,添加了TCP的心跳检测
lock (lockObj)
{
if (readConnList != null && readConnList.Any())
{
foreach (var item in readConnList)
{
//心跳测试,将死掉的服务器IP从列表中移除
var client = new TcpClient();
try
{
client.Connect(new IPEndPoint(IPAddress.Parse(item.Ip), item.Port));
}
catch (SocketException)
{
//异常,没有连接上
readConnList.Remove(item);
}
if (!client.Connected)
{
readConnList.Remove(item);
}
}
}
}
四 对于数据库库端还是这前通过发布和订阅实现的,需要注意的是,这些功能需要使用“机器名”进行链接,使用ip和域名都是无效的
五 下面贡献一下完成的拦截器代码
/// <summary>
/// SQL命令拦截器
/// 主要实现EF的读写分离
/// </summary>
public class SqlCommandInterceptor : DbCommandInterceptor
{
static SqlCommandInterceptor()
{
InitConfig();
initSysTimer.Enabled = true;
initSysTimer.Elapsed += initSysTimer_Elapsed;
initSysTimer.Start();
sysTimer.Enabled = true;
sysTimer.Elapsed += sysTimer_Elapsed;
sysTimer.Start();
}
private static object lockObj = new object();
/// <summary>
/// 定期找没有在线的数据库服务器
/// </summary>
private static Timer sysTimer = new Timer(6);
/// <summary>
/// 系统配置文件轮训读时间间隔
/// </summary>
private static Timer initSysTimer = new Timer(60000 * 10);
/// <summary>
/// 读库,从库集群,写库不用设置走默认的EF框架
/// </summary>
private static List<ReadDbConfig> readConnList;
/// <summary>
/// 配置初始化
/// </summary>
private static void InitConfig()
{
lock (lockObj)
{
var temp = new List<ReadDbConfig>();
var str = System.Configuration.ConfigurationManager.AppSettings["readDb"] ?? string.Empty;
var readList = str.Split(new char[] { ',' }, StringSplitOptions.RemoveEmptyEntries);
if (readList != null && readList.Any())
{
foreach (var item in readList)
{
var configArr = item.Split(new char[] { '|' }, StringSplitOptions.RemoveEmptyEntries);
temp.Add(new ReadDbConfig
{
Ip = configArr[0],
Port = int.Parse(configArr[1]),
DbName = configArr[2],
UserId = configArr[3],
Password = configArr[4],
});
}
}
readConnList = temp;
}
}
#region Private Methods
private static void initSysTimer_Elapsed(object sender, ElapsedEventArgs e)
{
InitConfig();
}
/// <summary>
/// 轮询服务
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
private static void sysTimer_Elapsed(object sender, ElapsedEventArgs e)
{
lock (lockObj)
{
if (readConnList != null && readConnList.Any())
{
foreach (var item in readConnList)
{
//心跳测试,将死掉的服务器IP从列表中移除
var client = new TcpClient();
try
{
client.Connect(new IPEndPoint(IPAddress.Parse(item.Ip), item.Port));
}
catch (SocketException)
{
//异常,没有连接上
readConnList.Remove(item);
}
if (!client.Connected)
{
readConnList.Remove(item);
}
}
}
}
}
/// <summary>
/// 处理读库字符串
/// </summary>
/// <returns></returns>
private string GetReadConn()
{
if (readConnList != null && readConnList.Any())
{
var resultConn = readConnList[Convert.ToInt32(Math.Floor((double)new Random().Next(0, readConnList.Count)))];
return string.Format(System.Configuration.ConfigurationManager.AppSettings["readDbConnectioin"]
, resultConn.Ip
, resultConn.DbName
, resultConn.UserId
, resultConn.Password);
}
return string.Empty;
}
/// <summary>
/// 只读库的选择,加工command对象
/// </summary>
/// <param name="command"></param>
private void ReadDbSelect(DbCommand command)
{
if (!string.IsNullOrWhiteSpace(GetReadConn()))//如果配置了读写分离,就去实现
{
if (!command.CommandText.StartsWith("insert", StringComparison.InvariantCultureIgnoreCase))
{
command.Connection.Close();
command.Connection.ConnectionString = GetReadConn();
command.Connection.Open();
}
}
}
#endregion
#region Override Methods
/// <summary>
/// Linq to Entity生成的update,delete
/// </summary>
/// <param name="command"></param>
/// <param name="interceptionContext"></param>
public override void NonQueryExecuting(DbCommand command, DbCommandInterceptionContext<int> interceptionContext)
{
base.NonQueryExecuting(command, interceptionContext);//update,delete等写操作直接走主库
}
/// <summary>
/// 执行sql语句,并返回第一行第一列,没有找到返回null,如果数据库中值为null,则返回 DBNull.Value
/// </summary>
/// <param name="command"></param>
/// <param name="interceptionContext"></param>
public override void ScalarExecuting(DbCommand command, DbCommandInterceptionContext<object> interceptionContext)
{
ReadDbSelect(command);
base.ScalarExecuting(command, interceptionContext);
}
/// <summary>
/// Linq to Entity生成的select,insert
/// 发送到sqlserver之前触发
/// warning:在select语句中DbCommand.Transaction为null,而ef会为每个insert添加一个DbCommand.Transaction进行包裹
/// </summary>
/// <param name="command"></param>
/// <param name="interceptionContext"></param>
public override void ReaderExecuting(DbCommand command, DbCommandInterceptionContext<DbDataReader> interceptionContext)
{
ReadDbSelect(command);
base.ReaderExecuted(command, interceptionContext);
}
/// <summary>
/// 发送到sqlserver之后触发
/// </summary>
/// <param name="command"></param>
/// <param name="interceptionContext"></param>
public override void ReaderExecuted(DbCommand command, DbCommandInterceptionContext<DbDataReader> interceptionContext)
{
base.ReaderExecuted(command, interceptionContext);
}
#endregion
}
回到目录