Skip to content

Producer-Consumer problem

Updated: at 10:13 AM (7 min read)

Overview

Producer-Consumer problem is a classical synchronization problem in the operating system. With the presence of more than one process and limited resources in the system the synchronization problem arises. If one resource is shared between more than one process at the same time then it can lead to data inconsistency. In the producer-consumer problem, the producer produces an item and the consumer consumes the item produced by the producer.

Problem

// Shared buffer without synchronization
class SharedBuffer {
    private val items = mutableListOf<Int>()

    // No synchronization around items.add(...)
    fun produce(item: Int) {
        items.add(item)
    }

    // No synchronization around items.removeAt(...)
    fun consume(): Int? {
        return if (items.isNotEmpty()) {
            // Potential race condition, might throw an exception or return incorrect data
            items.removeAt(0)
        } else {
            null
        }
    }
}

Image

Assumptions

At later point of time, we would be doing limited buffer + more than 1 producer + more than 1 consumer

The First Attempt


fun main() {
    val buffer = SharedBuffer()
    val totalItems = 1000

    // Producer Thread
    val producer = thread {
        for (i in 1..totalItems) {
            buffer.produce(i)
            println("produced item $i")
            // Simulate some processing delay
            Thread.sleep(1)
        }
        println("Producer finished producing $totalItems items.")
    }

    // Consumer Thread
    val consumer = thread {
        var consumedCount = 0
        while (consumedCount < totalItems) {
            val item = buffer.consume()
            if (item != null) {
                println("Consumed: $item")
                consumedCount++
            } else {
                // Race condition: Consumer finds the buffer empty even though more items might be coming
                println("Buffer was empty unexpectedly!")
            }
            Thread.sleep(2)
        }
        println("Consumer finished consuming $consumedCount items.")
    }

    // Wait for both threads to finish
    producer.join()
    consumer.join()

    println("Main thread finished.")
}

Outputs Problem 1

Consumed: 689
Consumed: 690
Consumed: 692
Consumed: 693

in above output 691 was missing, WHY?

Outputs Problem 2

Consumed: 995
Consumed: 996
Consumed: 997
Consumed: 998
Consumed: 999
Consumed: 1000
Buffer was empty unexpectedly!
Buffer was empty unexpectedly!
Buffer was empty unexpectedly!

why did we get such outputs

Reason for above 2 outputs

It can be said that 2 threads are competing against each other, one is adding item and one is removing item, now this can lead to race condition and inconsistent results.

The Solution?

Make this ++ atomic, lets see

No improvements, as explained below

consumedCount++ translates into multiple steps internally (read → increment → write). If two threads call this simultaneously, one update could “overwrite” the other, resulting in a lost increment. An AtomicInteger ensures that all increments happen atomically (in a thread-safe manner), so your final count will always be accurate.

But it was just one thread in above case

Second Attempt

Assumption, buffer is too large, no constraint on its size

Mutex api’s are coroutine/suspend types, so lets use ReentrantLock

Modifying SharedBuffer Code

// Shared buffer without synchronization
class SharedBuffer {
    private val items = mutableListOf<Int>()
    private val lock = ReentrantLock()

    // No synchronization around items.add(...)
    fun produce(item: Int) {
        lock.withLock {
            items.add(item)
        }
    }

    // No synchronization around items.removeAt(...)
    fun consume(): Int? {
        lock.withLock {
            return if (items.isNotEmpty()) {
                // Potential race condition, might throw an exception or return incorrect data
                items.removeAt(0)
            } else {
                null
            }
        }
    }
}

Output

Consumed: 994
Consumed: 995
Consumed: 996
Consumed: 997
Consumed: 998
Consumed: 999
Consumed: 1000
Consumer finished consuming 1000 items.
Main thread finished.

So that Buffer was empty unexpectedly! problem is solved.

The earlier problems happened because of race condition and now with locks its solved.

Fixed sized buffer

Now, we have to create a signalling mechanism to tell another thread that unblock itself as condition is true

A signaling mechanism (e.g., condition variables) allows threads to:

// Shared buffer without synchronization
class FixedBuffer(private val capacity: Int = 10) {
    private val items = mutableListOf<Int>()

    // Mutex/Lock
    private val lock = ReentrantLock()

    // Condition variables
    private val notFull: Condition = lock.newCondition()
    private val notEmpty: Condition = lock.newCondition()

    fun produce(item: Int) {
        lock.withLock {
            // If the buffer is full, wait until there's space
            while (items.size == capacity) {
                println("Producer waiting... (buffer full with ${items.size} items)")
                notFull.await()
            }

            // Now we can safely add
            items.add(item)
            println("Produced: $item | Buffer size: ${items.size}")

            // Signal that there's now something to consume
            notEmpty.signal()
        }
    }

    fun consume(): Int {
        lock.withLock {
            // If the buffer is empty, wait until there's an item
            while (items.isEmpty()) {
                println("Consumer waiting... (buffer empty)")
                notEmpty.await()
            }

            // Remove item
            val removed = items.removeAt(0)
            println("Consumed: $removed | Buffer size: ${items.size}")

            // Signal producer that there's space now
            notFull.signal()

            return removed
        }
    }
}

Other Signalling mechanism

BlockingQueue

Specially designed for this problem, it blocks while putting if buffer is full, and similarly it blocks when taking data out if buffer is already empty.

// Shared buffer without synchronization
class FixedBuffer(private val capacity: Int = 10) {
    // Create a blocking queue with fixed capacity.
    private val queue: BlockingQueue<Int> = ArrayBlockingQueue(capacity)

    // Produces an item by putting it into the queue. Blocks if full.
    fun produce(item: Int) {
        queue.put(item)  // Blocks if the queue is full.
        println("Produced: $item | Buffer size: ${queue.size}")
    }

    // Consumes an item by taking it from the queue. Blocks if empty.
    fun consume(): Int {
        val item = queue.take()  // Blocks if the queue is empty.
        println("Consumed: $item | Buffer size: ${queue.size}")
        return item
    }
}

Channels

// FixedBuffer class that uses a Channel with fixed capacity.
class FixedBuffer<T>(capacity: Int = 10) {
    private val channel = Channel<T>(capacity)

    // Produces an item by sending it to the channel.
    // If the channel (buffer) is full, this suspends until space is available.    suspend fun produce(item: T) {
        channel.send(item)
        // remainingCapacity tells us how many more items can be added before blocking.
        println("Produced: $item")
    }

    // Consumes an item by receiving it from the channel.
    // If the channel is empty, this suspends until an item is available.    suspend fun consume(): T {
        val item = channel.receive()
        println("Consumed: $item")
        return item
    }
}

fun main() = runBlocking {
    val totalItems = 100
    val buffer = FixedBuffer<Int>(10) // Fixed buffer with capacity 10

    // Launch a coroutine for the producer.    val producer = launch {
        for (i in 1..totalItems) {
            buffer.produce(i)
            delay(50)  // Simulate a fast production pace
        }
        println("Producer finished producing $totalItems items.")
    }

    // Launch a coroutine for the consumer.
    val consumer = launch {
        repeat(totalItems) {
            buffer.consume()
            delay(150) // Simulate a slower consumption pace
        }
        println("Consumer finished consuming $totalItems items.")
    }

    producer.join()
    consumer.join()
    println("Main coroutine finished.")
}

Channels were introduced as a high-level, coroutine-friendly abstraction for communication and synchronization between concurrent tasks. Rather than manually handling low-level thread synchronization primitives like locks and condition variables, channels provide a way to safely pass messages (or data) between producers and consumers.

More than 1 producer and more than 1 consumer

Image

fun main() {
    val buffer = FixedBuffer(10)

    // Settings: 3 producers and 3 consumers.
    val totalProducers = 3
    val totalConsumers = 3
    // Each producer produces 20 items (total 60 items).
    val itemsPerProducer = 20
    // Each consumer consumes 20 items (total 60 items).
    val itemsPerConsumer = 20

    // List to keep track of producer and consumer threads.
    val producers = mutableListOf<Thread>()
    val consumers = mutableListOf<Thread>()

    // Launch multiple producers.
    for (p in 1..totalProducers) {
        val producerThread = thread(start = true) {
            for (i in 1..itemsPerProducer) {
                // Unique item number: calculates an overall sequence.
                val item = (p - 1) * itemsPerProducer + i
                buffer.produce(item)
                Thread.sleep(50) // Fast production pace.
            }
            println("Producer $p finished producing $itemsPerProducer items.")
        }
        producers.add(producerThread)
    }

    // Launch multiple consumers.
    for (c in 1..totalConsumers) {
        val consumerThread = thread(start = true) {
            for (i in 1..itemsPerConsumer) {
                buffer.consume()
                Thread.sleep(100) // Slower consumption pace.
            }
            println("Consumer $c finished consuming $itemsPerConsumer items.")
        }
        consumers.add(consumerThread)
    }

    // Wait for all threads to complete.
    producers.forEach { it.join() }
    consumers.forEach { it.join() }

    println("Main thread finished.")
}

That is it!