分享

java多线程处理分库分表数据

 WindySky 2018-02-25

最近对海量数据分析突然来了兴趣,加上之前公司有分库的需求,就想用多线程解决数据查询结果集合并问题。

在海量数据的情况下,简单的表分区已经没有办法满足数据表操作需求,这个时候就需要用到数据库分表或者分库。分表和分库能够极大减轻数据库或者表的压力,但是数据查询的时候就比较麻烦,需要分别取对应表或者数据库的数据,那么有没有办法实现数据库合并呢?多线程查询是一个既能提高执行效率又能获取结果集的方法。
首先创建数据库连接公共类:
  1. package com.threadExecutor;  
  2.   
  3.   
  4. import sun.applet.Main;  
  5.   
  6. import java.sql.*;  
  7. import java.util.ArrayList;  
  8. import java.util.HashMap;  
  9. import java.util.List;  
  10. import java.util.Map;  
  11.   
  12. /** 
  13. * Created by jimmy on 2014/10/26. 
  14. */  
  15. public class NormalJDBC {  
  16.     private Connection connection;  
  17.     private PreparedStatement ps;  
  18.     private ResultSet resultSet;  
  19.   
  20.     public Connection getConnection(){  
  21.         try {  
  22.             Class.forName("com.mysql.jdbc.Driver");  
  23.             connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/test","test","test");  
  24.         } catch (ClassNotFoundException e) {  
  25.             e.printStackTrace();  
  26.         } catch (SQLException e) {  
  27.             e.printStackTrace();  
  28.         }  
  29.         return connection;  
  30.     }  
  31.   
  32.     public List<Map> getQuery(String tableName){  
  33.         List<Map> list = new ArrayList<>();  
  34.         String sql = "select id,name,salary from "+tableName+"";  
  35.         connection = getConnection();  
  36.         try {  
  37.             ps = connection.prepareCall(sql);  
  38.             resultSet = ps.executeQuery();  
  39.             while (resultSet.next()){  
  40.                 Map map = new HashMap<>();  
  41.                 map.put("id",resultSet.getInt(1));  
  42.                 map.put("name",resultSet.getString(2));  
  43.                 map.put("salary",resultSet.getInt(3));  
  44.                 list.add(map);  
  45.             }  
  46.             close(connection,ps,resultSet);  
  47.             //Thread.sleep(1000);  
  48.         } catch (SQLException e) {  
  49.             e.printStackTrace();  
  50.             close(connection,ps,resultSet);  
  51.         } /*catch (InterruptedException e) { 
  52.             e.printStackTrace(); 
  53.         }*/  
  54.   
  55.         return list;  
  56.     }  
  57.   
  58.     public void close(Connection conn,PreparedStatement ps,ResultSet rs){  
  59.         try {  
  60.             if(ps!=null){  
  61.                 ps.close();  
  62.                 ps=null;  
  63.             }  
  64.             if(rs!=null){  
  65.                 rs.close();  
  66.                 rs=null;  
  67.             }  
  68.             if(conn!=null){  
  69.                 conn.close();  
  70.                 conn=null;  
  71.             }  
  72.         }catch (SQLException e){  
  73.             e.printStackTrace();  
  74.         }  
  75.   
  76.     }  
  77.   
  78. }  
然后编写线程处理类,这里使用到的是带返回参数的线程:
  1. package com.threadExecutor;  
  2.   
  3. import java.util.ArrayList;  
  4. import java.util.List;  
  5. import java.util.concurrent.Callable;  
  6. import java.util.concurrent.CountDownLatch;  
  7.   
  8. /** 
  9. * Created by jimmy on 2014/10/26. 
  10. */  
  11. public class NewJDBCThread implements Callable<List>{  
  12.     private List list=new ArrayList<>();  
  13.     private String tableName;  
  14.     private CountDownLatch latch;//子线程执行完毕之后主线程在执行,现在不适用这方法  
  15.     public NewJDBCThread(String tableName,CountDownLatch latch){  
  16.         this.tableName = tableName;  
  17.         this.latch = latch;  
  18.     }  
  19.   
  20.     public NewJDBCThread(String tableName){  
  21.         this.tableName = tableName;  
  22.     }  
  23.   
  24.   
  25.     @Override  
  26.     public List call() {  
  27.         NormalJDBC normalJDBC = new NormalJDBC();  
  28.         List list1 = normalJDBC.getQuery(this.tableName);  
  29.         System.out.println(Thread.currentThread().getName()+"   "+list1.size());  
  30.         try {  
  31.             Thread.sleep(1000);  
  32.         } catch (InterruptedException e) {  
  33.             e.printStackTrace();  
  34.         }  
  35.         //latch.countDown();//  
  36.         return list1;  
  37.     }  
  38.   
  39.     public List getList(){  
  40.         return this.list;  
  41.     }  
  42. }  
主线程:
  1. package com.threadExecutor;  
  2.   
  3. import java.util.ArrayList;  
  4. import java.util.List;  
  5. import java.util.concurrent.ExecutionException;  
  6. import java.util.concurrent.ExecutorService;  
  7. import java.util.concurrent.Executors;  
  8. import java.util.concurrent.Future;  
  9.   
  10. /** 
  11. * Created by jimmy on 2014/10/26. 
  12. */  
  13. public class MergeResult {  
  14.     public static void main(String[] args) throws InterruptedException, ExecutionException {  
  15.         long start = System.currentTimeMillis();  
  16.         List list = new ArrayList<>();  
  17.         //线程池初始化  
  18.         ExecutorService executor=(ExecutorService) Executors.newCachedThreadPool();  
  19.         //创建任务列表  
  20.         List<NewJDBCThread> threads = new ArrayList<>();  
  21.         //初始化任务列表  
  22.         for(int i=1;i<5;i++){  
  23.             threads.add(new NewJDBCThread("test"+i));  
  24.         }  
  25.         List<Future<List>> results = null;//接受处理返回的结果集  
  26.         //执行线程处理  
  27.         results = executor.invokeAll(threads);  
  28.         //获取结果集  
  29.         for(Future<List> future:results){  
  30.             list.addAll((List)future.get());//讲线程返回值转为list并加入到主线程结果集中  
  31.         }  
  32.         System.out.println("执行时间:"+(System.currentTimeMillis()-start)+"   数据量"+list.size());  
  33.     }  
  34. }  

经过测试,这里因为线程休眠,如果单线程执行的话,4个线程需要4400多毫秒,数据库执行400多毫秒,但是通过多线程的话只需要1400多毫秒,也就是一条执行的时间。因此是证实多线程执行是符合要求的。
第一次写博客,欢迎拍砖!更加欢迎指点!


    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多