博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Heritrix 3.1.0 源码解析(十六)
阅读量:6876 次
发布时间:2019-06-26

本文共 13350 字,大约阅读时间需要 44 分钟。

 我们接下来分析与与BdbFrontier对象CrawlURI next()方法相关的方法

/**     * Return the next CrawlURI eligible to be processed (and presumably     * visited/fetched) by a a worker thread.     *     * Relies on the readyClassQueues having been loaded with     * any work queues that are eligible to provide a URI.      *     * @return next CrawlURI eligible to be processed, or null if none available     *     * @see org.archive.crawler.framework.Frontier#next()     */    protected CrawlURI findEligibleURI() {            // wake any snoozed queues            wakeQueues();            // consider rescheduled URIS            checkFutures();                               // find a non-empty ready queue, if any             // TODO: refactor to untangle these loops, early-exits, etc!            WorkQueue readyQ = null;            findauri: while(true) {                findaqueue: do {                    String key = readyClassQueues.poll();                    if(key==null) {                        // no ready queues; try to activate one                        if(!getInactiveQueuesByPrecedence().isEmpty()                             && highestPrecedenceWaiting < getPrecedenceFloor()) {                            activateInactiveQueue();                            continue findaqueue;                        } else {                            // nothing ready or readyable                            break findaqueue;                        }                    }                    readyQ = getQueueFor(key);                    if(readyQ==null) {                         // readyQ key wasn't in all queues: unexpected                        logger.severe("Key "+ key +                            " in readyClassQueues but not allQueues");                        break findaqueue;                    }                    if(readyQ.getCount()==0) {                        // readyQ is empty and ready: it's exhausted                        readyQ.noteExhausted();                         readyQ.makeDirty();                        readyQ = null;                        continue;                     }                    if(!inProcessQueues.add(readyQ)) {                        // double activation; discard this and move on                        // (this guard allows other enqueuings to ready or                         // the various inactive-by-precedence queues to                         // sometimes redundantly enqueue a queue key)                        readyQ = null;                         continue;                    }                    // queue has gone 'in process'                     readyQ.considerActive();                    readyQ.setWakeTime(0); // clear obsolete wake time, if any                    readyQ.setSessionBudget(getBalanceReplenishAmount());                    readyQ.setTotalBudget(getQueueTotalBudget());                     if (readyQ.isOverSessionBudget()) {                        deactivateQueue(readyQ);                        readyQ.makeDirty();                        readyQ = null;                        continue;                     }                    if (readyQ.isOverTotalBudget()) {                        retireQueue(readyQ);                        readyQ.makeDirty();                        readyQ = null;                        continue;                     }                } while (readyQ == null);                                if (readyQ == null) {                    // no queues left in ready or readiable                    break findauri;                 }                           returnauri: while(true) { // loop left by explicit return or break on empty                    CrawlURI curi = null;                    curi = readyQ.peek(this);                       if(curi == null) {                        // should not reach                        logger.severe("No CrawlURI from ready non-empty queue "                                + readyQ.classKey + "\n"                                 + readyQ.shortReportLegend() + "\n"                                + readyQ.shortReportLine() + "\n");                        break returnauri;                    }                                        // from queues, override names persist but not map source                    curi.setOverlayMapsSource(sheetOverlaysManager);                    // TODO: consider optimizations avoiding this recalc of                    // overrides when not necessary                    sheetOverlaysManager.applyOverlaysTo(curi);                    // check if curi belongs in different queue                    String currentQueueKey;                    try {                        KeyedProperties.loadOverridesFrom(curi);                        currentQueueKey = getClassKey(curi);                    } finally {                        KeyedProperties.clearOverridesFrom(curi);                     }                    if (currentQueueKey.equals(curi.getClassKey())) {                        // curi was in right queue, emit                        noteAboutToEmit(curi, readyQ);                        return curi;                    }                    // URI's assigned queue has changed since it                    // was queued (eg because its IP has become                    // known). Requeue to new queue.                    // TODO: consider synchronization on readyQ                    readyQ.dequeue(this,curi);                    doJournalRelocated(curi);                    curi.setClassKey(currentQueueKey);                    decrementQueuedCount(1);                    curi.setHolderKey(null);                    sendToQueue(curi);                    if(readyQ.getCount()==0) {                        // readyQ is empty and ready: it's exhausted                        // release held status, allowing any subsequent                         // enqueues to again put queue in ready                        // FIXME: tiny window here where queue could                         // receive new URI, be readied, fail not-in-process?                        inProcessQueues.remove(readyQ);                        readyQ.noteExhausted();                        readyQ.makeDirty();                        readyQ = null;                        continue findauri;                    }                }            }                            if(inProcessQueues.size()==0) {                // Nothing was ready or in progress or imminent to wake; ensure                 // any piled-up pending-scheduled URIs are considered                uriUniqFilter.requestFlush();            }                        // if truly nothing ready, wait a moment before returning null            // so that loop in surrounding next() has a chance of getting something            // next time            if(getTotalEligibleInactiveQueues()==0) {                try {                    Thread.sleep(1000);                } catch (InterruptedException e) {                    //                 }             }                        // nothing eligible            return null;     }

这个方法有点长,我们先看一下void wakeQueues() 方法

/** 唤醒snoozed queue中到时的队列     * Wake any queues sitting in the snoozed queue whose time has come.     */    protected void wakeQueues() {        DelayedWorkQueue waked;         while((waked = snoozedClassQueues.poll())!=null) {            WorkQueue queue = waked.getWorkQueue(this);            queue.setWakeTime(0);            queue.makeDirty();            reenqueueQueue(queue);        }        // also consider overflow (usually empty)        if(!snoozedOverflow.isEmpty()) {            synchronized(snoozedOverflow) {                Iterator
iter = snoozedOverflow.headMap(System.currentTimeMillis()).values().iterator(); while(iter.hasNext()) { DelayedWorkQueue dq = iter.next(); iter.remove(); snoozedOverflowCount.decrementAndGet(); WorkQueue queue = dq.getWorkQueue(this); queue.setWakeTime(0); queue.makeDirty(); reenqueueQueue(queue); } } } }

snoozedClassQueues.poll()方法是从休眠队列中取出时间到期的元素,重置睡眠时间为0,然后重置WorkQueue wq的队列归属(非活动状态队列或已经准备好被爬取的队列)

/**     * Enqueue the given queue to either readyClassQueues or inactiveQueues,     * as appropriate.     *      * @param wq     */    protected void reenqueueQueue(WorkQueue wq) {         //TODO:SPRINGY set overrides by queue?         getQueuePrecedencePolicy().queueReevaluate(wq);        if (logger.isLoggable(Level.FINE)) {            logger.fine("queue reenqueued: " +                wq.getClassKey());        }        if(highestPrecedenceWaiting < wq.getPrecedence()             || wq.getPrecedence() >= getPrecedenceFloor()) {            // if still over budget, deactivate            deactivateQueue(wq);        } else {            readyQueue(wq);        }    }

 首先是重置队列的优先级,然后是将WorkQueue wq归入非活动状态队列或已经准备好被爬取的队列

deactivateQueue(wq)方法我们上文已经分析过(将WorkQueue wq加入非活动状态队列),这里看一下readyQueue(wq)方法

/**     * Put the given queue on the readyClassQueues queue     * @param wq     */    protected void readyQueue(WorkQueue wq) {//        assert Thread.currentThread() == managerThread;        try {            readyClassQueues.put(wq.getClassKey());            if(logger.isLoggable(Level.FINE)) {                logger.log(Level.FINE,                        "queue readied: " + wq.getClassKey());            }        } catch (InterruptedException e) {            e.printStackTrace();            System.err.println("unable to ready queue "+wq);            // propagate interrupt up             throw new RuntimeException(e);        }    }

该方法是将WorkQueue wq加入已经准备好被爬取的队列readyClassQueues

重新回到void wakeQueues()方法,后面是从snoozedOverflow容器中取出休眠到期的队列(snoozedOverflow用Map类型存储着优先级与过载的休眠状态的队列(队列存储着key)[Map类型]),然后重置WorkQueue wq归入哪个队列

回到CrawlURI findEligibleURI()方法里面的void checkFutures()方法检测到延迟时间的CrawlURI对象,并且加入BDB数据库

/**     * Check for any future-scheduled URIs now eligible for reenqueuing     */    protected void checkFutures() {//        assert Thread.currentThread() == managerThread;        // TODO: consider only checking this every set interval        if(!futureUris.isEmpty()) {            synchronized(futureUris) {                Iterator
iter = futureUris.headMap(System.currentTimeMillis()) .values().iterator(); while(iter.hasNext()) { CrawlURI curi = iter.next(); curi.setRescheduleTime(-1); // unless again set elsewhere iter.remove(); futureUriCount.decrementAndGet(); receive(curi); } } } }

继续往下面看,String key = readyClassQueues.poll()方法为从已经准备好被爬取的队列readyClassQueues中取出队头元素(WorkQueue wq的classkey)

如果预备队列中不存在元素,则激活非活动状态队列inactiveQueues,将合适的WorkQueue wq放已经准备好被爬取的队列入readyClassQueues中

activateInactiveQueue()

/**     * 激活非活动状态的队列     * Activate an inactive queue, if any are available.      */    protected boolean activateInactiveQueue() {        for (Entry
> entry: getInactiveQueuesByPrecedence().entrySet()) { int expectedPrecedence = entry.getKey(); Queue
queueOfWorkQueueKeys = entry.getValue(); while (true) { synchronized (getInactiveQueuesByPrecedence()) { String workQueueKey = queueOfWorkQueueKeys.poll(); if (workQueueKey == null) { break; } WorkQueue candidateQ = (WorkQueue) this.allQueues.get(workQueueKey); if (candidateQ.getPrecedence() > expectedPrecedence) { // queue demoted since placed; re-deactivate deactivateQueue(candidateQ); candidateQ.makeDirty(); continue; } updateHighestWaiting(expectedPrecedence); try { readyClassQueues.put(workQueueKey);//readyClassQueues存储着已经准备好被爬取的队列的key } catch (InterruptedException e) { throw new RuntimeException(e); } return true; } } } return false; }

更新非活动状态队列inactiveQueues中最高优先级的值(最小值) 

/**     * Recalculate the value of thehighest-precedence queue waiting     * among inactive queues.      *      * @param startFrom start looking at this precedence value     */    protected void updateHighestWaiting(int startFrom) {        // probe for new highestWaiting        for(int precedenceKey : getInactiveQueuesByPrecedence().tailMap(startFrom).keySet()) {            if(!getInactiveQueuesByPrecedence().get(precedenceKey).isEmpty()) {                highestPrecedenceWaiting = precedenceKey;                return;            }        }        // nothing waiting        highestPrecedenceWaiting = Integer.MAX_VALUE;    }

 上面方法为从非活动状态队列inactiveQueues中获取大于指定值得队列元素集合,然后将highestPrecedenceWaiting值设置为非活动状态队列inactiveQueues中precedence最小的值(inactiveQueues是有序的)

---------------------------------------------------------------------------

本系列Heritrix 3.1.0 源码解析系本人原创

转载请注明出处 博客园 刺猬的温驯

本文链接 http://www.cnblogs.com/chenying99/archive/2013/04/21/3033510.html

你可能感兴趣的文章
前端UI框架总结
查看>>
Atom 初识
查看>>
Ext.MessageBox消息框
查看>>
电脑知识:修电脑(转)
查看>>
jQuery 1.7.2 animate功能跨浏览器Bug修补
查看>>
HTML <map>标签的使用
查看>>
通向架构师的道路(第一天)之Apache整合Tomcat - lifetragedy的专栏 - 博客频道 - CSDN.NET...
查看>>
智慧东湖让城市慢游更幸福
查看>>
陕西联通推进高速公路WiFi覆盖
查看>>
linux 用户权限和组权限
查看>>
RPM的使用
查看>>
我的友情链接
查看>>
lvs
查看>>
原型图设计软件
查看>>
setTimeout和setIntelval的区别
查看>>
[C#]通过方法获得游戏人数和玩家姓名
查看>>
How to rotate a bitmap
查看>>
spring常见注解
查看>>
我的友情链接
查看>>
Linux 磁盘分区
查看>>