In our ongoing petrol station simulator project, we previously used threads, mutexes, and condition variables to manage the concurrent operations. This approach, while effective, can be complex and error-prone. Today, we’ll rewrite our simulator to use the Async
gem and Thread::Queue
, which provide a more straightforward and safer way to handle concurrency.
Setting Up the Basics
First, let’s set up the basic structure of our program. We’ll need to require the necessary gems and initialize some shared data structures.
require 'async'
require 'async/barrier'
require 'thread'
require 'securerandom'
queue = Thread::Queue.new
barrier = Async::Barrier.new
cars_count = 10
pumps_count = 3
cars_waiting = true
start = Time.now
Here, we’ve required the async
and thread
libraries, set up a Thread::Queue
to manage our cars waiting to be fueled, and an Async::Barrier
to synchronize our tasks.
Generating Random Car Arrivals
Next, we’ll define a method to simulate cars arriving at random intervals. This method will use an enumerator to yield delays between car arrivals.
def random_interval_enumerator(cars_count)
Enumerator.new do |enum|
cars_count.times do |i|
delay = SecureRandom.random_number
sleep(delay)
enum.yield(delay)
end
end
end
This method will simulate cars arriving at random intervals up to 10 times (and you can run it with millions as well).
Simulating Car Arrivals and Pump Operations
Now, we’ll create the main Async
block where our asynchronous tasks will run. First, we’ll simulate car arrivals and add them to the queue.
Async do |task|
barrier.async do
random_interval_enumerator(cars_count).each_with_index do |delay, i|
car, fueling_time = "Car#{i}", (delay * 20).round(2)
queue << [car, fueling_time]
puts "#{car} is #{queue.size} in line"
end
end
Within this block, we use the barrier
to create an asynchronous task that generates cars at random intervals and adds them to the queue with their respective fueling times.
Creating the Fuel Pumps
Next, we’ll create the tasks that simulate the fuel pumps. Each pump will continuously check the queue for cars to fuel.
pumps_count.times do |i|
task.async do
pump = "Pump#{i}"
while cars_waiting || !queue.empty?
car, fueling_time = queue.shift
puts "#{pump} fueling #{car}..."
sleep(fueling_time)
puts "#{pump} fueled #{car} in #{fueling_time} seconds."
end
end
end
barrier.wait
cars_waiting = false
end
p Time.now - start
Here, we create five asynchronous tasks, each representing a fuel pump. Each pump will continue to operate as long as there are cars either waiting to be added to the queue or already in the queue.
Output
Car0 is 1 in line
Pump0 fueling Car0...
Car1 is 1 in line
Pump1 fueling Car1...
Car2 is 1 in line
Pump2 fueling Car2... # all pumps busy
Car3 is 1 in line # queue is growing
Car4 is 2 in line
Car5 is 3 in line
Car6 is 4 in line
Pump0 fueled Car0 in 4.84 seconds.
Pump0 fueling Car3... # first available pump takes a car
Car7 is 4 in line
Car8 is 5 in line
Car9 is 6 in line
Pump1 fueled Car1 in 5.35 seconds.
Pump1 fueling Car4...
Pump0 fueled Car3 in 8.63 seconds.
Pump0 fueling Car5...
Pump2 fueled Car2 in 12.08 seconds.
Pump2 fueling Car6...
Pump0 fueled Car5 in 1.78 seconds.
Pump0 fueling Car7...
Pump2 fueled Car6 in 2.22 seconds.
Pump2 fueling Car8...
Pump1 fueled Car4 in 13.44 seconds.
Pump1 fueling Car9...
Pump2 fueled Car8 in 3.64 seconds.
Pump0 fueled Car7 in 5.56 seconds.
Pump1 fueled Car9 in 4.9 seconds.
24.716897 # total time
Conclusion
By rewriting our petrol station simulator using the Async
gem and Thread::Queue
, we’ve probably created a more maintainable and safer concurrent program. The Async
gem simplifies the creation and synchronization of asynchronous tasks, while Thread::Queue
provides a thread-safe way to manage our cars waiting to be fueled.
I didn’t implement this approach to my Petrol Station Simulator yet, but it may be the right way. I have lately reduced amount of threads generated though. Maybe switching to Fibers and Async library will prove better. Here are some links to learn more about Async:
- https://socketry.github.io/async/guides/
- https://brunosutic.com/
- https://topenddevs.com/podcasts/ruby-rogues/episodes/revolutionizing-ruby-deployment-with-falcon-web-server-and-async-concurrency-framework-ruby-627#player1