将字节块流式传输到python中的csv行

我需要逐行处理大型远程 CSV 文件,而无需完全下载。

下面是我得到的最接近的。我从 Azure 迭代字节块,并有一些代码来处理截断的行。但是,如果 csv 值包含换行符,这将不起作用,因为我无法区分值换行符和 csv 换行符。

# this does not work
def azure_iter_lines(logger_scope, client, file_path):
    # get a StorageStreamDownloader
    # https://docs.microsoft.com/en-us/python/api/azure-storage-file-datalake/azure.storage.filedatalake.storagestreamdownloader?view=azure-python
    file_client = client.get_file_client(file_path)
    file_handle = file_client.download_file()

    truncated_line = ''
    for chunk in file_handle.chunks():
        # have the previous truncated line appended to the next block
        chunk_txt = truncated_line + chunk.decode("utf-8")
        lines = chunk_txt.split('n') # THIS CANNOT WORK AS VALUES CONTAIN NEWLINES
        for line in lines[0:len(lines)-2]:
            yield line
        truncated_line = lines[len(lines)-1]

    # process the last chunk (same code)
    chunk_txt = truncated_line
    lines = chunk_txt.split('n') # THIS CANNOT WORK AS VALUES CONTAIN NEWLINES
    for line in lines[0:len(lines)-2]:
        yield line
    truncated_line = lines[len(lines)-1]

理想情况下,我会使用 csv.DictReader() 但我无法这样做,因为它完全下载了文件。

# this does not work
def azure_iter_lines(logger_scope, client, file_path):
    file_client = client.get_file_client(file_path)
    file_handle = file_client.download_file()
    buffer = io.BytesIO()
    file_handle.readinto(buffer) # THIS DOWNLOADS THE FILE ENTIRELY
    csvreader = csv.DictReader(buffer, delimiter=";")
    return csvreader

这是使用@H.Leger 的一些提示进行的更新

请注意,这仍然不起作用

file_client = client.get_file_client(file_path)
file_handle = file_client.download_file()
stream = codecs.iterdecode(file_handle.chunks(), 'utf-8')
csvreader = csv.DictReader(stream, delimiter=";")
for row in csvreader:
    print(row)
# => _csv.Error: new-line character seen in unquoted field - do you need to open the file in universal-newline mode?

编辑:基于@paiv 答案的最终解决方案

编辑:更新解决方案以使用 io 而不是编解码器来更快地解析

import io
import csv
import ctypes as ct

# bytes chunk iterator to python stream adapter 
# /sf/answers/4728331821/

class ChunksAdapter:
    def __init__(self, chunks):
        self.chunks = chunks
        self.buf = b''
        self.closed = False
    
    def readable(self):
        return True
        
    def writable(self):
        return False
    
    def seekable(self):
        return False
        
    def close(self):
        self.closed = True
        
    def read(self, size):
        if not self.buf:
            self.buf = next(self.chunks, b'')
        res, self.buf = self.buf[:size], self.buf[size:]
        return res



# get the downloader object
file_client = client.get_file_client(file_path)
downloader = file_client.download_file()
# adapt the downloader iterator to a byte stream
file_object = ChunksAdapter(downloader.chunks())
# decode bytes stream to utf-8
text_stream = io.TextIOWrapper(file_object, encoding='utf-8', newline='') 

# update csv field limit to handle large fields
# /sf/answers/3816205991/
csv.field_size_limit(int(ct.c_ulong(-1).value // 2)) 

csvreader = csv.DictReader(text_stream, delimiter=";", quotechar='"', quoting=csv.QUOTE_MINIMAL)
for row in csvreader:
    print(row)

回答

免责声明:我对 Azure 的细节知之甚少。最终,您也希望流式传输单独的块。

在 Python 中,给定一个文件对象,您可以通过以下方式设置 CSV 流:

import codecs
import csv
codec = codecs.getreader('utf-8')
text_stream = codec(file_object)
csvreader = csv.DictReader(text_stream)

现在您可以迭代csvreader,它将file_object以流媒体方式读取。

编辑:正如@Martijn Pieters 所建议的,我们可以通过TextIOWrapper代替来获得性能codecs

text_stream = io.TextIOWrapper(file_object, encoding='utf-8', newline='')

检查csv 模块中关于newline参数的注释。

但是Azure 的StorageStreamDownloader并没有提供python 的文件对象接口。它有.chunks()生成器(我假设它将调用单独的 HTTP 请求来检索下一个块)。

您可以.chunks()使用简单的适配器适应文件对象:

class ChunksAdapter:
    def __init__(self, chunks):
        self.chunks = chunks
        self.buf = b''
        
    def read(self, size):
        if not self.buf:
            self.buf = next(self.chunks, b'')
        res, self.buf = self.buf[:size], self.buf[size:]
        return res

并使用像

downloader = file_client.download_file()
file_object = ChunksAdapter(downloader.chunks())

一定要配置DictReader了相应的CSV方言。

并在 blob 客户端上为max_single_get_size,设置适当的值。max_chunk_get_size

  • Why use codecs at all? io.TextIOWrapper() would be more robust.

以上是将字节块流式传输到python中的csv行的全部内容。
THE END
分享
二维码
< <上一篇
下一篇>>