1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180
use std::time::Duration;
use crate::ffi::*;
use crate::{context_and_function, time_after_delay, WaitTimeout};
use crate::queue::Queue;
/// A Grand Central Dispatch group.
///
/// A `Group` is a mechanism for grouping closures and monitoring them. This
/// allows for aggregate synchronization, so you can track when all the
/// closures complete, even if they are running on different queues.
#[derive(Debug)]
pub struct Group {
ptr: dispatch_group_t,
}
impl Group {
/// Creates a new dispatch `Group`.
pub fn create() -> Group {
unsafe {
Group { ptr: dispatch_group_create() }
}
}
/// Indicates that a closure has entered self, and increments the current
/// count of outstanding tasks. Returns a `GroupGuard` that should be
/// dropped when the closure leaves self, decrementing the count.
pub fn enter(&self) -> GroupGuard {
GroupGuard::new(self)
}
/// Submits a closure asynchronously to the given `Queue` and associates it
/// with self.
pub fn exec_async<F>(&self, queue: &Queue, work: F)
where F: 'static + Send + FnOnce() {
let (context, work) = context_and_function(work);
unsafe {
dispatch_group_async_f(self.ptr, queue.ptr, context, work);
}
}
/// Schedules a closure to be submitted to the given `Queue` when all tasks
/// associated with self have completed.
/// If self is empty, the closure is submitted immediately.
pub fn notify<F>(&self, queue: &Queue, work: F)
where F: 'static + Send + FnOnce() {
let (context, work) = context_and_function(work);
unsafe {
dispatch_group_notify_f(self.ptr, queue.ptr, context, work);
}
}
/// Waits synchronously for all tasks associated with self to complete.
pub fn wait(&self) {
let result = unsafe {
dispatch_group_wait(self.ptr, DISPATCH_TIME_FOREVER)
};
assert!(result == 0, "Dispatch group wait errored");
}
/// Waits for all tasks associated with self to complete within the
/// specified duration.
/// Returns true if the tasks completed or false if the timeout elapsed.
pub fn wait_timeout(&self, timeout: Duration) -> Result<(), WaitTimeout> {
let when = time_after_delay(timeout);
let result = unsafe {
dispatch_group_wait(self.ptr, when)
};
if result == 0 {
Ok(())
} else {
Err(WaitTimeout { duration: timeout })
}
}
/// Returns whether self is currently empty.
pub fn is_empty(&self) -> bool {
let result = unsafe {
dispatch_group_wait(self.ptr, DISPATCH_TIME_NOW)
};
result == 0
}
}
unsafe impl Sync for Group { }
unsafe impl Send for Group { }
impl Clone for Group {
fn clone(&self) -> Self {
unsafe {
dispatch_retain(self.ptr);
}
Group { ptr: self.ptr }
}
}
impl Drop for Group {
fn drop(&mut self) {
unsafe {
dispatch_release(self.ptr);
}
}
}
/// An RAII guard which will leave a `Group` when dropped.
#[derive(Debug)]
pub struct GroupGuard {
group: Group,
}
impl GroupGuard {
fn new(group: &Group) -> GroupGuard {
unsafe {
dispatch_group_enter(group.ptr);
}
GroupGuard { group: group.clone() }
}
/// Drops self, leaving the `Group`.
pub fn leave(self) { }
}
impl Clone for GroupGuard {
fn clone(&self) -> Self {
GroupGuard::new(&self.group)
}
}
impl Drop for GroupGuard {
fn drop(&mut self) {
unsafe {
dispatch_group_leave(self.group.ptr);
}
}
}
#[cfg(test)]
mod tests {
use std::sync::{Arc, Mutex};
use crate::{Queue, QueueAttribute};
use super::Group;
#[test]
fn test_group() {
let group = Group::create();
let q = Queue::create("", QueueAttribute::Serial);
let num = Arc::new(Mutex::new(0));
let num2 = num.clone();
group.exec_async(&q, move || {
let mut num = num2.lock().unwrap();
*num += 1;
});
let guard = group.enter();
assert!(!group.is_empty());
let num3 = num.clone();
q.exec_async(move || {
let mut num = num3.lock().unwrap();
*num += 1;
guard.leave();
});
let notify_group = Group::create();
let guard = notify_group.enter();
let num4 = num.clone();
group.notify(&q, move || {
let mut num = num4.lock().unwrap();
*num *= 5;
guard.leave();
});
// Wait for the notify block to finish
notify_group.wait();
// If the notify ran, the group should be empty
assert!(group.is_empty());
// The notify must have run after the two blocks of the group
assert_eq!(*num.lock().unwrap(), 10);
}
}