feat: Job dispatcher hooked up*

The job dispatcher has been hooked up and four threads are rendering the
scene. There's a super important caveat, though: The job submission is
blocking and will prevent the main thread from continuing to the result
collection. The threads will be unable to contiinue without having
output space, though. Since the buffers are only size 1, this will cause
4 scanlines to be rendered, and then nothing else to be done. Deadlock.
Bumping the input buffer to 100 lets the submission loop fill up the
workload and then move on to collecting.

There's also no scanline sorting, so everything gets <<wiggly>>.
This commit is contained in:
2023-06-24 20:56:28 -05:00
parent d77655af12
commit a4a389c10d
2 changed files with 23 additions and 42 deletions

View File

@@ -3,21 +3,21 @@
mod vec3;
mod ray;
mod camera;
mod material; mod hittable;
mod material;
mod hittable;
mod thread_utils;
use crate::vec3::Vec3;
use crate::ray::Ray;
use crate::hittable::Hittable;
use crate::material::Material;
use crate::camera::Camera;
use crate::thread_utils::RenderCommand;
use rand::{Rng, SeedableRng};
use rand::rngs::SmallRng;
use rand::distributions::Uniform;
use std::thread;
use std::sync::mpsc;
fn main() {
// image
let aspect_ratio = 3.0 / 2.0;
@@ -50,28 +50,6 @@ fn main() {
aperture,
dist_to_focus
);
// thread messaging channels
// Render output pipe endpoints
let (render_tx, render_rx) = mpsc::sync_channel::<(i32, Vec<Vec3>)>(1); // TODO: Figure out good names for the ends of the output pipe
let (job_tx, job_rx) = mpsc::channel::<RenderCommand>();
// Threads exist for the whole duration of the (main function) program.
let thread_handle = thread::spawn(move || {
let mut srng = small_rng.clone();
while let Ok(job) = job_rx.recv() {
match job {
RenderCommand::Stop => {
break;
}
RenderCommand::Line { line_num, context } => {
let line = render_line(line_num, &mut srng, context);
let result = (line_num, line);
render_tx.send(result).unwrap();
}
}
}
});
// render
// The render loop should now be a job submission mechanism
@@ -84,27 +62,35 @@ fn main() {
samples_per_pixel,
world,
};
let mut dispatcher = thread_utils::Dispatcher::new(&small_rng);
for y in (0..image.1).rev() {
eprintln!("Submitting scanline: {}", y);
let job = RenderCommand::Line { line_num: y, context: context.clone() };
job_tx.send(job).unwrap();
dispatcher.submit_job(job);
}
job_tx.send(RenderCommand::Stop).unwrap();
//TODO: Dispatcher shutdown mechanism
// Just gonna take advantage of the round-robin dispatching to
// get a stop command to each thread
dispatcher.submit_job(RenderCommand::Stop);
dispatcher.submit_job(RenderCommand::Stop);
dispatcher.submit_job(RenderCommand::Stop);
dispatcher.submit_job(RenderCommand::Stop);
// ... also I happen to know there are 4 threads.
while let Ok(line) = render_rx.recv() {
while let Ok(scanline) = dispatcher.render_rx.recv() {
//TODO: sort results once multiple threads are introduced.
let (linenum, colors) = line;
eprintln!("Received scanline: {}", linenum);
for color in colors {
eprintln!("Received scanline: {}", scanline.line_num);
for color in scanline.line {
println!("{}", color.print_ppm(samples_per_pixel));
}
}
thread_handle.join().unwrap();
// TODO: Dispatcher shutdown mechanism. Right now, we might technically be leaking threads.
eprintln!("Done!");
}
#[derive (Clone)]
struct RenderContext{
pub struct RenderContext{
image: (i32, i32),
samples_per_pixel: u32,
max_depth: u32,
@@ -112,11 +98,6 @@ struct RenderContext{
camera: Camera,
}
enum RenderCommand{
Stop,
Line { line_num: i32, context: RenderContext },
}
fn render_line(y: i32, small_rng: &mut SmallRng, context: RenderContext ) -> Vec<Vec3> {
let distrib_zero_one = Uniform::new(0.0, 1.0);
let distrib_plusminus_one = Uniform::new(-1.0, 1.0);

View File

@@ -42,7 +42,7 @@ impl Dispatcher {
for _ in 0..4 {
// create new command tx/rx pairs. Store tx in the list, give rx to the thread.
let (command_tx, command_rx) = mpsc::sync_channel::<RenderCommand>(1);
let (command_tx, command_rx) = mpsc::sync_channel::<RenderCommand>(100);
// TODO: Pick appropriate command queue depth (or make it controllable, even)
@@ -92,7 +92,7 @@ impl Dispatcher {
}
match self.command_transmitters.get(self.next_to_feed){
Some(target) => target.send(command),
Some(target) => target.send(command).unwrap(),
None => panic!("oh god oh fuck"),
}
self.next_to_feed += 1;