TextAdapter 第一步

基本用法

IOPro 通过附加到数据源(例如本地 CSV 文件)来工作。在开始之前,让我们创建一个示例 CSV 文件来使用:

from random import random, randint, shuffle
import string

NUMROWS = 10
with open('data/table.csv','w') as data:
    # Header
    for n in range(1,5):
        print("f%d" % n, end=",", file=data)
    print("comment", file=data)

    # Body
    letters = list(string.ascii_letters)
    for n in range(NUMROWS):
        shuffle(letters)
        s = "".join(letters[:randint(5,20)])
        vals = (n, randint(1000,2000), random(), random()*100, s)
        print("%d,%d,%f,%f,%s" % vals, file=data)

让我们读入此处创建的本地 CSV 文件。由于这个小文件很容易放入内存,因此可以使用csvpandas 模块,但我们将演示适用于更大数据的接口和功能。

>>> import iopro
>>> adapter = iopro.text_adapter('data/table.csv', parser='csv')
>>> adapter.get_field_names()
['f1', 'f2', 'f3', 'f4', 'comment']

我们可以为正在读取的 CSV 文件的列中的值指定数据类型;但首先我们看一下 IOPro 的 TextAdapter 自动发现使用的数据类型的能力。

我们可以要求 IOPro 的 TextAdapter 解析文本并使用切片符号从 CSV 文件的选定部分返回 NumPy 数组中的记录:

>>> # the inferred datatypes
>>> array = adapter[:]
>>> array.dtype
dtype([('f1', '<u8'), ('f2', '<u8'), ('f3', '<f8'), ('f4', '<f8'),
       ('comment', 'O')])

让我们定义字段数据类型(例如:将字段 0 设置为 16 位无符号整数,将字段 3 设置为 32 位浮点数)。

按摩数据类型:

>>> adapter.set_field_types({0: 'u2', 3:'f4'})
>>> array = adapter[:]
>>> array.dtype
dtype([('f1', '<u2'), ('f2', '<u8'), ('f3', '<f8'), ('f4', '<f4'),
       ('comment', 'O')])

前五项记录:

>>> array = adapter[0:5]
>>> print(array)
[(0, 1222, 0.926116, 84.44437408447266, 'MlzvBRyquns')
 (1, 1350, 0.553585, 81.03726959228516, 'ikgEauJeTZvd')
 (2, 1932, 0.710919, 31.59865951538086, 'uUQmHJFZhnirecAvx')
 (3, 1494, 0.622391, 57.90607452392578, 'iWQBAZodkfHODtI')
 (4, 1981, 0.820246, 40.848018646240234, 'igxeXdBpqE')]

读取最后五条记录:

>>> array = adapter[-5:]
>>> print(array)
[(5, 1267, 0.694631, 6.999039173126221, 'bRSrwitHeY')
 (6, 1166, 0.37465, 38.7022705078125, 'qzbMgVThXtHpfDNrd')
 (7, 1229, 0.390566, 55.338134765625, 'hyarmvWi')
 (8, 1816, 0.201106, 59.74718475341797, 'DcHymelRusO')
 (9, 1416, 0.725697, 42.50992965698242, 'QMUGRAwe')]

读取所有其他记录:

>>> array = adapter[::2]
>>> print(array)
[(0, 1222, 0.926116, 84.44437408447266, 'MlzvBRyquns')
 (2, 1932, 0.710919, 31.59865951538086, 'uUQmHJFZhnirecAvx')
 (4, 1981, 0.820246, 40.848018646240234, 'igxeXdBpqE')
 (6, 1166, 0.37465, 38.7022705078125, 'qzbMgVThXtHpfDNrd')
 (8, 1816, 0.201106, 59.74718475341797, 'DcHymelRusO')]

仅读取第一和第二、第三字段:

>>> array = adapter[[0,1,2]][:]
>>> list(array)
[(0, 1222, 0.926116),
 (1, 1350, 0.553585),
 (2, 1932, 0.710919),
 (3, 1494, 0.622391),
 (4, 1981, 0.820246),
 (5, 1267, 0.694631),
 (6, 1166, 0.37465),
 (7, 1229, 0.390566),
 (8, 1816, 0.201106),
 (9, 1416, 0.725697)]

仅读取名为 'f2' 和 'comment' 的字段:

>>> array = adapter[['f2','comment']][:]
>>> list(array)
[(1222, 'MlzvBRyquns'),
 (1350, 'ikgEauJeTZvd'),
 (1932, 'uUQmHJFZhnirecAvx'),
 (1494, 'iWQBAZodkfHODtI'),
 (1981, 'igxeXdBpqE'),
 (1267, 'bRSrwitHeY'),
 (1166, 'qzbMgVThXtHpfDNrd'),
 (1229, 'hyarmvWi'),
 (1816, 'DcHymelRusO'),
 (1416, 'QMUGRAwe')]

JSON 支持

可以通过为解析器参数指定 'json' 来解析 JSON 格式的文本数据:

文件内容data/one.json

{"id":123, "name":"xxx"}

单个 JSON 对象:

>>> adapter = iopro.text_adapter('data/one.json', parser='json')
>>> adapter[:]
array([(123, 'xxx')],
      dtype=[('id', '<u8'), ('name', 'O')])

目前,根级别的每个 JSON 对象都被解释为单个 NumPy 记录。每个 JSON 对象都可以是数组的一部分,也可以由换行符分隔。IOPro 可以解析的有效 JSON 文档示例,以及 NumPy 数组结果:

文件内容data/two.json

[{"id":123, "name":"xxx"}, {"id":456, "name":"yyy"}]

两个 JSON 对象的数组:

>>> iopro.text_adapter('data/two.json', parser='json')[:]
array([(123, 'xxx'), (456, 'yyy')],
      dtype=[('id', '<u8'), ('name', 'O')])

文件内容data/three.json

{"id":123, "name":"xxx"}
{"id":456, "name":"yyy"}

由换行符分隔的两个 JSON 对象:

>>> iopro.text_adapter('data/three.json', parser='json')[:]
array([(123, 'xxx'), (456, 'yyy')],
      dtype=[('id', '<u8'), ('name', 'O')])

在适配器中按摩数据

自定义函数可用于在读取值时修改值。

>>> import iopro, io, math
>>> stream = io.StringIO('3,abc,3.3\n7,xxx,9.9\n4,,')
>>> adapter = iopro.text_adapter(stream, parser='csv', field_names=False)

覆盖第一个字段的默认转换器:

>>> adapter.set_converter(0, lambda x: math.factorial(int(x)))
>>> adapter[:]
array([(6, 'abc', 3.3), (5040, 'xxx', 9.9), (24, '', nan)],
      dtype=[('f0', '<u8'), ('f1', 'O'), ('f2', '<f8')])

我们还可以强制数据类型并为缺失数据设置填充值。

将数据类型应用于列:

>>> stream = io.StringIO('3,abc,3.3\n7,xxx,9.9\n4,,')
>>> adapter = iopro.text_adapter(stream, parser='csv', field_names=False)
>>> adapter.set_field_types({1:'S3', 2:'f4'})
>>> adapter[:]
array([(3, b'abc', 3.299999952316284), (7, b'xxx', 9.899999618530273),
       (4, b'', nan)],
      dtype=[('f0', '<u8'), ('f1', 'S3'), ('f2', '<f4')])

为每个字段中的缺失值设置填充值:

>>> adapter.set_fill_values({1:'ZZZ', 2:999.999})
>>> adapter[:]
array([(3, b'abc', 3.299999952316284), (7, b'xxx', 9.899999618530273),
       (4, b'ZZZ', 999.9990234375)],
      dtype=[('f0', '<u8'), ('f1', 'S3'), ('f2', '<f4')])

结合正则表达式和类型转换

后面的部分将更详细地讨论正则表达式。这个例子是对 IOPro 使用它们的快速浏览。

文件内容data/transactions.csv

$2.56, 50%, September 20 1978
$1.23, 23%, April 5 1981

组合功能:

>>> import iopro
>>> regex_string = '\$(\d)\.(\d{2}),\s*([0-9]+)\%,\s*([A-Za-z]+)'
>>> adapter = iopro.text_adapter('data/transactions.csv',
...                              parser='regex',
...                              regex_string=regex_string,
...                              field_names=False,
...                              infer_types=False)

设置字段的 dtype 及其名称:

>>> adapter.set_field_types({0:'i2', 1:'u2', 2:'f4', 3:'S10'})
>>> adapter.set_field_names(['dollars', 'cents', 'percentage', 'month'])
>>> adapter[:]
array([(2, 56, 50.0, b'September'), (1, 23, 23.0, b'April')],
      dtype=[('dollars', '<i2'), ('cents', '<u2'),
             ('percentage', '<f4'), ('month', 'S10')])

高级文本适配器

iopro.loadtext()iopro.genfromtxt()

在 IOPro 中有两个密切相关的功能。loadtext(),我们一直在研究,它做出了一个更乐观的假设,即您的数据格式良好。genfromtxt()有许多处理混乱数据的参数,以及处理丢失数据的特殊行为。

loadtext()已经具有高度可配置性,可以处理多种 CSV 和其他分隔格式下的数据。genfromtxt()包含这些参数的超集。

Gzip 支持

IOPro 可以动态解压缩 gzip 压缩的数据,只需指明一个 compression关键字参数。

>>> adapter = iopro.text_adapter('data.gz', parser='csv', compression='gzip')
>>> array = adapter[:]

除了无需先解压缩即可存储和处理压缩数据之外,您也无需为此牺牲任何性能。例如,一个 419 MB 的 CSV 数字数据测试文件和一个 105 MB 的相同数据用 gzip 压缩文件,以下是测试机器上的运行时间,用于将每个文件的全部内容加载到 NumPy 数组中。机器之间的确切性能会有所不同,尤其是在具有 HDD 和 SSD 架构的机器之间。:

-  uncompressed: 13.38 sec
-  gzip compressed: 14.54 sec

在测试中,压缩文件需要的时间稍长,但考虑在使用 IOPro 加载之前必须将文件解压缩到磁盘:

  • 未压缩:13.38 秒
  • gzip 压缩:14.54 秒
  • gzip 压缩(解压缩到磁盘,然后加载):21.56 秒

索引 CSV 数据

IOPro 最有用的功能之一是能够索引数据以允许快速随机查找。

例如,要检索我们上面使用的压缩 109 MB 数据集的最后一条记录:

>>> adapter = iopro.text_adapter('data.gz', parser='csv', compression='gzip')
>>> array = adapter[-1]

将最后一条记录检索到 NumPy 数组中需要 14.82 秒。这与读取整个数组的时间大致相同,因为必须解析整个数据集才能获得最后一条记录。

为了加快查找速度,我们可以建立一个索引:

>>> adapter.create_index('index_file')

上述方法在内存中创建索引并将其保存到磁盘,耗时 9.48 秒。现在再次查找和读取最后一条记录时,只需 0.02 秒。

重新加载索引只需要 0.18 秒。如果您建立一次索引,您将永远获得对数据的即时随机访问(假设数据保持静态):

>>> adapter = iopro.text_adapter('data.gz', parser='csv',
...                              compression='gzip',
...                              index_name='index_file')

让我们用一个中等大小的例子来试试。您可以从Exoplanets Data Explorer站点下载此数据。

>>> adapter = iopro.text_adapter('data/exoplanets.csv.gz',
...                              parser='csv', compression='gzip')
>>> print(len(adapter[:]), "rows")
>>> print(', '.join(adapter.field_names[:3]),
...       '...%d more...\n   ' % (adapter.field_count-6),
...       ', '.join(adapter.field_names[-3:]))
2042 rows
name, mass, mass_error_min ...73 more...
    star_teff, star_detected_disc, star_magnetic_field
>>> adapter.field_types
{0: dtype('O'),
 1: dtype('float64'),
 2: dtype('float64'),
 3: dtype('O'),
 4: dtype('float64'),
 5: dtype('float64'),
 6: dtype('float64'),
 7: dtype('float64'),
 8: dtype('O'),
 9: dtype('float64'),
 [... more fields ...]
 69: dtype('float64'),
 70: dtype('float64'),
 71: dtype('float64'),
 72: dtype('float64'),
 73: dtype('float64'),
 74: dtype('O'),
 75: dtype('float64'),
 76: dtype('float64'),
 77: dtype('O'),
 78: dtype('uint64')}

做一些计时(使用 IPython 魔法):

>>> %time row = adapter[-1]
CPU times: user 35 ms, sys: 471 µs, total: 35.5 ms
Wall time: 35.5 ms
>>> %time adapter.create_index('data/exoplanets.index')
CPU times: user 15.7 ms, sys: 3.35 ms, total: 19.1 ms
Wall time: 18.6 ms
>>> %time row = adapter[-1]
CPU times: user 18.3 ms, sys: 1.96 ms, total: 20.3 ms
Wall time: 20.1 ms
>>> new_adapter = iopro.text_adapter('data/exoplanets.csv.gz', parser='csv',
...                                  compression='gzip',
...                                  index_name='data/exoplanets.index')
>>> %time row = new_adapter[-1]
CPU times: user 17.3 ms, sys: 2.12 ms, total: 19.4 ms
Wall time: 19.4 ms

正则表达式

有些人在遇到问题时会想“我知道,我会使用正则表达式”。现在他们有两个问题。——杰米·扎温斯基

IOPro 支持使用正则表达式来帮助解析杂乱的数据。以以下在 Internet 上找到的实际 NASDAQ 股票数据片段为例:

文件内容data/stocks.csv

Name,Symbol,Exchange,Range
Apple,AAPL,NasdaqNM,363.32 - 705.07
Google,GOOG,NasdaqNM,523.20 - 774.38
Microsoft,MSFT,NasdaqNM,24.30 - 32.95

前三个字段很简单:名称、符号和交换。第四个字段存在一些问题。让我们试试 IOPro 的基于正则表达式的解析器:

>>> regex_string = '([A-Za-z]+),([A-Z]{1,4}),([A-Za-z]+),'\
...                '(\d+.\.\d{2})\s*\-\s*(\d+.\.\d{2})'
>>> adapter = iopro.text_adapter('data/stocks.csv', parser='regex',
...                              regex_string=regex_string)
>>> # Notice that header does not now match the regex
>>> print(adapter.field_names)
['Name,Symbol,Exchange,Range', '', '', '', '']
>>> # We can massage the headers to reflect our match pattern
>>> info = adapter.field_names[0].split(',')[:3]
>>> adapter.field_names =  info + ["Low", "High"]
>>> adapter[:]
array([('Apple', 'AAPL', 'NasdaqNM', 363.32, 705.07),
       ('Google', 'GOOG', 'NasdaqNM', 523.2, 774.38),
       ('Microsoft', 'MSFT', 'NasdaqNM', 24.3, 32.95)],
       dtype=[('Name', 'O'), ('Symbol', 'O'),
              ('Exchange', 'O'), ('Low', '<f8'), ('High', '<f8')])

正则表达式很紧凑,通常难以阅读,但它们也非常强大。通过将上述正则表达式与分组运算符 '(' 和 ')' 一起使用,我们可以准确定义每条记录应如何解析为字段。让我们将其分解为各个领域:

  • ([A-Za-z]+) 定义输出数组中的第一个字段(股票名称)
  • ([A-Z]{1-4}) 定义第二个(股票代码)
  • ([A-Za-z]+) 定义第三个(交换名称)
  • (\d+.\.\d{2}) 定义第四个字段(低价)
  • \s*\-\s* 被跳过,因为它不是组的一部分
  • (\d+.\.\d{2}) 定义第五个字段(高价)

输出数组包含五个字段:三个字符串字段和两个浮点字段。正是我们想要的。

S3 支持

IOPro 可以解析存储在亚马逊 S3 云存储服务中的 CSV 数据。为了访问 S3 文件,您需要指定一些凭据以及您正在访问的资源。

前两个参数是您的 AWS 访问密钥和秘密密钥,然后是 S3 存储桶名称和密钥名称。S3 CSV 数据以 128K 块的形式下载并直接从内存中解析,无需将整个 S3 数据集保存到本地磁盘。

让我们来看看我们从 Health Insurance Marketplace 数据中存储的内容。BeautifulSoup 有一些代码只是为了美化原始 XML 查询结果。

>>> import urllib.request
>>> url = 'http://s3.amazonaws.com/product-training/'
>>> xml = urllib.request.urlopen(url).read()
>>> import bs4, re
>>> r = re.compile(r'^(\s*)', re.MULTILINE)
>>> def display(bs, encoding=None, formatter="minimal", indent=4):
...     print(r.sub(r'\1' * indent, bs.prettify(encoding, formatter)))
>>> display(bs4.BeautifulSoup(xml, "xml"))
<?xml version="1.0" encoding="utf-8"?>
<ListBucketResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
    <Name>
        product-training
    </Name>
    <Prefix/>
    <Marker/>
    <MaxKeys>
        1000
    </MaxKeys>
    <IsTruncated>
        false
    </IsTruncated>
    <Contents>
        <Key>
            BusinessRules.csv
        </Key>
        <LastModified>
            2016-06-25T00:03:20.000Z
        </LastModified>
        <ETag>
            "a565ebede6a7e6e060cd4526a7ae4345"
        </ETag>
        <Size>
            8262590
        </Size>
        <StorageClass>
            STANDARD
        </StorageClass>
    </Contents>
    <Contents>
        [... more files ...]
    </Contents>
</ListBucketResult>

以简单的形式,我们可以看到一些 S3 资源的详细信息。让我们访问其中之一。请注意,您需要填写实际的 AWS 访问密钥和秘密密钥。

>>> user_name = "class1"
>>> aws_access_key = "ABCD"
>>> aws_secret_key = "EFGH/IJK"
>>> bucket = 'product-training'
>>> key_name = 'BusinessRules.csv' # 21k lines, 8MB
>>> # key_name = 'PlanAttributes.csv' # 77k lines, 95MB
>>> # key_name = 'Rate.csv.gzip' # 13M lines, 2GB raw, 110MB compressed
>>> adapter = iopro.s3_text_adapter(aws_access_key, aws_secret_key,
...                                 bucket, key_name)
>>> # Don't try with the really large datasets, works with the default one
>>> df = adapter.to_dataframe()
>>> df.iloc[:6,:6]
营业年 州代码 发行人 ID 来源名称 版本号 进口日期
0 2014年 AL 82285 个人信息系统 7 2014-01-21 08:29:49
1 2014年 AL 82285 个人信息系统 7 2014-01-21 08:29:49
2 2014年 AL 82285 个人信息系统 7 2014-01-21 08:29:49
3 2014年 AL 82285 个人信息系统 7 2014-01-21 08:29:49
4 2014年 AL 82285 个人信息系统 7 2014-01-21 08:29:49
5 2014年 AZ 17100 个人信息系统 7 2013-10-15 07:27:56

与基于磁盘的 CSV 数据一样,IOPro 还可以为 S3 数据构建索引,并使用该索引进行快速随机访问查找。如果使用 IOPro 创建索引文件并与 S3 数据集一起存储在云中,IOPro 可以使用此远程索引下载和解析请求的记录子集。这允许您生成一个索引文件并与数据集一起在云端共享,并且不需要其他人下载整个索引文件来使用它。