polars_arrow/io/ipc/read/
reader.rs

1use std::io::{Read, Seek};
2
3use polars_error::PolarsResult;
4
5use super::common::*;
6use super::file::{get_message_from_block, get_record_batch};
7use super::{read_batch, read_file_dictionaries, Dictionaries, FileMetadata};
8use crate::array::Array;
9use crate::datatypes::ArrowSchema;
10use crate::record_batch::RecordBatchT;
11
12/// An iterator of [`RecordBatchT`]s from an Arrow IPC file.
13pub struct FileReader<R: Read + Seek> {
14    reader: R,
15    metadata: FileMetadata,
16    // the dictionaries are going to be read
17    dictionaries: Option<Dictionaries>,
18    current_block: usize,
19    projection: Option<ProjectionInfo>,
20    remaining: usize,
21    data_scratch: Vec<u8>,
22    message_scratch: Vec<u8>,
23}
24
25impl<R: Read + Seek> FileReader<R> {
26    /// Creates a new [`FileReader`]. Use `projection` to only take certain columns.
27    /// # Panic
28    /// Panics iff the projection is not in increasing order (e.g. `[1, 0]` nor `[0, 1, 1]` are valid)
29    pub fn new(
30        reader: R,
31        metadata: FileMetadata,
32        projection: Option<Vec<usize>>,
33        limit: Option<usize>,
34    ) -> Self {
35        let projection =
36            projection.map(|projection| prepare_projection(&metadata.schema, projection));
37        Self {
38            reader,
39            metadata,
40            dictionaries: Default::default(),
41            projection,
42            remaining: limit.unwrap_or(usize::MAX),
43            current_block: 0,
44            data_scratch: Default::default(),
45            message_scratch: Default::default(),
46        }
47    }
48
49    /// Creates a new [`FileReader`]. Use `projection` to only take certain columns.
50    /// # Panic
51    /// Panics iff the projection is not in increasing order (e.g. `[1, 0]` nor `[0, 1, 1]` are valid)
52    pub fn new_with_projection_info(
53        reader: R,
54        metadata: FileMetadata,
55        projection: Option<ProjectionInfo>,
56        limit: Option<usize>,
57    ) -> Self {
58        Self {
59            reader,
60            metadata,
61            dictionaries: Default::default(),
62            projection,
63            remaining: limit.unwrap_or(usize::MAX),
64            current_block: 0,
65            data_scratch: Default::default(),
66            message_scratch: Default::default(),
67        }
68    }
69
70    /// Return the schema of the file
71    pub fn schema(&self) -> &ArrowSchema {
72        self.projection
73            .as_ref()
74            .map(|x| &x.schema)
75            .unwrap_or(&self.metadata.schema)
76    }
77
78    /// Returns the [`FileMetadata`]
79    pub fn metadata(&self) -> &FileMetadata {
80        &self.metadata
81    }
82
83    /// Consumes this FileReader, returning the underlying reader
84    pub fn into_inner(self) -> R {
85        self.reader
86    }
87
88    pub fn set_current_block(&mut self, idx: usize) {
89        self.current_block = idx;
90    }
91
92    pub fn get_current_block(&self) -> usize {
93        self.current_block
94    }
95
96    /// Get the inner memory scratches so they can be reused in a new writer.
97    /// This can be utilized to save memory allocations for performance reasons.
98    pub fn take_projection_info(&mut self) -> Option<ProjectionInfo> {
99        std::mem::take(&mut self.projection)
100    }
101
102    /// Get the inner memory scratches so they can be reused in a new writer.
103    /// This can be utilized to save memory allocations for performance reasons.
104    pub fn take_scratches(&mut self) -> (Vec<u8>, Vec<u8>) {
105        (
106            std::mem::take(&mut self.data_scratch),
107            std::mem::take(&mut self.message_scratch),
108        )
109    }
110
111    /// Set the inner memory scratches so they can be reused in a new writer.
112    /// This can be utilized to save memory allocations for performance reasons.
113    pub fn set_scratches(&mut self, scratches: (Vec<u8>, Vec<u8>)) {
114        (self.data_scratch, self.message_scratch) = scratches;
115    }
116
117    fn read_dictionaries(&mut self) -> PolarsResult<()> {
118        if self.dictionaries.is_none() {
119            self.dictionaries = Some(read_file_dictionaries(
120                &mut self.reader,
121                &self.metadata,
122                &mut self.data_scratch,
123            )?);
124        };
125        Ok(())
126    }
127
128    /// Skip over blocks until we have seen at most `offset` rows, returning how many rows we are
129    /// still too see.  
130    ///
131    /// This will never go over the `offset`. Meaning that if the `offset < current_block.len()`,
132    /// the block will not be skipped.
133    pub fn skip_blocks_till_limit(&mut self, offset: u64) -> PolarsResult<u64> {
134        let mut remaining_offset = offset;
135
136        for (i, block) in self.metadata.blocks.iter().enumerate() {
137            let message =
138                get_message_from_block(&mut self.reader, block, &mut self.message_scratch)?;
139            let record_batch = get_record_batch(message)?;
140
141            let length = record_batch.length()?;
142            let length = length as u64;
143
144            if length > remaining_offset {
145                self.current_block = i;
146                return Ok(remaining_offset);
147            }
148
149            remaining_offset -= length;
150        }
151
152        self.current_block = self.metadata.blocks.len();
153        Ok(remaining_offset)
154    }
155
156    pub fn next_record_batch(
157        &mut self,
158    ) -> Option<PolarsResult<arrow_format::ipc::RecordBatchRef<'_>>> {
159        let block = self.metadata.blocks.get(self.current_block)?;
160        self.current_block += 1;
161        let message = get_message_from_block(&mut self.reader, block, &mut self.message_scratch);
162        Some(message.and_then(|m| get_record_batch(m)))
163    }
164}
165
166impl<R: Read + Seek> Iterator for FileReader<R> {
167    type Item = PolarsResult<RecordBatchT<Box<dyn Array>>>;
168
169    fn next(&mut self) -> Option<Self::Item> {
170        // get current block
171        if self.current_block == self.metadata.blocks.len() {
172            return None;
173        }
174
175        match self.read_dictionaries() {
176            Ok(_) => {},
177            Err(e) => return Some(Err(e)),
178        };
179
180        let block = self.current_block;
181        self.current_block += 1;
182
183        let chunk = read_batch(
184            &mut self.reader,
185            self.dictionaries.as_ref().unwrap(),
186            &self.metadata,
187            self.projection.as_ref().map(|x| x.columns.as_ref()),
188            Some(self.remaining),
189            block,
190            &mut self.message_scratch,
191            &mut self.data_scratch,
192        );
193        self.remaining -= chunk.as_ref().map(|x| x.len()).unwrap_or_default();
194
195        let chunk = if let Some(ProjectionInfo { map, .. }) = &self.projection {
196            // re-order according to projection
197            chunk.map(|chunk| apply_projection(chunk, map))
198        } else {
199            chunk
200        };
201        Some(chunk)
202    }
203}