diff --git a/src/main.rs b/src/main.rs index e521549..e465ed1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -66,21 +66,16 @@ fn main() { }; thread::scope(|s| { - let (mut dispatcher, scanline_receiver) = thread_utils::Dispatcher::new(&small_rng); + let (mut dispatcher, scanline_receiver) = thread_utils::Dispatcher::new(&small_rng, 4); s.spawn(move || { for y in (0..image.1).rev() { eprintln!("Submitting scanline: {}", y); let job = RenderCommand::Line { line_num: y, context: context.clone() }; - dispatcher.submit_job(job); + dispatcher.submit_job(job).unwrap(); } - //TODO: Dispatcher shutdown mechanism - // Just gonna take advantage of the round-robin dispatching to - // get a stop command to each thread - dispatcher.submit_job(RenderCommand::Stop); - dispatcher.submit_job(RenderCommand::Stop); - dispatcher.submit_job(RenderCommand::Stop); - dispatcher.submit_job(RenderCommand::Stop); + + dispatcher.submit_job(RenderCommand::Stop).unwrap(); // ... also I happen to know there are 4 threads. }); diff --git a/src/thread_utils.rs b/src/thread_utils.rs index d2237e8..6156828 100644 --- a/src/thread_utils.rs +++ b/src/thread_utils.rs @@ -8,6 +8,8 @@ use std::thread; use std::sync::mpsc; use rand::rngs::SmallRng; + +#[derive (Clone)] pub enum RenderCommand{ Stop, Line { line_num: i32, context: RenderContext }, @@ -60,13 +62,13 @@ pub struct Dispatcher{ } impl Dispatcher { - pub fn new(srng: &SmallRng) -> (Dispatcher, mpsc::Receiver ) { + pub fn new(srng: &SmallRng, num_threads: usize) -> (Dispatcher, mpsc::Receiver ) { let mut handles = Vec::new(); let mut command_transmitters = Vec::>::new(); let (render_tx, render_rx) = mpsc::sync_channel::(1); - for _ in 0..4 { + for _ in 0..num_threads { // create new command tx/rx pairs. Store tx in the list, give rx to the thread. let (command_tx, command_rx) = mpsc::sync_channel::(1); // TODO: Pick appropriate command queue depth (or make it controllable, even) @@ -107,7 +109,14 @@ impl Dispatcher { // When passing the message to threads which are still busy, this function // will block (it's a sync_channel). While blocked, other threads could // become available and left idle. - pub fn submit_job(&mut self, command: RenderCommand) { + pub fn submit_job(&mut self, command: RenderCommand) -> Result<(), mpsc::SendError> { + // Stop command is special. We'll broadcast it to all threads. + if let RenderCommand::Stop = command { + for channel in &self.command_transmitters { + return channel.send(command.clone()); + } + } + // Check that `next_to_feed` is in-bounds, and then insert. // index is post-incremented with this function call. @@ -117,12 +126,13 @@ impl Dispatcher { } else if self.next_to_feed > self.handles.len() { panic!("How the hell did a +=1 skip past the maximum allowed size?"); } - + match self.command_transmitters.get(self.next_to_feed){ Some(target) => target.send(command).unwrap(), None => panic!("oh god oh fuck"), } self.next_to_feed += 1; + Ok(()) } }