diff --git a/src/threads.zig b/src/threads.zig index 527df87..bdbf599 100644 --- a/src/threads.zig +++ b/src/threads.zig @@ -9,6 +9,7 @@ const AtomicU32 = std.atomic.Atomic(u32); const Signals = struct { quit: AtomicBool, processed: AtomicU32, + eof_block: AtomicU32, const Self = @This(); @@ -16,6 +17,7 @@ const Signals = struct { return Self{ .quit = AtomicBool.init(false), .processed = AtomicU32.init(0), + .eof_block = AtomicU32.init(0), }; } }; @@ -69,6 +71,21 @@ pub const ThreadManager = struct { self.arena.deinit(); self.job_pool.deinit(); } + + pub fn quit(self: *Self) void { + self.signals.quit.store(true, .Release); + self.unblock(); + for (self.threads.items) |thread| { + thread.join(); + } + } + + pub fn eof(self: *Self) void { + self.signals.eof_block.store(1, .Release); + } + pub fn unblock(self: *Self) void { + self.signals.eof_block.store(0, .Release); + } }; fn quantize_loop(queue: *util.JobQueue, signals: *Signals, Q_Lum: *util.QTable, Q_Chrom: *util.QTable) void { @@ -76,5 +93,8 @@ fn quantize_loop(queue: *util.JobQueue, signals: *Signals, Q_Lum: *util.QTable, const job = queue.pop() orelse continue; transform.quantize(job.source, job.target, if (job.is_lum) Q_Lum else Q_Chrom); _ = @atomicRmw(u32, &signals.processed.value, .Add, 1, .SeqCst); + if (signals.eof_block.load(.Acquire) == 1) { + std.Thread.Futex.wait(&signals.eof_block, 1); + } } }