摘要:我描述了一个简单的面试问题(计算唯一单词的频率),以各种语言解决它,并将性能进行比较。对于每种语言,我已经包括一个简单,惯用的解决方案以及通过分析更优化的方法。
我在过去几年中进行了许多编码访谈,以及我想问的问题之一是:
编写一个程序以计算标准输入的唯一单词的频率,然后用频率打印出来,首先订购最常见的。例如,给定此输入:
我认为这是一个良好的面试问题,因为它比FizzBuzz解决了,但它不会遭受“在这个白板上倒置二叉树”的问题。这是程序员在现实生活中编写脚本的那种东西,它显示了他们是否了解文件I / O,哈希表(地图)以及如何使用他们的语言的排序功能。排序部分中有一点棘手,因为大多数哈希表都没有订购,如果它们是,它是通过键或插入顺序而不是值。
候选人有一个基本解决方案后,您可以在各种不同方向上推动它:大写内容如何?标点?它是如何用相同频率的两个单词?什么是性能瓶颈可能是什么?在大o方面如何票价?内存用法是什么?大致您的程序需要多长时间来处理1GB文件?您的解决方案是否仍然适用于1TB?等等。或者您可以在“软件工程”方向上,讨论错误处理,可测试性,将其转换为硬化的命令行实用程序等。
一个基本解决方案读取文件行逐行,转换为小写,将每一行拆分为单词,并计算哈希表中的频率。完成后,它将哈希表转换为单词计数对列表,按计数排序(最大的第一个),并将它们打印出来。
在Python中,使用普通地区的一个明显的解决方案可能看起来像这样(引入所示):
counts = {}在sys中的行。 STDIN:单词=行。降低 ()。单词的拆分()单词:counts [word] = counts。获取(Word,0)+ 1对=排序(计数。项目(),key = lambda kv:kv [1],反向= true)对于word,成对计数:print(word,count)
如果候选人是Pythonista,它们可能会使用CollectionS.defaultdict ventics.counter.counter - 请参阅下面的使用后者代码。在这种情况下,我会问他们如何在引擎盖下工作,或者它们如何用普通词典解决它。
顺便提一下,这个问题将几十年前为两位计算机科学家之间的巫师决斗设定了场景。 1986年,Jon Bentley要求Donald Knuth炫耀“识字编程”并解决这个问题的解决方案,他提出了一个精致的十页的持剑杰作。然后Doug McIlroy(Unix管道的发明者)使用TR,SORT和UNIQ使用单行UNIX Shell版本回复。
在任何情况下,我现在一直在玩这个问题,我想看看这个程序在各种语言中看起来像什么,以及他们将如何运行,两者都有一个简单的惯用解决方案,并且更优化版本。我在文章中包括大型代码片段,但每个版本的完整来源都在我的benhoyt / countwords存储库中。或者您可以欺骗并直接跳转到性能结果。
每个程序必须从标准输入中读取并打印唯一的空间分开的单词的频率,从最常见的是最小频繁的顺序。为了使我们的解决方案简单且保持一致,这里是我努力的(自我强加)的限制:
案例:程序必须将单词正常化为小写,因此“”“该”应显示为输出中的“3”。
单词:用空格分隔的东西 - 忽略标点符号。这确实使程序不太有用,但我不希望这成为象征化战斗。
ASCII:只需支持空白处理和小写操作即可支持ASCII即可。大多数优化的变体都这样做。
订购:如果两个单词的频率相同,则其在输出中的顺序无关紧要。我使用归一化脚本来确保输出正确。
线程:它应该在单个机器上的单个线程中运行(尽管我经常在访谈中讨论并发)。
内存:不要将整个文件读入内存。缓冲IT线路是可以的,或者用最大缓冲区大小为64KB的块。也就是说,可以在内存中保持整个单词映射地图(我们假设输入是实际语言的文本,而不是完整的随机唯一词)。
文字:假设输入文件是文本,具有比缓冲区大小短的“合理”长度线。
安全:即使对于优化的变体,也不要使用不安全的语言功能,并不会下拉到装配。
哈希:不要滚动我们自己的哈希表(除了优化的C版本外)。
我们的测试输入文件将是国王詹姆斯圣经的文本,连续十次连接。我从Gutenberg.org寻求这一点,用ASCII报价字符替换智能报价,并使用CAT将其乘以十个以获取43MB参考输入文件。
让我们来编码!下面的解决方案是我解决了它们的顺序。
惯用的python版本可能会使用collections.counter。 Python的集合图书馆真的很好 - 谢谢雷蒙德Hettinger!这与你可以得到的简单:
counts =集合。在sys中的柜台()。 STDIN:单词=行。降低 ()。拆分()计数。单词的更新(单词),计数计数。 most_common():打印(Word,Count)
这是Unicode感知,可能是我在“现实生活”中写的。它实际上非常有效,因为所有低级的东西都是在c:读取文件中的所有低级的东西,转换为小写和拆分在空格上,更新计数器,以及计数器的排序。
但让我们试着优化! Python附带一个名为cprofile的分析模块。它很容易使用 - 只需使用Python3 -m cprofile运行您的程序。我评论了最终的打印呼吁,以避免与程序的产出进行分析输出混合 - 无论如何,它相当忽略不计。以下是输出(-s tottime在每个函数中的总时间排序):
$ python3-m cprofile -s tottime simple.py&lt; kjvbible_x10.txt 6997799函数调用(6997787原始呼叫)在3.872秒排序:内部时间ncalls tottime percall percall percall filename:lineno(功能)998170 1.361 0.000 1.361 0.000 {构建-in方法_collections._count_elements} 1 0.911 0.911 3.872 3.872 simple.py:1(< ;module& gt;)998170 0.415 0.415 0.415 0.415 0.000 {方法&#39;拆分&#39; &#39; str&#39;对象} 998171 0.405 0.000 2.388 0.000 __Init__.py:608(update)998170 0.270 0.622 0.000 {内置方法构成。998170 0.182 0.30 0.10 abc.py:96 (_xinstancecheck__)998170 0.170 0.170 0.000 {内置在方法_abc._abc_instanceCheck} 998170 0.134 0.10134 0.134 0.000 {方法&#39;较低的&#39; &#39; str&#39; Objects} 5290 0.009 0.0.018 0.018 0.018 0.010 0.000 0.009 0.009 0.009 0.000 {内置方法_codecs.utf_8_decode} 1 0.007 0.007 0.007 {内置方法构成。7/1 0.000 0.000 0.000 0.000 0.000 0.000 0.000 0.000 0.000 0.000 {内置方法_abc._abc_subclasscheck} 1 0.000 0.007 0.007 __init__.py:559(最多_common)1 0.000 0.000 0.000 0.000 __init__.py:540 (_mit__)1 0.000 0.000 3.872 3.872 {内置方法构建insize.exec} 7 / 1 0.000 0.000 0.000 0.000 abc.py:100 (_Subclasscheck__)7 0.000 0.000 0.000 _Collections_abc.py:392 (_Subclasshook__)1 0.000 0.000 0.000 {方法&#39;禁用&#39; &#39; _lsprof.profiler&#39;对象} 1 0.000 0.000 0.000 0.000 {方法&#39;物品&#39; &#39; dict&#39;对象}
998,170是输入中的行数,因为我们正在阅读逐行,我们正在调用多次的函数并执行Python循环。
在Simple.py中花费的大量时间都显示了如何(相对)慢它是执行Python字节码的速度 - 主循环是纯Python,再次执行998,170次。
counter.update呼叫isinstance,它增加了。我想直接调用C函数_count_elements,但这是一个实现细节,我决定它落入“不安全”的类别。
我们需要做的主要内容是减少主Python循环周围的次数,因此减少对所有这些功能的调用次数。所以让我们在64kb块中阅读它:
counts =集合。计数器()剩余=&#39;&#39;虽然真实:块= sys。 stdin。读取(64 * 1024)如果没有块:打破chunk =剩余+ chunk last_lf = chunk。 rfind(&#39; \ n&#39;)#进程到最后lf字符如果last_lf == - 1:剩余=&#39;&#39;否则:剩余=块[last_lf + 1:] chunk = chunk [:last_lf] counts。更新(块。leow()。拆分())用于单词,计数计数。 most_common():打印(Word,Count)
而不是我们的主循环处理一次42个字符(平均线长度),我们一次处理65,536(最后的部分线)。我们还在读取和处理相同数量的字节,但我们现在在C中做大部分,而不是在Python循环中。许多优化的解决方案使用这种基本方法 - 在更大的块中处理事物。
剖析输出现在看起来好多了。 _count_elements和str.split函数仍在大部分时间,但它们仅被调用662次而不是998170(在大约64kb的时间而不是42字节):
$ python3-m cprofile -s tottime优化.py&lt; kjvbible_x10.txt 7980函数调用(7968原始呼叫)在1.280秒排序:内部时间ncalls tottime percall cumpere percall文件名:lineno(功能)662 0.870 0.001 0.870 0.001 {构建-in方法_collections._count_elements} 662 0.278 0.000 0.278 0.000 {方法&#39;拆分&#39; &#39; str&#39;物体} 1 0.080 0.080 1.280 1.280 1.280优化.TY: &#39; str&#39;对象} 663 0.010 0.016 0.000 0.016 0.000 {方法&#39;阅读&#39; &#39; _io.textiowrapper&#39;对象} 1 0.007 0.007 0.007 0.007 {内置方法构建.SORTED} 664 0.004 0.000 0.004 0.000 {内置方法_CodeCS.UTF_8_DECODE} 663 0.001 0.000 0.872 0.001 0.000 0.872 0.001 __Init_.py:608(Hudate)664 0.001 0.005 0.000编解码器。 PY:319(解码)662 0.001 0.001 0.000 {内置方法构建.Isinstance} 662 0.000 0.001 0.000 0.001 0.000 0.001 0.000【内置方法_abc._abc_instanceCheck} 662 0.000 0.000 0.000 {方法&#39; rfind&#39; &#39; str&#39; Objects} 664 0.000 0.000 0.000 0.000 CODECS.PY:331(GETTSTATE)662 0.000 0.001 0.000 ABC.YCECHECK__)7/1 0.000 0.000 0.000 0.000 {内置方法_abc._abc_subclasscheck} 1 0.000 0.000 0.007 0.007 __init__ .py:559(most_common)1 0.000 0.000 0.000 0.000 __init__.py:540 (_mit__)7/1 0.000 0.000 0.000 0.000 abc.py:100 (_Subclasscheck__)1 0.000 0.000 1.280 1.280 {内置方法构建.Exec} 7 0.000 0.000 0.000 0.000 _Collections_abc.py:392 (_Mubclasshook__)1 0.000 0.000 0.000 {方法&#39;禁用&#39; &#39; _lsprof.profiler&#39;对象} 1 0.000 0.000 0.000 0.000 {方法&#39;物品&#39; &#39; dict&#39;对象}
我还发现,通过Python解决方案,读取和处理字节VS STR不会产生明显的区别(UTF_8_decode相对较远)。此外,大约2kb的任何缓冲大小比64kb都不多得多 - 我注意到许多系统具有4kb的默认缓冲区大小,这似乎是非常合理的。
我尝试了各种其他方式来提高性能,但这是我可以使用标准Python管理的最佳状态。 (我尝试使用Pypy优化编译器运行它,但由于某种原因,它显着慢。在C中完成。如果您发现更好的方法,请告诉我。
一个简单的惯用的GO版本可能会使用Bufio.scanner与scanwords作为拆分功能。 Go没有像Python的Collection.Counter这样的东西,但它很容易使用Map [String] Int进行计数,以及用于排序操作的一片单词计数对:
func main(){scanner:= bufio。 newscanner(操作系统。stdin)扫描仪。拆分(bufio。scanwords)counts:= make(map [string] int)扫描仪。扫描(){word:= strings。 tolower(扫描仪。text())计数[word] ++}如果错误:=扫描仪。呃 (); err!= nil {fmt。 fprintln(操作系统。stderr,err)操作系统。退出(1)}} var有序[]计数Word,count:=范围计数{已排序=附加(有序,计数{word,count})}排序。切片(订购,Func(i,J int)bool {返回订购[i]。count&gt;命令[j]。count})for _,count:=排序范围{fmt。 println(string(count.word),count。count)}}}键入count struct {word字符串count int}
简单的GO版本明显比简单的Python版本更快,但只能比优化的Python版本快一点(并且几乎双倍代码行数 - 肯定更多的样板和低级问题)。
要使用Go的Profiler,您必须将几行代码添加到程序的开始:
f,err:=操作系统。创建(&#34; cpuprofile&#34;)如果err!= nil {fmt。 fprintf(操作系统。stderr,&#34;无法创建CPU配置文件:%v \ n&#34;,err)操作系统。退出(1)}如果错误:= pprof。 startcpuprofile(f); err!= nil {fmt。 fprintf(操作系统。stderr,#34;无法启动CPU配置文件:%v \ n&#34;,err)操作系统。退出(1)}推迟pprof。 stopcpuprofile()
运行程序后,您可以使用此命令查看CPU配置文件(单击以查看图片全大小):
结果很有趣,但没有意外 - 每次单词热门循环中的操作一直在。扫描仪中花了一个好的时间,另一个块在分配字符串中即可插入地图,因此让我们尝试优化这些部分。
为了提高扫描,我们将基本上制作Bufio.Scanner和Scanword的剪切版本(并在适当的操作中进行ACIII到更低的操作)。要减少分配,我们将使用地图[字符串] * int而不是映射[字符串] int所以我们只需每一个单词分配一次,而不是每一个增量(Martinmöhrmann给了我这个提示的Gophers Slack #performance通道)。
请注意,我花了几个迭代和分析通过来实现这一结果。一个在一起的步骤是仍然使用bufio.scanner,但是使用自定义拆分功能,扫描唱片函数。但是,它有点快,而且更难,避免bufio.scanner完全。我尝试的另一件事是一个自定义哈希表,但我决定没有出于Go版本的范围,而不是在任何情况下比地图[字符串] * int更快。
func main(){offset:= 0 buf:= make([]字节,64 * 1024)计数:= make(map [string] * int)for {//读取输入的64kb块块直到eof。 n,err:=操作系统。 stdin。读取(Buf [offset:])如果Err!= nil&amp;&amp;呃!= io。 ef {fmt。 fprintln(操作系统。stderr,err)操作系统。退出(1)}如果n == 0 {break} // expset剩余的剩余时间加上读取的字节数。 chunk:= buf [:offset + n] //在块读取中找到最后的行末端字符。 lastlf:= bytes。 LastIndexbyte(块,&#39; \ n&#39;)toprocess:= chunk如果lastlf!= - 1 {toprocess = chunk [:lastlf]} //循环通过toprocess切片和计数单词。启动:= - 1 // start -1在空格中运行i,c:=范围toprocess {//在我们走的时候转换为ASCII小写。如果c&gt; =&#39; a&#39; &amp;&amp; C&lt;&#39; z&#39; {c = c +(&#39; a&#39; - &#39; a&#39;)toprocess [i] = c}如果start&gt; = 0 {//在一个字中,查找单词的结尾(空白)。如果c <=&#39; &#39; {//计算这个词!递增(计数,Toprocess [开始:i])start = - 1}} else {//在空格中,查找单词(非空间)的开始。如果c&gt; &#39; &#39; {start = i}}} //计算最后一个单词,如果有的话。如果开始&gt; = 0&amp;&amp;开始&lt; len(toprocess){递增(计数,toprocess [start:])} //复制剩余的字节(不完整的线)到缓冲区的开始。如果lastlf!= - 1 {剩余:= chunk [lastlf + 1:]复制(buf,剩余)offset = len(剩余)} els {offset = 0}} var有序[]计数,count:=范围计数{已订购=附加(有序,计数{word,* count})}排序。切片(订购,Func(i,J int)bool {返回订购[i]。count&gt;命令[j]。count})for _,count:=排序范围{fmt。 println(字符串(计数字),计数。计数)}}}}}}}}} func增量(counts map [string] * int,word [] byte){如果p,确定:= counts [string(word)]; OK {// Word已在Map中,递增现有int通过指针。 * p ++返回} // word不在地图中,插入新int。 n:= 1 counts [string(word)] =&amp; n}
分析结果现在非常平坦 - 几乎所有内容都在主循环或地图访问中:
这是一个有趣的练习,Go给你一个公平的低级控制(你可以进一步进一步 - 内存映射I / O,自定义哈希表等)。但是,程序员时间是有价值的,上面的优化版本不是我想要测试或维护的东西。这是棘手的代码,并且有很多潜力的偏离一误错误(如果没有一些错误,我会感到惊讶)。在实践中,我可能会用一个bufio.scanner用scanwords,bytes.tolower和地图[字符串] * int技巧。
自从我上次使用它,C ++很长的路:C ++ 11中的许多好东西,然后在C ++ 14,17和20中更多。功能,到处都是!它绝对是旧学校C ++的宗旨,但错误消息仍然是一团糟。这是我想出的简单版本(通过代码评论堆栈交换的一些帮助,使其有点惯用):
int main(){std :: string word; std :: unordered_map&lt; std :: string,int&gt;计数;而(std :: cin&gt; word){std :: transform(word。begine begine(),word。结束(),word。begen(),[](无符号char c){return std :: tolower( C ); }); ++计数[Word]; }如果(std :: cin。bad()){std :: cerr lt;&lt; &#34;读取stdin \ n&#34错误; ;返回1; } std :: vector&lt; std ::对&lt; std :: string,int&gt;&gt;订购(计数。begin(),counts。结束()); std :: sort(命令。begin(),订购。结束(),[](auto const&amp; a,auto const&amp; b){返回a。第二&gt; b。第二;}; for(auto const&amp; count:downered){std :: cout&lt;&lt;数数 。首先&lt; &#34; &#34; &lt;&lt;数数 。第二&lt; &#34; \ n&#34; ; }}
优化此时,首先要做的是使用已启用的优化(G ++ -O2)编译。我有点像这样的事实,你不必担心这个 - 优化始终打开。
我注意到I / O相对较慢。事实证明,在每个I / O操作后,您可以在程序开始时拒绝与C stdio函数同步的魔法咒语。这条线使它速度几乎运行了两倍:
GCC可以生成与GPROF一起使用的分析报告。这是几行看起来像 - 我孩子不是:
索引%的时间自我称为名称13 Frame_Dummy [1] [1] 100.0 0.01 0.00 0 + 13 Frame_Dummy [1] 13 Frame_Dummy [1] ------------------ ------------------------- 0.00 0.00 0.00 0 0.00 0.00 32187/32187 STD :: Vector&lt; std :: pair \&lt; std:
......