Mutex and conditional variable

So in a couple of months I’m about to present this thing on petrol station simulator. And it’s better to know what I’ll be talking about beforehand. I mean, that’s the right attitude and order. Therefore I came back to my little hobby project and decided to understand it better.

The issue I was having was about the pumps. When there’s a station with a few pumps, I want to order all cars in one line and let them be served as soon as a pump is free. I could do a line for each pump but my first attempt was to have one line with many pumps.

It turned out that synchronising mutexes is not enough. Because if you do so, the cars may wait at the wrong pump (as some other pump is available). And checking for pumps’ availability also turned out to be a bad idea, because if you’re a Ruby thread you never know when you’ll be deprived of a processor. Some other thread my start just after you checked for the pump but before you took it.

And also, who should manage the line of cars? Should there be some entity between a pump and a car that knows when to allocate a car at the pump? It turns out that nothing like that is necessary.

What worked was a conditional variable. Two good reads on them are here:

  • https://workingwithruby.com/wwrt/condvars/
  • https://vaneyckt.io/posts/ruby_concurrency_in_praise_of_condition_variables/

This, in short, makes all the producer/consumer thing happy. Here’s the code:

lock = Mutex.new
cond_var = ConditionVariable.new
queue = []

cars = 10.times.map do |i|
  Thread.new do
    sleep(i)
    car, fueling_time = "Car#{i}", rand(1..10)
    puts "#{car} is coming..."

    lock.synchronize do
      queue << [car, fueling_time]
      cond_var.signal
    end
  end
end

pumps = 3.times.map do |i|
  Thread.new do
    pump = "Pump#{i}"

    while true
      car, fueling_time = nil

      lock.synchronize do
        while queue.empty?
          cond_var.wait(lock)
        end

        car, fueling_time = queue.shift
      end

      puts "#{pump} fueling #{car}..."
      sleep(fueling_time)
      puts "#{car} fueled in #{fueling_time} seconds."
    end
  end
end

([Thread.new { sleep }] + cars + pumps).each(&:join)

This way the cars will be taken by the pumps, and that’s the right way to do it. At the same time pump waits for the signal from the “manager” that a new car is in the queue.

Also there is a separation of managing the queue and pumping the gas. Taking a car for pumping gas doesn’t lock the possibility for new cars coming into a queue. Taking them from the queue does. This example uses thread-unsafe structures like Array, so that’s why these append/shift operations are in the sync block.

The output is like this:

Car0 is coming...
Pump0 fueling Car0...
Car1 is coming...
Pump1 fueling Car1...
Car2 is coming...
Pump2 fueling Car2...      # All pumps are busy now
Car3 is coming...          # Car3 is first in queue
Car4 is coming...
Car5 is coming...
Car1 fueled in 4 seconds.  # Car1 was at Pump1
Pump1 fueling Car3...      # As soon as Pump1 got released, it takes Car3
Car6 is coming...
Car7 is coming...
Car2 fueled in 5 seconds.
Pump2 fueling Car4...
Car8 is coming...
Car4 fueled in 1 seconds.
Pump2 fueling Car5...
Car9 is coming...
Car0 fueled in 10 seconds.
Pump0 fueling Car6...
Car6 fueled in 1 seconds.
Pump0 fueling Car7...
Car3 fueled in 9 seconds.
Pump1 fueling Car8...
Car7 fueled in 3 seconds.
Pump0 fueling Car9...
Car5 fueled in 8 seconds.   # Last three pumpings
Car8 fueled in 4 seconds.
Car9 fueled in 6 seconds.

An idea for playing with it is to make the cars come infinitely. Or making a separate line for each pump and also helping the car to know in which line it should wait. Have fun!

Leave a Reply

Your email address will not be published. Required fields are marked *