Compare commits

...

10 Commits

Author SHA1 Message Date
fc8f9e0e15 Starting to get a hold of the Tile struct
Iterators are turning my brain to mush. I'm trying far too hard to
leverage existing iterator tooling. The closures all over the place make
saying the type names basically impossible. For chaining in the middle
of a function, this is fine. But I need to stick it in a `struct Tile{}`
and move it between threads.

This commit is a save point for my own sanity, more than anything else.
The tile struct exists and compiles. The only changed part is the
extraction of the pixel sampling loop into a named function (instead of
an nameless closuuuruurreeee)
2023-08-19 17:12:28 -05:00
601beb10a0 Control those Uniform distributions
I wanted to make the Uniform's into `const`s that live at the global
scope. Rust can do global const, but the `Uniform::new()` function
can't be used in that context.

As an intermediate to a *helpful* solution, I've just pushed them into a
struct. One less parameter to name even though it's the same stuff. The
compiler should be smart enough to hoist the initialization outside the
function and leave them around, but maybe not. After all, the function
isn't defined to work in such a way with the `const` keyword :v
2023-08-19 08:55:29 -05:00
4430b7c0bf Render frame function does the iterator thing!
The many, many nested for loops don't feel right in a language that lets
you `let x = for...` to assign the results of the loop directly to a
variable. The logic hasn't changed (and I'm pretty sure the compiler
emits the same code), but it feels better now.

I'm now equipped to go over the rest of the project and rewrite the
loops. Hopefully a more ergonomic way to dispatch to the threads arises
as a result. I shall see.
2023-08-19 08:55:27 -05:00
adaf277cba feat: Shutdown propogation and thread count arg
The Stop command will be handled specially by the Dispatcher to act as a
sort of broadcast into the thread pool. Instead of asking the caller to
feed in the appropriate number of stop commands, a single one will queue
up a stop for all threads. Important for the ergonomics of having a
variable number of threads (instead of the earlier magic constant).
Moreover, it's necessary if the dispatcher stops using the round-robin
style assignment.

The pool size is specifiable as an argument to Dispatcher::new().
2023-06-25 14:29:35 -05:00
9873c5596d feat: Output ordering!
As results come from the dispatcher('s return channel) they are pushed
into a vector to be reordered. They're sorted in reverse-order so that
they can be popped from the vector. Upon receipt and buffering of a
scanline, a loop checks the tail of the buffer to see if it's the
next-to-write element. Since the tail is popped off, this loop can run
until this condition is not met.
2023-06-25 12:11:30 -05:00
995cfdf391 feat: Dispatcher constructor separates render_rx
The dispatcher no longer owns the render results message channel, and
instead passes it out as a separate item during construction.
2023-06-25 09:14:41 -05:00
65185c7996 fail: Threads want full ownership, do another way
Saving for reference more than anything. The threads take ownership of
the data (the closures do, but whatever). Moving the return channel out
of the dispatcher means the dispatcher can't be moved into the feeder
thread.

I see a few solutions from here:
1. Proxy the return channel with another channel. Give the whole
   dispatcher to the feeder thread and hook up another output channel.
   Have the feeder unload the return and pass it through.
2. Rewrite the dispatcher constructor to pass a tuple of the dispatcher
   minus it's return channel, and the return channel now as a separate
   object. This could let them have independent lifetimes and then I can
   pass them around like I'm trying to do.
3. Have main do all the job feeding, result unloading, and
   recompositing. Don't have a feeder and collector thread, and just
   have main bounce between loading a few, and unloading a few.
2023-06-25 09:00:47 -05:00
a4a389c10d feat: Job dispatcher hooked up*
The job dispatcher has been hooked up and four threads are rendering the
scene. There's a super important caveat, though: The job submission is
blocking and will prevent the main thread from continuing to the result
collection. The threads will be unable to contiinue without having
output space, though. Since the buffers are only size 1, this will cause
4 scanlines to be rendered, and then nothing else to be done. Deadlock.
Bumping the input buffer to 100 lets the submission loop fill up the
workload and then move on to collecting.

There's also no scanline sorting, so everything gets <<wiggly>>.
2023-06-24 20:56:28 -05:00
d77655af12 fix: Increment submit_job counter
Gotta increment that counter to submit jobs to each thread. Big dummy.
2023-06-24 20:43:53 -05:00
1d7f075e0d feat: Dispatcher struct for handling jobs
It's getting fiddly and awful keeping all the thread control pieces all
over the place. Existing ThreadPool implementations seem a little weird,
and this is a learning experience. I'll just make my own!

The dispatcher constructor only creates the threads and sets up their IO
channels. It has another method for serial submission of jobs.

The job allocation follows a round-robin selection, but I have some
concerns about starvation doing this -- consider a long running scanline
render. It would make the submission call block and other threads could
become available for that job. But they'll be ignored, since the
round-robin assignment doesn't have any skip mechanisms.
2023-06-24 20:05:11 -05:00
3 changed files with 314 additions and 59 deletions

View File

@@ -7,4 +7,4 @@ edition = "2021"
[dependencies]
rand = { version = "0.8.5", features = ["small_rng"] }
itertools = { version = "0.11.0" }

View File

@@ -3,27 +3,33 @@
mod vec3;
mod ray;
mod camera;
mod material; mod hittable;
mod material;
mod hittable;
mod thread_utils;
use crate::vec3::Vec3;
use crate::ray::Ray;
use crate::hittable::Hittable;
use crate::material::Material;
use crate::camera::Camera;
use crate::thread_utils::RenderCommand;
use rand::{Rng, SeedableRng};
use rand::rngs::SmallRng;
use rand::distributions::Uniform;
use itertools;
use itertools::Itertools;
use std::ops;
use std::thread;
use std::sync::mpsc;
fn main() {
// image
let aspect_ratio = 3.0 / 2.0;
let image = (
400,
(400.0 / aspect_ratio) as i32
1920,
(1920.0 / aspect_ratio) as i32
);
let samples_per_pixel: u32 = 10;
let max_depth = 50;
@@ -50,28 +56,6 @@ fn main() {
aperture,
dist_to_focus
);
// thread messaging channels
// Render output pipe endpoints
let (render_tx, render_rx) = mpsc::sync_channel::<(i32, Vec<Vec3>)>(1); // TODO: Figure out good names for the ends of the output pipe
let (job_tx, job_rx) = mpsc::channel::<RenderCommand>();
// Threads exist for the whole duration of the (main function) program.
let thread_handle = thread::spawn(move || {
let mut srng = small_rng.clone();
while let Ok(job) = job_rx.recv() {
match job {
RenderCommand::Stop => {
break;
}
RenderCommand::Line { line_num, context } => {
let line = render_line(line_num, &mut srng, context);
let result = (line_num, line);
render_tx.send(result).unwrap();
}
}
}
});
// render
// The render loop should now be a job submission mechanism
@@ -84,27 +68,82 @@ fn main() {
samples_per_pixel,
world,
};
for y in (0..image.1).rev() {
eprintln!("Submitting scanline: {}", y);
let job = RenderCommand::Line { line_num: y, context: context.clone() };
job_tx.send(job).unwrap();
}
job_tx.send(RenderCommand::Stop).unwrap();
while let Ok(line) = render_rx.recv() {
//TODO: sort results once multiple threads are introduced.
let (linenum, colors) = line;
eprintln!("Received scanline: {}", linenum);
for color in colors {
println!("{}", color.print_ppm(samples_per_pixel));
thread::scope(|s| {
let (mut dispatcher, scanline_receiver) = thread_utils::Dispatcher::new(&small_rng, 12);
s.spawn(move || {
for y in (0..image.1).rev() {
eprintln!("Submitting scanline: {}", y);
let job = RenderCommand::Line { line_num: y, context: context.clone() };
dispatcher.submit_job(job).unwrap();
}
dispatcher.submit_job(RenderCommand::Stop).unwrap();
// ... also I happen to know there are 4 threads.
});
/*
* Store received results in the segments buffer.
* Some will land before their previous segments and will need to be held
* until the next-to-write arrives.
*
* Elements are sorted in reverse order so that they can be popped from the
* Vec quickly.
*
* The queue is scanned every single time a new item is received. In the
* happy path where the received item is next-up, it'll be buffered, checked
* and then printed. In the case where it isn't, it'll get buffered and
* stick around for more loops. When the next-to-write finally lands, it
* means the n+1 element is up, now. If that element is already in the buffer
* we want to write it out. Hence the loop that scans the whole buffer each
* receive.
*
* TODO: There could be an up-front conditional that checks to see if the
* received item *is* the next-to-write and skip the buffering step.
* But I need to make the concept work at all, first.
*/
let mut raster_segments = Vec::<thread_utils::RenderResult>::new();
let mut sl_output_index = image.1-1; // scanlines count down, start at image height.
while let Ok(scanline) = scanline_receiver.recv() {
eprintln!("Received scanline: {}", scanline.line_num);
raster_segments.push(scanline);
raster_segments.sort_by( |a, b| b.cmp(a) );
loop {
if raster_segments.len() == 0 { break; } // can this ever happen? Not while every
// single element gets pushed to the
// buffer first. With the happy path
// short-circuit noted above, it could.
let last_ind = raster_segments.len() - 1;
if raster_segments[last_ind].line_num == sl_output_index{
let scanline = raster_segments.pop().unwrap();
print_scanline(scanline, samples_per_pixel);
sl_output_index -= 1;
} else {
break;
}
}
}
}
thread_handle.join().unwrap();
eprintln!("Size of raster_segments at finish: {}", raster_segments.len());
});
// TODO: Dispatcher shutdown mechanism. Right now, we might technically be leaking threads.
eprintln!("Done!");
}
fn print_scanline(scanline: thread_utils::RenderResult, samples_per_pixel: u32){
eprintln!("Printing scanline num: {}", scanline.line_num);
for color in &scanline.line {
println!("{}", color.print_ppm(samples_per_pixel));
}
}
#[derive (Clone)]
struct RenderContext{
pub struct RenderContext{
image: (i32, i32),
samples_per_pixel: u32,
max_depth: u32,
@@ -112,26 +151,104 @@ struct RenderContext{
camera: Camera,
}
enum RenderCommand{
Stop,
Line { line_num: i32, context: RenderContext },
pub struct DistributionContianer {
distrib_zero_one: Uniform<f32>,
distrib_plusminus_one: Uniform<f32>,
}
fn render_line(y: i32, small_rng: &mut SmallRng, context: RenderContext ) -> Vec<Vec3> {
let distrib_zero_one = Uniform::new(0.0, 1.0);
let distrib_plusminus_one = Uniform::new(-1.0, 1.0);
let mut line = Vec::<Vec3>::new();
for x in 0..context.image.0 {
let mut color = Vec3::zero();
for _ in 0..context.samples_per_pixel {
let u = ((x as f32) + small_rng.sample(distrib_zero_one)) / ((context.image.0 - 1) as f32);
let v = ((y as f32) + small_rng.sample(distrib_zero_one)) / ((context.image.1 - 1) as f32);
let ray = context.camera.get_ray(u, v, small_rng);
color+= ray_color(ray, &context.world, context.max_depth, small_rng, distrib_plusminus_one);
impl DistributionContianer {
fn new() -> Self {
DistributionContianer {
distrib_zero_one: Uniform::new(0.0, 1.0),
distrib_plusminus_one: Uniform::new(-1.0, 1.0),
}
}
}
fn render_line(y: i32, small_rng: &mut SmallRng, context: RenderContext, distr: &DistributionContianer) -> Vec<Vec3> {
//TODO: Ensure that the compiler hoists the distribution's out as constants
// else, do so manually
(0..context.image.0).map(|x| {
sample_pixel(x, y, small_rng, &context, distr)
}).collect()
}
fn sample_pixel(x: i32, y: i32, small_rng: &mut SmallRng, context: &RenderContext, distr: &DistributionContianer) -> Vec3{
(0..context.samples_per_pixel).into_iter().fold(
Vec3::zero(),
|color, _sample| {
let u = ((x as f32) + small_rng.sample(distr.distrib_zero_one)) / ((context.image.0 - 1) as f32);
let v = ((y as f32) + small_rng.sample(distr.distrib_zero_one)) / ((context.image.1 - 1) as f32);
let ray = context.camera.get_ray(u, v, small_rng);
color + ray_color(ray, &context.world, context.max_depth, small_rng, distr.distrib_plusminus_one)
}
)
}
fn range2d(bounds: (i32, i32, i32, i32)) -> impl Iterator<Item = (i32, i32)> {
let rheight = bounds.1..(bounds.1+bounds.3);
rheight.flat_map(move |y| {
let rwidth = bounds.0..(bounds.0+bounds.2);
rwidth.map( move |x| {
(x, y)
})
})
}
#[derive (Copy, Clone)]
struct Rect {
x: i32,
y: i32,
w: i32,
h: i32,
}
/* Iterable that produces pixels left-to-right, top-to-bottom.
* `Tile`s represent the render space, not the finished image.
* There is no internal pixel buffer
*/
type TileCursorIter = itertools::Product<ops::Range<i32>, ops::Range<i32>>;
struct Tile {
bounds: Rect,
context: RenderContext,
small_rng: SmallRng,
rand_distr: DistributionContianer,
cursor: TileCursorIter,
}
impl Tile{
fn new(
bounds: Rect,
context: RenderContext,
small_rng: SmallRng,
rand_distr: DistributionContianer
) -> Self
{
Tile { bounds, context, small_rng, rand_distr,
cursor: (bounds.x..(bounds.x + bounds.w))
.cartesian_product(bounds.y..(bounds.y + bounds.h)
)
}
}
}
impl Iterator for Tile {
type Item = Vec3;
fn next(&mut self) -> Option<Self::Item> {
if let Some((x, y)) = self.cursor.next(){
Some(sample_pixel(
x, y,
&mut self.small_rng,
&self.context,
&self.rand_distr,
))
} else {
None
}
line.push(color);
}
return line;
}
fn ray_color(r: Ray, world: &Hittable, depth: u32, srng: &mut SmallRng, distrib: Uniform<f32> ) -> Vec3 {

138
src/thread_utils.rs Normal file
View File

@@ -0,0 +1,138 @@
use crate::RenderContext;
use crate::Vec3;
use crate::{render_line, DistributionContianer};
use core::cmp::Ordering;
use std::thread;
use std::sync::mpsc;
use rand::rngs::SmallRng;
#[derive (Clone)]
pub enum RenderCommand{
Stop,
Line { line_num: i32, context: RenderContext },
}
pub struct RenderResult {
pub line_num: i32,
pub line: Vec<Vec3>,
}
impl Ord for RenderResult {
fn cmp(&self, other: &Self) -> Ordering {
if self.line_num > other.line_num {
Ordering::Less
} else if self.line_num < other.line_num {
Ordering::Greater
} else {
Ordering::Equal
}
}
}
impl PartialOrd for RenderResult {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl PartialEq for RenderResult {
fn eq(&self, other: &Self) -> bool {
self.line_num == other.line_num
}
}
impl Eq for RenderResult {}
/*
* The dispatcher will hold a list of threads, and a list of command input channels to match.
* Helper functions exist to input jobs serially, and then dispatch them to an open thread.
*
* Since receivers can be matched to several senders, the input end of the result channel will
* be cloned and given to each of the threads.
* TODO: Consider holding a copy of the render_tx end in case threads exit early and need to
* be restored.
*/
pub struct Dispatcher{
handles: Vec<thread::JoinHandle<()>>,
command_transmitters: Vec<mpsc::SyncSender<RenderCommand>>,
next_to_feed: usize, // gonna do a round-robin style dispatch, ig.
}
impl Dispatcher {
pub fn new(srng: &SmallRng, num_threads: usize) -> (Dispatcher, mpsc::Receiver<RenderResult> ) {
let mut handles = Vec::new();
let mut command_transmitters = Vec::<mpsc::SyncSender<RenderCommand>>::new();
let (render_tx, render_rx) = mpsc::sync_channel::<RenderResult>(1);
for _ in 0..num_threads {
// create new command tx/rx pairs. Store tx in the list, give rx to the thread.
let (command_tx, command_rx) = mpsc::sync_channel::<RenderCommand>(1);
// TODO: Pick appropriate command queue depth (or make it controllable, even)
let mut srng = srng.clone();
let threads_result_tx = render_tx.clone();
let distribs = DistributionContianer::new();
let thread_handle = thread::spawn(move || {
while let Ok(job) = command_rx.recv() {
match job {
RenderCommand::Stop => {
break;
}
RenderCommand::Line { line_num, context } => {
let line = render_line(line_num, &mut srng, context, &distribs);
let result = RenderResult { line_num, line };
threads_result_tx.send(result).unwrap();
}
}
}
});
handles.push(thread_handle);
command_transmitters.push(command_tx);
}
// finally, stash everything in the Dispatcher struct and return.
(
Dispatcher{
handles,
command_transmitters,
next_to_feed: 0,
},
render_rx
)
}
//TODO: Reconsider round-robin dispatch
// When passing the message to threads which are still busy, this function
// will block (it's a sync_channel). While blocked, other threads could
// become available and left idle.
pub fn submit_job(&mut self, command: RenderCommand) -> Result<(), mpsc::SendError<RenderCommand>> {
// Stop command is special. We'll broadcast it to all threads.
if let RenderCommand::Stop = command {
for channel in &self.command_transmitters {
return channel.send(command.clone());
}
}
// Check that `next_to_feed` is in-bounds, and then insert.
// index is post-incremented with this function call.
// wrap when at length (0-indexed so last valid index is len-1)
if self.next_to_feed == self.handles.len() {
self.next_to_feed = 0;
} else if self.next_to_feed > self.handles.len() {
panic!("How the hell did a +=1 skip past the maximum allowed size?");
}
match self.command_transmitters.get(self.next_to_feed){
Some(target) => target.send(command).unwrap(),
None => panic!("oh god oh fuck"),
}
self.next_to_feed += 1;
Ok(())
}
}