From Threads to Async gem

In our ongoing petrol station simulator project, we previously used threads, mutexes, and condition variables to manage the concurrent operations. This approach, while effective, can be complex and error-prone. Today, we’ll rewrite our simulator to use the Async gem and Thread::Queue, which provide a more straightforward and safer way to handle concurrency.

Setting Up the Basics

First, let’s set up the basic structure of our program. We’ll need to require the necessary gems and initialize some shared data structures.

require 'async'
require 'async/barrier'
require 'thread'
require 'securerandom'

queue = Thread::Queue.new
barrier = Async::Barrier.new

cars_count = 10
pumps_count = 3
cars_waiting = true

start = Time.now

Here, we’ve required the async and thread libraries, set up a Thread::Queue to manage our cars waiting to be fueled, and an Async::Barrier to synchronize our tasks.

Generating Random Car Arrivals

Next, we’ll define a method to simulate cars arriving at random intervals. This method will use an enumerator to yield delays between car arrivals.

def random_interval_enumerator(cars_count)
  Enumerator.new do |enum|
    cars_count.times do |i|
      delay = SecureRandom.random_number
      sleep(delay)
      enum.yield(delay)
    end
  end
end

This method will simulate cars arriving at random intervals up to 10 times (and you can run it with millions as well).

Simulating Car Arrivals and Pump Operations

Now, we’ll create the main Async block where our asynchronous tasks will run. First, we’ll simulate car arrivals and add them to the queue.

Async do |task|
  barrier.async do
    random_interval_enumerator(cars_count).each_with_index do |delay, i|
      car, fueling_time = "Car#{i}", (delay * 20).round(2)
      queue << [car, fueling_time]

      puts "#{car} is #{queue.size} in line"
    end
  end

Within this block, we use the barrier to create an asynchronous task that generates cars at random intervals and adds them to the queue with their respective fueling times.

Creating the Fuel Pumps

Next, we’ll create the tasks that simulate the fuel pumps. Each pump will continuously check the queue for cars to fuel.

  pumps_count.times do |i|
    task.async do
      pump = "Pump#{i}"

      while cars_waiting || !queue.empty?
        car, fueling_time = queue.shift

        puts "#{pump} fueling #{car}..."
        sleep(fueling_time)
        puts "#{pump} fueled #{car} in #{fueling_time} seconds."
      end
    end
  end

  barrier.wait
  cars_waiting = false
end

p Time.now - start

Here, we create five asynchronous tasks, each representing a fuel pump. Each pump will continue to operate as long as there are cars either waiting to be added to the queue or already in the queue.

Output

Car0 is 1 in line
Pump0 fueling Car0...
Car1 is 1 in line
Pump1 fueling Car1...
Car2 is 1 in line
Pump2 fueling Car2... # all pumps busy
Car3 is 1 in line     # queue is growing
Car4 is 2 in line
Car5 is 3 in line
Car6 is 4 in line
Pump0 fueled Car0 in 4.84 seconds.
Pump0 fueling Car3... # first available pump takes a car
Car7 is 4 in line
Car8 is 5 in line
Car9 is 6 in line
Pump1 fueled Car1 in 5.35 seconds.
Pump1 fueling Car4...
Pump0 fueled Car3 in 8.63 seconds.
Pump0 fueling Car5...
Pump2 fueled Car2 in 12.08 seconds.
Pump2 fueling Car6...
Pump0 fueled Car5 in 1.78 seconds.
Pump0 fueling Car7...
Pump2 fueled Car6 in 2.22 seconds.
Pump2 fueling Car8...
Pump1 fueled Car4 in 13.44 seconds.
Pump1 fueling Car9...
Pump2 fueled Car8 in 3.64 seconds.
Pump0 fueled Car7 in 5.56 seconds.
Pump1 fueled Car9 in 4.9 seconds.
24.716897             # total time 

Conclusion

By rewriting our petrol station simulator using the Async gem and Thread::Queue, we’ve probably created a more maintainable and safer concurrent program. The Async gem simplifies the creation and synchronization of asynchronous tasks, while Thread::Queue provides a thread-safe way to manage our cars waiting to be fueled.

I didn’t implement this approach to my Petrol Station Simulator yet, but it may be the right way. I have lately reduced amount of threads generated though. Maybe switching to Fibers and Async library will prove better. Here are some links to learn more about Async:

  • https://socketry.github.io/async/guides/
  • https://brunosutic.com/
  • https://topenddevs.com/podcasts/ruby-rogues/episodes/revolutionizing-ruby-deployment-with-falcon-web-server-and-async-concurrency-framework-ruby-627#player1

Leave a Reply

Your email address will not be published. Required fields are marked *