分享

C# 使用ClickHouse时序数据库封装...

 _明心见性_ 2021-02-22

 2020-12-19 16:29:09 

最后发布:2020-12-19 16:29:09首次发布:2020-12-19 16:29:09
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。

一、引用nuget包

  1. ClickHouse.Ado

  2. ClickHouse.Net

二、基础封装类

类库封装借鉴了其它博客

  public class ClickHouseHelper : IDisposable
    {
        private readonly string _connectionString = "Compress=False;BufferSize=32768;SocketTimeout=10000;CheckCompressedHash=False;Compressor=lz4;Host=192.168.0.233;Port=9000;Database=default;User=default;Password=";

        private ClickHouseConnection _clickHouseConnection;

        #region Constructor

        public ClickHouseHelper()
        {
            this.CreateConnection();
        }

        public ClickHouseHelper(string connectionString) : this()
        {
            this._connectionString = connectionString;
        }

        #endregion

        public ClickHouseConnection CreateConnection()
        {
            if (_clickHouseConnection == null)
            {
                var settings = new ClickHouseConnectionSettings(_connectionString);
                var cnn = new ClickHouseConnection(settings);
                if (cnn.State != ConnectionState.Open)
                {
                    cnn.Open();
                }
                _clickHouseConnection = cnn;
            }
            return _clickHouseConnection;
        }

        public void ExecuteNoQuery(string sql, CommandType commandType, params ClickHouseParameter[] parameters)
        {

            try
            {
                if (_clickHouseConnection == null)
                {
                    this.CreateConnection();
                }
                var command = _clickHouseConnection.CreateCommand();
                command.CommandText = sql;
                command.CommandType = commandType;
                AttachParameters(command.Parameters, parameters);
                command.ExecuteNonQuery();
            }
            catch (Exception e)
            {
                this.Dispose();
                throw;
            }
        }

        public void ExecuteNoQuery(string sql, params ClickHouseParameter[] parameters)
        {
            try
            {
                ExecuteNoQuery(sql, CommandType.Text, parameters);
            }
            catch (Exception e)
            {
                this.Dispose();
                throw;
            }
        }

        public T ExecuteScalar<T>(string sql, CommandType commandType, params ClickHouseParameter[] parameters)
        {
            T result;
            try
            {
                if (_clickHouseConnection == null)
                {
                    this.CreateConnection();
                }
                var command = _clickHouseConnection.CreateCommand();
                command.CommandText = sql;
                command.CommandType = commandType;
                AttachParameters(command.Parameters, parameters);
                result = (T)command.ExecuteScalar();
            }
            catch (Exception e)
            {
                this.Dispose();
                throw;
            }

            return result;
        }

        public T ExecuteScalar<T>(string sql, params ClickHouseParameter[] parameters)
        {
            T result;
            try
            {
                result = ExecuteScalar<T>(sql, CommandType.Text, parameters);
            }
            catch (Exception e)
            {
                this.Dispose();
                throw;
            }

            return result;
        }

        public IDataReader ExecuteReader(string sql, CommandType commandType, params ClickHouseParameter[] parameters)
        {
            IDataReader result = null;
            try
            {
                if (_clickHouseConnection == null)
                {
                    this.CreateConnection();
                }
                var command = _clickHouseConnection.CreateCommand();
                command.CommandText = sql;
                command.CommandType = commandType;
                AttachParameters(command.Parameters, parameters);
                result = command.ExecuteReader();
            }
            catch (Exception e)
            {
                this.Dispose();
                throw;
            }

            return result;
        }

        public IDataReader ExecuteReader(string sql, params ClickHouseParameter[] parameters)
        {
            IDataReader result;
            try
            {
                result = ExecuteReader(sql, CommandType.Text, parameters);
            }
            catch (Exception e)
            {
                this.Dispose();
                throw;
            }

            return result;
        }

        /// <summary>
        /// 执行sql返回一个DataTable
        /// </summary>
        /// <param name="sql">sql语句</param>
        /// <param name="commandType">命令类型</param>
        /// <param name="parameters">sql参数</param>
        /// <returns></returns>
        public DataTable ExecuteDataTable(string sql, CommandType commandType, params ClickHouseParameter[] parameters)
        {
            DataTable result = null;
            try
            {
                var dataReader = ExecuteReader(sql, commandType, parameters);
                if (dataReader != null)
                {
                    result = DataReaderToDataTable(dataReader);
                }
            }
            catch (Exception e)
            {
                this.Dispose();
                throw;
            }

            return result;
        }

        /// <summary>
        /// 执行sql返回一个DataTable
        /// </summary>
        /// <param name="sql">sql语句</param>
        /// <param name="parameters">sql参数</param>
        /// <returns></returns>
        public DataTable ExecuteDataTable(string sql, params ClickHouseParameter[] parameters)
        {
            DataTable result;
            try
            {
                result = ExecuteDataTable(sql, CommandType.Text, parameters);
            }
            catch (Exception e)
            {
                this.Dispose();
                throw;
            }

            return result;
        }

        /// <summary>
        /// 执行sql返回指定类型的List
        /// </summary>
        /// <typeparam name="T">需要返回的类型</typeparam>
        /// <param name="sql">sql语句</param>
        /// <param name="commandType">命令类型</param>
        /// <param name="parameters">sql参数</param>
        /// <returns></returns>
        public List<T> ExecuteList<T>(string sql, CommandType commandType, params ClickHouseParameter[] parameters) where T : class
        {
            List<T> resultList = new List<T>();
            try
            {
                var dataReader = ExecuteReader(sql, commandType, parameters);
                if (dataReader != null)
                {
                    resultList = ReaderToList<T>(dataReader);
                }
            }
            catch (Exception e)
            {
                this.Dispose();
                throw;
            }

            return resultList;
        }

        /// <summary>
        /// 执行sql返回指定类型的List
        /// </summary>
        /// <typeparam name="T">需要返回的类型</typeparam>
        /// <param name="sql">sql语句</param>
        /// <param name="parameters">sql参数</param>
        /// <returns></returns>
        public List<T> ExecuteList<T>(string sql, params ClickHouseParameter[] parameters) where T : class
        {
            List<T> resultList = new List<T>();
            try
            {
                resultList = ExecuteList<T>(sql, CommandType.Text, parameters);
            }
            catch (Exception e)
            {
                this.Dispose();
                throw;
            }

            return resultList;

        }

        /// <summary>
        /// DataTable分页;注:传入的sql请自己增加排序条件
        /// </summary>
        /// <param name="sql">sql语句</param>
        /// <param name="pageindex">页码</param>
        /// <param name="pagesize">每页条数</param>
        /// <param name="parameters">sql参数</param>
        /// <returns>返回总条数和分页后数据</returns>
        public (ulong, DataTable) ExecuteDataTableByPagination(string sql, int pageindex, int pagesize, params ClickHouseParameter[] parameters)
        {
            DataTable result;
            ulong totalCount = 0;
            try
            {
                (string countsql, string pagesql) = GetCountAndPageSql(sql, pageindex, pagesize);
                result = ExecuteDataTable(pagesql, CommandType.Text, parameters);
                totalCount = ExecuteScalar<ulong>(countsql);
            }
            catch (Exception e)
            {
                this.Dispose();
                throw;
            }

            return (totalCount, result);
        }

        /// <summary>
        /// List分页;注:传入的sql请自己增加排序条件
        /// </summary>
        /// <typeparam name="T">需要返回的list类型</typeparam>
        /// <param name="sql">sql语句</param>
        /// <param name="pageindex">页码</param>
        /// <param name="pagesize">每页条数</param>
        /// <param name="parameters">sql参数</param>
        /// <returns>返回总条数和分页后数据</returns>
        public (ulong, List<T>) ExecuteListByPagination<T>(string sql, int pageindex, int pagesize, params ClickHouseParameter[] parameters) where T : class
        {
            List<T> result;
            ulong totalCount = 0;
            try
            {
                (string countsql, string pagesql) = GetCountAndPageSql(sql, pageindex, pagesize);
                result = ExecuteList<T>(pagesql, CommandType.Text, parameters);
                totalCount = ExecuteScalar<ulong>(countsql);
            }
            catch (Exception e)
            {
                this.Dispose();
                throw;
            }

            return (totalCount, result);
        }

        /// <summary>
        /// 批量新增数据;注:单条增加请使用ExecuteNonQuery
        /// </summary>
        /// <typeparam name="T">数据类型</typeparam>
        /// <param name="sourceList">源数据</param>
        /// <param name="tbName">需要插入的表名;注:不填默认为类名</param>
        public void BulkInsert<T>(List<T> sourceList, string tbName = "") where T : class
        {
            tbName = string.IsNullOrEmpty(tbName) ? typeof(T).Name : tbName;
            try
            {
                string insertClickHouseSql = $"INSERT INTO {tbName} ({GetColumns<T>()}) VALUES @bulk;";
                if (_clickHouseConnection == null)
                {
                    this.CreateConnection();
                }
                var command = _clickHouseConnection.CreateCommand();
                command.CommandText = insertClickHouseSql;
                command.Parameters.Add(new ClickHouseParameter
                {
                    ParameterName = "bulk",
                    Value = List2AList(sourceList)
                });
                command.ExecuteNonQuery();
            }
            catch (Exception e)
            {
                this.Dispose();
                throw;
            }
        }

        #region private

        private List<dynamic[]> List2AList<T>(List<T> sourceList)
        {
            List<dynamic[]> result = new List<dynamic[]>();
            sourceList.ForEach(u =>
            {
                var dic = GetColumnsAndValue(u);
                result.Add(dic.Select(i => i.Value).ToArray());
            });
            return result;
        }

        private string GetColumns<T>()
        {
            try
            {
                var dic = GetColumnsAndValue<T>(default(T));
                return string.Join(",", dic.Select(u => u.Key).ToArray());
            }
            catch (Exception e)
            {
                this.Dispose();
                throw;
            }
        }

        private Dictionary<string, object> GetColumnsAndValue<T>(T u)
        {
            try
            {
                Dictionary<string, object> dic = new Dictionary<string, object>();
                Type t = typeof(T);
                if (u != null)
                {
                    t = u.GetType();
                }
                var columns = t.GetProperties(BindingFlags.Public | BindingFlags.Instance);
                foreach (var item in columns)
                {
                    object v = null;
                    if (u != null)
                    {
                        v = item.GetValue(u);
                    }
                    dic.TryAdd(item.Name, v);
                }

                return dic;
            }
            catch (Exception e)
            {
                this.Dispose();
                throw e;
            }
        }

        private void AttachParameters(ClickHouseParameterCollection parametersCollection, ClickHouseParameter[] parameters)
        {
            foreach (var item in parameters)
            {
                parametersCollection.Add(item);
            }
        }

        /// <summary>
        ///  将IDataReader转换为DataTable
        /// </summary>
        /// <param name="reader"></param>
        /// <returns></returns>
        private static DataTable DataReaderToDataTable(IDataReader reader)
        {
            DataTable objDataTable = new DataTable("Table");
            int intFieldCount = reader.FieldCount;
            for (int intCounter = 0; intCounter < intFieldCount; ++intCounter)
            {
                objDataTable.Columns.Add(reader.GetName(intCounter).ToUpper(), reader.GetFieldType(intCounter));
            }
            objDataTable.BeginLoadData();
            object[] objValues = new object[intFieldCount];
            while (reader.NextResult())
            {
                while (reader.Read())
                {
                    reader.GetValues(objValues);
                    objDataTable.LoadDataRow(objValues, true);
                }
            }
            reader.Close();
            objDataTable.EndLoadData();
            return objDataTable;
        }

        private static T ReaderToModel<T>(IDataReader dr)
        {
            try
            {
                using (dr)
                {
                    if (dr.Read())
                    {
                        List<string> list = new List<string>(dr.FieldCount);
                        for (int i = 0; i < dr.FieldCount; i++)
                        {
                            list.Add(dr.GetName(i).ToLower());
                        }
                        T model = Activator.CreateInstance<T>();
                        foreach (PropertyInfo pi in model.GetType().GetProperties(BindingFlags.GetProperty | BindingFlags.Public | BindingFlags.Instance))
                        {
                            if (list.Contains(pi.Name.ToLower()))
                            {
                                if (!IsNullOrDBNull(dr[pi.Name]))
                                {
                                    pi.SetValue(model, HackType(dr[pi.Name], pi.PropertyType), null);
                                }
                            }
                        }
                        return model;
                    }
                }
                return default(T);
            }
            catch (Exception ex)
            {
                throw ex;
            }
        }

        private static List<T> ReaderToList<T>(IDataReader dr)
        {
            using (dr)
            {
                List<string> field = new List<string>(dr.FieldCount);
                for (int i = 0; i < dr.FieldCount; i++)
                {
                    field.Add(dr.GetName(i).ToLower());
                }
                List<T> list = new List<T>();
                while (dr.NextResult())
                {
                    while (dr.Read())
                    {
                        T model = Activator.CreateInstance<T>();
                        foreach (PropertyInfo property in model.GetType().GetProperties(BindingFlags.GetProperty | BindingFlags.Public | BindingFlags.Instance))
                        {
                            if (field.Contains(property.Name.ToLower()))
                            {
                                if (!IsNullOrDBNull(dr[property.Name]))
                                {
                                    property.SetValue(model, HackType(dr[property.Name], property.PropertyType), null);
                                }
                            }
                        }
                        list.Add(model);
                    }
                }
                return list;
            }
        }

        //这个类对可空类型进行判断转换,要不然会报错
        private static object HackType(object value, Type conversionType)
        {
            if (conversionType.IsGenericType && conversionType.GetGenericTypeDefinition().Equals(typeof(Nullable<>)))
            {
                if (value == null)
                    return null;

                System.ComponentModel.NullableConverter nullableConverter = new System.ComponentModel.NullableConverter(conversionType);
                conversionType = nullableConverter.UnderlyingType;
            }
            return Convert.ChangeType(value, conversionType);
        }

        private static bool IsNullOrDBNull(object obj)
        {
            return ((obj is DBNull) || string.IsNullOrEmpty(obj.ToString())) ? true : false;
        }

        private (string, string) GetCountAndPageSql(string sql, int pageindex, int pagesize)
        {
            string countSql = $"SELECT COUNT(1) count FROM ({sql}) A";
            string pageSql = $"select * from ({sql}) LIMIT {pagesize} OFFSET {(pageindex - 1) * pagesize}";
            return (countSql, pageSql);
        }


        public void Dispose()
        {
            _clickHouseConnection?.Dispose();
            _clickHouseConnection = null;
            GC.Collect();
        }
        #endregion
    }

三、注意事项

1、ClickHouse 数据库对应端口为tcp_port中设置的端口。
2、时区问题。Clickhosue中取出来的时候会多8个小时,之前一度怀疑安装时服务器时区不对,但实际上都是正确的,只能手动将时间通过ToLocalTime转成本地时区。
3、批量插数据。批量插数据的时候如果传入一个List的话,对应的类需要增加GetEnumerator方法。例如:

   /// <summary>
    /// 测试
    /// </summary>
    public class flink_user_2
    {
        public long id { get; set; }

        public string name { get; set; }

        public int age { get; set; }

        public string sex { get; set; }

        public string phone { get; set; }

        /// <summary>
        /// 批量插入必须
        /// </summary>
        /// <returns></returns>
        public IEnumerator GetEnumerator()
        {
            yield return name;
            yield return age;
            yield return sex;
            yield return phone;
        }

    }

四、使用示例

 public class DBContext
    {
        public void getTest()
        {
            ClickHouseHelper helper = new ClickHouseHelper();

            var db = helper.ExecuteList<Models.flink_user_2>("select * from flink_user_2");
            foreach (var item in db)
            {
                Console.WriteLine(Newtonsoft.Json.JsonConvert.SerializeObject(item));
            }

            Console.WriteLine($"总数:{db.Count}");

        }


        public void InserTest()
        {
            ClickHouseHelper click = new ClickHouseHelper();
            List<flink_user_2> flinks = new List<flink_user_2>();
            for (int i = 0; i < 100000000; i++)
            {
                if (flinks.Count > 10000)
                {
                    Console.WriteLine("插入...." + flinks.Count + "条");
                    click.BulkInsert<flink_user_2>(flinks, "flink_user_2");
                    flinks.Clear();
                }

                flinks.Add(new flink_user_2()
                {
                    age = i,
                    id = i,
                    name = "测试" + i,
                    phone = "12345678911",
                    sex = "男"
                });
            }
            Console.WriteLine("开始进行插入....");

            click.BulkInsert<flink_user_2>(flinks, "flink_user_2");

        }
    }

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多