polars_arrow/ffi/
stream.rs1use std::ffi::{CStr, CString};
2use std::ops::DerefMut;
3
4use polars_error::{polars_bail, polars_err, PolarsError, PolarsResult};
5
6use super::{
7 export_array_to_c, export_field_to_c, import_array_from_c, import_field_from_c, ArrowArray,
8 ArrowArrayStream, ArrowSchema,
9};
10use crate::array::Array;
11use crate::datatypes::Field;
12
13impl Drop for ArrowArrayStream {
14 fn drop(&mut self) {
15 match self.release {
16 None => (),
17 Some(release) => unsafe { release(self) },
18 };
19 }
20}
21
22unsafe impl Send for ArrowArrayStream {}
23
24impl ArrowArrayStream {
25 pub fn empty() -> Self {
27 Self {
28 get_schema: None,
29 get_next: None,
30 get_last_error: None,
31 release: None,
32 private_data: std::ptr::null_mut(),
33 }
34 }
35}
36
37unsafe fn handle_error(iter: &mut ArrowArrayStream) -> PolarsError {
38 let error = unsafe { (iter.get_last_error.unwrap())(&mut *iter) };
39
40 if error.is_null() {
41 return polars_err!(ComputeError: "got unspecified external error");
42 }
43
44 let error = unsafe { CStr::from_ptr(error) };
45 polars_err!(ComputeError: "got external error: {}", error.to_str().unwrap())
46}
47
48pub struct ArrowArrayStreamReader<Iter: DerefMut<Target = ArrowArrayStream>> {
50 iter: Iter,
51 field: Field,
52}
53
54impl<Iter: DerefMut<Target = ArrowArrayStream>> ArrowArrayStreamReader<Iter> {
55 pub unsafe fn try_new(mut iter: Iter) -> PolarsResult<Self> {
67 if iter.release.is_none() {
68 polars_bail!(InvalidOperation: "the C stream was already released")
69 };
70
71 if iter.get_next.is_none() {
72 polars_bail!(InvalidOperation: "the C stream must contain a non-null get_next")
73 };
74
75 if iter.get_last_error.is_none() {
76 polars_bail!(InvalidOperation: "The C stream MUST contain a non-null get_last_error")
77 };
78
79 let mut field = ArrowSchema::empty();
80 let status = if let Some(f) = iter.get_schema {
81 unsafe { (f)(&mut *iter, &mut field) }
82 } else {
83 polars_bail!(InvalidOperation:
84 "The C stream MUST contain a non-null get_schema"
85 )
86 };
87
88 if status != 0 {
89 return Err(unsafe { handle_error(&mut iter) });
90 }
91
92 let field = unsafe { import_field_from_c(&field)? };
93
94 Ok(Self { iter, field })
95 }
96
97 pub fn field(&self) -> &Field {
99 &self.field
100 }
101
102 pub unsafe fn next(&mut self) -> Option<PolarsResult<Box<dyn Array>>> {
112 let mut array = ArrowArray::empty();
113 let status = unsafe { (self.iter.get_next.unwrap())(&mut *self.iter, &mut array) };
114
115 if status != 0 {
116 return Some(Err(unsafe { handle_error(&mut self.iter) }));
117 }
118
119 array.release?;
121
122 unsafe { import_array_from_c(array, self.field.dtype.clone()) }
124 .map(Some)
125 .transpose()
126 }
127}
128
129struct PrivateData {
130 iter: Box<dyn Iterator<Item = PolarsResult<Box<dyn Array>>>>,
131 field: Field,
132 error: Option<CString>,
133}
134
135unsafe extern "C" fn get_next(iter: *mut ArrowArrayStream, array: *mut ArrowArray) -> i32 {
136 if iter.is_null() {
137 return 2001;
138 }
139 let private = &mut *((*iter).private_data as *mut PrivateData);
140
141 match private.iter.next() {
142 Some(Ok(item)) => {
143 let item_dt = item.dtype();
145 let expected_dt = private.field.dtype();
146 if item_dt != expected_dt {
147 private.error = Some(CString::new(format!("The iterator produced an item of data type {item_dt:?} but the producer expects data type {expected_dt:?}").as_bytes().to_vec()).unwrap());
148 return 2001; }
150
151 std::ptr::write(array, export_array_to_c(item));
152
153 private.error = None;
154 0
155 },
156 Some(Err(err)) => {
157 private.error = Some(CString::new(err.to_string().as_bytes().to_vec()).unwrap());
158 2001 },
160 None => {
161 let a = ArrowArray::empty();
162 std::ptr::write_unaligned(array, a);
163 private.error = None;
164 0
165 },
166 }
167}
168
169unsafe extern "C" fn get_schema(iter: *mut ArrowArrayStream, schema: *mut ArrowSchema) -> i32 {
170 if iter.is_null() {
171 return 2001;
172 }
173 let private = &mut *((*iter).private_data as *mut PrivateData);
174
175 std::ptr::write(schema, export_field_to_c(&private.field));
176 0
177}
178
179unsafe extern "C" fn get_last_error(iter: *mut ArrowArrayStream) -> *const ::std::os::raw::c_char {
180 if iter.is_null() {
181 return std::ptr::null();
182 }
183 let private = &mut *((*iter).private_data as *mut PrivateData);
184
185 private
186 .error
187 .as_ref()
188 .map(|x| x.as_ptr())
189 .unwrap_or(std::ptr::null())
190}
191
192unsafe extern "C" fn release(iter: *mut ArrowArrayStream) {
193 if iter.is_null() {
194 return;
195 }
196 let _ = Box::from_raw((*iter).private_data as *mut PrivateData);
197 (*iter).release = None;
198 }
200
201pub fn export_iterator(
203 iter: Box<dyn Iterator<Item = PolarsResult<Box<dyn Array>>>>,
204 field: Field,
205) -> ArrowArrayStream {
206 let private_data = Box::new(PrivateData {
207 iter,
208 field,
209 error: None,
210 });
211
212 ArrowArrayStream {
213 get_schema: Some(get_schema),
214 get_next: Some(get_next),
215 get_last_error: Some(get_last_error),
216 release: Some(release),
217 private_data: Box::into_raw(private_data) as *mut ::std::os::raw::c_void,
218 }
219}