如何进行自然语言处理¶
概览¶
此示例提供了一个使用NLTK 库的简单 PySpark 作业。NLTK 是用于自然语言处理的流行 Python 包。本示例将演示 Python 库在集群上的安装、Spark 与 YARN 资源管理器的使用以及 Spark 作业的执行。
这是给谁的?¶
本指南适用于希望使用 YARN 资源管理器运行 Python 代码的 Spark 集群用户。本指南将向您展示如何将第三方 Python 库与 Spark 集成。
开始之前¶
要执行此示例,请下载:或
.cluster-spark-nltk.py example script
cluster-spark-nltk.ipynb example notebook
对于此示例,您需要使用 YARN 资源管理器运行 Spark。您可以使用Cloudera CDH 或Hortonworks HDP等企业 Hadoop 发行版安装 Spark 和 YARN 。
安装 NLTK ¶
如果您有权安装软件包,则acluster
可以将 NLTK 安装为 conda 软件包。
acluster conda install nltk
您应该会从每个节点看到与此类似的输出,这表明该软件包已在整个集群中成功安装:
Node "ip-10-140-235-89.ec2.internal":
Successful actions: 1/1
Node "ip-10-154-10-144.ec2.internal":
Successful actions: 1/1
Node "ip-10-31-96-152.ec2.internal":
Successful actions: 1/1
为了使用完整的 NLTK 库,您需要下载 NLTK 项目的数据。您可以使用该命令下载所有集群节点上的数据。acluster cmd
acluster cmd 'sudo /opt/anaconda/bin/python -m nltk.downloader -d /usr/share/nltk_data all'
几分钟后,您应该会看到与此类似的输出。
Execute command "sudo /opt/anaconda/bin/python -m nltk.downloader -d /usr/share/nltk_data all" target: "*" cluster: "d"
All nodes (x3) response: [nltk_data] Downloading collection 'all'
[nltk_data] |
[nltk_data] | Downloading package abc to /usr/share/nltk_data...
[nltk_data] | Unzipping corpora/abc.zip.
[nltk_data] | Downloading package alpino to /usr/share/nltk_data...
[nltk_data] | Unzipping corpora/alpino.zip.
[nltk_data] | Downloading package biocreative_ppi to
[nltk_data] | /usr/share/nltk_data...
....
[nltk_data] | Unzipping models/bllip_wsj_no_aux.zip.
[nltk_data] | Downloading package word2vec_sample to
[nltk_data] | /usr/share/nltk_data...
[nltk_data] | Unzipping models/word2vec_sample.zip.
[nltk_data] |
[nltk_data] Done downloading collection all
运行作业¶
这是在 PySpark 中运行 Spark + NLTK 示例的完整脚本。
# cluster-spark-nltk.py
from pyspark import SparkConf
from pyspark import SparkContext
conf = SparkConf()
conf.setMaster('yarn-client')
conf.setAppName('spark-nltk')
sc = SparkContext(conf=conf)
data = sc.textFile('file:///usr/share/nltk_data/corpora/state_union/1972-Nixon.txt')
def word_tokenize(x):
import nltk
return nltk.word_tokenize(x)
def pos_tag(x):
import nltk
return nltk.pos_tag([x])
words = data.flatMap(word_tokenize)
print words.take(10)
pos_word = words.map(pos_tag)
print pos_word.take(5)
让我们来看看上面的代码示例。首先,我们将创建一个 SparkContext。请注意,用于集群管理的 Anaconda 默认不会创建 SparkContext。在本例中,我们使用 YARN 资源管理器。
from pyspark import SparkConf
from pyspark import SparkContext
conf = SparkConf()
conf.setMaster('yarn-client')
conf.setAppName('spark-nltk')
sc = SparkContext(conf=conf)
创建 SparkContext 后,我们可以将一些数据加载到 Spark 中。在这种情况下,数据文件来自 NLTK 提供的示例文档之一。
data = sc.textFile('file:///usr/share/nltk_data/corpora/state_union/1972-Nixon.txt')
接下来,我们编写一个函数来导入nltk
和调用nltk.word_tokenize
. 该函数被映射到在上一步中读取的文本文件。
def word_tokenize(x):
import nltk
return nltk.word_tokenize(x)
words = data.flatMap(word_tokenize)
我们可以flatMap
通过返回数据集中的一些单词来确认操作是否有效。
print words.take(10)
最后,NTLK 的POS-tagger可用于查找每个单词的词性。
def pos_tag(x):
import nltk
return nltk.pos_tag([x])
pos_word = words.map(pos_tag)
print pos_word.take(5)
使用spark-submit脚本在 Spark 集群上运行脚本。输出显示从 Spark 脚本返回的单词,包括flatMap
操作的结果和POS-tagger
.
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/06/13 05:14:29 INFO SparkContext: Running Spark version 1.4.0
[...]
['Address',
'on',
'the',
'State',
'of',
'the',
'Union',
'Delivered',
'Before',
'a']
[...]
[[('Address', 'NN')],
[('on', 'IN')],
[('the', 'DT')],
[('State', 'NNP')],
[('of', 'IN')]]