polars_arrow/io/ipc/read/
reader.rs1use std::io::{Read, Seek};
2
3use polars_error::PolarsResult;
4
5use super::common::*;
6use super::file::{get_message_from_block, get_record_batch};
7use super::{read_batch, read_file_dictionaries, Dictionaries, FileMetadata};
8use crate::array::Array;
9use crate::datatypes::ArrowSchema;
10use crate::record_batch::RecordBatchT;
11
12pub struct FileReader<R: Read + Seek> {
14 reader: R,
15 metadata: FileMetadata,
16 dictionaries: Option<Dictionaries>,
18 current_block: usize,
19 projection: Option<ProjectionInfo>,
20 remaining: usize,
21 data_scratch: Vec<u8>,
22 message_scratch: Vec<u8>,
23}
24
25impl<R: Read + Seek> FileReader<R> {
26 pub fn new(
30 reader: R,
31 metadata: FileMetadata,
32 projection: Option<Vec<usize>>,
33 limit: Option<usize>,
34 ) -> Self {
35 let projection =
36 projection.map(|projection| prepare_projection(&metadata.schema, projection));
37 Self {
38 reader,
39 metadata,
40 dictionaries: Default::default(),
41 projection,
42 remaining: limit.unwrap_or(usize::MAX),
43 current_block: 0,
44 data_scratch: Default::default(),
45 message_scratch: Default::default(),
46 }
47 }
48
49 pub fn new_with_projection_info(
53 reader: R,
54 metadata: FileMetadata,
55 projection: Option<ProjectionInfo>,
56 limit: Option<usize>,
57 ) -> Self {
58 Self {
59 reader,
60 metadata,
61 dictionaries: Default::default(),
62 projection,
63 remaining: limit.unwrap_or(usize::MAX),
64 current_block: 0,
65 data_scratch: Default::default(),
66 message_scratch: Default::default(),
67 }
68 }
69
70 pub fn schema(&self) -> &ArrowSchema {
72 self.projection
73 .as_ref()
74 .map(|x| &x.schema)
75 .unwrap_or(&self.metadata.schema)
76 }
77
78 pub fn metadata(&self) -> &FileMetadata {
80 &self.metadata
81 }
82
83 pub fn into_inner(self) -> R {
85 self.reader
86 }
87
88 pub fn set_current_block(&mut self, idx: usize) {
89 self.current_block = idx;
90 }
91
92 pub fn get_current_block(&self) -> usize {
93 self.current_block
94 }
95
96 pub fn take_projection_info(&mut self) -> Option<ProjectionInfo> {
99 std::mem::take(&mut self.projection)
100 }
101
102 pub fn take_scratches(&mut self) -> (Vec<u8>, Vec<u8>) {
105 (
106 std::mem::take(&mut self.data_scratch),
107 std::mem::take(&mut self.message_scratch),
108 )
109 }
110
111 pub fn set_scratches(&mut self, scratches: (Vec<u8>, Vec<u8>)) {
114 (self.data_scratch, self.message_scratch) = scratches;
115 }
116
117 fn read_dictionaries(&mut self) -> PolarsResult<()> {
118 if self.dictionaries.is_none() {
119 self.dictionaries = Some(read_file_dictionaries(
120 &mut self.reader,
121 &self.metadata,
122 &mut self.data_scratch,
123 )?);
124 };
125 Ok(())
126 }
127
128 pub fn skip_blocks_till_limit(&mut self, offset: u64) -> PolarsResult<u64> {
134 let mut remaining_offset = offset;
135
136 for (i, block) in self.metadata.blocks.iter().enumerate() {
137 let message =
138 get_message_from_block(&mut self.reader, block, &mut self.message_scratch)?;
139 let record_batch = get_record_batch(message)?;
140
141 let length = record_batch.length()?;
142 let length = length as u64;
143
144 if length > remaining_offset {
145 self.current_block = i;
146 return Ok(remaining_offset);
147 }
148
149 remaining_offset -= length;
150 }
151
152 self.current_block = self.metadata.blocks.len();
153 Ok(remaining_offset)
154 }
155
156 pub fn next_record_batch(
157 &mut self,
158 ) -> Option<PolarsResult<arrow_format::ipc::RecordBatchRef<'_>>> {
159 let block = self.metadata.blocks.get(self.current_block)?;
160 self.current_block += 1;
161 let message = get_message_from_block(&mut self.reader, block, &mut self.message_scratch);
162 Some(message.and_then(|m| get_record_batch(m)))
163 }
164}
165
166impl<R: Read + Seek> Iterator for FileReader<R> {
167 type Item = PolarsResult<RecordBatchT<Box<dyn Array>>>;
168
169 fn next(&mut self) -> Option<Self::Item> {
170 if self.current_block == self.metadata.blocks.len() {
172 return None;
173 }
174
175 match self.read_dictionaries() {
176 Ok(_) => {},
177 Err(e) => return Some(Err(e)),
178 };
179
180 let block = self.current_block;
181 self.current_block += 1;
182
183 let chunk = read_batch(
184 &mut self.reader,
185 self.dictionaries.as_ref().unwrap(),
186 &self.metadata,
187 self.projection.as_ref().map(|x| x.columns.as_ref()),
188 Some(self.remaining),
189 block,
190 &mut self.message_scratch,
191 &mut self.data_scratch,
192 );
193 self.remaining -= chunk.as_ref().map(|x| x.len()).unwrap_or_default();
194
195 let chunk = if let Some(ProjectionInfo { map, .. }) = &self.projection {
196 chunk.map(|chunk| apply_projection(chunk, map))
198 } else {
199 chunk
200 };
201 Some(chunk)
202 }
203}