分享

多线程map(转载)

 hehffyy 2017-01-03

概述

  • 作用
  • 应用场景
  • 效果
  • 代码示例
作用
  • 想在 map 或 reduce 里启动多线程
应用场景
  • map reduce 端 cpu 型作业
效果
  • 公司原来一个 cpu 型算法作业,20 台服务器 以前运行需 55 个小时左右,改用我自己改进的多线程后,同样的前提条件,只须运行 7 小时
代码示例
  • 多线程map 示例代码
  • 自带的 多线程map 源代码
public class MultithreadedMapper<K1, V1, K2, V2> 
  extends Mapper<K1, V1, K2, V2> {

  private static final Log LOG = LogFactory.getLog(MultithreadedMapper.class);
  private Class<? extends Mapper<K1,V1,K2,V2>> mapClass;
  private Context outer;
  private List<MapRunner> runners;

  /**
   * The number of threads in the thread pool that will run the map function.
   * @param job the job
   * @return the number of threads
   */
  public static int getNumberOfThreads(JobContext job) {
    return job.getConfiguration().
            getInt("mapred.map.multithreadedrunner.threads", 10);
  }

  /**
   * Set the number of threads in the pool for running maps.
   * @param job the job to modify
   * @param threads the new number of threads
   */
  public static void setNumberOfThreads(Job job, int threads) {
    job.getConfiguration().setInt("mapred.map.multithreadedrunner.threads", 
                                  threads);
  }

  /**
   * Get the application's mapper class.
   * @param <K1> the map's input key type
   * @param <V1> the map's input value type
   * @param <K2> the map's output key type
   * @param <V2> the map's output value type
   * @param job the job
   * @return the mapper class to run
   */
  @SuppressWarnings("unchecked")
  public static <K1,V1,K2,V2>
  Class<Mapper<K1,V1,K2,V2>> getMapperClass(JobContext job) {
    return (Class<Mapper<K1,V1,K2,V2>>) 
         job.getConfiguration().getClass("mapred.map.multithreadedrunner.class",
                                         Mapper.class);
  }
  
  /**
   * Set the application's mapper class.
   * @param <K1> the map input key type
   * @param <V1> the map input value type
   * @param <K2> the map output key type
   * @param <V2> the map output value type
   * @param job the job to modify
   * @param cls the class to use as the mapper
   */
  public static <K1,V1,K2,V2> 
  void setMapperClass(Job job, 
                      Class<? extends Mapper<K1,V1,K2,V2>> cls) {
    if (MultithreadedMapper.class.isAssignableFrom(cls)) {
      throw new IllegalArgumentException("Can't have recursive " + 
                                         "MultithreadedMapper instances.");
    }
    job.getConfiguration().setClass("mapred.map.multithreadedrunner.class",
                                    cls, Mapper.class);
  }

  /**
   * Run the application's maps using a thread pool.
   */
  @Override
  public void run(Context context) throws IOException, InterruptedException {
    outer = context;
    int numberOfThreads = getNumberOfThreads(context);
    mapClass = getMapperClass(context);
    if (LOG.isDebugEnabled()) {
      LOG.debug("Configuring multithread runner to use " + numberOfThreads + 
                " threads");
    }
    
    runners =  new ArrayList<MapRunner>(numberOfThreads);
    for(int i=0; i < numberOfThreads; ++i) {
      MapRunner thread = new MapRunner(context);
      thread.start();
      runners.add(i, thread);
    }
    for(int i=0; i < numberOfThreads; ++i) {
      MapRunner thread = runners.get(i);
      thread.join();
      Throwable th = thread.throwable;
      if (th != null) {
        if (th instanceof IOException) {
          throw (IOException) th;
        } else if (th instanceof InterruptedException) {
          throw (InterruptedException) th;
        } else {
          throw new RuntimeException(th);
        }
      }
    }
  }

  private class SubMapRecordReader extends RecordReader<K1,V1> {
    private K1 key;
    private V1 value;
    private Configuration conf;

    @Override
    public void close() throws IOException {
    }

    @Override
    public float getProgress() throws IOException, InterruptedException {
      return 0;
    }

    @Override
    public void initialize(InputSplit split, 
                           TaskAttemptContext context
                           ) throws IOException, InterruptedException {
      conf = context.getConfiguration();
    }


    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
      synchronized (outer) {
        if (!outer.nextKeyValue()) {
          return false;
        }
        key = ReflectionUtils.copy(outer.getConfiguration(),
                                   outer.getCurrentKey(), key);
        value = ReflectionUtils.copy(conf, outer.getCurrentValue(), value);
        return true;
      }
    }

    public K1 getCurrentKey() {
      return key;
    }

    @Override
    public V1 getCurrentValue() {
      return value;
    }
  }
  
  private class SubMapRecordWriter extends RecordWriter<K2,V2> {

    @Override
    public void close(TaskAttemptContext context) throws IOException,
                                                 InterruptedException {
    }

    @Override
    public void write(K2 key, V2 value) throws IOException,
                                               InterruptedException {
      synchronized (outer) {
        outer.write(key, value);
      }
    }  
  }

  private class SubMapStatusReporter extends StatusReporter {

    @Override
    public Counter getCounter(Enum<?> name) {
      return outer.getCounter(name);
    }

    @Override
    public Counter getCounter(String group, String name) {
      return outer.getCounter(group, name);
    }

    @Override
    public void progress() {
      outer.progress();
    }

    @Override
    public void setStatus(String status) {
      outer.setStatus(status);
    }
    
  }

  private class MapRunner extends Thread {
    private Mapper<K1,V1,K2,V2> mapper;
    private Context subcontext;
    private Throwable throwable;

    MapRunner(Context context) throws IOException, InterruptedException {
      mapper = ReflectionUtils.newInstance(mapClass, 
                                           context.getConfiguration());
      subcontext = new Context(outer.getConfiguration(), 
                            outer.getTaskAttemptID(),
                            new SubMapRecordReader(),
                            new SubMapRecordWriter(), 
                            context.getOutputCommitter(),
                            new SubMapStatusReporter(),
                            outer.getInputSplit());
    }

    public Throwable getThrowable() {
      return throwable;
    }

    @Override
    public void run() {
      try {
        mapper.run(subcontext);
      } catch (Throwable ie) {
        throwable = ie;
      }
    }
  }
}

  • 我改进后的多线程map
public class DemoMultithreadedMapper extends Mapper<LongWritable, CookieInterestInfoWritableWithDolphinAlgorithm, LongWritable, AllCookieInterestInfoWritableInternal> {
private static final Log LOG = LogFactory.getLog(DemoMultithreadedMapper.class);
private static HashMap<Long, CookieInterestInfoWritableWithDolphinAlgorithm>[] cookies = new HashMap[12];
static {
for (int i = 0; i < cookies.length; i++) {
cookies[i] = new HashMap<Long, CookieInterestInfoWritableWithDolphinAlgorithm>();

}
}
private Context outer;
private List<MapRunner> runners;
protected void setup(Context context) throws IOException,InterruptedException {
@Override
public void run(Context context) throws IOException, InterruptedException {
// load model
setup(context);
// add key value to cookies
int ord = 0;
while (context.nextKeyValue()) {
// full fill this 13 tables
// add key value to pools
cookies[ord].put(context.getCurrentKey().get(),new CookieInterestInfoWritableWithDolphinAlgorithm(context.getCurrentValue().getLocation(), context.getCurrentValue()
.getStaticStore(), topicValueOut, context
.getCurrentValue().getDayNums(), context
.getCurrentValue().getDomainIds(), context.getCurrentValue().getTypeStrings(), context.getCurrentValue().getTime(), context.getCurrentValue().getImportance()));
}
// thread start
outer = context;
int numberOfThreads = cookies.length;
if (LOG.isDebugEnabled()) {
LOG.debug("Configuring multithread runner to use "
+ numberOfThreads + " threads");
}

runners = new ArrayList<MapRunner>(cookies.length);
for (int i = 0; i < cookies.length; i++) {
MapRunner thread = new MapRunner(context, i);
thread.start();
runners.add(i, thread);
}
for (int i = 0; i < cookies.length; ++i) {
MapRunner thread = runners.get(i);
thread.join();
Throwable th = thread.throwable;
if (th != null) {
if (th instanceof IOException) {
throw (IOException) th;
} else if (th instanceof InterruptedException) {
throw (InterruptedException) th;
} else {
throw new RuntimeException(th);
}
}
}

}

private class SubMapRecordReader extends
RecordReader<LongWritable, CookieInterestInfoWritableWithDolphinAlgorithm> {
private LongWritable key;
private CookieInterestInfoWritableWithDolphinAlgorithm value;
private Configuration conf;

@Override
public void close() throws IOException {
}

@Override
public float getProgress() throws IOException, InterruptedException {
return 0;
}

@Override
public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
conf = context.getConfiguration();
}

@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
synchronized (outer) {
if (!outer.nextKeyValue()) {
return false;
}
key = ReflectionUtils.copy(outer.getConfiguration(),
outer.getCurrentKey(), key);
value = ReflectionUtils.copy(conf, outer.getCurrentValue(),
value);
return true;
}
}

public LongWritable getCurrentKey() {
return key;
}

@Override
public CookieInterestInfoWritableWithDolphinAlgorithm getCurrentValue() {
return value;
}
}

private class SubMapRecordWriter extends
RecordWriter<LongWritable, AllCookieInterestInfoWritableInternal> {

@Override
public void close(TaskAttemptContext context) throws IOException,
InterruptedException {
}

@Override
public void write(LongWritable key,
AllCookieInterestInfoWritableInternal value)
throws IOException, InterruptedException {
synchronized (outer) {
outer.write(key, value);
}
}
}

private class SubMapStatusReporter extends StatusReporter {

@Override
public Counter getCounter(Enum<?> name) {
return outer.getCounter(name);
}

@Override
public Counter getCounter(String group, String name) {
return outer.getCounter(group, name);
}

@Override
public void progress() {
outer.progress();
}

@Override
public void setStatus(String status) {
outer.setStatus(status);
}

}

private class MapRunner extends Thread {

private Context subcontext;
private Throwable throwable;
private int id;

MapRunner(Context context, int idx) throws IOException,
InterruptedException {
subcontext = new Context(outer.getConfiguration(),
outer.getTaskAttemptID(), new SubMapRecordReader(),
new SubMapRecordWriter(), context.getOutputCommitter(),
new SubMapStatusReporter(), outer.getInputSplit());
this.id = idx;
}

@Override
public void run() {
try {
HashMap<Long, CookieInterestInfoWritableWithDolphinAlgorithm> map = cookies[id];
for (Entry<Long, CookieInterestInfoWritableWithDolphinAlgorithm> entry : map
.entrySet()) {
Long key = entry.getKey();
CookieInterestInfoWritableWithDolphinAlgorithm value = entry.getValue();
// prepare data for zhengyiyuan
// do something
subcontext.write(new LongWritable(key), valueOut);
}
} catch (Throwable ie) {
throwable = ie;
}
}
}

}

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多