polars_arrow/io/ipc/read/
file.rs1use std::io::{Read, Seek, SeekFrom};
2use std::sync::Arc;
3
4use arrow_format::ipc::planus::ReadAsRoot;
5use arrow_format::ipc::FooterRef;
6use polars_error::{polars_bail, polars_err, PolarsResult};
7use polars_utils::aliases::{InitHashMaps, PlHashMap};
8
9use super::super::{ARROW_MAGIC_V1, ARROW_MAGIC_V2, CONTINUATION_MARKER};
10use super::common::*;
11use super::schema::fb_to_schema;
12use super::{Dictionaries, OutOfSpecKind, SendableIterator};
13use crate::array::Array;
14use crate::datatypes::{ArrowSchemaRef, Metadata};
15use crate::io::ipc::IpcSchema;
16use crate::record_batch::RecordBatchT;
17
18#[derive(Debug, Clone)]
20pub struct FileMetadata {
21 pub schema: ArrowSchemaRef,
23
24 pub custom_schema_metadata: Option<Arc<Metadata>>,
26
27 pub ipc_schema: IpcSchema,
29
30 pub blocks: Vec<arrow_format::ipc::Block>,
34
35 pub(crate) dictionaries: Option<Vec<arrow_format::ipc::Block>>,
37
38 pub size: u64,
40}
41
42pub fn get_row_count<R: Read + Seek>(reader: &mut R) -> PolarsResult<i64> {
44 let (_, footer_len) = read_footer_len(reader)?;
45 let footer = read_footer(reader, footer_len)?;
46 let (_, blocks) = deserialize_footer_blocks(&footer)?;
47
48 get_row_count_from_blocks(reader, &blocks)
49}
50
51pub fn get_row_count_from_blocks<R: Read + Seek>(
53 reader: &mut R,
54 blocks: &[arrow_format::ipc::Block],
55) -> PolarsResult<i64> {
56 let mut message_scratch: Vec<u8> = Default::default();
57
58 blocks
59 .iter()
60 .map(|block| {
61 let message = get_message_from_block(reader, block, &mut message_scratch)?;
62 let record_batch = get_record_batch(message)?;
63 record_batch.length().map_err(|e| e.into())
64 })
65 .sum()
66}
67
68pub(crate) fn get_dictionary_batch<'a>(
69 message: &'a arrow_format::ipc::MessageRef,
70) -> PolarsResult<arrow_format::ipc::DictionaryBatchRef<'a>> {
71 let header = message
72 .header()
73 .map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferHeader(err)))?
74 .ok_or_else(|| polars_err!(oos = OutOfSpecKind::MissingMessageHeader))?;
75 match header {
76 arrow_format::ipc::MessageHeaderRef::DictionaryBatch(batch) => Ok(batch),
77 _ => polars_bail!(oos = OutOfSpecKind::UnexpectedMessageType),
78 }
79}
80
81fn read_dictionary_block<R: Read + Seek>(
82 reader: &mut R,
83 metadata: &FileMetadata,
84 block: &arrow_format::ipc::Block,
85 dictionaries: &mut Dictionaries,
86 message_scratch: &mut Vec<u8>,
87 dictionary_scratch: &mut Vec<u8>,
88) -> PolarsResult<()> {
89 let message = get_message_from_block(reader, block, message_scratch)?;
90 let batch = get_dictionary_batch(&message)?;
91
92 let offset: u64 = block
93 .offset
94 .try_into()
95 .map_err(|_| polars_err!(oos = OutOfSpecKind::UnexpectedNegativeInteger))?;
96
97 let length: u64 = block
98 .meta_data_length
99 .try_into()
100 .map_err(|_| polars_err!(oos = OutOfSpecKind::UnexpectedNegativeInteger))?;
101
102 read_dictionary(
103 batch,
104 &metadata.schema,
105 &metadata.ipc_schema,
106 dictionaries,
107 reader,
108 offset + length,
109 metadata.size,
110 dictionary_scratch,
111 )
112}
113
114pub fn read_file_dictionaries<R: Read + Seek>(
117 reader: &mut R,
118 metadata: &FileMetadata,
119 scratch: &mut Vec<u8>,
120) -> PolarsResult<Dictionaries> {
121 let mut dictionaries = Default::default();
122
123 let blocks = if let Some(blocks) = &metadata.dictionaries {
124 blocks
125 } else {
126 return Ok(PlHashMap::new());
127 };
128 let mut message_scratch = Default::default();
130
131 for block in blocks {
132 read_dictionary_block(
133 reader,
134 metadata,
135 block,
136 &mut dictionaries,
137 &mut message_scratch,
138 scratch,
139 )?;
140 }
141 Ok(dictionaries)
142}
143
144pub(super) fn decode_footer_len(footer: [u8; 10], end: u64) -> PolarsResult<(u64, usize)> {
145 let footer_len = i32::from_le_bytes(footer[..4].try_into().unwrap());
146
147 if footer[4..] != ARROW_MAGIC_V2 {
148 if footer[..4] == ARROW_MAGIC_V1 {
149 polars_bail!(ComputeError: "feather v1 not supported");
150 }
151 return Err(polars_err!(oos = OutOfSpecKind::InvalidFooter));
152 }
153 let footer_len = footer_len
154 .try_into()
155 .map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
156
157 Ok((end, footer_len))
158}
159
160fn read_footer_len<R: Read + Seek>(reader: &mut R) -> PolarsResult<(u64, usize)> {
162 let end = reader.seek(SeekFrom::End(-10))? + 10;
164
165 let mut footer: [u8; 10] = [0; 10];
166
167 reader.read_exact(&mut footer)?;
168 decode_footer_len(footer, end)
169}
170
171fn read_footer<R: Read + Seek>(reader: &mut R, footer_len: usize) -> PolarsResult<Vec<u8>> {
172 reader.seek(SeekFrom::End(-10 - footer_len as i64))?;
174
175 let mut serialized_footer = vec![];
176 serialized_footer.try_reserve(footer_len)?;
177 reader
178 .by_ref()
179 .take(footer_len as u64)
180 .read_to_end(&mut serialized_footer)?;
181 Ok(serialized_footer)
182}
183
184fn deserialize_footer_blocks(
185 footer_data: &[u8],
186) -> PolarsResult<(FooterRef, Vec<arrow_format::ipc::Block>)> {
187 let footer = arrow_format::ipc::FooterRef::read_as_root(footer_data)
188 .map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferFooter(err)))?;
189
190 let blocks = footer
191 .record_batches()
192 .map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferRecordBatches(err)))?
193 .ok_or_else(|| polars_err!(oos = OutOfSpecKind::MissingRecordBatches))?;
194
195 let blocks = blocks
196 .iter()
197 .map(|block| {
198 block.try_into().map_err(|err| {
199 polars_err!(oos = OutOfSpecKind::InvalidFlatbufferRecordBatches(err))
200 })
201 })
202 .collect::<PolarsResult<Vec<_>>>()?;
203 Ok((footer, blocks))
204}
205
206pub(super) fn deserialize_footer_ref(footer_data: &[u8]) -> PolarsResult<FooterRef> {
207 arrow_format::ipc::FooterRef::read_as_root(footer_data)
208 .map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferFooter(err)))
209}
210
211pub(super) fn deserialize_schema_ref_from_footer(
212 footer: arrow_format::ipc::FooterRef,
213) -> PolarsResult<arrow_format::ipc::SchemaRef> {
214 footer
215 .schema()
216 .map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferSchema(err)))?
217 .ok_or_else(|| polars_err!(oos = OutOfSpecKind::MissingSchema))
218}
219
220pub(super) fn iter_recordbatch_blocks_from_footer(
222 footer: arrow_format::ipc::FooterRef,
223) -> PolarsResult<impl SendableIterator<Item = PolarsResult<arrow_format::ipc::Block>> + '_> {
224 let blocks = footer
225 .record_batches()
226 .map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferRecordBatches(err)))?
227 .ok_or_else(|| polars_err!(oos = OutOfSpecKind::MissingRecordBatches))?;
228
229 Ok(blocks.iter().map(|block| {
230 block
231 .try_into()
232 .map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferRecordBatches(err)))
233 }))
234}
235
236pub(super) fn iter_dictionary_blocks_from_footer(
237 footer: arrow_format::ipc::FooterRef,
238) -> PolarsResult<Option<impl SendableIterator<Item = PolarsResult<arrow_format::ipc::Block>> + '_>>
239{
240 let dictionaries = footer
241 .dictionaries()
242 .map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferDictionaries(err)))?;
243
244 Ok(dictionaries.map(|dicts| {
245 dicts.into_iter().map(|block| {
246 block.try_into().map_err(|err| {
247 polars_err!(oos = OutOfSpecKind::InvalidFlatbufferRecordBatches(err))
248 })
249 })
250 }))
251}
252
253pub fn deserialize_footer(footer_data: &[u8], size: u64) -> PolarsResult<FileMetadata> {
254 let footer = deserialize_footer_ref(footer_data)?;
255 let blocks = iter_recordbatch_blocks_from_footer(footer)?.collect::<PolarsResult<Vec<_>>>()?;
256 let dictionaries = iter_dictionary_blocks_from_footer(footer)?
257 .map(|dicts| dicts.collect::<PolarsResult<Vec<_>>>())
258 .transpose()?;
259 let ipc_schema = deserialize_schema_ref_from_footer(footer)?;
260 let (schema, ipc_schema, custom_schema_metadata) = fb_to_schema(ipc_schema)?;
261
262 Ok(FileMetadata {
263 schema: Arc::new(schema),
264 ipc_schema,
265 blocks,
266 dictionaries,
267 size,
268 custom_schema_metadata: custom_schema_metadata.map(Arc::new),
269 })
270}
271
272pub fn read_file_metadata<R: Read + Seek>(reader: &mut R) -> PolarsResult<FileMetadata> {
274 let start = reader.stream_position()?;
275 let (end, footer_len) = read_footer_len(reader)?;
276 let serialized_footer = read_footer(reader, footer_len)?;
277 deserialize_footer(&serialized_footer, end - start)
278}
279
280pub(crate) fn get_record_batch(
281 message: arrow_format::ipc::MessageRef,
282) -> PolarsResult<arrow_format::ipc::RecordBatchRef> {
283 let header = message
284 .header()
285 .map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferHeader(err)))?
286 .ok_or_else(|| polars_err!(oos = OutOfSpecKind::MissingMessageHeader))?;
287 match header {
288 arrow_format::ipc::MessageHeaderRef::RecordBatch(batch) => Ok(batch),
289 _ => polars_bail!(oos = OutOfSpecKind::UnexpectedMessageType),
290 }
291}
292
293fn get_message_from_block_offset<'a, R: Read + Seek>(
294 reader: &mut R,
295 offset: u64,
296 message_scratch: &'a mut Vec<u8>,
297) -> PolarsResult<arrow_format::ipc::MessageRef<'a>> {
298 reader.seek(SeekFrom::Start(offset))?;
300 let mut meta_buf = [0; 4];
301 reader.read_exact(&mut meta_buf)?;
302 if meta_buf == CONTINUATION_MARKER {
303 reader.read_exact(&mut meta_buf)?;
305 }
306 let meta_len = i32::from_le_bytes(meta_buf)
307 .try_into()
308 .map_err(|_| polars_err!(oos = OutOfSpecKind::UnexpectedNegativeInteger))?;
309
310 message_scratch.clear();
311 message_scratch.try_reserve(meta_len)?;
312 reader
313 .by_ref()
314 .take(meta_len as u64)
315 .read_to_end(message_scratch)?;
316
317 arrow_format::ipc::MessageRef::read_as_root(message_scratch)
318 .map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferMessage(err)))
319}
320
321pub(super) fn get_message_from_block<'a, R: Read + Seek>(
322 reader: &mut R,
323 block: &arrow_format::ipc::Block,
324 message_scratch: &'a mut Vec<u8>,
325) -> PolarsResult<arrow_format::ipc::MessageRef<'a>> {
326 let offset: u64 = block
327 .offset
328 .try_into()
329 .map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
330
331 get_message_from_block_offset(reader, offset, message_scratch)
332}
333
334#[allow(clippy::too_many_arguments)]
342pub fn read_batch<R: Read + Seek>(
343 reader: &mut R,
344 dictionaries: &Dictionaries,
345 metadata: &FileMetadata,
346 projection: Option<&[usize]>,
347 limit: Option<usize>,
348 index: usize,
349 message_scratch: &mut Vec<u8>,
350 data_scratch: &mut Vec<u8>,
351) -> PolarsResult<RecordBatchT<Box<dyn Array>>> {
352 let block = metadata.blocks[index];
353
354 let offset: u64 = block
355 .offset
356 .try_into()
357 .map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
358
359 let length: u64 = block
360 .meta_data_length
361 .try_into()
362 .map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
363
364 let message = get_message_from_block_offset(reader, offset, message_scratch)?;
365 let batch = get_record_batch(message)?;
366
367 read_record_batch(
368 batch,
369 &metadata.schema,
370 &metadata.ipc_schema,
371 projection,
372 limit,
373 dictionaries,
374 message
375 .version()
376 .map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferVersion(err)))?,
377 reader,
378 offset + length,
379 metadata.size,
380 data_scratch,
381 )
382}