#Nutch学习记录:Injector
因为nutch是建立在Hadoop架构之上的,在以后的程序代码中使用了MapReduce算法和HDFS(分布式文件系统),所以建议先把这些方面基本的知识看一下,这样在看到nutch代码中一些MapReduce的配置操作时也能理解什么意思。
我们看到在Crawl类的处理代码中,最先执行了下面的一句:
1 | injector.inject(crawlDb, rootUrlDir); |
这一句调用的就是Injector类的inject()函数,目的是初始化抓取工作。Nutch的爬行初始的url来源主要有两个地方,首先是urls/urls.txt这个文件,我们可以在里面指定一些url;另一个是Nutch的爬行数据库crawldb(当然第一次爬行是没有的,第一次必须在utls.txt中指定url)。每次爬行的初始化工作就是将这两个地方的url合并。
Injector类做的事情主要归纳如下:
- 对urls/urls.txt中的初始url进行规格化和过滤,并将结果存入临时目录下。
- 将上述结果与老的数据库(crawldb/current)合并,产生一个新的爬行数据库,并替换老的数据库current。
第一次MapReduce任务
我们从inject()这个入口函数看起,看代码时忽略一些提示信息的输出等无关紧要的代码,只看一些重点部分。首先建立一个临时目录tempDir,用于第一次MapReduce任务的输出目录。接下来建立了第一个MapReduce任务,并做了相应配置如下:
1 | JobConf sortJob = new NutchJob(getConf()); // 建立一个MapReduce任务sortJob |
以上主要是MapReduce配置方面的知识,重申一遍:应该先找这方面的文章大体了解一下。MapReduce的核心思想是:把整个任务流程分为Map阶段和Reduce阶段,分别由Mapper的具体实现(如这里是InjectMapper类)和Reducer的具体实现来执行相关操作。操作分别定义在Mapper的map()函数和Reducer的reduce()函数中。其中map()函数接受一个key/value(键/值对)值作为输入,然后产生一个中间的key/value值的集合。之后MapReduce框架自动把具有相同key的key/value值的value几种到一起,交给reduce()函数处理。
MapReduce任务的主要配置如下:FileInputFormat.addInputPath(任务, 路径);
用于指定输入路径,比如上面指定的输入路径就是urls/,里面有我们事先建立的urls.txt文件,Hadoop框架会自动读取其中的内容分配给各个Mapper。FileOutputFormat.setOutputPath(任务, 路径);
指定任务的输出路径任务.setMapperClass(类名);
指定Mapper,我们自定义一个类,继承Mapper接口并、实现其中的map()方法后,就可以作为一个Mapper。任务.setReducerClass(类名);
指定Reducer,我们自定义一个类,继承Reducer接口并、实现其中的reduce()方法后,就可以作为一个Reducer。(Mapper和Reducer并不是必须指定的,就像上面这个任务就只有Mapper,没有Reducer)任务.setOutputKeyClass(类型);
指定输出key/value对的key的类型,上面类型Text是Hadoop中定义的类,存放的是url。任务.setOutputValueClass(类型);
指定输出key/value对的value的类型,上面类型CrawlDatum是Nutch定义的类,用于存放url的状态信息。
配置MapReduce后,通过JobClient.runJob()方法启动任务,之后任务的处理便有Mapper和Reducer来依次执行。我们先看Mapper——InjectMapper类。
InjectMapper
在查看map()函数之前,我们先来看一下configure()函数,这个函数用于获取一些后面用到的配置信息。这些配置信息是由Hadoop的Configuration类加载并传递过来的,加载的文件除了Hadoop本身的一些必须配置之外,与Nutch有关的配置主要存放在nutch-site.xml,nutch-default.xml,nutch-tool.xml这三个文件,获取的参数可以在其中查找,里面有关于该参数的详细描述。(这方面前一篇文章有提到)
1 | public void configure(JobConf job) { |
接下来进入map()函数,urls/urls.txt文件中存放我们初始想要注入的url,每个url占据一行,在url后面可以自己指定一些参数。该函数主要是对urls/urls.txt文件中的url进行规格化和过滤,并给它分配一个初始得分,此外若urls.txt文件中的url有指定的参数,则记录下来。
该函数的输入的key/value数据中的key是每个url在文件中的偏移量,value是url本身,这种包装是Hadoop框架自动做好的。
1 | public void map(WritableComparable key, Text value, OutputCollector<Text, CrawlDatum> output, Reporter reporter) |
CrawlDatum是用于记录url的信息,其中一个很重要的信息是当前url的status(状态),在上面代码中,status是CrawlDatum.STATUS_INJECTED,表示是新注入的,在以后各个阶段会有相应的status。
再说一点:Nutch在爬行过程中并不是随机选择页面爬行,而是会根据“重要程度”,因此会对各个待爬行url打分,做这个工作的就是上面代码中的scfilters对象,它实际上用到的是一个插件,位于plugins/scoring-opic目录下,类是OPICScoringFilter。在以后爬行的许多阶段,都会调用其中的方法对url进行打分。其中使用的算法是OPIC算法,其主要思想是:“每个页面有一个初始得分,爬行过程中每个页面会把自己的得分平均分配给它的链接”。可以看一下源代码。
该MapReduce任务只有Mapper,没有Reducer,因此到此第一次MapReduce任务就结束了。收集的是<url,CrawlDatum>数据,存放在临时目录中。
第二次MapReduce任务
接下来又创建了一个MapReduce任务,用于将刚才临时目录里的数据与爬行数据库中的原有数据合并。如下:
1 | JobConf mergeJob = CrawlDb.createJob(getConf(), crawlDb); // 在CrawlDb类的createJob()里建立一个MapReduce任务 |
可以看到,这个任务是通过调用CrawlDb类的createJob()函数创建的,我们进入这个函数看一下:
1 | public static JobConf createJob(Configuration config, Path crawlDb) |
可以看出:在该函数中建立了一个MapReduce任务,并进行了相应的配置。这里有几点需要注意:
- MapReduce任务可以指定多个输入路径,例如这里在createJob()函数里指定了输入路径是crawldb/current目录,即老的爬行数据库;在外面inject()函数中又指定了tempDir,(即上次任务的输出)作为输入路径,这样才能对老url与新注入的url进行合并。
- 可能你会发现在createJob()函数里制定了Reducer为CrawlDbReducer类,在外面又指定了Reducer为InjectReducer类。这里要说明的是Reducer只能有一个,后面指定的Reducer类会覆盖前面指定的,也就是该任务的Reducer是InjectReducer类。
接下来我们进入Mapper和Reducer去看看实际做了什么。
CrawlDbFilter
我们发现该类的map()函数很简单,只是又做了一次规格化和过滤。
InjectReducer
前面讲过了MapReduce机制,map之后,会将具有共同key的value集中到一起。因此加入原先的数据库中(crawldb/current)有一个www.baidu.com,新注入的url里也有一个www.baidu.com,当然它们各自对应一个CrawlDatum,里面存储的状态等信息也不同。在reduce阶段处理的时候上面两个数据因为key(即url)相同,所以该key对应的两个value(两个CrawlDatum对象)会集中在一起处理。
处理策略如下:
- 若一个key只对应一个CrawlDatum,并且其status是CrawlDatum.STATUS_INJECTED,表示是新注入的url信息,则将其状态更改为CrawlDatum.STATUS_DB_UNFETCHED,表示该url待抓取。
- 若一个key只对应一个CrawlDatum,并且其status不是CrawlDatum.STATUS_INJECTED,表示是老数据库中的url信息,直接收集。
- 若一个key对应两个CrawlDatum,则优先收集老数据库中的CrawlDatum。原因很简单,同一个url,老数据库中的CrawlDatum存储的信息肯定比新的CrawlDatum信息多,比如上次抓取时间并以此来判断当前是不是应该执行抓取等。
下面看代码:
1 | public void reduce(Text key, Iterator<CrawlDatum> values, |
这个MapReduce也执行完了,接下来回到inject()函数继续向下执行,如下:
CrawlDb.install(mergeJob, crawlDb);
第二个MapReduce任务的输出(即合并后的爬行数据库)的目录是第二个临时目录newCrawlDb,我们需要把它替代current作为新的爬行数据库。代码如下:
1 | public static void install(JobConf job, Path crawlDb) throws IOException { |
接下来,是删除第一个临时目录tempDir,
1 | FileSystem fs = FileSystem.get(getConf()); |
OK!Injector阶段结束了!我们回到Crawl类发现接下来进入一个循环,循环中首先调用了如下一行代码:
Path[] segs = generator.generate(crawlDb, segments, -1, topN, System.currentTimeMillis());
这行代码就根据Injector阶段得到的爬行数据库从中选出一部分url等待抓取。具体的操作在下一篇文章中分析。
该阶段数据处理的流程图如下: