Avro.jl:附加记录时遇到问题(更新)
我特别不熟悉 Avro 和 Avro.jl。我在编写记录然后以稍后可以阅读的方式附加第二条记录时遇到困难。我还需要能够读取 python 和 Julia 中的文件。
我选择使用 Avro,因为它是基于行的,并且我需要在模拟运行时一次添加一行(即记录)。CSV 不起作用,因为我的数据没有固定的列。
——
更新:我在这里插入我遇到的问题的简化版本。
以下代码尝试将多条记录写入一个文件,然后读取它们。在 Julia 中,它只读取第一条记录。在python中,我可以读取所有记录。我曾尝试使用 Avro.writetable 但没有取得多大成功。
import Avro,StructTypes,Tables,JSON3,Base
import Random
District=Dict{String,Int}
Districting=Dict{String,District}
Base.@kwdef struct Map
name::String
districting::Districting
desc::String=""
numLevels::Int=1
end
#
#
districting=Districting()
d=District()
t=()
#
keys=["P1","P2","P3","C1","B1"]
io=open("map1Test.avro","w")
#
rng = Random.MersenneTwister(1234);
#
for i=1:4
for j=1:3
for k in keys
d[k]=convert(Int,floor(100*Random.rand(rng)))
end
districting[string("D",j)]=d
end
t=(name=string("map",i),levels=1,districting=districting)
print("write : ",t,"n")
Avro.write(io,t)
write(io,"n")
end
close(io)
#
print("nn")
#
asc=Avro.schematype(typeof(t))
tType=typeof(t)
JSON3.write(asc)
io=open("mapAutoGen.avsc","w")
print("avsc: ",JSON3.write(asc),"n")
print("type: ",tType,"n")
write(io,JSON3.write(asc))
close(io)
#
print("nn")
rec=Any[]
io=open("map1Test.avro","r")
while eof(io)==false
m=Avro.read(io,tType)
print("reading :",m,"n")
end
#
close(io)
输出是:
write : (name = "map1", levels = 1, districting = Dict("D2" => Dict("C1" => 95, "B1" => 64, "P2" => 1, "P1" => 64, "P3" => 6), "D3" => Dict("C1" => 95, "B1" => 64, "P2" => 1, "P1" => 64, "P3" => 6), "D1" => Dict("C1" => 95, "B1" => 64, "P2" => 1, "P1" => 64, "P3" => 6)))
write : (name = "map2", levels = 1, districting = Dict("D2" => Dict("C1" => 12, "B1" => 37, "P2" => 9, "P1" => 3, "P3" => 31), "D3" => Dict("C1" => 12, "B1" => 37, "P2" => 9, "P1" => 3, "P3" => 31), "D1" => Dict("C1" => 12, "B1" => 37, "P2" => 9, "P1" => 3, "P3" => 31)))
write : (name = "map3", levels = 1, districting = Dict("D2" => Dict("C1" => 30, "B1" => 37, "P2" => 69, "P1" => 4, "P3" => 36), "D3" => Dict("C1" => 30, "B1" => 37, "P2" => 69, "P1" => 4, "P3" => 36), "D1" => Dict("C1" => 30, "B1" => 37, "P2" => 69, "P1" => 4, "P3" => 36)))
write : (name = "map4", levels = 1, districting = Dict("D2" => Dict("C1" => 75, "B1" => 3, "P2" => 61, "P1" => 28, "P3" => 66), "D3" => Dict("C1" => 75, "B1" => 3, "P2" => 61, "P1" => 28, "P3" => 66), "D1" => Dict("C1" => 75, "B1" => 3, "P2" => 61, "P1" => 28, "P3" => 66)))
avsc: {"type":"record","name":"Record_8558689697622909467","fields":[{"name":"name","type":{"type":"string"},"order":"ascending"},{"name":"levels","type":{"type":"long"},"order":"ascending"},{"name":"districting","type":{"type":"map","values":{"type":"map","values":{"type":"long"}}},"order":"ascending"}]}
type: NamedTuple{(:name, :levels, :districting), Tuple{String, Int64, Dict{String, Dict{String, Int64}}}}
reading :(name = "map1", levels = 1, districting = Dict("D2" => Dict("C1" => 95, "B1" => 64, "P2" => 1, "P1" => 64, "P3" => 6), "D3" => Dict("C1" => 95, "B1" => 64, "P2" => 1, "P1" => 64, "P3" => 6), "D1" => Dict("C1" => 95, "B1" => 64, "P2" => 1, "P1" => 64, "P3" => 6)))
但是,以下 python 代码读取文件并执行我想要的操作:
import fastavro
parsed_schema=fastavro.schema.load_schema("mapAutoGen.avsc")
fp=open('map1Test.avro', 'rb')
while True:
try:
record=fastavro.schemaless_reader(fp, parsed_schema)
print(record)
except:
break
fp.close()
以下输出
输出
{'name': 'map1', 'levels': 1, 'districting': {'D2': {'C1': 95, 'B1': 64, 'P2': 1, 'P1': 64, 'P3': 6}, 'D3': {'C1': 95, 'B1': 64, 'P2': 1, 'P1': 64, 'P3': 6}, 'D1': {'C1': 95, 'B1': 64, 'P2': 1, 'P1': 64, 'P3': 6}}}
{'name': 'x08map2', 'levels': 1, 'districting': {'D2': {'C1': 12, 'B1': 37, 'P2': 9, 'P1': 3, 'P3': 31}, 'D3': {'C1': 12, 'B1': 37, 'P2': 9, 'P1': 3, 'P3': 31}, 'D1': {'C1': 12, 'B1': 37, 'P2': 9, 'P1': 3, 'P3': 31}}}
{'name': 'x08map3', 'levels': 1, 'districting': {'D2': {'C1': 30, 'B1': 37, 'P2': 69, 'P1': 4, 'P3': 36}, 'D3': {'C1': 30, 'B1': 37, 'P2': 69, 'P1': 4, 'P3': 36}, 'D1': {'C1': 30, 'B1': 37, 'P2': 69, 'P1': 4, 'P3': 36}}}
{'name': 'x08map4', 'levels': 1, 'districting': {'D2': {'C1': 75, 'B1': 3, 'P2': 61, 'P1': 28, 'P3': 66}, 'D3': {'C1': 75, 'B1': 3, 'P2': 61, 'P1': 28, 'P3': 66}, 'D1': {'C1': 75, 'B1': 3, 'P2': 61, 'P1': 28, 'P3': 66}}}
——
我在下面做了一些实验,希望能解释我遇到的问题。
这是摘要:
- 经验1:我可以在Julia中写入和读取单个记录,但无法在python中读取
- 实验 2:我可以在 Julia 中一次写入和读取两条记录,并在 Julia 中将它们作为单个记录读取。Python不读取文件
- 实验 3:在 julia 中写入两条记录(一个接一个),然后在 Julia 中读取它们(一次全部)。在 python 中没有运气。
- 在一次使用writetable,写两个记录,并在阅读它们都朱莉娅和蟒蛇。在 Julia 中,它们是一次性阅读的。在 python 3 中,一次一个。
- 使用 writetable,对一个文件进行两次写入。朱莉娅什么也没读。Python 设法通过第二条记录读取第一条记录。
总之,我希望能够记录是字典(或包含字典的元组),然后关闭文件,重新打开它,然后写入另一条记录。然后我希望能够用 Julia 或 python 编写。
实验一
import Avro,StructTypes,Tables,JSON3,Base
display(smap)
display(smap2)
输出:
Dict{String, Int64} with 25 entries:
"["ROWAN","32","371590519022049"]" => 4
"["RUTHERFORD","05A","371619611022065"]" => 7
"["RANDOLPH","AN","371510303011002"]" => 2
"["CLEVELAND","POLKVL","370459501022046"]" => 9
Dict{String, Int64} with 25 entries:
"["ROWAN","32","371590519022049"]" => 4
"["RANDOLPH","AN","371510303011002"]" => 2
"["GRANVILLE","TYHO","370779706012012"]" => 1
io=open("map-1.avro","w")
Avro.write(io,smap)
close(io)
io=open("map-1.avro","r")
while eof(io)==false
m=Avro.read(io,typeof(smap2))
display(Dict(m))
end
close(io)
输出
Dict{String, Int64} with 25 entries:
"["ROWAN","32","371590519022049"]" => 4
"["RUTHERFORD","05A","371619611022065"]" => 7
"["RANDOLPH","AN","371510303011002"]" => 2
"["CLEVELAND","POLKVL","370459501022046"]" => 9
我尝试将文件读入 python 失败。
sch2=fastavro.schema.load_schema("rec2.avsc")
with open('map-1.avro', 'rb') as fo:
avro_reader = fastavro.reader(fo,sch2)
for record in avro_reader:
print(record)
print("---------n")
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
<ipython-input-65-c1108c8faa22> in <module>
1 sch2=fastavro.schema.load_schema("rec2.avsc")
2 with open('map-1.avro', 'rb') as fo:
----> 3 avro_reader = fastavro.reader(fo,sch2)
4 for record in avro_reader:
5 print(record)
*output from python*
fastavro/_read.pyx in fastavro._read.reader.__init__()
fastavro/_read.pyx in fastavro._read.file_reader.__init__()
fastavro/_read.pyx in fastavro._read._read_data()
fastavro/_read.pyx in fastavro._read.read_record()
fastavro/_read.pyx in fastavro._read._read_data()
fastavro/_read.pyx in fastavro._read.read_map()
fastavro/_read.pyx in fastavro._read._read_data()
fastavro/_read.pyx in fastavro._read.read_bytes()
ValueError: read length must be non-negative or -1
实验二
io=open("map-3.avro","w"
d=Dict("map1"=> smap,"map2"=>smap2)
Avro.write(io,d)
close(io)
io=open("map-3.avro","r")
while eof(io)==false
m=Avro.read(io,typeof(d))
display(Dict(m))
end
close(io)
输出 2
Dict{String, Dict{String, Int64}} with 2 entries:
"map2" => Dict("["ROWAN","32","371590519022049"]"=>4, "["RANDOLPH","…
"map1" => Dict("["ROWAN","32","371590519022049"]"=>4, "["RUTHERFORD",…
实验三
io=open("map-4.avro","w")
Avro.write(io,smap)
Avro.write(io,smap2)
close(io)
io=open("map-4.avro","r")
while eof(io)==false
m=Avro.read(io,typeof(smap))
display(Dict(m))
end
close(io)
输出
Dict{String, Int64} with 25 entries:
"["ROWAN","32","371590519022049"]" => 4
"["RUTHERFORD","05A","371619611022065"]" => 7
"["RANDOLPH","AN","371510303011002"]" => 2
"["CLEVELAND","POLKVL","370459501022046"]" => 9
"["GRANVILLE","TYHO","370779706012012"]" => 1
实验四
s=Avro.parseschema("rec.avsc")
d=Dict("map1"=>smap,"map2"=>smap2)
io=open("map-table.avro","w")
it=Avro.writetable(io,d,sch=s)
close(io)
io=open("map-table.avro","r")
while eof(io)==false
m=Avro.readtable(io)
display(Dict(m)["map1"])
display(Dict(m)["map2"])
结束关闭(io)
输出
Dict{String, Int64} with 25 entries:
"["ROWAN","32","371590519022049"]" => 4
"["RUTHERFORD","05A","371619611022065"]" => 7
"["RANDOLPH","AN","371510303011002"]" => 2
"["CLEVELAND","POLKVL","370459501022046"]" => 9
Dict{String, Int64} with 25 entries:
"["ROWAN","32","371590519022049"]" => 4
"["RANDOLPH","AN","371510303011002"]" => 2
"["GRANVILLE","TYHO","370779706012012"]" => 1
蟒蛇代码
with open('map-table.avro', 'rb') as fo:
avro_reader = fastavro.reader(fo,sch)
for record in avro_reader:
print(record)
print("---------n")
蟒蛇输出
{'first': 'map2', 'second': {'["ROWAN","32","371590519022049"]': 4, '["RANDOLPH","AN","371510303011002"]': 2, '["GRANVILLE","TYHO","370779706012012"]': 1, '["GUILFORD","G66"]': 3, '["MONTGOMERY","T2"]': 4, '["CLEVELAND","POLKVL","370459501022038"]': 9, '["DURHAM","28"]': 11}}
---------
{'first': 'map1', 'second': {'["ROWAN","32","371590519022049"]': 4, '["RUTHERFORD","05A","371619611022065"]': 7, '["RANDOLPH","AN","371510303011002"]': 2, '["CLEVELAND","POLKVL","370459501022046"]': 9, '["GRANVILLE","TYHO","370779706012012"]':13}}
---------
实验四
print("Writing n")
io=open("map-table2.avro","w")
d=Dict("map1"=>smap)
it=Avro.writetable(io,d,sch=s)
d=Dict("map2"=>smap2)
it=Avro.writetable(io,d,sch=s)
close(io)
print("reading n")
io=open("map-table2.avro","r")
while eof(io)==false
print("-----n")
m=Avro.readtable(io)
display(Dict(m))
end
close(io)
输出
写作阅读
ArgumentError: invalid Array dimensions
Stacktrace:
[1] Array
@ ./boot.jl:448 [inlined]
[2] readwithschema(#unused#::Type{Avro.Record{(:first, :second), Tuple{String,
Dict{String, Int64}}, 2}}, sch::Avro.RecordType, buf::Vector{UInt8}, pos::Int64, comp::Nothing)
@ Avro ~/.julia/packages/Avro/JEoRa/src/tables.jl:176
[3] readtable(buf::Vector{UInt8}, pos::Int64, kw::Base.Iterators.Pairs{Union{}, Union{}, Tuple{}, NamedTuple{(), Tuple{}}})
@ Avro ~/.julia/packages/Avro/JEoRa/src/tables.jl:166
[4] #readtable#33
@ ~/.julia/packages/Avro/JEoRa/src/tables.jl:156 [inlined]
[5] readtable(io::IOStream)
@ Avro ~/.julia/packages/Avro/JEoRa/src/tables.jl:156
[6] top-level scope
@ ./In[69]:13
[7] eval
@ ./boot.jl:360 [inlined]
[8] include_string(mapexpr::typeof(REPL.softscope), mod::Module, code::String, filename::String)
@ Base ./loading.jl:1094
Python代码
with open('map-table2.avro', 'rb') as fo:
avro_reader = fastavro.reader(fo,sch)
for record in avro_reader:
print(record)
print("---------n")
{'first': 'map1', 'second': {'["ROWAN","32","371590519022049"]': 4, '["RUTHERFORD","05A","371619611022065"]': 7, '["RANDOLPH","AN","371510303011002"]': 2, '["CLEVELAND","POLKVL","370459501022046"]': 9}
---------
这是两个架构文件:
rec2.avsc
{"type": "record",
"name": "Record_366361102733404708",
"fields": [
{"type": "map", "values": {"type": "long"},
"order": "ascending"}]}
rec.avsc
{"type": "record","name": "Record_366361102733404708","fields": [{"name": "first", "type": {"type": "string"}, "order": "ascending"}, {"name": "second", "type": {"type": "map", "values": {"type": "long"}}, "order": "ascending"}]}
任何帮助将非常受欢迎。如果我不遵循最佳实践,我愿意改变我所写内容的确切外观。