Nutch学习记录:Generator

#Nutch学习记录:Generator

在Inject过后,程序返回到crawl的main方法中继续执行,接下来进入到一个循环,循环的终止条件是达到指定的爬行深度。在循环中依次进行generate和fetch两个操作,每次循环产生一个segment,位于crawl/segments下,以generate方法调用的时间作为这次循环产生目录的名称。首先分析Generate操作,generate操作有三步MapReduce过程

Generator类位于org.apache.nutch.crawl中,其中的核心方法是public Path generate(Path dbDir, Path segments, int numLists, long topN, long curTime)方法,作用是根据链接得分,产生包含topN个链接的待爬行队列fethclist,并更新网页数据库。crawl正是调用了这个方法进入Generate阶段。其中各个参数的含义是:

Path dbDir : crawldb的路径

Path segments : segments的路径

int numLists : Reduce任务的数量

long topN :用户指定的TopN数量,即每轮爬行选择前TopN个URL加入到fetchlist中

long curTime : 调用generate方法的时间

上述方法代码如下:

1
2
3
4
5
6
public Path generate(Path dbDir, Path segments, int numLists, long topN, long curTime) throws IOException {   

JobConf job = new NutchJob(getConf());
boolean filter = job.getBoolean(CRAWL_GENERATE_FILTER, true);
return generate(dbDir, segments, numLists, topN, curTime, filter, false);
}

它通过配置文件读取是否要进行URL过滤,默认为过滤。然后调用generate(dbDir, segments, numLists, topN, curTime, filter, false)方法,这个方法是真正进行generate的地方,它产生了一个位于系统临时目录下的目录,命名为tempDir。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public Path generate(Path dbDir, Path segments, int numLists, long topN, long curTime, boolean filter, boolean force) throws IOException {   
... ...
JobConf job = new NutchJob(getConf());
job.setJobName("generate: select " + segment);

if (numLists == -1) { // for politeness make
numLists = job.getNumMapTasks(); // a partition per fetch task
// 获取Reduce任务的数量,默认是1
}
if ("local".equals(job.get("mapred.job.tracker")) && numLists != 1) { // 如果是local模式,则将reduce的数量置1
// override
LOG.info("Generator: jobtracker is 'local', generating exactly one partition.");
numLists = 1;
}
... ...
FileInputFormat.addInputPath(job, new Path(dbDir, CrawlDb.CURRENT_NAME)); // crawl/crawldb/current
job.setInputFormat(SequenceFileInputFormat.class);

job.setMapperClass(Selector.class);
job.setPartitionerClass(Selector.class);
job.setReducerClass(Selector.class);

FileOutputFormat.setOutputPath(job, tempDir);
job.setOutputFormat(SequenceFileOutputFormat.class);
job.setOutputKeyClass(FloatWritable.class);
job.setOutputKeyComparatorClass(DecreasingFloatComparator.class);
job.setOutputValueClass(SelectorEntry.class);
try {
JobClient.runJob(job);
} catch (IOException e) {
LockUtil.removeLockFile(fs, lock);
throw e;
}

程序首先对几个路径进行配置,然后获取Reducer的数量,如果hadoop当前的模式为local,则将Reducer的数量numLists设为1。

第1次MapReduce

程序接下来将对第1个MapReduce任务进行配置。输入路径InputPath为当前的网页数据库(crawl/crawldb/current),输出路径为临时目录tempDir。Mapper、Partitioner、Reducer类都是Selector类。

Mapper阶段

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public void map(Text key, CrawlDatum value, OutputCollector<FloatWritable, SelectorEntry> output, Reporter reporter)   
throws IOException {
Text url = key;
if (filter) {
// If filtering is on don't generate URLs that don't pass URLFilters
try {
if (filters.filter(url.toString()) == null) // 过滤URL
return;
} catch (URLFilterException e) {
if (LOG.isWarnEnabled()) {
LOG.warn("Couldn't filter url: " + url + " (" + e.getMessage() + ")");
}
}
}
CrawlDatum crawlDatum = value;

// check fetch schedule
if (!schedule.shouldFetch(url, crawlDatum, curTime)) { // 默认情况下调用DefaultFetchSchedule继承的shouldFetch方法,
// 根据crawlDatum的时间和当前时间相比,如果比当前时间更新,则
// 可以考虑加入fetch list
// "当前时间"可以进行配置,不一定就是实际中的当前时间
LOG.debug("-shouldFetch rejected '" + url+ "', fetchTime=" + crawlDatum.getFetchTime() + ", curTime=" + curTime);
return;
}

LongWritable oldGenTime = (LongWritable)crawlDatum.getMetaData().get(Nutch.WRITABLE_GENERATE_TIME_KEY);
if (oldGenTime != null) { // awaiting fetch & update
if (oldGenTime.get() + genDelay > curTime) // still wait for update
return;
}
float sort = 1.0f;
try {
sort = scfilters.generatorSortValue((Text)key, crawlDatum, sort); // 根据当前使用的ScoringFilter计算该URL在Generate阶段的得分
} catch (ScoringFilterException sfe) {
if (LOG.isWarnEnabled()) {
LOG.warn("Couldn't filter generatorSortValue for " + key + ": " + sfe);
}
}
// sort by decreasing score, using DecreasingFloatComparator
sortValue.set(sort);
// record generation time
crawlDatum.getMetaData().put(Nutch.WRITABLE_GENERATE_TIME_KEY, genTime);
entry.datum = crawlDatum;
entry.url = (Text)key;
output.collect(sortValue, entry); // invert for sort by score
}

map首先对输入的URL进行过滤,如果被过滤掉则返回,否则继续执行。接下来,调用shouldFetch方法通过对比当前时间和数据库中该链接的fetchtime属性的间隔来判断是否考虑抓取该链接,如果不到应该的时间间隔则不怕,但是如果到了,也不一定就抓取,还要看后面的处理。接下来通过调用ScoringFilter的generatorSortValue方法判断在Generate阶段的得分。并将这个得分作为键收集将datum和url封装进SelectorEntry类型的entry对象中,并将entry作为值收集。第1次Map后的结果就是<sortValue, entry>链接得分和它的信息对应的键值对。

Partitioner阶段

1
2
3
public int getPartition(FloatWritable key, Writable value, int numReduceTasks) {   
return hostPartitioner.getPartition(((SelectorEntry)value).url, key, numReduceTasks);
}

它根据url代表的主机名进行分区,将同一个主机上的URL交给同一个Reducer处理,这样体现了对该站点的礼貌性(politeness)。

Reducer阶段

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public void reduce(FloatWritable key, Iterator<SelectorEntry> values, OutputCollector<FloatWritable, SelectorEntry> output, Reporter reporter)   // key是每个链接的得分,values是SelectorEntry类型的得分对应的所有链接   
throws IOException {

while (values.hasNext() && count < limit) {
SelectorEntry entry = values.next();
Text url = entry.url;
String urlString = url.toString();
URL u = null;

// skip bad urls, including empty and null urls
try {
u = new URL(url.toString());
} catch (MalformedURLException e) {
LOG.info("Bad protocol in url: " + url.toString());
continue;
}

String host = u.getHost();
host = host.toLowerCase();
String hostname = host;
... ...
try {
urlString = normalizers.normalize(urlString, URLNormalizers.SCOPE_GENERATE_HOST_COUNT);
host = new URL(urlString).getHost();
} catch (Exception e) {
LOG.warn("Malformed URL: '" + urlString + "', skipping (" + StringUtils.stringifyException(e) + ")");
continue;
}

// only filter if we are counting hosts
// 对同一个host,产生url的最大个数,通过配置文件获得,如果为-1,则没有限// 制
if (maxPerHost > 0) {

IntWritable hostCount = hostCounts.get(host);
if (hostCount == null) {
hostCount = new IntWritable();
hostCounts.put(host, hostCount);
}
// increment hostCount
hostCount.set(hostCount.get() + 1);
// skip URL if above the limit per host.
if (hostCount.get() > maxPerHost) {
if (hostCount.get() == maxPerHost + 1) {
// remember the raw hostname that is maxed out
maxedHosts.add(hostname);
if (LOG.isInfoEnabled()) {
LOG.info("Host " + host + " has more than " + maxPerHost + " URLs." + " Skipping additional.");
}
}
continue;
}
}
output.collect(key, entry);
// Count is incremented only when we keep the URL
// maxPerHost may cause us to skip it.
count++;
}
}

limit是每个Reducer最大需要处理的链接的数量,由limit = job.getLong(CRAWL_TOP_N,Long.MAX_VALUE)/job.getNumReduceTasks()得到,也就是说,是由topN/Reducer的数量决定的。只要每个Reducer没有达到这个最大限度,就从输入的value中取出链接。如果用户配置了generate.max.per.host属性并设置为正值,则会限制同一个host中产生的链接数。程序会过滤掉超过指定数目的属于同一主机的URL。reduce方法最终收集通过过滤,且符合数量要求的<sortScore, selectorEntry>键值对。
由于每次收集的键值对的数量是受limit限制的,而且reduce输入的value又是根据链接的得分值从高到低排序的,所以当达到limit的限制时,低分的url就被略掉了。

第1次MapReduce任务的输出还是以<得分,链接信息>的形式保存的。

第2次MapReduce

在进行第1次MapReduce后,程序返回generate方法中,并进行第2次MapReduce:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
... ...   
job = new NutchJob(getConf());
job.setJobName("generate: partition " + segment);
job.setInt("partition.url.by.host.seed", new Random().nextInt());

FileInputFormat.addInputPath(job, tempDir);
job.setInputFormat(SequenceFileInputFormat.class);

job.setMapperClass(SelectorInverseMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(SelectorEntry.class);
job.setPartitionerClass(PartitionUrlByHost.class);
job.setReducerClass(PartitionReducer.class);
job.setNumReduceTasks(numLists);

// output是crawl/segments/yyyyMMddHHmmss/crawl_generate
FileOutputFormat.setOutputPath(job, output);
job.setOutputFormat(SequenceFileOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(CrawlDatum.class);
job.setOutputKeyComparatorClass(HashComparator.class);

此次MapReduce的输入是第一次MapReduce输出到的临时文件,输出路径是segment/目录下的crawl_generate。

Mapper阶段

Mapper类是SelectorInverseMapper

1
2
3
4
5
6
7
public static class SelectorInverseMapper extends MapReduceBase implements Mapper<FloatWritable, SelectorEntry, Text, SelectorEntry> {   

public void map(FloatWritable key, SelectorEntry value, OutputCollector<Text, SelectorEntry> output, Reporter reporter) throws IOException {
SelectorEntry entry = (SelectorEntry)value;
output.collect(entry.url, entry);
}
}

map方法的作用是将第一次MapReduce输出的以<链接得分,链接信息>为形式的文件转变为<url, 链接信息>形式的文件,以URL作为键值,以便Reducer进行处理。

Partitioner以url对应主机名的哈希值作为分发到Reducer的依据

Reducer阶段

Reducer采用PartitionReducer类

1
2
3
4
5
6
7
8
9
10
public static class PartitionReducer extends MapReduceBase   
implements Reducer<Text, SelectorEntry, Text, CrawlDatum> {

public void reduce(Text key, Iterator<SelectorEntry> values, OutputCollector<Text, CrawlDatum> output, Reporter reporter) throws IOException {
while (values.hasNext()) {
SelectorEntry entry = values.next();
output.collect(entry.url, entry.datum);
}
}
}

Reducer的作用也十分简单,就是将Mapper的输出进一步转化,形成crawlDb中数据的保存形式<url, crawldatum>。之所以将<sortScore, selectorEntry>到<url, crawldatum>的转换分两步进行而不全都放在Map阶段是因为转化需要将同一个主机上的链接交给同一个Reducer进行处理。

crawl_generate中包含的链接就是待爬行队列fetch list。

第3次MapReduce

在第二次MapReduce执行后,紧接着进行第三次MapReduce任务,这次的目的是更新tempDir中的generate时间信息,并输出到tempDir2中

1
2
3
4
5
6
7
8
9
10
11
12
13
Path tempDir2 = new Path(getConf().get("mapred.temp.dir", ".") + "/generate-temp-"+ System.currentTimeMillis());   
job = new NutchJob(getConf());
job.setJobName("generate: updatedb " + dbDir);
job.setLong(Nutch.GENERATE_TIME_KEY, generateTime);
FileInputFormat.addInputPath(job, tempDir);
FileInputFormat.addInputPath(job, new Path(dbDir, CrawlDb.CURRENT_NAME)); // 两个InputPath
job.setInputFormat(SequenceFileInputFormat.class);
job.setMapperClass(CrawlDbUpdater.class);
job.setReducerClass(CrawlDbUpdater.class);
job.setOutputFormat(MapFileOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(CrawlDatum.class);
FileOutputFormat.setOutputPath(job, tempDir2);

此次MapReduce以第一次MapReduce的输出tmpDir(里面存放着<sortScore,selectorEntry>形式的信息)和当前的网页数据库crawl/crawldb/current两个路径为输入,Mapper和Reducer都在CrawlDbUpdater中。

Mapper阶段

1
2
3
4
5
6
7
8
public void map(WritableComparable key, Writable value, OutputCollector<Text, CrawlDatum> output, Reporter reporter) throws IOException {   
if (key instanceof FloatWritable) { // tempDir source key是得分,value是该得分对应的所有selector entry
SelectorEntry se = (SelectorEntry)value;
output.collect(se.url, se.datum);
} else {
output.collect((Text)key, (CrawlDatum)value); // key是URL,value是datum
}
}

它将两个输入路径中的不同数据形式统一为<url, crawldatum>的形式。

Reducer阶段

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public void reduce(Text key, Iterator<CrawlDatum> values, OutputCollector<Text, CrawlDatum> output, Reporter reporter) throws IOException {   
while (values.hasNext()) {
CrawlDatum val = values.next();
if (val.getMetaData().containsKey(Nutch.WRITABLE_GENERATE_TIME_KEY)) {
LongWritable gt = (LongWritable)val.getMetaData().get(Nutch.WRITABLE_GENERATE_TIME_KEY);
genTime.set(gt.get());
if (genTime.get() != generateTime) {
orig.set(val);
genTime.set(0L);
continue;
} else {
orig.set(val);
}
}
if (genTime.get() != 0L) {
orig.getMetaData().put(Nutch.WRITABLE_GENERATE_TIME_KEY, genTime);
}
output.collect(key, orig);
}

Reduce任务的功能是将输入的<url, datum>中的datum的generate时间进行更新后,输出<url, datum>,此时的输出目录tempDir2就是最新的网页数据库。

在第三次MapReduce执行后,generate方法将tempDir2命名为当前的网页数据库,删除旧的crawlDb,返回segment对象。Generate阶段执行完毕。

这一阶段的数据流程图如下: