feat: Shutdown propogation and thread count arg

The Stop command will be handled specially by the Dispatcher to act as a
sort of broadcast into the thread pool. Instead of asking the caller to
feed in the appropriate number of stop commands, a single one will queue
up a stop for all threads. Important for the ergonomics of having a
variable number of threads (instead of the earlier magic constant).
Moreover, it's necessary if the dispatcher stops using the round-robin
style assignment.

The pool size is specifiable as an argument to Dispatcher::new().
This commit is contained in:
2023-06-25 14:29:35 -05:00
parent 9873c5596d
commit adaf277cba
2 changed files with 18 additions and 13 deletions

View File

@@ -66,21 +66,16 @@ fn main() {
};
thread::scope(|s| {
let (mut dispatcher, scanline_receiver) = thread_utils::Dispatcher::new(&small_rng);
let (mut dispatcher, scanline_receiver) = thread_utils::Dispatcher::new(&small_rng, 4);
s.spawn(move || {
for y in (0..image.1).rev() {
eprintln!("Submitting scanline: {}", y);
let job = RenderCommand::Line { line_num: y, context: context.clone() };
dispatcher.submit_job(job);
dispatcher.submit_job(job).unwrap();
}
//TODO: Dispatcher shutdown mechanism
// Just gonna take advantage of the round-robin dispatching to
// get a stop command to each thread
dispatcher.submit_job(RenderCommand::Stop);
dispatcher.submit_job(RenderCommand::Stop);
dispatcher.submit_job(RenderCommand::Stop);
dispatcher.submit_job(RenderCommand::Stop);
dispatcher.submit_job(RenderCommand::Stop).unwrap();
// ... also I happen to know there are 4 threads.
});

View File

@@ -8,6 +8,8 @@ use std::thread;
use std::sync::mpsc;
use rand::rngs::SmallRng;
#[derive (Clone)]
pub enum RenderCommand{
Stop,
Line { line_num: i32, context: RenderContext },
@@ -60,13 +62,13 @@ pub struct Dispatcher{
}
impl Dispatcher {
pub fn new(srng: &SmallRng) -> (Dispatcher, mpsc::Receiver<RenderResult> ) {
pub fn new(srng: &SmallRng, num_threads: usize) -> (Dispatcher, mpsc::Receiver<RenderResult> ) {
let mut handles = Vec::new();
let mut command_transmitters = Vec::<mpsc::SyncSender<RenderCommand>>::new();
let (render_tx, render_rx) = mpsc::sync_channel::<RenderResult>(1);
for _ in 0..4 {
for _ in 0..num_threads {
// create new command tx/rx pairs. Store tx in the list, give rx to the thread.
let (command_tx, command_rx) = mpsc::sync_channel::<RenderCommand>(1);
// TODO: Pick appropriate command queue depth (or make it controllable, even)
@@ -107,7 +109,14 @@ impl Dispatcher {
// When passing the message to threads which are still busy, this function
// will block (it's a sync_channel). While blocked, other threads could
// become available and left idle.
pub fn submit_job(&mut self, command: RenderCommand) {
pub fn submit_job(&mut self, command: RenderCommand) -> Result<(), mpsc::SendError<RenderCommand>> {
// Stop command is special. We'll broadcast it to all threads.
if let RenderCommand::Stop = command {
for channel in &self.command_transmitters {
return channel.send(command.clone());
}
}
// Check that `next_to_feed` is in-bounds, and then insert.
// index is post-incremented with this function call.
@@ -117,12 +126,13 @@ impl Dispatcher {
} else if self.next_to_feed > self.handles.len() {
panic!("How the hell did a +=1 skip past the maximum allowed size?");
}
match self.command_transmitters.get(self.next_to_feed){
Some(target) => target.send(command).unwrap(),
None => panic!("oh god oh fuck"),
}
self.next_to_feed += 1;
Ok(())
}
}