最近对海量数据分析突然来了兴趣,加上之前公司有分库的需求,就想用多线程解决数据查询结果集合并问题。
在海量数据的情况下,简单的表分区已经没有办法满足数据表操作需求,这个时候就需要用到数据库分表或者分库。分表和分库能够极大减轻数据库或者表的压力,但是数据查询的时候就比较麻烦,需要分别取对应表或者数据库的数据,那么有没有办法实现数据库合并呢?多线程查询是一个既能提高执行效率又能获取结果集的方法。
首先创建数据库连接公共类:- package com.threadExecutor;
-
-
- import sun.applet.Main;
-
- import java.sql.*;
- import java.util.ArrayList;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
-
- /**
- * Created by jimmy on 2014/10/26.
- */
- public class NormalJDBC {
- private Connection connection;
- private PreparedStatement ps;
- private ResultSet resultSet;
-
- public Connection getConnection(){
- try {
- Class.forName("com.mysql.jdbc.Driver");
- connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/test","test","test");
- } catch (ClassNotFoundException e) {
- e.printStackTrace();
- } catch (SQLException e) {
- e.printStackTrace();
- }
- return connection;
- }
-
- public List<Map> getQuery(String tableName){
- List<Map> list = new ArrayList<>();
- String sql = "select id,name,salary from "+tableName+"";
- connection = getConnection();
- try {
- ps = connection.prepareCall(sql);
- resultSet = ps.executeQuery();
- while (resultSet.next()){
- Map map = new HashMap<>();
- map.put("id",resultSet.getInt(1));
- map.put("name",resultSet.getString(2));
- map.put("salary",resultSet.getInt(3));
- list.add(map);
- }
- close(connection,ps,resultSet);
- //Thread.sleep(1000);
- } catch (SQLException e) {
- e.printStackTrace();
- close(connection,ps,resultSet);
- } /*catch (InterruptedException e) {
- e.printStackTrace();
- }*/
-
- return list;
- }
-
- public void close(Connection conn,PreparedStatement ps,ResultSet rs){
- try {
- if(ps!=null){
- ps.close();
- ps=null;
- }
- if(rs!=null){
- rs.close();
- rs=null;
- }
- if(conn!=null){
- conn.close();
- conn=null;
- }
- }catch (SQLException e){
- e.printStackTrace();
- }
-
- }
-
- }
然后编写线程处理类,这里使用到的是带返回参数的线程:
- package com.threadExecutor;
-
- import java.util.ArrayList;
- import java.util.List;
- import java.util.concurrent.Callable;
- import java.util.concurrent.CountDownLatch;
-
- /**
- * Created by jimmy on 2014/10/26.
- */
- public class NewJDBCThread implements Callable<List>{
- private List list=new ArrayList<>();
- private String tableName;
- private CountDownLatch latch;//子线程执行完毕之后主线程在执行,现在不适用这方法
- public NewJDBCThread(String tableName,CountDownLatch latch){
- this.tableName = tableName;
- this.latch = latch;
- }
-
- public NewJDBCThread(String tableName){
- this.tableName = tableName;
- }
-
-
- @Override
- public List call() {
- NormalJDBC normalJDBC = new NormalJDBC();
- List list1 = normalJDBC.getQuery(this.tableName);
- System.out.println(Thread.currentThread().getName()+" "+list1.size());
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- //latch.countDown();//
- return list1;
- }
-
- public List getList(){
- return this.list;
- }
- }
主线程:
- package com.threadExecutor;
-
- import java.util.ArrayList;
- import java.util.List;
- import java.util.concurrent.ExecutionException;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- import java.util.concurrent.Future;
-
- /**
- * Created by jimmy on 2014/10/26.
- */
- public class MergeResult {
- public static void main(String[] args) throws InterruptedException, ExecutionException {
- long start = System.currentTimeMillis();
- List list = new ArrayList<>();
- //线程池初始化
- ExecutorService executor=(ExecutorService) Executors.newCachedThreadPool();
- //创建任务列表
- List<NewJDBCThread> threads = new ArrayList<>();
- //初始化任务列表
- for(int i=1;i<5;i++){
- threads.add(new NewJDBCThread("test"+i));
- }
- List<Future<List>> results = null;//接受处理返回的结果集
- //执行线程处理
- results = executor.invokeAll(threads);
- //获取结果集
- for(Future<List> future:results){
- list.addAll((List)future.get());//讲线程返回值转为list并加入到主线程结果集中
- }
- System.out.println("执行时间:"+(System.currentTimeMillis()-start)+" 数据量"+list.size());
- }
- }
经过测试,这里因为线程休眠,如果单线程执行的话,4个线程需要4400多毫秒,数据库执行400多毫秒,但是通过多线程的话只需要1400多毫秒,也就是一条执行的时间。因此是证实多线程执行是符合要求的。
第一次写博客,欢迎拍砖!更加欢迎指点!
|