Skip to content

Commit acc12cd

Browse files
authored
Make Input threadsafe (optionally) (hydro-project#8)
1 parent 5162821 commit acc12cd

File tree

3 files changed

+113
-53
lines changed

3 files changed

+113
-53
lines changed

covid_tracing/src/main.rs

Lines changed: 9 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,10 @@
11
#![feature(never_type)]
22

3-
use std::sync::mpsc;
43
use std::time::Duration;
54

65
use hydroflow::compiled::pull::SymmetricHashJoin;
76
use hydroflow::compiled::{ForEach, Pivot, Tee};
87
use hydroflow::scheduled::collections::Iter;
9-
use hydroflow::scheduled::ctx::SendCtx;
108
use hydroflow::scheduled::handoff::VecHandoff;
119
use hydroflow::scheduled::Hydroflow;
1210
use hydroflow::{tl, tlt};
@@ -23,26 +21,11 @@ fn main() {
2321
type Phone = &'static str;
2422
type DateTime = usize;
2523

26-
let (contacts_send, contacts_recv) = mpsc::channel::<(Pid, Pid, DateTime)>();
27-
let (diagnosed_send, diagnosed_recv) = mpsc::channel::<(Pid, (DateTime, DateTime))>();
28-
let (people_send, people_recv) = mpsc::channel::<(Pid, (Name, Phone))>();
29-
3024
let mut df = Hydroflow::new();
3125

32-
let contacts_out = df.add_source(move |send: &mut SendCtx<VecHandoff<_>>| {
33-
send.give(Iter(contacts_recv.try_iter()));
34-
});
35-
let contacts_op_id = contacts_out.op_id();
36-
37-
let diagnosed_out = df.add_source(move |send: &mut SendCtx<VecHandoff<_>>| {
38-
send.give(Iter(diagnosed_recv.try_iter()));
39-
});
40-
let diagnosed_op_id = diagnosed_out.op_id();
41-
42-
let people_out = df.add_source(move |send: &mut SendCtx<VecHandoff<_>>| {
43-
send.give(Iter(people_recv.try_iter()));
44-
});
45-
let people_op_id = people_out.op_id();
26+
let (contacts_send, contacts_out) = df.add_channel_input();
27+
let (diagnosed_send, diagnosed_out) = df.add_channel_input();
28+
let (people_send, people_out) = df.add_channel_input();
4629

4730
let mut exposed_contacts = Default::default();
4831

@@ -127,17 +110,12 @@ fn main() {
127110
df.add_edge(people_out, people_in);
128111
df.add_edge(notifs_out, notifs_in);
129112

130-
let reactor = df.reactor();
131-
132113
let all_people = people::get_people();
133114

134115
let inner = all_people.clone();
135-
let inner_reactor = reactor.clone();
136116
std::thread::spawn(move || {
137-
for person in inner {
138-
people_send.send(person).unwrap();
139-
}
140-
inner_reactor.trigger(people_op_id).unwrap();
117+
people_send.give(Iter(inner.into_iter()));
118+
people_send.flush();
141119
});
142120

143121
std::thread::spawn(move || {
@@ -152,10 +130,8 @@ fn main() {
152130
let p1 = rng.gen_range(0..all_people.len());
153131
let p2 = rng.gen_range(0..all_people.len());
154132
if p1 != p2 {
155-
contacts_send
156-
.send((all_people[p1].0, all_people[p2].0, t))
157-
.unwrap();
158-
reactor.trigger(contacts_op_id).unwrap();
133+
contacts_send.give(Some((all_people[p1].0, all_people[p2].0, t)));
134+
contacts_send.flush();
159135
}
160136
}
161137
}
@@ -164,9 +140,8 @@ fn main() {
164140
if !all_people.is_empty() {
165141
let p = rng.gen_range(0..all_people.len());
166142
diagnosed_send
167-
.send((all_people[p].0, (t, t + TRANSMISSIBLE_DURATION)))
168-
.unwrap();
169-
reactor.trigger(diagnosed_op_id).unwrap();
143+
.give(Some((all_people[p].0, (t, t + TRANSMISSIBLE_DURATION))));
144+
diagnosed_send.flush();
170145
}
171146
}
172147
_ => unreachable!(),

hydroflow/src/scheduled/input.rs

Lines changed: 48 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,67 @@
1-
use std::{cell::RefCell, rc::Rc};
1+
use std::{cell::RefCell, marker::PhantomData, rc::Rc, sync::mpsc::SyncSender};
22

33
use super::{OpId, Reactor};
44

5-
pub struct Input<T> {
5+
pub trait Give<T> {
6+
fn give(&self, t: T) -> bool;
7+
}
8+
9+
pub struct Buffer<T>(pub(crate) Rc<RefCell<Vec<T>>>);
10+
impl<T> Give<T> for Buffer<T> {
11+
fn give(&self, t: T) -> bool {
12+
(*self.0).borrow_mut().push(t);
13+
true
14+
}
15+
}
16+
17+
impl<T> Default for Buffer<T> {
18+
fn default() -> Self {
19+
Buffer(Rc::new(RefCell::new(Vec::new())))
20+
}
21+
}
22+
23+
impl<T> Clone for Buffer<T> {
24+
fn clone(&self) -> Self {
25+
Buffer(self.0.clone())
26+
}
27+
}
28+
29+
impl<T> Give<T> for SyncSender<T> {
30+
fn give(&self, t: T) -> bool {
31+
matches!(self.send(t), Ok(_))
32+
}
33+
}
34+
35+
// TODO(justin): this thing should probably give Vecs to the Givable, and buffer
36+
// stuff up and automatically flush, but postponing that until we have occasion
37+
// to benchmark it.
38+
pub struct Input<T, G>
39+
where
40+
G: Give<T>,
41+
{
642
reactor: Reactor,
743
op_id: OpId,
8-
data: Rc<RefCell<Vec<T>>>,
44+
givable: G,
45+
_marker: PhantomData<T>,
946
}
10-
impl<T> Input<T> {
11-
pub fn new(reactor: Reactor, op_id: OpId, data: Rc<RefCell<Vec<T>>>) -> Self {
47+
impl<T, G> Input<T, G>
48+
where
49+
G: Give<T>,
50+
{
51+
pub fn new(reactor: Reactor, op_id: OpId, givable: G) -> Self {
1252
Input {
1353
reactor,
1454
op_id,
15-
data,
55+
givable,
56+
_marker: PhantomData,
1657
}
1758
}
1859

1960
pub fn give(&self, t: T) {
20-
(*self.data).borrow_mut().push(t);
61+
self.givable.give(t);
2162
}
2263

2364
pub fn flush(&self) {
2465
self.reactor.trigger(self.op_id).unwrap(/* TODO(justin) */);
2566
}
26-
27-
pub fn give_vec(&self, t: &mut Vec<T>) {
28-
(*self.data).borrow_mut().extend(t.drain(..));
29-
self.flush();
30-
}
3167
}

hydroflow/src/scheduled/mod.rs

Lines changed: 56 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use handoff::{Handoff, HandoffList, HandoffMeta, NullHandoff, VecHandoff};
1717
use subgraph::{NtoMClosureSubgraph, Subgraph, VariadicClosureSubgraph};
1818

1919
use self::handoff::CanReceive;
20-
use self::input::Input;
20+
use self::input::{Buffer, Input};
2121

2222
pub type OpId = usize;
2323
pub type HandoffId = usize;
@@ -286,25 +286,41 @@ impl Hydroflow {
286286

287287
/**
288288
* Adds an "input" operator, along with a handle to insert data into it.
289-
* TODO(justin): this thing currently cannot be passed across threads, but I
290-
* think if we replaced the Vec with a channel it could.
291289
*/
292-
pub fn add_input<T, W>(&mut self) -> (Input<T>, OutputPort<W>)
290+
pub fn add_input<T, W>(&mut self) -> (Input<T, Buffer<T>>, OutputPort<W>)
293291
where
294292
T: 'static,
295293
W: 'static + Handoff + CanReceive<T>,
296294
{
297-
let input = Rc::new(RefCell::new(Vec::new()));
295+
let input = Buffer::default();
298296
let inner_input = input.clone();
299297
let output_port = self.add_source::<_, W>(move |send| {
300-
for x in (*inner_input).borrow_mut().drain(..) {
298+
for x in (*inner_input.0).borrow_mut().drain(..) {
301299
send.give(x);
302300
}
303301
});
304302
let id = output_port.op_id;
305303
(Input::new(self.reactor(), id, input), output_port)
306304
}
307305

306+
/**
307+
* Adds a threadsafe "input" operator, along with a handle to insert data into it.
308+
*/
309+
pub fn add_channel_input<T, W>(&mut self) -> (Input<T, SyncSender<T>>, OutputPort<W>)
310+
where
311+
T: 'static,
312+
W: 'static + Handoff + CanReceive<T>,
313+
{
314+
let (sender, receiver) = mpsc::sync_channel(8000);
315+
let output_port = self.add_source::<_, W>(move |send| {
316+
for x in receiver.try_iter() {
317+
send.give(x);
318+
}
319+
});
320+
let id = output_port.op_id;
321+
(Input::new(self.reactor(), id, sender), output_port)
322+
}
323+
308324
/**
309325
* Adds a new compiled subgraph with no inputs and one output.
310326
*/
@@ -351,7 +367,7 @@ impl Hydroflow {
351367
}
352368

353369
/**
354-
* A handle into a specific [Hydroflow] instance for triggering operators to run, possibly from another thread.Default
370+
* A handle into a specific [Hydroflow] instance for triggering operators to run, possibly from another thread.
355371
*/
356372
#[derive(Clone)]
357373
pub struct Reactor {
@@ -650,3 +666,36 @@ fn test_input_handle() {
650666

651667
assert_eq!((*vec).borrow().clone(), vec![1, 2, 3, 4, 5, 6]);
652668
}
669+
670+
#[test]
671+
fn test_input_handle_thread() {
672+
let mut df = Hydroflow::new();
673+
674+
let (input, output_port) = df.add_channel_input();
675+
676+
let vec = Rc::new(RefCell::new(Vec::new()));
677+
let inner_vec = vec.clone();
678+
let input_port = df.add_sink(move |recv: &mut RecvCtx<VecHandoff<usize>>| {
679+
for v in recv.take_inner() {
680+
(*inner_vec).borrow_mut().push(v);
681+
}
682+
});
683+
684+
df.add_edge(output_port, input_port);
685+
686+
let (done, wait) = mpsc::channel();
687+
688+
std::thread::spawn(move || {
689+
input.give(Some(1));
690+
input.give(Some(2));
691+
input.give(Some(3));
692+
input.flush();
693+
done.send(()).unwrap();
694+
});
695+
696+
wait.recv().unwrap();
697+
698+
df.tick();
699+
700+
assert_eq!((*vec).borrow().clone(), vec![1, 2, 3]);
701+
}

0 commit comments

Comments
 (0)