Skip to content

Commit c51f12b

Browse files
authored
Merge pull request #89 from orxfun/into-iter-for-concurrent-pinned-vec
Into iter for concurrent pinned vec
2 parents 16d0ec3 + b801ef7 commit c51f12b

File tree

12 files changed

+539
-12
lines changed

12 files changed

+539
-12
lines changed

Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "orx-split-vec"
3-
version = "3.20.0"
3+
version = "3.21.0"
44
edition = "2024"
55
authors = ["orxfun <orx.ugur.arikan@gmail.com>"]
66
description = "An efficient dynamic capacity vector with pinned element guarantees."
@@ -12,7 +12,7 @@ categories = ["data-structures", "rust-patterns", "no-std"]
1212
[dependencies]
1313
orx-iterable = { version = "1.3.0", default-features = false }
1414
orx-pseudo-default = { version = "2.1.0", default-features = false }
15-
orx-pinned-vec = { version = "3.18.0", default-features = false }
15+
orx-pinned-vec = { version = "3.20.0", default-features = false }
1616
orx-concurrent-iter = { version = "3.1.0", default-features = false }
1717

1818
[[bench]]
@@ -24,4 +24,4 @@ criterion = "0.7.0"
2424
rand = { version = "0.9.2", default-features = false }
2525
rand_chacha = { version = "0.9", default-features = false }
2626
test-case = "3.3.1"
27-
orx-concurrent-bag = "3.1.0"
27+
# orx-concurrent-bag = "3.1.0"
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
mod con_iter;
2-
mod into;
1+
// mod con_iter;
2+
// mod into;
33
mod par;
44
mod transformations;
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
mod con_iter;
2-
mod into;
1+
// mod con_iter;
2+
// mod into;
33
mod par;
44
mod transformations;

src/concurrent_pinned_vec/con_pinvec.rs

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::{
22
Doubling, Fragment, GrowthWithConstantTimeAccess, SplitVec,
33
common_traits::iterator::{IterOfSlicesOfCon, SliceBorrowAsMut, SliceBorrowAsRef},
4-
concurrent_pinned_vec::iter_ptr::IterPtrOfCon,
4+
concurrent_pinned_vec::{into_iter::ConcurrentSplitVecIntoIter, iter_ptr::IterPtrOfCon},
55
fragment::transformations::{fragment_from_raw, fragment_into_raw},
66
};
77
use alloc::vec::Vec;
@@ -10,10 +10,10 @@ use core::sync::atomic::{AtomicUsize, Ordering};
1010
use core::{cell::UnsafeCell, ops::Range};
1111
use orx_pinned_vec::ConcurrentPinnedVec;
1212

13-
struct FragmentData {
14-
f: usize,
15-
len: usize,
16-
capacity: usize,
13+
pub struct FragmentData {
14+
pub f: usize,
15+
pub len: usize,
16+
pub capacity: usize,
1717
}
1818

1919
/// Concurrent wrapper ([`orx_pinned_vec::ConcurrentPinnedVec`]) for the `SplitVec`.
@@ -35,6 +35,15 @@ impl<T, G: GrowthWithConstantTimeAccess> Drop for ConcurrentSplitVec<T, G> {
3535
}
3636

3737
impl<T, G: GrowthWithConstantTimeAccess> ConcurrentSplitVec<T, G> {
38+
pub(super) fn destruct(mut self) -> (G, Vec<UnsafeCell<*mut T>>, usize) {
39+
let mut data = Vec::new();
40+
core::mem::swap(&mut self.data, &mut data);
41+
let capacity = self.capacity.load(Ordering::Relaxed);
42+
let growth = self.growth.clone();
43+
self.zero();
44+
(growth, data, capacity)
45+
}
46+
3847
unsafe fn get_raw_mut_unchecked_fi(&self, f: usize, i: usize) -> *mut T {
3948
let p = unsafe { *self.data[f].get() };
4049
unsafe { p.add(i) }
@@ -195,6 +204,8 @@ impl<T, G: GrowthWithConstantTimeAccess> ConcurrentPinnedVec<T> for ConcurrentSp
195204
where
196205
Self: 'a;
197206

207+
type IntoIter = ConcurrentSplitVecIntoIter<T, G>;
208+
198209
unsafe fn into_inner(mut self, len: usize) -> Self::P {
199210
let mut fragments = Vec::with_capacity(self.max_num_fragments);
200211
let mut take_fragment = |fragment| fragments.push(fragment);
@@ -434,4 +445,9 @@ impl<T, G: GrowthWithConstantTimeAccess> ConcurrentPinnedVec<T> for ConcurrentSp
434445
unsafe fn ptr_iter_unchecked(&self, range: Range<usize>) -> Self::PtrIter<'_> {
435446
IterPtrOfCon::new(self.capacity(), &self.data, self.growth.clone(), range)
436447
}
448+
449+
unsafe fn into_iter(self, range: Range<usize>) -> Self::IntoIter {
450+
let (growth, data, capacity) = self.destruct();
451+
ConcurrentSplitVecIntoIter::new(capacity, data, growth, range)
452+
}
437453
}
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
use crate::{
2+
GrowthWithConstantTimeAccess,
3+
concurrent_pinned_vec::into_iter_ptr_slices::IntoIterPtrOfConSlices,
4+
};
5+
use alloc::vec::Vec;
6+
use core::{cell::UnsafeCell, ops::Range};
7+
8+
pub struct ConcurrentSplitVecIntoIter<T, G>
9+
where
10+
G: GrowthWithConstantTimeAccess,
11+
{
12+
slices: IntoIterPtrOfConSlices<T, G>,
13+
len_of_remaining_slices: usize,
14+
current_ptr: *const T,
15+
current_last: *const T,
16+
}
17+
18+
impl<T, G> ConcurrentSplitVecIntoIter<T, G>
19+
where
20+
G: GrowthWithConstantTimeAccess,
21+
{
22+
pub fn new(
23+
capacity: usize,
24+
fragments: Vec<UnsafeCell<*mut T>>,
25+
growth: G,
26+
range: Range<usize>,
27+
) -> Self {
28+
let len_of_remaining_slices = range.len();
29+
let slices = IntoIterPtrOfConSlices::new(capacity, fragments, growth, range);
30+
Self {
31+
slices,
32+
len_of_remaining_slices,
33+
current_ptr: core::ptr::null(),
34+
current_last: core::ptr::null(),
35+
}
36+
}
37+
38+
fn remaining(&self) -> usize {
39+
let remaining_current = match self.current_ptr.is_null() {
40+
true => 0,
41+
// SAFETY: whenever current_ptr is not null, we know that current_last is also not
42+
// null which is >= current_ptr.
43+
false => unsafe { self.current_last.offset_from(self.current_ptr) as usize + 1 },
44+
};
45+
46+
self.len_of_remaining_slices + remaining_current
47+
}
48+
49+
fn next_ptr(&mut self) -> Option<*mut T> {
50+
match self.current_ptr {
51+
ptr if ptr.is_null() => self.next_slice(),
52+
ptr if ptr == self.current_last => {
53+
self.current_ptr = core::ptr::null_mut();
54+
Some(ptr as *mut T)
55+
}
56+
ptr => {
57+
// SAFETY: current_ptr is not the last element, hance current_ptr+1 is in bounds
58+
self.current_ptr = unsafe { self.current_ptr.add(1) };
59+
60+
// SAFETY: ptr is valid and its value can be taken.
61+
// Drop will skip this position which is now uninitialized.
62+
Some(ptr as *mut T)
63+
}
64+
}
65+
}
66+
67+
fn next_slice(&mut self) -> Option<*mut T> {
68+
self.slices.next().and_then(|(ptr, len)| {
69+
debug_assert!(len > 0);
70+
self.len_of_remaining_slices -= len;
71+
// SAFETY: pointers are not null since slice is not empty
72+
self.current_ptr = ptr;
73+
self.current_last = unsafe { ptr.add(len - 1) };
74+
self.next_ptr()
75+
})
76+
}
77+
}
78+
79+
impl<T, G> Iterator for ConcurrentSplitVecIntoIter<T, G>
80+
where
81+
G: GrowthWithConstantTimeAccess,
82+
{
83+
type Item = T;
84+
85+
#[inline(always)]
86+
fn next(&mut self) -> Option<Self::Item> {
87+
self.next_ptr().map(|ptr| unsafe { ptr.read() })
88+
}
89+
90+
fn size_hint(&self) -> (usize, Option<usize>) {
91+
let len = self.remaining();
92+
(len, Some(len))
93+
}
94+
}
95+
96+
impl<T, G> ExactSizeIterator for ConcurrentSplitVecIntoIter<T, G>
97+
where
98+
G: GrowthWithConstantTimeAccess,
99+
{
100+
fn len(&self) -> usize {
101+
self.remaining()
102+
}
103+
}
104+
105+
impl<T, G> Drop for ConcurrentSplitVecIntoIter<T, G>
106+
where
107+
G: GrowthWithConstantTimeAccess,
108+
{
109+
fn drop(&mut self) {
110+
if core::mem::needs_drop::<T>() {
111+
while let Some(ptr) = self.next_ptr() {
112+
// SAFETY: ptr is in bounds and have not been dropped yet
113+
unsafe { ptr.drop_in_place() };
114+
}
115+
}
116+
}
117+
}
Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
use crate::{
2+
GrowthWithConstantTimeAccess,
3+
fragment::transformations::fragment_from_raw,
4+
range_helpers::{range_end, range_start},
5+
};
6+
use alloc::vec::Vec;
7+
use core::cmp::min;
8+
use core::{cell::UnsafeCell, iter::FusedIterator, ops::Range};
9+
10+
#[derive(Default)]
11+
pub(super) struct IntoIterPtrOfConSlices<T, G>
12+
where
13+
G: GrowthWithConstantTimeAccess,
14+
{
15+
fragments: Vec<UnsafeCell<*mut T>>,
16+
growth: G,
17+
sf: usize,
18+
si: usize,
19+
si_end: usize,
20+
ef: usize,
21+
ei: usize,
22+
f: usize,
23+
}
24+
25+
impl<T, G> Drop for IntoIterPtrOfConSlices<T, G>
26+
where
27+
G: GrowthWithConstantTimeAccess,
28+
{
29+
fn drop(&mut self) {
30+
Self::drop_fragments(&self.growth, &mut self.fragments);
31+
}
32+
}
33+
34+
impl<T, G> IntoIterPtrOfConSlices<T, G>
35+
where
36+
G: GrowthWithConstantTimeAccess,
37+
{
38+
fn empty() -> Self {
39+
Self {
40+
fragments: Default::default(),
41+
growth: G::pseudo_default(),
42+
sf: 0,
43+
si: 0,
44+
si_end: 0,
45+
ef: 0,
46+
ei: 0,
47+
f: 1,
48+
}
49+
}
50+
51+
fn single_slice(
52+
fragments: Vec<UnsafeCell<*mut T>>,
53+
growth: G,
54+
f: usize,
55+
begin: usize,
56+
end: usize,
57+
) -> Self {
58+
Self {
59+
fragments,
60+
growth,
61+
sf: f,
62+
si: begin,
63+
si_end: end,
64+
ef: f,
65+
ei: 0,
66+
f,
67+
}
68+
}
69+
70+
pub fn new(
71+
capacity: usize,
72+
mut fragments: Vec<UnsafeCell<*mut T>>,
73+
growth: G,
74+
range: Range<usize>,
75+
) -> Self {
76+
let fragment_and_inner_indices = |i| growth.get_fragment_and_inner_indices_unchecked(i);
77+
78+
let a = range_start(&range);
79+
let b = min(capacity, range_end(&range, capacity));
80+
81+
match b.saturating_sub(a) {
82+
0 => {
83+
Self::drop_fragments(&growth, &mut fragments);
84+
Self::empty()
85+
}
86+
_ => {
87+
let (sf, si) = fragment_and_inner_indices(a);
88+
let (ef, ei) = fragment_and_inner_indices(b - 1);
89+
90+
match sf == ef {
91+
true => Self::single_slice(fragments, growth, sf, si, ei + 1),
92+
false => {
93+
let si_end = growth.fragment_capacity_of(sf);
94+
Self {
95+
fragments,
96+
growth,
97+
sf,
98+
si,
99+
si_end,
100+
ef,
101+
ei,
102+
f: sf,
103+
}
104+
}
105+
}
106+
}
107+
}
108+
}
109+
110+
fn drop_fragments(growth: &G, fragments: &mut [UnsafeCell<*mut T>]) {
111+
for (f, cell) in fragments.iter().enumerate() {
112+
let ptr = unsafe { *cell.get() };
113+
match ptr.is_null() {
114+
true => continue,
115+
false => {
116+
let capacity = growth.fragment_capacity_of(f);
117+
let _fragment_to_drop = unsafe { fragment_from_raw(ptr, 0, capacity) };
118+
}
119+
}
120+
}
121+
}
122+
123+
#[inline(always)]
124+
fn remaining_len(&self) -> usize {
125+
(1 + self.ef).saturating_sub(self.f)
126+
}
127+
128+
#[inline(always)]
129+
fn get_ptr_fi(&self, f: usize, i: usize) -> *mut T {
130+
let p = unsafe { *self.fragments[f].get() };
131+
unsafe { p.add(i) }
132+
}
133+
134+
#[inline(always)]
135+
fn capacity_of(&self, f: usize) -> usize {
136+
self.growth.fragment_capacity_of(f)
137+
}
138+
}
139+
140+
impl<T, G> Iterator for IntoIterPtrOfConSlices<T, G>
141+
where
142+
G: GrowthWithConstantTimeAccess,
143+
{
144+
type Item = (*mut T, usize);
145+
146+
fn next(&mut self) -> Option<Self::Item> {
147+
match self.f {
148+
f if f == self.sf => {
149+
self.f += 1;
150+
let len = self.si_end - self.si;
151+
let p = self.get_ptr_fi(self.sf, self.si);
152+
Some((p, len))
153+
}
154+
f if f < self.ef => {
155+
self.f += 1;
156+
let len = self.capacity_of(f);
157+
let p = self.get_ptr_fi(f, 0);
158+
Some((p, len))
159+
}
160+
f if f == self.ef => {
161+
self.f += 1;
162+
let len = self.ei + 1;
163+
let p = self.get_ptr_fi(self.ef, 0);
164+
Some((p, len))
165+
}
166+
_ => None,
167+
}
168+
}
169+
170+
fn size_hint(&self) -> (usize, Option<usize>) {
171+
let len = self.remaining_len();
172+
(len, Some(len))
173+
}
174+
}
175+
176+
impl<T, G> FusedIterator for IntoIterPtrOfConSlices<T, G> where G: GrowthWithConstantTimeAccess {}
177+
178+
impl<T, G> ExactSizeIterator for IntoIterPtrOfConSlices<T, G>
179+
where
180+
G: GrowthWithConstantTimeAccess,
181+
{
182+
fn len(&self) -> usize {
183+
self.remaining_len()
184+
}
185+
}

0 commit comments

Comments
 (0)