MapReduce中reduce阶段iterator如何遍历两遍和所遇到的问题

#MapReduce中reduce阶段iterator如何遍历两遍和所遇到的问题

在写MapReduce程序时,遇到了这样一个问题:在 reduce 阶段遍历了一次 Iterable 之后,再次遍历的时候,values.hasNext()值始终为false,即iterator只能单向遍历一次。

此时,我需要对values中的内容进行两次遍历才能得到我想要的结果。

StackOverFlow上错误的解决方法如下:

http://stackoverflow.com/questions/6111248/iterate-twice-on-values

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void reduce(Text host, Iterator<CrawlDatum> values, OutputCollector<Text, CrawlDatum> output, Reporter reporter) throws IOException {

List<CrawlDatum> cache = new LinkedList<CrawlDatum>();
// first loop and caching
while (values.hasNext()) {
CrawlDatum datum = values.next();
doSomethingWithValue();
cache.add(datum);//出错点
}

// second loop
for(IntWritable value:cache) {
doSomethingElseThatCantBeDoneInFirstLoop(value);
}
}

思路:创建一个链表,在第一遍遍历时将所需要再次遍历的内容加入链表,第一次遍历结束后即可得到我们所需要的链表,再对链表进行第二次遍历即可解决。
The Iterator you receive from that Iterable’s iterator() method is special. The values may not all be in memory; Hadoop may be streaming them from disk. They aren’t really backed by a Collection, so it’s nontrivial to allow multiple iterations.

错误原因:

1
2
3
4
5
while (values.hasNext()) {
CrawlDatum datum = values.next();
doSomethingWithValue();
cache.add(datum);//出错点
}

对象重用( objects reuse )
reduce方法的javadoc中已经说明了会出现的问题:

The framework calls this method for each <key, (list of values)> pair in the grouped inputs. Output values must be of the same type as input values. Input keys must not be altered. The framework will reuse the key and value objects that are passed into the reduce, therefore the application should clone the objects they want to keep a copy of.

也就是说虽然reduce方法会反复执行多次,但key和value相关的对象只有两个,reduce会反复重用这两个对象。所以如果要保存key或者value的结果,只能将其中的值取出另存或者重新clone一个对象(例如Text store = new Text(value) 或者 String a = value.toString()),而不能直接赋引用。因为引用从始至终都是指向同一个对象,你如果直接保存它们,那最后它们都指向最后一个输入记录。会影响最终计算结果而出错。

此问题类似于String 是不可变对象但为什么能相加呢?为什么字符串相加不提倡用 String,而用 StringBuilder ?如果你还不清楚这个问题怎么回答,建议你看看这篇《深入理解 String, StringBuffer 与 StringBuilder 的区别》http://my.oschina.net/leejun2005/blog/102377

正确的解决方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public void reduce(Text host, Iterator<CrawlDatum> values, OutputCollector<Text, CrawlDatum> output, Reporter reporter) throws IOException {

List<CrawlDatum> cache = new LinkedList<CrawlDatum>();
// first loop and caching
while (values.hasNext()) {
CrawlDatum datum = values.next();
doSomethingWithValue();
CrawlDatum copy = new CrawlDatum();
copy.set(datum);
cache.add(copy);
}

// second loop
for(IntWritable value:cache) {
doSomethingElseThatCantBeDoneInFirstLoop(value);
}
}