将字节块流式传输到python中的csv行
我需要逐行处理大型远程 CSV 文件,而无需完全下载。
下面是我得到的最接近的。我从 Azure 迭代字节块,并有一些代码来处理截断的行。但是,如果 csv 值包含换行符,这将不起作用,因为我无法区分值换行符和 csv 换行符。
# this does not work
def azure_iter_lines(logger_scope, client, file_path):
# get a StorageStreamDownloader
# https://docs.microsoft.com/en-us/python/api/azure-storage-file-datalake/azure.storage.filedatalake.storagestreamdownloader?view=azure-python
file_client = client.get_file_client(file_path)
file_handle = file_client.download_file()
truncated_line = ''
for chunk in file_handle.chunks():
# have the previous truncated line appended to the next block
chunk_txt = truncated_line + chunk.decode("utf-8")
lines = chunk_txt.split('n') # THIS CANNOT WORK AS VALUES CONTAIN NEWLINES
for line in lines[0:len(lines)-2]:
yield line
truncated_line = lines[len(lines)-1]
# process the last chunk (same code)
chunk_txt = truncated_line
lines = chunk_txt.split('n') # THIS CANNOT WORK AS VALUES CONTAIN NEWLINES
for line in lines[0:len(lines)-2]:
yield line
truncated_line = lines[len(lines)-1]
理想情况下,我会使用 csv.DictReader() 但我无法这样做,因为它完全下载了文件。
# this does not work
def azure_iter_lines(logger_scope, client, file_path):
file_client = client.get_file_client(file_path)
file_handle = file_client.download_file()
buffer = io.BytesIO()
file_handle.readinto(buffer) # THIS DOWNLOADS THE FILE ENTIRELY
csvreader = csv.DictReader(buffer, delimiter=";")
return csvreader
这是使用@H.Leger 的一些提示进行的更新
请注意,这仍然不起作用
file_client = client.get_file_client(file_path)
file_handle = file_client.download_file()
stream = codecs.iterdecode(file_handle.chunks(), 'utf-8')
csvreader = csv.DictReader(stream, delimiter=";")
for row in csvreader:
print(row)
# => _csv.Error: new-line character seen in unquoted field - do you need to open the file in universal-newline mode?
编辑:基于@paiv 答案的最终解决方案
编辑:更新解决方案以使用 io 而不是编解码器来更快地解析
import io
import csv
import ctypes as ct
# bytes chunk iterator to python stream adapter
# /sf/answers/4728331821/
class ChunksAdapter:
def __init__(self, chunks):
self.chunks = chunks
self.buf = b''
self.closed = False
def readable(self):
return True
def writable(self):
return False
def seekable(self):
return False
def close(self):
self.closed = True
def read(self, size):
if not self.buf:
self.buf = next(self.chunks, b'')
res, self.buf = self.buf[:size], self.buf[size:]
return res
# get the downloader object
file_client = client.get_file_client(file_path)
downloader = file_client.download_file()
# adapt the downloader iterator to a byte stream
file_object = ChunksAdapter(downloader.chunks())
# decode bytes stream to utf-8
text_stream = io.TextIOWrapper(file_object, encoding='utf-8', newline='')
# update csv field limit to handle large fields
# /sf/answers/3816205991/
csv.field_size_limit(int(ct.c_ulong(-1).value // 2))
csvreader = csv.DictReader(text_stream, delimiter=";", quotechar='"', quoting=csv.QUOTE_MINIMAL)
for row in csvreader:
print(row)
回答
免责声明:我对 Azure 的细节知之甚少。最终,您也希望流式传输单独的块。
在 Python 中,给定一个文件对象,您可以通过以下方式设置 CSV 流:
import codecs
import csv
codec = codecs.getreader('utf-8')
text_stream = codec(file_object)
csvreader = csv.DictReader(text_stream)
现在您可以迭代csvreader,它将file_object以流媒体方式读取。
编辑:正如@Martijn Pieters 所建议的,我们可以通过TextIOWrapper代替来获得性能codecs:
text_stream = io.TextIOWrapper(file_object, encoding='utf-8', newline='')
检查csv 模块中关于newline参数的注释。
但是Azure 的StorageStreamDownloader并没有提供python 的文件对象接口。它有.chunks()生成器(我假设它将调用单独的 HTTP 请求来检索下一个块)。
您可以.chunks()使用简单的适配器适应文件对象:
class ChunksAdapter:
def __init__(self, chunks):
self.chunks = chunks
self.buf = b''
def read(self, size):
if not self.buf:
self.buf = next(self.chunks, b'')
res, self.buf = self.buf[:size], self.buf[size:]
return res
并使用像
downloader = file_client.download_file()
file_object = ChunksAdapter(downloader.chunks())
一定要配置DictReader了相应的CSV方言。
并在 blob 客户端上为max_single_get_size,设置适当的值。max_chunk_get_size
- Why use codecs at all? io.TextIOWrapper() would be more robust.