Nutch学习记录:Fetch

#Nutch学习记录:Fetch

一、Fetch过程

Generate阶段产生了待爬行队列,并保存在segment/crawl_generate目录下。在Generate结束后,进入Fethch阶段,该阶段同Generate一样处于循环中,只要未达到指定的爬行深度,就继续。这一阶段是实际抓取网页的过程,它采用1个生产者(QueueFeeder)-多个消费者(FetchThread)的模型。QueueFeeder从输入中读取fetchlist并产生FetchItemQueue队列,该队列中包含了描述将要爬行的项目的FetchItem。这个队列是以要爬行的主机划分的,每个主机对应一个队列。但是在同一时刻,爬行FetchItem的数量受并发线程数的限制。

FetchThread从队列中不断地取出FetchItem,QueueFeeder向其中添加,这样就保持了一个动态平衡,直到FetchItemQueue中的FetchItem都被消耗完了。当队列中FetchItem的数量达到0,爬行结束。

爬行过程中,爬虫会考虑到对待爬行主机的礼貌(politeness),每个主机的爬行礼貌设置可以不同,比如最大并发请求和爬行间隔等。每个FetchItemQueue对象还维护着一个名为inProgress的队列,用于保存正在抓取的FetchItem,当FetchItemQueue返回一个FetchItem后,便从这个队列中移出并放入inProgress队列中。FetchItemQueue还保存FetchItem最后向主机请求的时间。

当FetchThread从FetchItemQueue中请求新的将要抓取的项,队列返回一个项给FetchItem,或者如果由于礼貌性的原因项没有准备好,则返回null。如果队列中还有待爬行项,且所有的项都没有准备好,那么FetchThread就等待,直到有项目准备好或到达timeout时间。

Fetch阶段以org.apache.nutch.crawl.Fetcher类中的fetch(Path segment, int threads, boolean parsing)方法作为入口

该方法的参数如下:

  • Path segment :segment的路径,这里是segment/crawl_generate
  • int threads :用户指定的线程数
  • Boolean :是否进行解析

fetch方法中主要是对一个MapReduce任务进行配置,并运行这个任务。这个任务中只有Map而没有Reduce。出于礼貌规则,Fetcher创建的MapReduce任务关闭了Speculative Execution,并通过定制的InputFormat类保证每个fetchlist不会再被分成多个splits,防止多个线程对同一个站点过度访问。该任务从segments/time/crawl_generate目录下读取数据,由QueueFeeder线程负责将<url, crawldatum>队列中,FetcherTread负责从队列中获取url,并抓取页面。

Speculative Execution: 由于各种原因(负载或资源分布不均、程序bug等),可能会出现一个Job的多个任务的进度不协调,如其它任务都已完成,而某个任务才完成10%。当某个任务满足一定条件时,Hadoop将为该任务启动Speculative task,并和该任务一起执行,哪个任务先完成就使用哪个的结果。

输出格式是FetcherOutputFormat,它是定义在org.apache.nutch.fetcher中,有三种不同的输出,分别根据value类型,对5个目录进行写操作:

  1. CrawlDatum类型:segments/crawl_fetch
  2. Content类型:segments/content
  3. Parse类型 :segments/parse_text,segments/parse_data,segments/crawl_parse

现在再来看一下segments下的每个目录是干什么的:

  1. crawl_generate:需要抓取的url列表(fetchlist),由<url, crawldatum>组成的sequence文件。
  2. crawl_fetch:每个抓取页面的状态报告,比如是否抓取是否成功,reponse code是多少,由<url, crawldatum>组成的map文件。
  3. content:包含下载下来的原数据(raw data),由<url, content>组成的map文件。
  4. parse_text:页面的解析文本,用于建立索引,由<url, ParseText>组成的map文件。
  5. parse_data:包含页面解析后的元数据和outlinks。
  6. crawl_parse:每个被成功抓取和解析的页面的outlinks列表,用于更新crawldb。
    的实例输出到不同的目录中。在最后,runJob(job)调用Fetcher类中的run方法启动任务
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    // Fetcher中的run方法   
    public void run(RecordReader<Text, CrawlDatum> input, OutputCollector<Text, NutchWritable> output, Reporter reporter) throws IOException {
    this.output = output;
    this.reporter = reporter;
    this.fetchQueues = new FetchItemQueues(getConf());
    int threadCount = getConf().getInt("fetcher.threads.fetch", 10);
    ... ...
    feeder = new QueueFeeder(input, fetchQueues, threadCount * 50);
    // feeder.setPriority((Thread.MAX_PRIORITY + Thread.NORM_PRIORITY) / 2);
    feeder.start(); // 调用QueueFeeder的run方法

QueueFeeder

方法的前半部分初始化了一个QueueFeeder,QueueFeeder维护着一个FetchItemQueues类型的名为queues的队列,这个队列是主机名和它对应FetchItemQueue的映射,所有相同主机名的抓取项(FetchItem)放入相同主机名对应的FetchItemQueue中。

接着,feeder调用start方法启动QueueFeeder的线程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// QueueFeeder中的run方法   
public void run() {
boolean hasMore = true; // reader中是否还有数据
int cnt = 0;
while (hasMore) {
int feed = size - queues.getTotalSize(); // 队列还有多少剩余空间
if (feed <= 0) {
// queues are full - spin-wait until they have some free space
// 队列满,等待
try {
Thread.sleep(1000);
} catch (Exception e) {
}
continue;
} else { // 队列有剩余空间,向其中添加FetchItem LOG.debug("-feeding " + feed + " input urls ...");
while (feed > 0 && hasMore) {
try {
Text url = new Text();
CrawlDatum datum = new CrawlDatum(); hasMore = reader.next(url, datum); // next方法,如果成功读取了下一个<url, datum>,返回true,如果读到末尾,返回false
if (hasMore) {
queues.addFetchItem(url, datum); cnt++;
feed--;
}
} catch (IOException e) {
... ...
return;
}
......
}

该线程的工作比较简单,就是检查队列是否未满,如果未满且输入中还有数据,则将数据添加到FetchItemQueue中。添加到队列的过程分两部进行,先添加到FetchItemQueues,再添加到FetchItemQueue中。

这里在说明一下根据主机进行映射的概念。如果传入fetch方法的parsing参数是true,则以ip + protocol作为queueId存入队列映射,否则以hostname + protocol作为queueId作为queueId存入映射。上面代码中的getTotalSize方法返回totalSize,这个变量是当前FetchItemQueue中队列的FetchItem的个数。

FetchItemQueues和FetchItemQueue中包含的内容和对应关系如下:

QueueFeeder的run对应的流程图如下:

FetcherThread

在启动了QueueFeeder的线程后,Fetcher类run方法的主线程继续执行:

1
2
3
4
for (int i = 0; i < threadCount; i++) { // spawn threads   
new FetcherThread(getConf()).start();
}
......

它根据用户指定的爬行线程数,开启了threadCount个爬行线程抓取网页。其后,当当前活动线程数>0时,就报告线程的状态。直到当前线程数为0时,run方法的主线程也退出。活跃线程数activeThreads是当前活跃的FetchThread的线程的个数。
接下来就分析用于抓取网页的类FetcherThread,该类同样是Fetcher类的一个内部类,继承了Thread,继而可以使用多线程。Fetcher类的run方法中启动了FetchThread的线程,调用其中的run方法,下面对其进行分析。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public void run() {   
activeThreads.incrementAndGet(); // count threads
FetchItem fit = null;
try {
while (true) {
fit = fetchQueues.getFetchItem(); // 使用Iterator从queues中取出第一个FetchItemQueue,再从queue中取出第一个FetchItem
if (fit == null) {
if (feeder.isAlive() ||fetchQueues.getTotalSize() > 0) { // queue没有准备好返回fetchitem,等待
LOG.debug(getName() + " spin-waiting ...");
// spin-wait.
spinWaiting.incrementAndGet();
try {
Thread.sleep(500); // 阻塞500ms后重新调度
} catch (Exception e) {
}
spinWaiting.decrementAndGet(); continue;
} else {// 该线程的所有工作都已完成,退出
// all done, finish this thread
return;
}
}

进入run方法后,首先为当前活跃线程数activeThreads加1,接下来进入一个死循环,进行网页的抓取。它首先调用getFetchItem方法,如果当前inProgress队列的大小>=每台主机允许的并发爬行线程数,则返回null,否则从所有FetchItemQueue中取出一个已经准备好(已经超过爬行间隔时间)的FetchItem,并加入到inProgress队列中。如果该值为空,则有两种可能,一种是出于礼貌性,队列不能返回FetchItem,则线程阻塞500毫秒后继续请求。另外一种可能是当前线程要的所有工作都已完成,则线程退出。

注意,同一主机对应的FetchItemQueue拥有相同的爬行间隔时间时间(得到爬行间隔时间的方法见后面的分析),在该queue中的一个FetchItem被爬行后,会设置这个队列的下一次请求时间,只有超过这个时间的队列才能返回新的FetchItem。这样便实现了对同一个主机的礼貌性访问控制。

接下来又进入一个循环,循环的终止条件是达到最大重定向次数或者不允许重定向。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
do {   
redirecting = false;
Protocol protocol = this.protocolFactory.getProtocol(fit.url.toString());
RobotRules rules = protocol.getRobotRules(fit.url, fit.datum); // 获取爬行规则
// rules实际上是RobotRuleSet类的实例,该类实现了RobotRules接口
if (!rules.isAllowed(fit.u)) { // unblock
fetchQueues.finishFetchItem(fit, true);
if (LOG.isDebugEnabled()) {
LOG.debug("Denied by robots.txt: " + fit.url);
}
output(fit.url, fit.datum, null, ProtocolStatus.STATUS_ROBOTS_DENIED, CrawlDatum.STATUS_FETCH_GONE);
continue;
}
if (rules.getCrawlDelay() > 0) {
if (rules.getCrawlDelay() > maxCrawlDelay) {// unblock
fetchQueues.finishFetchItem(fit, true); output(fit.url, fit.datum, null, ProtocolStatus.STATUS_ROBOTS_DENIED, CrawlDatum.STATUS_FETCH_GONE);
continue;
}else {
FetchItemQueue fiq = fetchQueues.getFetchItemQueue(fit.queueID);
fiq.crawlDelay = rules.getCrawlDelay(); }
}

首先根据要爬行的url确定其协议类型并初始化一个对应的Protocol对象,以下以http协议举例,则protocol初始化为org.apache.nutch.protocol包中的http类,其基类org.apache.nutch.protocol.http.api包中的HttpBase类。接着,生成一个RobotRule类型的对象,得到该url对应的站点的robot.txt文件,该文件中规定了爬虫的爬行限制。

如果不允许爬行,则调用finishFetchItem方法从inProgress集合中删除该fit。并设置该url对应datum的status为STATUS_FETCH_GONE,设置下一次爬行时间nextFetchTime。收集<url, datum>。

如果允许,则获取robot.txt中的规定的爬行间隔,如果大于配置文件中规定的最大爬行间隔,则跳过这个网页,调用finishFetchItem方法将这个FetchItem从inProgress队列中移出并报告一个错误,否则将这个FetchItem所在的主机队列FetchItemQueue的爬行间隔设为robot.txt中规定的值。

run方法继续执行:

1
2
3
4
5
ProtocolOutput output = protocol.getProtocolOutput(fit.url, fit.datum);   
ProtocolStatus status = output.getStatus();
Content content = output.getContent();
ParseStatus pstatus = null;
fetchQueues.finishFetchItem(fit);

protocol对象调用HttpBase类中的getProtocolOutput方法,该方法再调用Http类中的getResponse方法使用socket获取所请求文件的内容。返回一个ProtocolOutput类的对象。这个类中包含了对应协议请求的输出,包括Content类的内容和ProtocolStatus类的协议状态。然后调用finishFetchItem方法将fit从inProgress队列中移出,解除对该主机队列的阻塞。

接着,通过协议状态码来进行下一步的工作。协议状态码是前面进行http请求获得响应后根据响应的内容填写的,包括WOULDBLOCK,SUCCESS等状态,每种状态处理的方式不同,以下介绍请求后成功获得一个网页的response之后的处理过程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
switch (status.getCode()) {
case ProtocolStatus.WOULDBLOCK:
......
case ProtocolStatus.SUCCESS: // got a page
pstatus = output(fit.url, fit.datum, content, status, CrawlDatum.STATUS_FETCH_SUCCESS); updateStatus(content.getContent().length); // 增加网页数量和字节数
if (pstatus != null && pstatus.isSuccess()&& pstatus.getMinorCode() == parseStatus.SUCCESS_REDIRECT) {
String newUrl = pstatus.getMessage();
int refreshTime = Integer.valueOf(pstatus.getArgs()[1]);
Text redirUrl = handleRedirect( fit.url,fit.datum, urlString, newUrl, refreshTime < Fetcher.PERM_REFRESH_TIME,Fetcher.CONTENT_REDIR);
if (redirUrl != null) {
CrawlDatum newDatum = new CrawlDatum(CrawlDatum.STATUS_DB_UNFETCHED, fit.datum.getFetchInterval(), fit.datum.getScore());
if (reprUrl != null) {
newDatum.getMetaData().put( Nutch.WRITABLE_REPR_URL_KEY, new Text(reprUrl));
}
fit = FetchItem.create(redirUrl, newDatum, byIP);
if (fit != null) {
FetchItemQueue fiq = fetchQueues.getFetchItemQueue(fit.queueID);
fiq.addInProgressFetchItem(fit);
} else { // stop redirecting
redirecting = false;
}
}
}
break;
case...:

首先,调用output方法,该方法做了以下几个工作:(1)向链接对应的datum写入STATUS_FETCH_SUCCESS状态和当前时间作为抓取时间。向content对应的metadata中写入segment_name_key,score_key,fetch_status_key各自对应的值,其中score_key对应的得分由ScoringFilter算得。(2) 如果配置了parse则进行解析,解析的结果返回为ParseResult类的对象。(3)收集<url, crawldatum>、<url, content>,其中content是未解析的内容以及收集<url, 解析了的内容>。(4) 从content的metadata中拷贝得分,并存入parseData的metadata中。

在output方法返回后,调用updateStatus增加当前爬行的网页数和字节数。如果经过解析,网页中有重定向链接,则将重定向的链接加入到FetchItemQueue中。
在当重定向次数大于规定次数或者不允许重定向时,退出循环。

最后,将当前活跃进程数减1。网页抓取线程结束。

二、总结

Fetcher采用了经典的生产者消费者模型,生产者是QueueFeeder,用于根据主机/协议对向FetchItemQueues中不断地添加待爬行的FetchItem,只要FetchItemQueues队列中还有空间并且有输入数据,就添加,否则阻塞。消费者自然是FetchThread,它负责抓取网页。爬虫总共的线程数由配置决定,默认值是10,还可以指定对于同一个主机队列最大的爬行线程数,它们之和不超过总共的线程数。每个主机队列的最大线程数决定了每个主机队列中inProgress队列的大小,出于礼貌性一般对于同一个主机,只采用一个线程爬行。当待爬行的全部网页被爬行完后,爬行线程退出。

Mapper的分析

在了结了Fetch阶段的整个流程后,我们不禁要问,在分布式的情况下,不同的task node是怎样实现避免爬行相同网站的呢?

这要从Generate阶段说起,回顾Generate的第2次MapReduce,在Mapper输出的时候是以相同主机名作为分发到Reducer的依据的,因此各个Reducer生成了若干个文件,每个文件对应自己处理的那个主机对应的<url, crawldatum>。在Fetcher的输入时,又不对文件进行分片,因此分到各个Mapper上的主机就不会重复,避免了重复爬行的情况。

Fetch阶段流程图