大数据系列3:用Python编写MapReduce

      vi mapper.py

      输入:

             #!/usr/bin/env python

             importsys

             for linein sys.stdin:

                           line= line.strip()

                           words= line.split()

                           forword in words:

                                         print'%s\t%s' % (word,1)

      chmod +x mapper.py

      vi reducer.py

      输入:

            #!/usr/bin/envpython

            from operator import itemgetter

            import sys

            current_word = None

            current_count = 0

            word = None

            for line in sys.stdin:

                          line = line.strip()

                          word, count =line.split('\t', 1)

                          try:

                                        count =int(count)

                          except ValueError:

                                        continue

                          if current_word ==word:

                                        current_count+= count

                          else:

                                        ifcurrent_word:

                                                      print'%s\t%s' % (current_word, current_count)

                                        current_count= count

                                        current_word= word

            if current_word:

                           print'%s\t%s' % (current_word, current_count)

      chmod +x reducer.py

      本地操作系统测试:

      echo "foo fooquux labs foo bar quux" | ./mapper.py | sort | ./reducer.py

      提交HADOOP集群运行:

      hadoop jar hadoop-1.2.1/contrib/streaming/hadoop-streaming-1.2.1.jar -input input -output output-streaming-python  -mapper /home/ysc/mapper.py -reducer /home/ysc/reducer.py


 

APDPlat旗下十大开源项目

相关推荐