博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Heritrix 3.1.0 源码解析(十二)
阅读量:7067 次
发布时间:2019-06-28

本文共 12325 字,大约阅读时间需要 41 分钟。

接下来分析BdbFrontier类的CrawlURI next()方法,该方法是获取下一个待采集的CrawlURI对象

该方法是在BdbFrontier类的父类的父类AbstractFrontier里面

org.archive.crawler.frontier.BdbFrontier

           org.archive.crawler.frontier.AbstractFrontier

/* (non-Javadoc)     * @see org.archive.crawler.framework.Frontier#next()     */    public CrawlURI next() throws InterruptedException {        CrawlURI crawlable = null;        while(crawlable==null) {            outboundLock.readLock().lockInterruptibly();            // try filling outbound until we get something to work on            crawlable = findEligibleURI();            outboundLock.readLock().unlock();        }        return crawlable;    }

继续调用BdbFrontier类的CrawlURI findEligibleURI()方法,在它的父类WorkQueueFrontier里面

/**     * Return the next CrawlURI eligible to be processed (and presumably     * visited/fetched) by a a worker thread.     *     * Relies on the readyClassQueues having been loaded with     * any work queues that are eligible to provide a URI.      *     * @return next CrawlURI eligible to be processed, or null if none available     *     * @see org.archive.crawler.framework.Frontier#next()     */    protected CrawlURI findEligibleURI() {            // wake any snoozed queues            wakeQueues();            // consider rescheduled URIS            checkFutures();                               // find a non-empty ready queue, if any             // TODO: refactor to untangle these loops, early-exits, etc!            WorkQueue readyQ = null;            findauri: while(true) {                findaqueue: do {                    String key = readyClassQueues.poll();                    if(key==null) {                        // no ready queues; try to activate one                        if(!getInactiveQueuesByPrecedence().isEmpty()                             && highestPrecedenceWaiting < getPrecedenceFloor()) {                            activateInactiveQueue();                            continue findaqueue;                        } else {                            // nothing ready or readyable                            break findaqueue;                        }                    }                    readyQ = getQueueFor(key);                    if(readyQ==null) {                         // readyQ key wasn't in all queues: unexpected                        logger.severe("Key "+ key +                            " in readyClassQueues but not allQueues");                        break findaqueue;                    }                    if(readyQ.getCount()==0) {                        // readyQ is empty and ready: it's exhausted                        readyQ.noteExhausted();                         readyQ.makeDirty();                        readyQ = null;                        continue;                     }                    if(!inProcessQueues.add(readyQ)) {                        // double activation; discard this and move on                        // (this guard allows other enqueuings to ready or                         // the various inactive-by-precedence queues to                         // sometimes redundantly enqueue a queue key)                        readyQ = null;                         continue;                    }                    // queue has gone 'in process'                     readyQ.considerActive();                    readyQ.setWakeTime(0); // clear obsolete wake time, if any                    readyQ.setSessionBudget(getBalanceReplenishAmount());                    readyQ.setTotalBudget(getQueueTotalBudget());                     if (readyQ.isOverSessionBudget()) {                        deactivateQueue(readyQ);                        readyQ.makeDirty();                        readyQ = null;                        continue;                     }                    if (readyQ.isOverTotalBudget()) {                        retireQueue(readyQ);                        readyQ.makeDirty();                        readyQ = null;                        continue;                     }                } while (readyQ == null);                                if (readyQ == null) {                    // no queues left in ready or readiable                    break findauri;                 }                           returnauri: while(true) { // loop left by explicit return or break on empty                    CrawlURI curi = null;                    curi = readyQ.peek(this);                       if(curi == null) {                        // should not reach                        logger.severe("No CrawlURI from ready non-empty queue "                                + readyQ.classKey + "\n"                                 + readyQ.shortReportLegend() + "\n"                                + readyQ.shortReportLine() + "\n");                        break returnauri;                    }                                        // from queues, override names persist but not map source                    curi.setOverlayMapsSource(sheetOverlaysManager);                    // TODO: consider optimizations avoiding this recalc of                    // overrides when not necessary                    sheetOverlaysManager.applyOverlaysTo(curi);                    // check if curi belongs in different queue                    String currentQueueKey;                    try {                        KeyedProperties.loadOverridesFrom(curi);                        currentQueueKey = getClassKey(curi);                    } finally {                        KeyedProperties.clearOverridesFrom(curi);                     }                    if (currentQueueKey.equals(curi.getClassKey())) {                        // curi was in right queue, emit                        noteAboutToEmit(curi, readyQ);                        return curi;                    }                    // URI's assigned queue has changed since it                    // was queued (eg because its IP has become                    // known). Requeue to new queue.                    // TODO: consider synchronization on readyQ                    readyQ.dequeue(this,curi);                    doJournalRelocated(curi);                    curi.setClassKey(currentQueueKey);                    decrementQueuedCount(1);                    curi.setHolderKey(null);                    sendToQueue(curi);                    if(readyQ.getCount()==0) {                        // readyQ is empty and ready: it's exhausted                        // release held status, allowing any subsequent                         // enqueues to again put queue in ready                        // FIXME: tiny window here where queue could                         // receive new URI, be readied, fail not-in-process?                        inProcessQueues.remove(readyQ);                        readyQ.noteExhausted();                        readyQ.makeDirty();                        readyQ = null;                        continue findauri;                    }                }            }                            if(inProcessQueues.size()==0) {                // Nothing was ready or in progress or imminent to wake; ensure                 // any piled-up pending-scheduled URIs are considered                uriUniqFilter.requestFlush();            }                        // if truly nothing ready, wait a moment before returning null            // so that loop in surrounding next() has a chance of getting something            // next time            if(getTotalEligibleInactiveQueues()==0) {                try {                    Thread.sleep(1000);                } catch (InterruptedException e) {                    //                 }             }                        // nothing eligible            return null;     }

 首先是根据ClassKey获取WorkQueue类型对象,这里是BdbWorkQueue对象(这个ClassKey值的获取涉及到Heritrix3.1.0工作队列的调度,后文再分析),

然后是调用BdbWorkQueue对象的CrawlURI peek(final WorkQueueFrontier frontier)方法,在它的父类WorkQueue里面

/**     * Return the topmost queue item -- and remember it,     * such that even later higher-priority inserts don't     * change it.      *      * TODO: evaluate if this is really necessary     * @param frontier Work queues manager     *      * @return topmost queue item, or null     */    public synchronized CrawlURI peek(final WorkQueueFrontier frontier) {        if(peekItem == null && count > 0) {            try {                peekItem = peekItem(frontier);            } catch (IOException e) {                //FIXME better exception handling                logger.log(Level.SEVERE,"peek failure",e);                e.printStackTrace();                // throw new RuntimeException(e);            }            if(peekItem != null) {                lastPeeked = peekItem.toString();            }        }        return peekItem;    }

进一步调用CrawlURI peekItem(final WorkQueueFrontier frontier)方法

org.archive.crawler.frontier.BdbWorkQueue

protected CrawlURI peekItem(final WorkQueueFrontier frontier)    throws IOException {        final BdbMultipleWorkQueues queues = ((BdbFrontier) frontier)            .getWorkQueues();        DatabaseEntry key = new DatabaseEntry(origin);        CrawlURI curi = null;        int tries = 1;        while(true) {            try {                curi = queues.get(key);            } catch (DatabaseException e) {                LOGGER.log(Level.SEVERE,"peekItem failure; retrying",e);            }                        // ensure CrawlURI, if any,  came from acceptable range:             if(!ArchiveUtils.startsWith(key.getData(),origin)) {                LOGGER.severe(                    "inconsistency: "+classKey+"("+                    getPrefixClassKey(origin)+") with " + getCount() + " items gave "                    + curi +"("+getPrefixClassKey(key.getData()));                // clear curi to allow retry                curi = null;                 // reset key to original origin for retry                key.setData(origin);            }                        if (curi!=null) {                // success                break;            }                        if (tries>3) {                LOGGER.severe("no item where expected in queue "+classKey);                break;            }            tries++;            LOGGER.severe("Trying get #" + Integer.toString(tries)                    + " in queue " + classKey + " with " + getCount()                    + " items using key "                    + getPrefixClassKey(key.getData()));        }         return curi;    }

 上面我们可以看到,之后调用的是BdbMultipleWorkQueues对象的方法,传入DatabaseEntry key = new DatabaseEntry(origin)参数

这里的origin是byte[]类型的,是根据BdbWorkQueue工作队列的classKey值算出来的,在BdbWorkQueue的构造函数里面

/**     * Create a virtual queue inside the given BdbMultipleWorkQueues      *      * @param classKey     */    public BdbWorkQueue(String classKey, BdbFrontier frontier) {        super(classKey);        this.origin = BdbMultipleWorkQueues.calculateOriginKey(classKey);        if (LOGGER.isLoggable(Level.FINE)) {            LOGGER.fine(getPrefixClassKey(this.origin) + " " + classKey);        }        // add the queue-front 'cap' entry; see...        // http://sourceforge.net/tracker/index.php?func=detail&aid=1262665&group_id=73833&atid=539102        frontier.getWorkQueues().addCap(origin);    }

至于怎么算出来的,BdbMultipleWorkQueues类的byte[] calculateOriginKey(String classKey)静态方法 

/**     * Calculate the 'origin' key for a virtual queue of items     * with the given classKey. This origin key will be a      * prefix of the keys for all items in the queue.      *      * @param classKey String key to derive origin byte key from      * @return a byte array key      */    static byte[] calculateOriginKey(String classKey) {        byte[] classKeyBytes = null;        int len = 0;        try {            classKeyBytes = classKey.getBytes("UTF-8");            len = classKeyBytes.length;        } catch (UnsupportedEncodingException e) {            // should be impossible; all JVMs must support UTF-8            e.printStackTrace();        }        byte[] keyData = new byte[len+1];        System.arraycopy(classKeyBytes,0,keyData,0,len);        keyData[len]=0;        return keyData;    }

最后BdbMultipleWorkQueues对象的CrawlURI get(DatabaseEntry headKey)方法,我们在前面的文章已经看过了,这里不再贴出

---------------------------------------------------------------------------

本系列Heritrix 3.1.0 源码解析系本人原创

转载请注明出处 博客园 刺猬的温驯

本文链接 http://www.cnblogs.com/chenying99/archive/2013/04/17/3025414.html

你可能感兴趣的文章
PCA算法学习_1(OpenCV中PCA实现人脸降维)
查看>>
Kinect+OpenNI学习笔记之12(简单手势所表示的数字的识别)
查看>>
对比学习UIKit和AppKit--入门级
查看>>
深入学习JVM了解JVM内存模型
查看>>
LabVIEW与Arduino的连接
查看>>
[转]MySQL排序原理与案例分析
查看>>
ILMerge合并多个DLL
查看>>
DataTable转实体类
查看>>
用webmagic实现一个java爬虫小项目
查看>>
【uva 658】It's not a Bug, it's a Feature!(图论--Dijkstra或spfa算法+二进制表示+类“隐式图搜索”)...
查看>>
java for 的用法总结
查看>>
解决 多列 布局 左右等高问题
查看>>
Ubuntu 下新建用户
查看>>
gulp配置
查看>>
linux命令截取文件最后n行(所有命令)
查看>>
linux提取指定列字符并打印所有内容(awk)
查看>>
减治算法求n个数中的最小数的位置
查看>>
css3学习 理论之文本
查看>>
Linux 安装python3.7.0
查看>>
<Linux命令行学习 第二节> CentOS - 远程登录管理工具
查看>>