From a4a389c10d11cd8089430b972b6088ce4da3c84e Mon Sep 17 00:00:00 2001 From: Robert Garrett Date: Sat, 24 Jun 2023 20:56:28 -0500 Subject: [PATCH] feat: Job dispatcher hooked up* The job dispatcher has been hooked up and four threads are rendering the scene. There's a super important caveat, though: The job submission is blocking and will prevent the main thread from continuing to the result collection. The threads will be unable to contiinue without having output space, though. Since the buffers are only size 1, this will cause 4 scanlines to be rendered, and then nothing else to be done. Deadlock. Bumping the input buffer to 100 lets the submission loop fill up the workload and then move on to collecting. There's also no scanline sorting, so everything gets <>. --- src/main.rs | 61 ++++++++++++++++----------------------------- src/thread_utils.rs | 4 +-- 2 files changed, 23 insertions(+), 42 deletions(-) diff --git a/src/main.rs b/src/main.rs index 6a8f757..3973ca0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,21 +3,21 @@ mod vec3; mod ray; mod camera; -mod material; mod hittable; +mod material; +mod hittable; +mod thread_utils; use crate::vec3::Vec3; use crate::ray::Ray; use crate::hittable::Hittable; use crate::material::Material; - use crate::camera::Camera; +use crate::thread_utils::RenderCommand; + use rand::{Rng, SeedableRng}; use rand::rngs::SmallRng; use rand::distributions::Uniform; -use std::thread; -use std::sync::mpsc; - fn main() { // image let aspect_ratio = 3.0 / 2.0; @@ -50,28 +50,6 @@ fn main() { aperture, dist_to_focus ); - - // thread messaging channels - // Render output pipe endpoints - let (render_tx, render_rx) = mpsc::sync_channel::<(i32, Vec)>(1); // TODO: Figure out good names for the ends of the output pipe - let (job_tx, job_rx) = mpsc::channel::(); - - // Threads exist for the whole duration of the (main function) program. - let thread_handle = thread::spawn(move || { - let mut srng = small_rng.clone(); - while let Ok(job) = job_rx.recv() { - match job { - RenderCommand::Stop => { - break; - } - RenderCommand::Line { line_num, context } => { - let line = render_line(line_num, &mut srng, context); - let result = (line_num, line); - render_tx.send(result).unwrap(); - } - } - } - }); // render // The render loop should now be a job submission mechanism @@ -84,27 +62,35 @@ fn main() { samples_per_pixel, world, }; + let mut dispatcher = thread_utils::Dispatcher::new(&small_rng); + for y in (0..image.1).rev() { eprintln!("Submitting scanline: {}", y); let job = RenderCommand::Line { line_num: y, context: context.clone() }; - job_tx.send(job).unwrap(); + dispatcher.submit_job(job); } - job_tx.send(RenderCommand::Stop).unwrap(); + //TODO: Dispatcher shutdown mechanism + // Just gonna take advantage of the round-robin dispatching to + // get a stop command to each thread + dispatcher.submit_job(RenderCommand::Stop); + dispatcher.submit_job(RenderCommand::Stop); + dispatcher.submit_job(RenderCommand::Stop); + dispatcher.submit_job(RenderCommand::Stop); + // ... also I happen to know there are 4 threads. - while let Ok(line) = render_rx.recv() { + while let Ok(scanline) = dispatcher.render_rx.recv() { //TODO: sort results once multiple threads are introduced. - let (linenum, colors) = line; - eprintln!("Received scanline: {}", linenum); - for color in colors { + eprintln!("Received scanline: {}", scanline.line_num); + for color in scanline.line { println!("{}", color.print_ppm(samples_per_pixel)); } } - thread_handle.join().unwrap(); + // TODO: Dispatcher shutdown mechanism. Right now, we might technically be leaking threads. eprintln!("Done!"); } #[derive (Clone)] -struct RenderContext{ +pub struct RenderContext{ image: (i32, i32), samples_per_pixel: u32, max_depth: u32, @@ -112,11 +98,6 @@ struct RenderContext{ camera: Camera, } -enum RenderCommand{ - Stop, - Line { line_num: i32, context: RenderContext }, -} - fn render_line(y: i32, small_rng: &mut SmallRng, context: RenderContext ) -> Vec { let distrib_zero_one = Uniform::new(0.0, 1.0); let distrib_plusminus_one = Uniform::new(-1.0, 1.0); diff --git a/src/thread_utils.rs b/src/thread_utils.rs index eeb6d8f..6dc2977 100644 --- a/src/thread_utils.rs +++ b/src/thread_utils.rs @@ -42,7 +42,7 @@ impl Dispatcher { for _ in 0..4 { // create new command tx/rx pairs. Store tx in the list, give rx to the thread. - let (command_tx, command_rx) = mpsc::sync_channel::(1); + let (command_tx, command_rx) = mpsc::sync_channel::(100); // TODO: Pick appropriate command queue depth (or make it controllable, even) @@ -92,7 +92,7 @@ impl Dispatcher { } match self.command_transmitters.get(self.next_to_feed){ - Some(target) => target.send(command), + Some(target) => target.send(command).unwrap(), None => panic!("oh god oh fuck"), } self.next_to_feed += 1;