Nutch学习记录:Injector

#Nutch学习记录:Injector

因为nutch是建立在Hadoop架构之上的,在以后的程序代码中使用了MapReduce算法和HDFS(分布式文件系统),所以建议先把这些方面基本的知识看一下,这样在看到nutch代码中一些MapReduce的配置操作时也能理解什么意思。

我们看到在Crawl类的处理代码中,最先执行了下面的一句:

1
injector.inject(crawlDb, rootUrlDir);

这一句调用的就是Injector类的inject()函数,目的是初始化抓取工作。Nutch的爬行初始的url来源主要有两个地方,首先是urls/urls.txt这个文件,我们可以在里面指定一些url;另一个是Nutch的爬行数据库crawldb(当然第一次爬行是没有的,第一次必须在utls.txt中指定url)。每次爬行的初始化工作就是将这两个地方的url合并。

Injector类做的事情主要归纳如下:

  • 对urls/urls.txt中的初始url进行规格化和过滤,并将结果存入临时目录下。
  • 将上述结果与老的数据库(crawldb/current)合并,产生一个新的爬行数据库,并替换老的数据库current。

第一次MapReduce任务

我们从inject()这个入口函数看起,看代码时忽略一些提示信息的输出等无关紧要的代码,只看一些重点部分。首先建立一个临时目录tempDir,用于第一次MapReduce任务的输出目录。接下来建立了第一个MapReduce任务,并做了相应配置如下:

1
2
3
4
5
6
7
8
9
10
JobConf sortJob = new NutchJob(getConf());  // 建立一个MapReduce任务sortJob
sortJob.setJobName("inject " + urlDir);
FileInputFormat.addInputPath(sortJob, urlDir); // 为该任务的指定输入文件的路径
sortJob.setMapperClass(InjectMapper.class); // 指定Mapper类
FileOutputFormat.setOutputPath(sortJob, tempDir); // 任务的输出路径为刚才建立的临时目录
sortJob.setOutputFormat(SequenceFileOutputFormat.class); // 指定输出格式
sortJob.setOutputKeyClass(Text.class); // 指定输出的键的类型
sortJob.setOutputValueClass(CrawlDatum.class); // 指定输出的值的类型
sortJob.setLong("injector.current.time", System.currentTimeMillis());
JobClient.runJob(sortJob); // 启动任务

以上主要是MapReduce配置方面的知识,重申一遍:应该先找这方面的文章大体了解一下。MapReduce的核心思想是:把整个任务流程分为Map阶段和Reduce阶段,分别由Mapper的具体实现(如这里是InjectMapper类)和Reducer的具体实现来执行相关操作。操作分别定义在Mapper的map()函数和Reducer的reduce()函数中。其中map()函数接受一个key/value(键/值对)值作为输入,然后产生一个中间的key/value值的集合。之后MapReduce框架自动把具有相同key的key/value值的value几种到一起,交给reduce()函数处理。

MapReduce任务的主要配置如下:
FileInputFormat.addInputPath(任务, 路径);用于指定输入路径,比如上面指定的输入路径就是urls/,里面有我们事先建立的urls.txt文件,Hadoop框架会自动读取其中的内容分配给各个Mapper。
FileOutputFormat.setOutputPath(任务, 路径);指定任务的输出路径
任务.setMapperClass(类名);指定Mapper,我们自定义一个类,继承Mapper接口并、实现其中的map()方法后,就可以作为一个Mapper。
任务.setReducerClass(类名);指定Reducer,我们自定义一个类,继承Reducer接口并、实现其中的reduce()方法后,就可以作为一个Reducer。(Mapper和Reducer并不是必须指定的,就像上面这个任务就只有Mapper,没有Reducer)
任务.setOutputKeyClass(类型);指定输出key/value对的key的类型,上面类型Text是Hadoop中定义的类,存放的是url。
任务.setOutputValueClass(类型);指定输出key/value对的value的类型,上面类型CrawlDatum是Nutch定义的类,用于存放url的状态信息。

配置MapReduce后,通过JobClient.runJob()方法启动任务,之后任务的处理便有Mapper和Reducer来依次执行。我们先看Mapper——InjectMapper类。

InjectMapper

在查看map()函数之前,我们先来看一下configure()函数,这个函数用于获取一些后面用到的配置信息。这些配置信息是由Hadoop的Configuration类加载并传递过来的,加载的文件除了Hadoop本身的一些必须配置之外,与Nutch有关的配置主要存放在nutch-site.xml,nutch-default.xml,nutch-tool.xml这三个文件,获取的参数可以在其中查找,里面有关于该参数的详细描述。(这方面前一篇文章有提到)

1
2
3
4
5
6
7
8
9
10
public void configure(JobConf job) {
this.jobConf = job;
urlNormalizers = new
URLNormalizers(job,URLNormalizers.SCOPE_INJECT); // 得到url规格化器,使url的表达正规化,操作依据在conf目录下的相关配置文件中
interval = jobConf.getInt("db.fetch.interval.default", 2592000); // 获取参数:抓取时间间隔,默认30天
filters = new URLFilters(jobConf); // 获取url过滤器,比如过滤掉ftp:开头的url、以.gif结尾的url等等,具体规则见相关配置文件
scfilters = new ScoringFilters(jobConf); // 得到分数过滤器,用于抓取过程中url的得分计算
scoreInjected = jobConf.getFloat("db.score.injected", 1.0f); // 获取参数:新加入的url的初始得分
curTime = job.getLong("injector.current.time", System.currentTimeMillis());
}

接下来进入map()函数,urls/urls.txt文件中存放我们初始想要注入的url,每个url占据一行,在url后面可以自己指定一些参数。该函数主要是对urls/urls.txt文件中的url进行规格化和过滤,并给它分配一个初始得分,此外若urls.txt文件中的url有指定的参数,则记录下来。

该函数的输入的key/value数据中的key是每个url在文件中的偏移量,value是url本身,这种包装是Hadoop框架自动做好的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
public void map(WritableComparable key, Text value, OutputCollector<Text, CrawlDatum> output, Reporter reporter)
throws IOException {
String url = value.toString(); // 得到url
if (url != null && url.trim().startsWith("#")) { // 忽略文件中以“#”开始的行
return;
}
//如果想指定元信息,则这些元信息必须用\t分隔,并用name=value形式指定
float customScore = -1f;
int customInterval = interval;
Map<String, String> metadata = new TreeMap<String, String>(); // 如果有元信息,用于存储元信息(即文件中每一行的url后面跟着指定的信息,可以不指定)
if (url.indexOf("\t") != -1) { // 有元信息,则对这些元信息处理
String[] splits = url.split("\t");
url = splits[0];
for (int s = 1; s < splits.length; s++) {
int indexEquals = splits[s].indexOf("=");
if (indexEquals == -1) {
continue;
}
String metaname = splits[s].substring(0, indexEquals);
String metavalue = splits[s].substring(indexEquals + 1);
if (metaname.equals(nutchScoreMDName)) { //若是保留的元信息,直接记录下来使用,其它的先存储起来
try {
customScore = Float.parseFloat(metavalue);
} catch (NumberFormatException nfe) {
}
} else if (metaname.equals(nutchFetchIntervalMDName)) {
try {
customInterval = Integer.parseInt(metavalue);
} catch (NumberFormatException nfe) {
}
} else
metadata.put(metaname, metavalue);
}
}
try {
url = urlNormalizers.normalize(url, URLNormalizers.SCOPE_INJECT); // 规格化url
url = filters.filter(url); // 过滤url
} catch (Exception e) {
if (LOG.isWarnEnabled()) {
LOG.warn("Skipping " + url + ":" + e);
}
url = null;
}
if (url != null) { // 如果url没有被过滤掉
value.set(url);
CrawlDatum datum = new CrawlDatum(CrawlDatum.STATUS_INJECTED, customInterval); // 为它构造一个CrawlDatum对象,记录相应的状态
datum.setFetchTime(curTime); // 加入“fetchTime”信息(这里其实没用,并非真实的抓取时间)
// 将元信息(若有)加入CrawlDatum中
Iterator<String> keysIter = metadata.keySet().iterator();
while (keysIter.hasNext()) {
String keymd = keysIter.next();
String valuemd = metadata.get(keymd);
datum.getMetaData().put(new Text(keymd), new Text(valuemd));
}
// 若知道了分数元信息,则将它加入CrawlDatum;否则将从配置文件中获得的初始得分信息加入
if (customScore != -1)
datum.setScore(customScore);
else
datum.setScore(scoreInjected);
try {
scfilters.injectedScore(value, datum); // 对得分过滤
} catch (ScoringFilterException e) {
if (LOG.isWarnEnabled()) {
LOG.warn("Cannot filter injected score for url " + url + ", using default (" + e.getMessage() + ")");
}
}
output.collect(value, datum); // 收集数据,key是url,value是描述该url状态信息的CrawlDatum
}
}

CrawlDatum是用于记录url的信息,其中一个很重要的信息是当前url的status(状态),在上面代码中,status是CrawlDatum.STATUS_INJECTED,表示是新注入的,在以后各个阶段会有相应的status。

再说一点:Nutch在爬行过程中并不是随机选择页面爬行,而是会根据“重要程度”,因此会对各个待爬行url打分,做这个工作的就是上面代码中的scfilters对象,它实际上用到的是一个插件,位于plugins/scoring-opic目录下,类是OPICScoringFilter。在以后爬行的许多阶段,都会调用其中的方法对url进行打分。其中使用的算法是OPIC算法,其主要思想是:“每个页面有一个初始得分,爬行过程中每个页面会把自己的得分平均分配给它的链接”。可以看一下源代码。

该MapReduce任务只有Mapper,没有Reducer,因此到此第一次MapReduce任务就结束了。收集的是<url,CrawlDatum>数据,存放在临时目录中。

第二次MapReduce任务

接下来又创建了一个MapReduce任务,用于将刚才临时目录里的数据与爬行数据库中的原有数据合并。如下:

1
2
3
4
JobConf mergeJob = CrawlDb.createJob(getConf(), crawlDb);  // 在CrawlDb类的createJob()里建立一个MapReduce任务
FileInputFormat.addInputPath(mergeJob, tempDir); // 输入路径是上一次MapReduce产生的临时目录
mergeJob.setReducerClass(InjectReducer.class); // 设置Reducer,这个会替换之前设置的
JobClient.runJob(mergeJob); // 启动任务

可以看到,这个任务是通过调用CrawlDb类的createJob()函数创建的,我们进入这个函数看一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public static JobConf createJob(Configuration config, Path crawlDb)
throws IOException {
Path newCrawlDb = new Path(crawlDb, Integer.toString(new Random().nextInt(Integer.MAX_VALUE))); // 建立一个临时目录newCrawlDb
JobConf job = new NutchJob(config); // 建立一个MapReduce任务
job.setJobName("crawldb " + crawlDb);
Path current = new Path(crawlDb, CURRENT_NAME);
if (FileSystem.get(job).exists(current)) {
FileInputFormat.addInputPath(job, current); //将crwaldb/current目录(老爬行数据库)作为输入
}
job.setInputFormat(SequenceFileInputFormat.class); // 指定输入格式:读取基于Hadoop的二进制文件,这个格式可加快读数据速度
job.setMapperClass(CrawlDbFilter.class); // 指定Mapper
job.setReducerClass(CrawlDbReducer.class); // 指定Reducer
FileOutputFormat.setOutputPath(job, newCrawlDb); // 指定输出路径是临时目录newCrawlDb
job.setOutputFormat(MapFileOutputFormat.class); // Hadoop中定义的一种部分使用索引键的格式
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(CrawlDatum.class);

return job;
}

可以看出:在该函数中建立了一个MapReduce任务,并进行了相应的配置。这里有几点需要注意:

  1. MapReduce任务可以指定多个输入路径,例如这里在createJob()函数里指定了输入路径是crawldb/current目录,即老的爬行数据库;在外面inject()函数中又指定了tempDir,(即上次任务的输出)作为输入路径,这样才能对老url与新注入的url进行合并。
  2. 可能你会发现在createJob()函数里制定了Reducer为CrawlDbReducer类,在外面又指定了Reducer为InjectReducer类。这里要说明的是Reducer只能有一个,后面指定的Reducer类会覆盖前面指定的,也就是该任务的Reducer是InjectReducer类。

接下来我们进入Mapper和Reducer去看看实际做了什么。

CrawlDbFilter

我们发现该类的map()函数很简单,只是又做了一次规格化和过滤。

InjectReducer

前面讲过了MapReduce机制,map之后,会将具有共同key的value集中到一起。因此加入原先的数据库中(crawldb/current)有一个www.baidu.com,新注入的url里也有一个www.baidu.com,当然它们各自对应一个CrawlDatum,里面存储的状态等信息也不同。在reduce阶段处理的时候上面两个数据因为key(即url)相同,所以该key对应的两个value(两个CrawlDatum对象)会集中在一起处理。

处理策略如下:

  • 若一个key只对应一个CrawlDatum,并且其status是CrawlDatum.STATUS_INJECTED,表示是新注入的url信息,则将其状态更改为CrawlDatum.STATUS_DB_UNFETCHED,表示该url待抓取。
  • 若一个key只对应一个CrawlDatum,并且其status不是CrawlDatum.STATUS_INJECTED,表示是老数据库中的url信息,直接收集。
  • 若一个key对应两个CrawlDatum,则优先收集老数据库中的CrawlDatum。原因很简单,同一个url,老数据库中的CrawlDatum存储的信息肯定比新的CrawlDatum信息多,比如上次抓取时间并以此来判断当前是不是应该执行抓取等。

下面看代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public void reduce(Text key, Iterator<CrawlDatum> values,
OutputCollector<Text, CrawlDatum> output, Reporter reporter)
throws IOException {
boolean oldSet = false;
while (values.hasNext()) { // 一个key可能对于多个value
CrawlDatum val = values.next();
if (val.getStatus() == CrawlDatum.STATUS_INJECTED) { // 是新注入的url信息
injected.set(val);
injected.setStatus(CrawlDatum.STATUS_DB_UNFETCHED); //将新注入的状态改为STATUS_DB_UNFETCHED(待抓取)
} else { // 是老数据库中的url信息
old.set(val);
oldSet = true;
}
}
CrawlDatum res = null;
if (oldSet) res = old; // 不覆盖原有值,即优先收集老数据库中的url信息
else res = injected;
output.collect(key, res);
}

这个MapReduce也执行完了,接下来回到inject()函数继续向下执行,如下:

CrawlDb.install(mergeJob, crawlDb);

第二个MapReduce任务的输出(即合并后的爬行数据库)的目录是第二个临时目录newCrawlDb,我们需要把它替代current作为新的爬行数据库。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public static void install(JobConf job, Path crawlDb) throws IOException {
Path newCrawlDb = FileOutputFormat.getOutputPath(job); // 得到上次的输出,即newCrawlDb目录
FileSystem fs = new JobClient(job).getFs();
Path old = new Path(crawlDb, "old");
Path current = new Path(crawlDb, CURRENT_NAME); // 即crawldb/current目录
if (fs.exists(current)) {
if (fs.exists(old))
fs.delete(old, true);
fs.rename(current, old); // 将crawldb/current重命名为old
}
fs.mkdirs(crawlDb); // 重新建立crawldb目录
fs.rename(newCrawlDb, current); // 因为newCrawlDb是建立下crawldb下的,直接重命名为current
if (fs.exists(old))
fs.delete(old, true); // 删除old
Path lock = new Path(crawlDb, LOCK_NAME);
LockUtil.removeLockFile(fs, lock);
}

接下来,是删除第一个临时目录tempDir,

1
2
FileSystem fs = FileSystem.get(getConf());
fs.delete(tempDir, true);

OK!Injector阶段结束了!我们回到Crawl类发现接下来进入一个循环,循环中首先调用了如下一行代码:
Path[] segs = generator.generate(crawlDb, segments, -1, topN, System.currentTimeMillis());
这行代码就根据Injector阶段得到的爬行数据库从中选出一部分url等待抓取。具体的操作在下一篇文章中分析。

该阶段数据处理的流程图如下: