如何进行自然语言处理



概览

此示例提供了一个使用NLTK 库的简单 PySpark 作业。NLTK 是用于自然语言处理的流行 Python 包。本示例将演示 Python 库在集群上的安装、Spark 与 YARN 资源管理器的使用以及 Spark 作业的执行。

这是给谁的?

本指南适用于希望使用 YARN 资源管理器运行 Python 代码的 Spark 集群用户。本指南将向您展示如何将第三方 Python 库与 Spark 集成。

开始之前

要执行此示例,请下载:或 .cluster-spark-nltk.py example scriptcluster-spark-nltk.ipynb example notebook

对于此示例,您需要使用 YARN 资源管理器运行 Spark。您可以使用Cloudera CDHHortonworks HDP等企业 Hadoop 发行版安装 Spark 和 YARN 。

安装 NLTK

如果您有权安装软件包,则acluster可以将 NLTK 安装为 conda 软件包。

acluster conda install nltk

您应该会从每个节点看到与此类似的输出,这表明该软件包已在整个集群中成功安装:

Node "ip-10-140-235-89.ec2.internal":
    Successful actions: 1/1
Node "ip-10-154-10-144.ec2.internal":
    Successful actions: 1/1
Node "ip-10-31-96-152.ec2.internal":
    Successful actions: 1/1

为了使用完整的 NLTK 库,您需要下载 NLTK 项目的数据。您可以使用该命令下载所有集群节点上的数据。acluster cmd

acluster cmd 'sudo /opt/anaconda/bin/python -m nltk.downloader -d /usr/share/nltk_data all'

几分钟后,您应该会看到与此类似的输出。

Execute command "sudo /opt/anaconda/bin/python -m nltk.downloader -d /usr/share/nltk_data all" target: "*" cluster: "d"
    All nodes (x3) response: [nltk_data] Downloading collection 'all'
[nltk_data]    |
[nltk_data]    | Downloading package abc to /usr/share/nltk_data...
[nltk_data]    |   Unzipping corpora/abc.zip.
[nltk_data]    | Downloading package alpino to /usr/share/nltk_data...
[nltk_data]    |   Unzipping corpora/alpino.zip.
[nltk_data]    | Downloading package biocreative_ppi to
[nltk_data]    |     /usr/share/nltk_data...

....

[nltk_data]    |   Unzipping models/bllip_wsj_no_aux.zip.
[nltk_data]    | Downloading package word2vec_sample to
[nltk_data]    |     /usr/share/nltk_data...
[nltk_data]    |   Unzipping models/word2vec_sample.zip.
[nltk_data]    |
[nltk_data]  Done downloading collection all

运行作业

这是在 PySpark 中运行 Spark + NLTK 示例的完整脚本。

# cluster-spark-nltk.py
from pyspark import SparkConf
from pyspark import SparkContext

conf = SparkConf()
conf.setMaster('yarn-client')
conf.setAppName('spark-nltk')
sc = SparkContext(conf=conf)

data = sc.textFile('file:///usr/share/nltk_data/corpora/state_union/1972-Nixon.txt')

def word_tokenize(x):
    import nltk
    return nltk.word_tokenize(x)

def pos_tag(x):
    import nltk
    return nltk.pos_tag([x])

words = data.flatMap(word_tokenize)
print words.take(10)

pos_word = words.map(pos_tag)
print pos_word.take(5)

让我们来看看上面的代码示例。首先,我们将创建一个 SparkContext。请注意,用于集群管理的 Anaconda 默认不会创建 SparkContext。在本例中,我们使用 YARN 资源管理器。

from pyspark import SparkConf
from pyspark import SparkContext

conf = SparkConf()
conf.setMaster('yarn-client')
conf.setAppName('spark-nltk')
sc = SparkContext(conf=conf)

创建 SparkContext 后,我​​们可以将一些数据加载到 Spark 中。在这种情况下,数据文件来自 NLTK 提供的示例文档之一。

data = sc.textFile('file:///usr/share/nltk_data/corpora/state_union/1972-Nixon.txt')

接下来,我们编写一个函数来导入nltk和调用nltk.word_tokenize. 该函数被映射到在上一步中读取的文本文件。

def word_tokenize(x):
    import nltk
    return nltk.word_tokenize(x)

words = data.flatMap(word_tokenize)

我们可以flatMap通过返回数据集中的一些单词来确认操作是否有效。

print words.take(10)

最后,NTLK 的POS-tagger可用于查找每个单词的词性。

def pos_tag(x):
    import nltk
    return nltk.pos_tag([x])

pos_word = words.map(pos_tag)
print pos_word.take(5)

使用spark-submit脚本在 Spark 集群上运行脚本。输出显示从 Spark 脚本返回的单词,包括flatMap操作的结果和POS-tagger.

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/06/13 05:14:29 INFO SparkContext: Running Spark version 1.4.0

[...]

['Address',
 'on',
 'the',
 'State',
 'of',
 'the',
 'Union',
 'Delivered',
 'Before',
 'a']

[...]

[[('Address', 'NN')],
 [('on', 'IN')],
 [('the', 'DT')],
 [('State', 'NNP')],
 [('of', 'IN')]]

故障排除

如果出现问题,请查阅常见问题/已知问题页面。

更多信息

有关更多信息,请参阅SparkPySpark文档页面。

有关 NLTK 的更多信息,请参阅NLTK 书籍 <http://www.nltk.org/book/>