From 1d7f075e0de96f344e8b2c26ede4121f4b9ae53d Mon Sep 17 00:00:00 2001 From: Robert Garrett Date: Sat, 24 Jun 2023 20:05:11 -0500 Subject: [PATCH] feat: Dispatcher struct for handling jobs It's getting fiddly and awful keeping all the thread control pieces all over the place. Existing ThreadPool implementations seem a little weird, and this is a learning experience. I'll just make my own! The dispatcher constructor only creates the threads and sets up their IO channels. It has another method for serial submission of jobs. The job allocation follows a round-robin selection, but I have some concerns about starvation doing this -- consider a long running scanline render. It would make the submission call block and other threads could become available for that job. But they'll be ignored, since the round-robin assignment doesn't have any skip mechanisms. --- src/thread_utils.rs | 100 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 100 insertions(+) create mode 100644 src/thread_utils.rs diff --git a/src/thread_utils.rs b/src/thread_utils.rs new file mode 100644 index 0000000..0f84be7 --- /dev/null +++ b/src/thread_utils.rs @@ -0,0 +1,100 @@ + +use crate::RenderContext; +use crate::Vec3; +use crate::render_line; + +use std::thread; +use std::sync::mpsc; +use rand::rngs::SmallRng; + +pub enum RenderCommand{ + Stop, + Line { line_num: i32, context: RenderContext }, +} + +pub struct RenderResult { + pub line_num: i32, + pub line: Vec, +} + +/* + * The dispatcher will hold a list of threads, and a list of command input channels to match. + * Helper functions exist to input jobs serially, and then dispatch them to an open thread. + * + * Since receivers can be matched to several senders, the input end of the result channel will + * be cloned and given to each of the threads. + * TODO: Consider holding a copy of the render_tx end in case threads exit early and need to + * be restored. + */ +pub struct Dispatcher{ + handles: Vec>, + command_transmitters: Vec>, + next_to_feed: usize, // gonna do a round-robin style dispatch, ig. + pub render_rx: mpsc::Receiver, +} + +impl Dispatcher { + pub fn new(srng: &SmallRng) -> Dispatcher { + let mut handles = Vec::new(); + let mut command_transmitters = Vec::>::new(); + + let (render_tx, render_rx) = mpsc::sync_channel::(1); + + for _ in 0..4 { + // create new command tx/rx pairs. Store tx in the list, give rx to the thread. + let (command_tx, command_rx) = mpsc::sync_channel::(1); + // TODO: Pick appropriate command queue depth (or make it controllable, even) + + + let mut srng = srng.clone(); + let threads_result_tx = render_tx.clone(); + + let thread_handle = thread::spawn(move || { + while let Ok(job) = command_rx.recv() { + match job { + RenderCommand::Stop => { + break; + } + RenderCommand::Line { line_num, context } => { + let line = render_line(line_num, &mut srng, context); + let result = RenderResult { line_num, line }; + threads_result_tx.send(result).unwrap(); + } + } + } + }); + handles.push(thread_handle); + command_transmitters.push(command_tx); + } + // finally, stash everything in the Dispatcher struct and return. + + Dispatcher{ + handles, + command_transmitters, + next_to_feed: 0, + render_rx, + } + } + + //TODO: Reconsider round-robin dispatch + // When passing the message to threads which are still busy, this function + // will block (it's a sync_channel). While blocked, other threads could + // become available and left idle. + pub fn submit_job(&mut self, command: RenderCommand) { + // Check that `next_to_feed` is in-bounds, and then insert. + // index is post-incremented with this function call. + + // wrap when at length (0-indexed so last valid index is len-1) + if self.next_to_feed == self.handles.len() { + self.next_to_feed = 0; + } else if self.next_to_feed > self.handles.len() { + panic!("How the hell did a +=1 skip past the maximum allowed size?"); + } + + match self.command_transmitters.get(self.next_to_feed){ + Some(target) => target.send(command), + None => panic!("oh god oh fuck"), + }; + } +} +