Spark实现PageRank

搜索引擎的几个技术要点

最近浏览了搜索引擎的发展历程,简单总结下。搜索引擎需要解决的主要问题包含但不限于:建立资料库,建立关键字-页面号的索引,确定页面排序。三者的经典解决办法分别为:爬虫技术(Spider)、倒排索引(Inversed-Index)数据结构、排序算法(TF-IDF、PageRank等)解决。当然此处未考虑技术细节如如何应对反爬虫、如何分词等,现代搜索引擎也绝不是简单的几个算法堆砌就可以实现的。

爬虫部分不多说,通过http协议去互联网上爬取并保存到自己的数据库,道理简单,细说也是一个相当繁琐的过程。

倒排索引,其中key是关键词,value是一个页面编号集合(假设资料库中每个页面有唯一编号),这样就可以根据关键字迅速的找到页面。建立倒排索引有诸多方法,简单的如通过扫描页面(视为terms的集合)对词条的正向索引,生成倒排表。

TF-IDF

接下来解决页面排序问题。第一代搜索引擎主要依赖于词频统计来排序。最出名的要数TF-IDF了。
关于TF-IDF算法,TF:term frequency,IDF:inverse document frequency。TF即词频,代表词条在文档中出现的频率;IDF即反向文件频率,指包含特定词条的文件的频率。
tf
idf
tf-idf
如上公式所述,tf做了归一化,分子是词条ti在文件dj中出现的次数,分母是所有词条在dj中出现的次数。idf则等于文件总数对出现了词条ti的文件数求商再取对数。二者之积即代表tf-idf值,某一特定文件内的高词语频率,以及该词语在整个文件集合中的低文件频率,可以产生出高权重的tf-idf。因此,tf-idf倾向于过滤掉常见的词语,保留重要的词语。

PageRank

###PageRank原理简述
PageRank部分,由斯坦福大学博士研究生Sergey Brin和Lwraence Page等提出的。PageRank算法是Google搜索引擎的核心排序算法,是Google成为全球最成功的搜索引擎的重要因素之一,同时开启了链接分析研究的热潮。

一个页面的重要程度用PageRank(记为PR)来衡量。该算法基于以下两个假设:

  1. 数量假设,页面P的前置页面越多,代表P的重要程度即PR越高
  2. 质量假设,页面P的前置页面PR值越高,P的PR值就越高

假设一个页面A被另一个页面B引用。可看成B推荐A,B将其重要程度(PR值)平均的分配B所引用的所有页面,所以越多页面引用A,则越多的页面分配PR值给A,PR值也就越高,A越重要;另外,B越重要,它所引用的页面能分配到的PR值就越多,A的PR值也就越高,也就越重要。

摆脱上面的例子,看图说话,页面间的关系可以用有向图来表示,如下图

可以用向量V来表示初始PR值,根据图的关系可以得转移矩阵T。通过迭代公式Vn=T·Vn-1,最终可以得到稳定的PR向量。
转移矩阵
然而事情没有这般顺利,如果页面间关系图中出现终止点(出度为0)或陷阱点(只有自环),最终的PR值会失去意义。因此Larry Page等人引入了α来抵消这种影响,迭代公式为:Vn=αTVn-1+(1-α)V0。我理解,跳转有α的几率按转移矩阵进行,如果遇到极端情况则有(1-α)的几率摆脱,通过加上初始的PR向量V0实现。

###Spark实现
《Spark快速大数据分析》中有一段关于PageRank算法的描述:

PageRank是执行多次连接的一个迭代算法,因此它是RDD分区操作的一个很好的用例。算法会维护两个数据集:一个由(pageID,linkList)的元素组成,包含每个页面的相邻页面的列表;另一个由(pageID,rank)元素组成,包含每个页面的当前排序值。它按如下步骤进行计算。

  1. 将每个页面的排序值初始化为1.0。
  2. 在每次迭代中,对页面p,向其每个相邻页面(有直接链接的页面)发送一个值为rank(p)/numNeighbors(p)的贡献值。
  3. 将每个页面的排序值设为0.15 + 0.85 * contributionsReceived。

最后两个步骤会重复几个循环,在此过程中,算法会逐渐收敛于每个页面的实际PageRank值。在实际操作中,收敛通常需要大约10轮迭代。

以下图页面间关系为例。其中D是终止点,且接收其他所有网页的出链,直觉判断D的PR值会比较高。

实现代码如下:

def pageRank(sc: SparkContext): Unit = {
//Define alpha
val alpha = 0.85
val iterCnt = 20
//Init relation graph of pages
val links = sc.parallelize(
  List(
    ("A", List("A", "C", "D")),
    ("B", List("D")),
    ("C", List("B", "D")),
    ("D", List()))
)
  //Take advantage of partitions and save in mem cache
  .partitionBy(new HashPartitioner(2))
  .persist()
//Init pageRanks
var ranks = links.mapValues(_ => 1.0)

//Iteration
for (i <- 0 until iterCnt) {
  val contributions = links.join(ranks).flatMap{
    case (_, (linkList, rank)) =>
      linkList.map(dest => (dest, rank / linkList.size))
  }
  ranks = contributions.reduceByKey((x, y) => x + y)
    .mapValues(v => {
      (1 - alpha) + alpha * v
    })
}
//Display final pageRanks
ranks.sortByKey().foreach(println)
}

最终打印结果符合预期:

(A,0.209304961834908)
(B,0.23895744275236194)
(C,0.209304961834908)
(D,0.5013847328443557)

###分析
先从代码层面分析。其实就是对上面描述的实现,基本和书中代码示例一样,不过这里为了方便展示把link替换成了List嵌套结构。其中ABCD代表四个节点(页面),每个节点可链接至其他页面。数据结构类似图中的邻接表。ranks中的values代表PR值,初值为1.0。
links和ranks在join内连接操作后,flatmap将出链中所有link的contribution计算出,再reduce合并,注意引入α,思路还是很巧妙的。迭代到一定次数后结束。
注意flatmap处后接partialFunction,第一次看到偏函数还是有点懵(Stack Overflow)。

关于Partition带来的好处:
在实际操作中,页面的出链可能很多,为了避免重复做分区操作,这里用persist方法将links的分区缓存起来。
ranks由links通过mapValues得到,key的分区信息得以保留,这样links和ranks在做诸如join操作时就会避免额外通信开销,因为他们相同的key必然hash在同一partition。
分区采用hashPartitioner,如果不考虑特殊的分区逻辑,比如同host的页面要在一个分区的话,这里选择哪种partitioner没有硬性要求。

不过代码跑出来有Closure warning,貌似是偏函数语句造成的,待查。

###总结
PageRank算法有其鲜明的优点:算法不依赖查询,所有的PR值计算可以离线完成(听说Google最初发明MapReduce就是为了加速PageRank算法),极大加快了查询时响应时间。它的缺点也明显:对新页面不友好,没有页面引用新页面,其PR值必定不高,容易形成马太效应。

##参考

  1. 《Spark快速大数据分析》
  2. https://blog.csdn.net/guoziqing506/article/details/70702449
  3. https://blog.csdn.net/gamer_gyt/article/details/47443877
  4. https://www.cnblogs.com/csxf/p/3584511.html?utm_source=tuicool&utm_medium=referral
  5. https://zh.wikipedia.org/wiki/Tf-idf