polars_arrow/io/ipc/read/
file.rs

1use std::io::{Read, Seek, SeekFrom};
2use std::sync::Arc;
3
4use arrow_format::ipc::planus::ReadAsRoot;
5use arrow_format::ipc::FooterRef;
6use polars_error::{polars_bail, polars_err, PolarsResult};
7use polars_utils::aliases::{InitHashMaps, PlHashMap};
8
9use super::super::{ARROW_MAGIC_V1, ARROW_MAGIC_V2, CONTINUATION_MARKER};
10use super::common::*;
11use super::schema::fb_to_schema;
12use super::{Dictionaries, OutOfSpecKind, SendableIterator};
13use crate::array::Array;
14use crate::datatypes::{ArrowSchemaRef, Metadata};
15use crate::io::ipc::IpcSchema;
16use crate::record_batch::RecordBatchT;
17
18/// Metadata of an Arrow IPC file, written in the footer of the file.
19#[derive(Debug, Clone)]
20pub struct FileMetadata {
21    /// The schema that is read from the file footer
22    pub schema: ArrowSchemaRef,
23
24    /// The custom metadata that is read from the schema
25    pub custom_schema_metadata: Option<Arc<Metadata>>,
26
27    /// The files' [`IpcSchema`]
28    pub ipc_schema: IpcSchema,
29
30    /// The blocks in the file
31    ///
32    /// A block indicates the regions in the file to read to get data
33    pub blocks: Vec<arrow_format::ipc::Block>,
34
35    /// Dictionaries associated to each dict_id
36    pub(crate) dictionaries: Option<Vec<arrow_format::ipc::Block>>,
37
38    /// The total size of the file in bytes
39    pub size: u64,
40}
41
42/// Read the row count by summing the length of the of the record batches
43pub fn get_row_count<R: Read + Seek>(reader: &mut R) -> PolarsResult<i64> {
44    let (_, footer_len) = read_footer_len(reader)?;
45    let footer = read_footer(reader, footer_len)?;
46    let (_, blocks) = deserialize_footer_blocks(&footer)?;
47
48    get_row_count_from_blocks(reader, &blocks)
49}
50
51///  Read the row count by summing the length of the of the record batches in blocks
52pub fn get_row_count_from_blocks<R: Read + Seek>(
53    reader: &mut R,
54    blocks: &[arrow_format::ipc::Block],
55) -> PolarsResult<i64> {
56    let mut message_scratch: Vec<u8> = Default::default();
57
58    blocks
59        .iter()
60        .map(|block| {
61            let message = get_message_from_block(reader, block, &mut message_scratch)?;
62            let record_batch = get_record_batch(message)?;
63            record_batch.length().map_err(|e| e.into())
64        })
65        .sum()
66}
67
68pub(crate) fn get_dictionary_batch<'a>(
69    message: &'a arrow_format::ipc::MessageRef,
70) -> PolarsResult<arrow_format::ipc::DictionaryBatchRef<'a>> {
71    let header = message
72        .header()
73        .map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferHeader(err)))?
74        .ok_or_else(|| polars_err!(oos = OutOfSpecKind::MissingMessageHeader))?;
75    match header {
76        arrow_format::ipc::MessageHeaderRef::DictionaryBatch(batch) => Ok(batch),
77        _ => polars_bail!(oos = OutOfSpecKind::UnexpectedMessageType),
78    }
79}
80
81fn read_dictionary_block<R: Read + Seek>(
82    reader: &mut R,
83    metadata: &FileMetadata,
84    block: &arrow_format::ipc::Block,
85    dictionaries: &mut Dictionaries,
86    message_scratch: &mut Vec<u8>,
87    dictionary_scratch: &mut Vec<u8>,
88) -> PolarsResult<()> {
89    let message = get_message_from_block(reader, block, message_scratch)?;
90    let batch = get_dictionary_batch(&message)?;
91
92    let offset: u64 = block
93        .offset
94        .try_into()
95        .map_err(|_| polars_err!(oos = OutOfSpecKind::UnexpectedNegativeInteger))?;
96
97    let length: u64 = block
98        .meta_data_length
99        .try_into()
100        .map_err(|_| polars_err!(oos = OutOfSpecKind::UnexpectedNegativeInteger))?;
101
102    read_dictionary(
103        batch,
104        &metadata.schema,
105        &metadata.ipc_schema,
106        dictionaries,
107        reader,
108        offset + length,
109        metadata.size,
110        dictionary_scratch,
111    )
112}
113
114/// Reads all file's dictionaries, if any
115/// This function is IO-bounded
116pub fn read_file_dictionaries<R: Read + Seek>(
117    reader: &mut R,
118    metadata: &FileMetadata,
119    scratch: &mut Vec<u8>,
120) -> PolarsResult<Dictionaries> {
121    let mut dictionaries = Default::default();
122
123    let blocks = if let Some(blocks) = &metadata.dictionaries {
124        blocks
125    } else {
126        return Ok(PlHashMap::new());
127    };
128    // use a temporary smaller scratch for the messages
129    let mut message_scratch = Default::default();
130
131    for block in blocks {
132        read_dictionary_block(
133            reader,
134            metadata,
135            block,
136            &mut dictionaries,
137            &mut message_scratch,
138            scratch,
139        )?;
140    }
141    Ok(dictionaries)
142}
143
144pub(super) fn decode_footer_len(footer: [u8; 10], end: u64) -> PolarsResult<(u64, usize)> {
145    let footer_len = i32::from_le_bytes(footer[..4].try_into().unwrap());
146
147    if footer[4..] != ARROW_MAGIC_V2 {
148        if footer[..4] == ARROW_MAGIC_V1 {
149            polars_bail!(ComputeError: "feather v1 not supported");
150        }
151        return Err(polars_err!(oos = OutOfSpecKind::InvalidFooter));
152    }
153    let footer_len = footer_len
154        .try_into()
155        .map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
156
157    Ok((end, footer_len))
158}
159
160/// Reads the footer's length and magic number in footer
161fn read_footer_len<R: Read + Seek>(reader: &mut R) -> PolarsResult<(u64, usize)> {
162    // read footer length and magic number in footer
163    let end = reader.seek(SeekFrom::End(-10))? + 10;
164
165    let mut footer: [u8; 10] = [0; 10];
166
167    reader.read_exact(&mut footer)?;
168    decode_footer_len(footer, end)
169}
170
171fn read_footer<R: Read + Seek>(reader: &mut R, footer_len: usize) -> PolarsResult<Vec<u8>> {
172    // read footer
173    reader.seek(SeekFrom::End(-10 - footer_len as i64))?;
174
175    let mut serialized_footer = vec![];
176    serialized_footer.try_reserve(footer_len)?;
177    reader
178        .by_ref()
179        .take(footer_len as u64)
180        .read_to_end(&mut serialized_footer)?;
181    Ok(serialized_footer)
182}
183
184fn deserialize_footer_blocks(
185    footer_data: &[u8],
186) -> PolarsResult<(FooterRef, Vec<arrow_format::ipc::Block>)> {
187    let footer = arrow_format::ipc::FooterRef::read_as_root(footer_data)
188        .map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferFooter(err)))?;
189
190    let blocks = footer
191        .record_batches()
192        .map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferRecordBatches(err)))?
193        .ok_or_else(|| polars_err!(oos = OutOfSpecKind::MissingRecordBatches))?;
194
195    let blocks = blocks
196        .iter()
197        .map(|block| {
198            block.try_into().map_err(|err| {
199                polars_err!(oos = OutOfSpecKind::InvalidFlatbufferRecordBatches(err))
200            })
201        })
202        .collect::<PolarsResult<Vec<_>>>()?;
203    Ok((footer, blocks))
204}
205
206pub(super) fn deserialize_footer_ref(footer_data: &[u8]) -> PolarsResult<FooterRef> {
207    arrow_format::ipc::FooterRef::read_as_root(footer_data)
208        .map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferFooter(err)))
209}
210
211pub(super) fn deserialize_schema_ref_from_footer(
212    footer: arrow_format::ipc::FooterRef,
213) -> PolarsResult<arrow_format::ipc::SchemaRef> {
214    footer
215        .schema()
216        .map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferSchema(err)))?
217        .ok_or_else(|| polars_err!(oos = OutOfSpecKind::MissingSchema))
218}
219
220/// Get the IPC blocks from the footer containing record batches
221pub(super) fn iter_recordbatch_blocks_from_footer(
222    footer: arrow_format::ipc::FooterRef,
223) -> PolarsResult<impl SendableIterator<Item = PolarsResult<arrow_format::ipc::Block>> + '_> {
224    let blocks = footer
225        .record_batches()
226        .map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferRecordBatches(err)))?
227        .ok_or_else(|| polars_err!(oos = OutOfSpecKind::MissingRecordBatches))?;
228
229    Ok(blocks.iter().map(|block| {
230        block
231            .try_into()
232            .map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferRecordBatches(err)))
233    }))
234}
235
236pub(super) fn iter_dictionary_blocks_from_footer(
237    footer: arrow_format::ipc::FooterRef,
238) -> PolarsResult<Option<impl SendableIterator<Item = PolarsResult<arrow_format::ipc::Block>> + '_>>
239{
240    let dictionaries = footer
241        .dictionaries()
242        .map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferDictionaries(err)))?;
243
244    Ok(dictionaries.map(|dicts| {
245        dicts.into_iter().map(|block| {
246            block.try_into().map_err(|err| {
247                polars_err!(oos = OutOfSpecKind::InvalidFlatbufferRecordBatches(err))
248            })
249        })
250    }))
251}
252
253pub fn deserialize_footer(footer_data: &[u8], size: u64) -> PolarsResult<FileMetadata> {
254    let footer = deserialize_footer_ref(footer_data)?;
255    let blocks = iter_recordbatch_blocks_from_footer(footer)?.collect::<PolarsResult<Vec<_>>>()?;
256    let dictionaries = iter_dictionary_blocks_from_footer(footer)?
257        .map(|dicts| dicts.collect::<PolarsResult<Vec<_>>>())
258        .transpose()?;
259    let ipc_schema = deserialize_schema_ref_from_footer(footer)?;
260    let (schema, ipc_schema, custom_schema_metadata) = fb_to_schema(ipc_schema)?;
261
262    Ok(FileMetadata {
263        schema: Arc::new(schema),
264        ipc_schema,
265        blocks,
266        dictionaries,
267        size,
268        custom_schema_metadata: custom_schema_metadata.map(Arc::new),
269    })
270}
271
272/// Read the Arrow IPC file's metadata
273pub fn read_file_metadata<R: Read + Seek>(reader: &mut R) -> PolarsResult<FileMetadata> {
274    let start = reader.stream_position()?;
275    let (end, footer_len) = read_footer_len(reader)?;
276    let serialized_footer = read_footer(reader, footer_len)?;
277    deserialize_footer(&serialized_footer, end - start)
278}
279
280pub(crate) fn get_record_batch(
281    message: arrow_format::ipc::MessageRef,
282) -> PolarsResult<arrow_format::ipc::RecordBatchRef> {
283    let header = message
284        .header()
285        .map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferHeader(err)))?
286        .ok_or_else(|| polars_err!(oos = OutOfSpecKind::MissingMessageHeader))?;
287    match header {
288        arrow_format::ipc::MessageHeaderRef::RecordBatch(batch) => Ok(batch),
289        _ => polars_bail!(oos = OutOfSpecKind::UnexpectedMessageType),
290    }
291}
292
293fn get_message_from_block_offset<'a, R: Read + Seek>(
294    reader: &mut R,
295    offset: u64,
296    message_scratch: &'a mut Vec<u8>,
297) -> PolarsResult<arrow_format::ipc::MessageRef<'a>> {
298    // read length
299    reader.seek(SeekFrom::Start(offset))?;
300    let mut meta_buf = [0; 4];
301    reader.read_exact(&mut meta_buf)?;
302    if meta_buf == CONTINUATION_MARKER {
303        // continuation marker encountered, read message next
304        reader.read_exact(&mut meta_buf)?;
305    }
306    let meta_len = i32::from_le_bytes(meta_buf)
307        .try_into()
308        .map_err(|_| polars_err!(oos = OutOfSpecKind::UnexpectedNegativeInteger))?;
309
310    message_scratch.clear();
311    message_scratch.try_reserve(meta_len)?;
312    reader
313        .by_ref()
314        .take(meta_len as u64)
315        .read_to_end(message_scratch)?;
316
317    arrow_format::ipc::MessageRef::read_as_root(message_scratch)
318        .map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferMessage(err)))
319}
320
321pub(super) fn get_message_from_block<'a, R: Read + Seek>(
322    reader: &mut R,
323    block: &arrow_format::ipc::Block,
324    message_scratch: &'a mut Vec<u8>,
325) -> PolarsResult<arrow_format::ipc::MessageRef<'a>> {
326    let offset: u64 = block
327        .offset
328        .try_into()
329        .map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
330
331    get_message_from_block_offset(reader, offset, message_scratch)
332}
333
334/// Reads the record batch at position `index` from the reader.
335///
336/// This function is useful for random access to the file. For example, if
337/// you have indexed the file somewhere else, this allows pruning
338/// certain parts of the file.
339/// # Panics
340/// This function panics iff `index >= metadata.blocks.len()`
341#[allow(clippy::too_many_arguments)]
342pub fn read_batch<R: Read + Seek>(
343    reader: &mut R,
344    dictionaries: &Dictionaries,
345    metadata: &FileMetadata,
346    projection: Option<&[usize]>,
347    limit: Option<usize>,
348    index: usize,
349    message_scratch: &mut Vec<u8>,
350    data_scratch: &mut Vec<u8>,
351) -> PolarsResult<RecordBatchT<Box<dyn Array>>> {
352    let block = metadata.blocks[index];
353
354    let offset: u64 = block
355        .offset
356        .try_into()
357        .map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
358
359    let length: u64 = block
360        .meta_data_length
361        .try_into()
362        .map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
363
364    let message = get_message_from_block_offset(reader, offset, message_scratch)?;
365    let batch = get_record_batch(message)?;
366
367    read_record_batch(
368        batch,
369        &metadata.schema,
370        &metadata.ipc_schema,
371        projection,
372        limit,
373        dictionaries,
374        message
375            .version()
376            .map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferVersion(err)))?,
377        reader,
378        offset + length,
379        metadata.size,
380        data_scratch,
381    )
382}