如何对 HDFS 中的文本数据进行字数统计

概览

此示例计算存储在 HDFS 中的文本文件中的单词数。

这是给谁的?

本指南适用于希望使用 YARN 资源管理器运行 Python 代码的 Spark 集群用户,该管理器读取和处理存储在 HDFS 中的文件。

开始之前

要执行此示例,请下载和.cluster-spark-wordcount.py example scriptcluster-download-wc-data.py script

对于此示例,您需要使用 YARN 资源管理器和 Hadoop 分布式文件系统 (HDFS) 运行 Spark。您可以使用Cloudera CDHHortonworks HDP等企业 Hadoop 发行版安装 Spark、YARN 和 HDFS 。

您还需要有效的Amazon Web Services (AWS) 凭证。

加载 HDFS 数据

首先,我们将示例文本数据加载到 HDFS 数据存储中。以下脚本将示例文本数据(大约 6.4 GB)从公共 Amazon S3 存储桶传输到集群上的 HDFS 数据存储。

将 下载到您的集群并在和变量中插入您的 Amazon AWS 凭证。cluster-download-wc-data.py scriptAWS_KEYAWS_SECRET

import subprocess

AWS_KEY = ''
AWS_SECRET = ''

s3_path = 's3n://{0}:{1}@blaze-data/enron-email'.format(AWS_KEY, AWS_SECRET)
cmd = ['hadoop', 'distcp', s3_path, 'hdfs:///tmp/enron']
subprocess.call(cmd)

注意:由于内存限制,该命令可能会导致 HDFS 在较小的实例大小上失败。hadoop distcp

cluster-download-wc-data.py在 Spark 集群上运行脚本。

python cluster-download-wc-data.py

几分钟后,文本数据将在集群上的 HDFS 数据存储中,并准备好进行分析。

运行作业

将 下载到您的集群。此脚本将读取在步骤 2 中下载的文本文件并计算所有单词。cluster-spark-wordcount.py example script

# cluster-spark-wordcount.py
from pyspark import SparkConf
from pyspark import SparkContext

HDFS_MASTER = 'HEAD_NODE_IP'

conf = SparkConf()
conf.setMaster('yarn-client')
conf.setAppName('spark-wordcount')
conf.set('spark.executor.instances', 10)
sc = SparkContext(conf=conf)

distFile = sc.textFile('hdfs://{0}:9000/tmp/enron/*/*.txt'.format(HDFS_MASTER))

nonempty_lines = distFile.filter(lambda x: len(x) > 0)
print 'Nonempty lines', nonempty_lines.count()

words = nonempty_lines.flatMap(lambda x: x.split(' '))

wordcounts = words.map(lambda x: (x, 1)) \
                  .reduceByKey(lambda x, y: x+y) \
                  .map(lambda x: (x[1], x[0])).sortByKey(False)

print 'Top 100 words:'
print wordcounts.take(100)

HEAD_NODE_IP文本替换为头节点的 IP 地址。

使用spark-submit在 Spark 集群上运行脚本 输出显示从 Spark 脚本返回的示例文本数据中的前 100 个单词。

54.237.100.240: Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/06/13 04:58:42 INFO SparkContext: Running Spark version 1.4.0

[...]

15/06/26 04:32:03 INFO YarnScheduler: Removed TaskSet 7.0, whose tasks have all completed, from pool
15/06/26 04:32:03 INFO DAGScheduler: ResultStage 7 (runJob at PythonRDD.scala:366) finished in 0.210 s
15/06/26 04:32:03 INFO DAGScheduler: Job 3 finished: runJob at PythonRDD.scala:366, took 18.124243 s
[(288283320, ''), (22761900, '\t'), (19583689, 'the'), (13084511, '\t0'), (12330608, '-'),
(11882910, 'to'), (11715692, 'of'), (10822018, '0'), (10251855, 'and'), (6682827, 'in'),
(5463285, 'a'), (5226811, 'or'), (4353317, '/'), (3946632, 'for'), (3695870, 'is'),
(3497341, 'by'), (3481685, 'be'), (2714199, 'that'), (2650159, 'any'), (2444644, 'shall'),
(2414488, 'on'), (2325204, 'with'), (2308456, 'Gas'), (2268827, 'as'), (2265197, 'this'),
(2180110, '$'), (1996779, '\t$0'), (1903157, '12:00:00'), (1823570, 'The'), (1727698, 'not'),
(1626044, 'such'), (1578335, 'at'), (1570484, 'will'), (1509361, 'has'), (1506064, 'Enron'),
(1460737, 'Inc.'), (1453005, 'under'), (1411595, 'are'), (1408357, 'from'), (1334359, 'Data'),
(1315444, 'have'), (1310093, 'Energy'), (1289975, 'Set'), (1281998, 'Technologies,'),
(1280088, '***********'), (1238125, '\t-'), (1176380, 'all'), (1169961, 'other'), (1166151, 'its'),
(1132810, 'an'), (1127730, '&'), (1112331, '>'), (1111663, 'been'), (1098435, 'This'),
(1054291, '0\t0\t0\t0\t'), (1021797, 'States'), (971255, 'you'), (971180, 'which'), (961102, '.'),
(945348, 'I'), (941903, 'it'), (939439, 'provide'), (902312, 'North'), (867218, 'Subject:'),
(851401, 'Party'), (845111, 'America'), (840747, 'Agreement'), (810554, '#N/A\t'), (807259, 'may'),
(800753, 'please'), (798382, 'To'), (771784, '\t$-'), (753774, 'United'), (740472, 'if'),
(739731, '\t0.00'), (723399, 'Power'), (699294, 'To:'), (697798, 'From:'), (672727, 'Date:'),
(661399, 'produced'), (652527, '2001'), (651164, 'format'), (650637, 'Email'), (646922, '3.0'),
(645078, 'licensed'), (644200, 'License'), (642700, 'PST'), (641426, 'cite'), (640441, 'Creative'),
(640089, 'Commons'), (640066, 'NSF'), (639960, 'EML,'), (639949, 'Attribution'),
(639938, 'attribution,'), (639936, 'ZL'), (639936, '(http://www.zlti.com)."'), (639936, '"ZL'),
(639936, 'X-ZLID:'), (639936, '<http://creativecommons.org/licenses/by/3.0/us/>'), (639936, 'X-SDOC:')]

故障排除

如果出现问题,请查阅常见问题/已知问题页面。

更多信息

有关更多信息,请参阅SparkPySpark文档页面。