一、引用nuget包
ClickHouse.Ado
ClickHouse.Net
二、基础封装类
类库封装借鉴了其它博客
public class ClickHouseHelper : IDisposable { private readonly string _connectionString = "Compress=False;BufferSize=32768;SocketTimeout=10000;CheckCompressedHash=False;Compressor=lz4;Host=192.168.0.233;Port=9000;Database=default;User=default;Password="; private ClickHouseConnection _clickHouseConnection; #region Constructor public ClickHouseHelper() { this.CreateConnection(); } public ClickHouseHelper(string connectionString) : this() { this._connectionString = connectionString; } #endregion public ClickHouseConnection CreateConnection() { if (_clickHouseConnection == null) { var settings = new ClickHouseConnectionSettings(_connectionString); var cnn = new ClickHouseConnection(settings); if (cnn.State != ConnectionState.Open) { cnn.Open(); } _clickHouseConnection = cnn; } return _clickHouseConnection; } public void ExecuteNoQuery(string sql, CommandType commandType, params ClickHouseParameter[] parameters) { try { if (_clickHouseConnection == null) { this.CreateConnection(); } var command = _clickHouseConnection.CreateCommand(); command.CommandText = sql; command.CommandType = commandType; AttachParameters(command.Parameters, parameters); command.ExecuteNonQuery(); } catch (Exception e) { this.Dispose(); throw; } } public void ExecuteNoQuery(string sql, params ClickHouseParameter[] parameters) { try { ExecuteNoQuery(sql, CommandType.Text, parameters); } catch (Exception e) { this.Dispose(); throw; } } public T ExecuteScalar<T>(string sql, CommandType commandType, params ClickHouseParameter[] parameters) { T result; try { if (_clickHouseConnection == null) { this.CreateConnection(); } var command = _clickHouseConnection.CreateCommand(); command.CommandText = sql; command.CommandType = commandType; AttachParameters(command.Parameters, parameters); result = (T)command.ExecuteScalar(); } catch (Exception e) { this.Dispose(); throw; } return result; } public T ExecuteScalar<T>(string sql, params ClickHouseParameter[] parameters) { T result; try { result = ExecuteScalar<T>(sql, CommandType.Text, parameters); } catch (Exception e) { this.Dispose(); throw; } return result; } public IDataReader ExecuteReader(string sql, CommandType commandType, params ClickHouseParameter[] parameters) { IDataReader result = null; try { if (_clickHouseConnection == null) { this.CreateConnection(); } var command = _clickHouseConnection.CreateCommand(); command.CommandText = sql; command.CommandType = commandType; AttachParameters(command.Parameters, parameters); result = command.ExecuteReader(); } catch (Exception e) { this.Dispose(); throw; } return result; } public IDataReader ExecuteReader(string sql, params ClickHouseParameter[] parameters) { IDataReader result; try { result = ExecuteReader(sql, CommandType.Text, parameters); } catch (Exception e) { this.Dispose(); throw; } return result; } /// <summary> /// 执行sql返回一个DataTable /// </summary> /// <param name="sql">sql语句</param> /// <param name="commandType">命令类型</param> /// <param name="parameters">sql参数</param> /// <returns></returns> public DataTable ExecuteDataTable(string sql, CommandType commandType, params ClickHouseParameter[] parameters) { DataTable result = null; try { var dataReader = ExecuteReader(sql, commandType, parameters); if (dataReader != null) { result = DataReaderToDataTable(dataReader); } } catch (Exception e) { this.Dispose(); throw; } return result; } /// <summary> /// 执行sql返回一个DataTable /// </summary> /// <param name="sql">sql语句</param> /// <param name="parameters">sql参数</param> /// <returns></returns> public DataTable ExecuteDataTable(string sql, params ClickHouseParameter[] parameters) { DataTable result; try { result = ExecuteDataTable(sql, CommandType.Text, parameters); } catch (Exception e) { this.Dispose(); throw; } return result; } /// <summary> /// 执行sql返回指定类型的List /// </summary> /// <typeparam name="T">需要返回的类型</typeparam> /// <param name="sql">sql语句</param> /// <param name="commandType">命令类型</param> /// <param name="parameters">sql参数</param> /// <returns></returns> public List<T> ExecuteList<T>(string sql, CommandType commandType, params ClickHouseParameter[] parameters) where T : class { List<T> resultList = new List<T>(); try { var dataReader = ExecuteReader(sql, commandType, parameters); if (dataReader != null) { resultList = ReaderToList<T>(dataReader); } } catch (Exception e) { this.Dispose(); throw; } return resultList; } /// <summary> /// 执行sql返回指定类型的List /// </summary> /// <typeparam name="T">需要返回的类型</typeparam> /// <param name="sql">sql语句</param> /// <param name="parameters">sql参数</param> /// <returns></returns> public List<T> ExecuteList<T>(string sql, params ClickHouseParameter[] parameters) where T : class { List<T> resultList = new List<T>(); try { resultList = ExecuteList<T>(sql, CommandType.Text, parameters); } catch (Exception e) { this.Dispose(); throw; } return resultList; } /// <summary> /// DataTable分页;注:传入的sql请自己增加排序条件 /// </summary> /// <param name="sql">sql语句</param> /// <param name="pageindex">页码</param> /// <param name="pagesize">每页条数</param> /// <param name="parameters">sql参数</param> /// <returns>返回总条数和分页后数据</returns> public (ulong, DataTable) ExecuteDataTableByPagination(string sql, int pageindex, int pagesize, params ClickHouseParameter[] parameters) { DataTable result; ulong totalCount = 0; try { (string countsql, string pagesql) = GetCountAndPageSql(sql, pageindex, pagesize); result = ExecuteDataTable(pagesql, CommandType.Text, parameters); totalCount = ExecuteScalar<ulong>(countsql); } catch (Exception e) { this.Dispose(); throw; } return (totalCount, result); } /// <summary> /// List分页;注:传入的sql请自己增加排序条件 /// </summary> /// <typeparam name="T">需要返回的list类型</typeparam> /// <param name="sql">sql语句</param> /// <param name="pageindex">页码</param> /// <param name="pagesize">每页条数</param> /// <param name="parameters">sql参数</param> /// <returns>返回总条数和分页后数据</returns> public (ulong, List<T>) ExecuteListByPagination<T>(string sql, int pageindex, int pagesize, params ClickHouseParameter[] parameters) where T : class { List<T> result; ulong totalCount = 0; try { (string countsql, string pagesql) = GetCountAndPageSql(sql, pageindex, pagesize); result = ExecuteList<T>(pagesql, CommandType.Text, parameters); totalCount = ExecuteScalar<ulong>(countsql); } catch (Exception e) { this.Dispose(); throw; } return (totalCount, result); } /// <summary> /// 批量新增数据;注:单条增加请使用ExecuteNonQuery /// </summary> /// <typeparam name="T">数据类型</typeparam> /// <param name="sourceList">源数据</param> /// <param name="tbName">需要插入的表名;注:不填默认为类名</param> public void BulkInsert<T>(List<T> sourceList, string tbName = "") where T : class { tbName = string.IsNullOrEmpty(tbName) ? typeof(T).Name : tbName; try { string insertClickHouseSql = $"INSERT INTO {tbName} ({GetColumns<T>()}) VALUES @bulk;"; if (_clickHouseConnection == null) { this.CreateConnection(); } var command = _clickHouseConnection.CreateCommand(); command.CommandText = insertClickHouseSql; command.Parameters.Add(new ClickHouseParameter { ParameterName = "bulk", Value = List2AList(sourceList) }); command.ExecuteNonQuery(); } catch (Exception e) { this.Dispose(); throw; } } #region private private List<dynamic[]> List2AList<T>(List<T> sourceList) { List<dynamic[]> result = new List<dynamic[]>(); sourceList.ForEach(u => { var dic = GetColumnsAndValue(u); result.Add(dic.Select(i => i.Value).ToArray()); }); return result; } private string GetColumns<T>() { try { var dic = GetColumnsAndValue<T>(default(T)); return string.Join(",", dic.Select(u => u.Key).ToArray()); } catch (Exception e) { this.Dispose(); throw; } } private Dictionary<string, object> GetColumnsAndValue<T>(T u) { try { Dictionary<string, object> dic = new Dictionary<string, object>(); Type t = typeof(T); if (u != null) { t = u.GetType(); } var columns = t.GetProperties(BindingFlags.Public | BindingFlags.Instance); foreach (var item in columns) { object v = null; if (u != null) { v = item.GetValue(u); } dic.TryAdd(item.Name, v); } return dic; } catch (Exception e) { this.Dispose(); throw e; } } private void AttachParameters(ClickHouseParameterCollection parametersCollection, ClickHouseParameter[] parameters) { foreach (var item in parameters) { parametersCollection.Add(item); } } /// <summary> /// 将IDataReader转换为DataTable /// </summary> /// <param name="reader"></param> /// <returns></returns> private static DataTable DataReaderToDataTable(IDataReader reader) { DataTable objDataTable = new DataTable("Table"); int intFieldCount = reader.FieldCount; for (int intCounter = 0; intCounter < intFieldCount; ++intCounter) { objDataTable.Columns.Add(reader.GetName(intCounter).ToUpper(), reader.GetFieldType(intCounter)); } objDataTable.BeginLoadData(); object[] objValues = new object[intFieldCount]; while (reader.NextResult()) { while (reader.Read()) { reader.GetValues(objValues); objDataTable.LoadDataRow(objValues, true); } } reader.Close(); objDataTable.EndLoadData(); return objDataTable; } private static T ReaderToModel<T>(IDataReader dr) { try { using (dr) { if (dr.Read()) { List<string> list = new List<string>(dr.FieldCount); for (int i = 0; i < dr.FieldCount; i++) { list.Add(dr.GetName(i).ToLower()); } T model = Activator.CreateInstance<T>(); foreach (PropertyInfo pi in model.GetType().GetProperties(BindingFlags.GetProperty | BindingFlags.Public | BindingFlags.Instance)) { if (list.Contains(pi.Name.ToLower())) { if (!IsNullOrDBNull(dr[pi.Name])) { pi.SetValue(model, HackType(dr[pi.Name], pi.PropertyType), null); } } } return model; } } return default(T); } catch (Exception ex) { throw ex; } } private static List<T> ReaderToList<T>(IDataReader dr) { using (dr) { List<string> field = new List<string>(dr.FieldCount); for (int i = 0; i < dr.FieldCount; i++) { field.Add(dr.GetName(i).ToLower()); } List<T> list = new List<T>(); while (dr.NextResult()) { while (dr.Read()) { T model = Activator.CreateInstance<T>(); foreach (PropertyInfo property in model.GetType().GetProperties(BindingFlags.GetProperty | BindingFlags.Public | BindingFlags.Instance)) { if (field.Contains(property.Name.ToLower())) { if (!IsNullOrDBNull(dr[property.Name])) { property.SetValue(model, HackType(dr[property.Name], property.PropertyType), null); } } } list.Add(model); } } return list; } } //这个类对可空类型进行判断转换,要不然会报错 private static object HackType(object value, Type conversionType) { if (conversionType.IsGenericType && conversionType.GetGenericTypeDefinition().Equals(typeof(Nullable<>))) { if (value == null) return null; System.ComponentModel.NullableConverter nullableConverter = new System.ComponentModel.NullableConverter(conversionType); conversionType = nullableConverter.UnderlyingType; } return Convert.ChangeType(value, conversionType); } private static bool IsNullOrDBNull(object obj) { return ((obj is DBNull) || string.IsNullOrEmpty(obj.ToString())) ? true : false; } private (string, string) GetCountAndPageSql(string sql, int pageindex, int pagesize) { string countSql = $"SELECT COUNT(1) count FROM ({sql}) A"; string pageSql = $"select * from ({sql}) LIMIT {pagesize} OFFSET {(pageindex - 1) * pagesize}"; return (countSql, pageSql); } public void Dispose() { _clickHouseConnection?.Dispose(); _clickHouseConnection = null; GC.Collect(); } #endregion }
三、注意事项
1、ClickHouse 数据库对应端口为tcp_port中设置的端口。
2、时区问题。Clickhosue中取出来的时候会多8个小时,之前一度怀疑安装时服务器时区不对,但实际上都是正确的,只能手动将时间通过ToLocalTime转成本地时区。
3、批量插数据。批量插数据的时候如果传入一个List的话,对应的类需要增加GetEnumerator方法。例如:
/// <summary> /// 测试 /// </summary> public class flink_user_2 { public long id { get; set; } public string name { get; set; } public int age { get; set; } public string sex { get; set; } public string phone { get; set; } /// <summary> /// 批量插入必须 /// </summary> /// <returns></returns> public IEnumerator GetEnumerator() { yield return name; yield return age; yield return sex; yield return phone; } }
四、使用示例
public class DBContext { public void getTest() { ClickHouseHelper helper = new ClickHouseHelper(); var db = helper.ExecuteList<Models.flink_user_2>("select * from flink_user_2"); foreach (var item in db) { Console.WriteLine(Newtonsoft.Json.JsonConvert.SerializeObject(item)); } Console.WriteLine($"总数:{db.Count}"); } public void InserTest() { ClickHouseHelper click = new ClickHouseHelper(); List<flink_user_2> flinks = new List<flink_user_2>(); for (int i = 0; i < 100000000; i++) { if (flinks.Count > 10000) { Console.WriteLine("插入...." + flinks.Count + "条"); click.BulkInsert<flink_user_2>(flinks, "flink_user_2"); flinks.Clear(); } flinks.Add(new flink_user_2() { age = i, id = i, name = "测试" + i, phone = "12345678911", sex = "男" }); } Console.WriteLine("开始进行插入...."); click.BulkInsert<flink_user_2>(flinks, "flink_user_2"); } }