polars_arrow/ffi/
stream.rs

1use std::ffi::{CStr, CString};
2use std::ops::DerefMut;
3
4use polars_error::{polars_bail, polars_err, PolarsError, PolarsResult};
5
6use super::{
7    export_array_to_c, export_field_to_c, import_array_from_c, import_field_from_c, ArrowArray,
8    ArrowArrayStream, ArrowSchema,
9};
10use crate::array::Array;
11use crate::datatypes::Field;
12
13impl Drop for ArrowArrayStream {
14    fn drop(&mut self) {
15        match self.release {
16            None => (),
17            Some(release) => unsafe { release(self) },
18        };
19    }
20}
21
22unsafe impl Send for ArrowArrayStream {}
23
24impl ArrowArrayStream {
25    /// Creates an empty [`ArrowArrayStream`] used to import from a producer.
26    pub fn empty() -> Self {
27        Self {
28            get_schema: None,
29            get_next: None,
30            get_last_error: None,
31            release: None,
32            private_data: std::ptr::null_mut(),
33        }
34    }
35}
36
37unsafe fn handle_error(iter: &mut ArrowArrayStream) -> PolarsError {
38    let error = unsafe { (iter.get_last_error.unwrap())(&mut *iter) };
39
40    if error.is_null() {
41        return polars_err!(ComputeError: "got unspecified external error");
42    }
43
44    let error = unsafe { CStr::from_ptr(error) };
45    polars_err!(ComputeError: "got external error: {}", error.to_str().unwrap())
46}
47
48/// Implements an iterator of [`Array`] consumed from the [C stream interface](https://arrow.apache.org/docs/format/CStreamInterface.html).
49pub struct ArrowArrayStreamReader<Iter: DerefMut<Target = ArrowArrayStream>> {
50    iter: Iter,
51    field: Field,
52}
53
54impl<Iter: DerefMut<Target = ArrowArrayStream>> ArrowArrayStreamReader<Iter> {
55    /// Returns a new [`ArrowArrayStreamReader`]
56    /// # Error
57    /// Errors iff the [`ArrowArrayStream`] is out of specification,
58    /// or was already released prior to calling this function.
59    ///
60    /// # Safety
61    /// This method is intrinsically `unsafe` since it assumes that the `ArrowArrayStream`
62    /// contains a valid Arrow C stream interface.
63    /// In particular:
64    /// * The `ArrowArrayStream` fulfills the invariants of the C stream interface
65    /// * The schema `get_schema` produces fulfills the C data interface
66    pub unsafe fn try_new(mut iter: Iter) -> PolarsResult<Self> {
67        if iter.release.is_none() {
68            polars_bail!(InvalidOperation: "the C stream was already released")
69        };
70
71        if iter.get_next.is_none() {
72            polars_bail!(InvalidOperation: "the C stream must contain a non-null get_next")
73        };
74
75        if iter.get_last_error.is_none() {
76            polars_bail!(InvalidOperation: "The C stream MUST contain a non-null get_last_error")
77        };
78
79        let mut field = ArrowSchema::empty();
80        let status = if let Some(f) = iter.get_schema {
81            unsafe { (f)(&mut *iter, &mut field) }
82        } else {
83            polars_bail!(InvalidOperation:
84                            "The C stream MUST contain a non-null get_schema"
85            )
86        };
87
88        if status != 0 {
89            return Err(unsafe { handle_error(&mut iter) });
90        }
91
92        let field = unsafe { import_field_from_c(&field)? };
93
94        Ok(Self { iter, field })
95    }
96
97    /// Returns the field provided by the stream
98    pub fn field(&self) -> &Field {
99        &self.field
100    }
101
102    /// Advances this iterator by one array
103    /// # Error
104    /// Errors iff:
105    /// * The C stream interface returns an error
106    /// * The C stream interface returns an invalid array (that we can identify, see Safety below)
107    ///
108    /// # Safety
109    /// Calling this iterator's `next` assumes that the [`ArrowArrayStream`] produces arrow arrays
110    /// that fulfill the C data interface
111    pub unsafe fn next(&mut self) -> Option<PolarsResult<Box<dyn Array>>> {
112        let mut array = ArrowArray::empty();
113        let status = unsafe { (self.iter.get_next.unwrap())(&mut *self.iter, &mut array) };
114
115        if status != 0 {
116            return Some(Err(unsafe { handle_error(&mut self.iter) }));
117        }
118
119        // last paragraph of https://arrow.apache.org/docs/format/CStreamInterface.html#c.ArrowArrayStream.get_next
120        array.release?;
121
122        // SAFETY: assumed from the C stream interface
123        unsafe { import_array_from_c(array, self.field.dtype.clone()) }
124            .map(Some)
125            .transpose()
126    }
127}
128
129struct PrivateData {
130    iter: Box<dyn Iterator<Item = PolarsResult<Box<dyn Array>>>>,
131    field: Field,
132    error: Option<CString>,
133}
134
135unsafe extern "C" fn get_next(iter: *mut ArrowArrayStream, array: *mut ArrowArray) -> i32 {
136    if iter.is_null() {
137        return 2001;
138    }
139    let private = &mut *((*iter).private_data as *mut PrivateData);
140
141    match private.iter.next() {
142        Some(Ok(item)) => {
143            // check that the array has the same dtype as field
144            let item_dt = item.dtype();
145            let expected_dt = private.field.dtype();
146            if item_dt != expected_dt {
147                private.error = Some(CString::new(format!("The iterator produced an item of data type {item_dt:?} but the producer expects data type {expected_dt:?}").as_bytes().to_vec()).unwrap());
148                return 2001; // custom application specific error (since this is never a result of this interface)
149            }
150
151            std::ptr::write(array, export_array_to_c(item));
152
153            private.error = None;
154            0
155        },
156        Some(Err(err)) => {
157            private.error = Some(CString::new(err.to_string().as_bytes().to_vec()).unwrap());
158            2001 // custom application specific error (since this is never a result of this interface)
159        },
160        None => {
161            let a = ArrowArray::empty();
162            std::ptr::write_unaligned(array, a);
163            private.error = None;
164            0
165        },
166    }
167}
168
169unsafe extern "C" fn get_schema(iter: *mut ArrowArrayStream, schema: *mut ArrowSchema) -> i32 {
170    if iter.is_null() {
171        return 2001;
172    }
173    let private = &mut *((*iter).private_data as *mut PrivateData);
174
175    std::ptr::write(schema, export_field_to_c(&private.field));
176    0
177}
178
179unsafe extern "C" fn get_last_error(iter: *mut ArrowArrayStream) -> *const ::std::os::raw::c_char {
180    if iter.is_null() {
181        return std::ptr::null();
182    }
183    let private = &mut *((*iter).private_data as *mut PrivateData);
184
185    private
186        .error
187        .as_ref()
188        .map(|x| x.as_ptr())
189        .unwrap_or(std::ptr::null())
190}
191
192unsafe extern "C" fn release(iter: *mut ArrowArrayStream) {
193    if iter.is_null() {
194        return;
195    }
196    let _ = Box::from_raw((*iter).private_data as *mut PrivateData);
197    (*iter).release = None;
198    // private drops automatically
199}
200
201/// Exports an iterator to the [C stream interface](https://arrow.apache.org/docs/format/CStreamInterface.html)
202pub fn export_iterator(
203    iter: Box<dyn Iterator<Item = PolarsResult<Box<dyn Array>>>>,
204    field: Field,
205) -> ArrowArrayStream {
206    let private_data = Box::new(PrivateData {
207        iter,
208        field,
209        error: None,
210    });
211
212    ArrowArrayStream {
213        get_schema: Some(get_schema),
214        get_next: Some(get_next),
215        get_last_error: Some(get_last_error),
216        release: Some(release),
217        private_data: Box::into_raw(private_data) as *mut ::std::os::raw::c_void,
218    }
219}