Producer Consumer using Custom BlockingQueue

public class MyBlockingQueue { //Instead use internal BlockingQueue of concurrent package
  
    private Object lock = new Object();
    private List queue = new LinkedList();
    private int limit = 10;

    public MyBlockingQueue(int limit) {
        this.limit = limit;
    }

    public void put(Object item) throws InterruptedException {    // Synchronize with a LOCK -> wait while queueSize == queueLimit
        synchronized (lock) {
            while (this.queue.size() == this.limit) {
                lock.wait();
            }
            if (this.queue.size() == 0) { //if while putting queue is EMPTY -> NotifyAll Producer Threds
                lock.notifyAll();
            }
            this.queue.add(item);
        }
    }

    public Object take() throws InterruptedException {
        synchronized (lock) {
            while (this.queue.size() == 0) {//while removing if size=0 -> lock.wait
                lock.wait();
            }
            if (this.queue.size() == this.limit) {//while removing size = limit -> Notify all Threds
                lock.notifyAll();
            }

            return this.queue.remove(0);
        }
    }
}



public class Producer implements Runnable{
   
   private final MyBlockingQueue sharedQueue;

   public Producer(MyBlockingQueue sharedQueue) {
       this.sharedQueue = sharedQueue;
   }

   @Override
   public void run() {
       for(int i=1; i<11 b="" i="">
           try {
               System.out.println("Produced: " + i);
               sharedQueue.put(i);
           } catch (InterruptedException ex) {
               System.out.println("InterruptedException Occured inside Producer");
           }
       }
   }

}



public class Consumer implements Runnable{
   
    private final MyBlockingQueue sharedQueue;
   
    public Consumer(MyBlockingQueue sharedQueue) {
        this.sharedQueue = sharedQueue;
    }

    @Override
    public void run() {
        while(true){
            try {
                System.out.println("~~~~~~~Consumed: "+ sharedQueue.take());
            } catch (InterruptedException ex) {
                 System.out.println("InterruptedException Occured inside Consumer");
            }
        }
      
    }

}




public class ProducerConsumerTest {

    public static void main(String[] args) {
      
          //Either use internal BlockingQueue or write a custom MyBlockingQueue
         BlockingQueue sharedQueue = new LinkedBlockingQueue();
       
         MyBlockingQueue  myBlockingQueue = new MyBlockingQueue(10);

         


         Thread producerThread = new Thread(new Producer(myBlockingQueue));
       
         Thread consumerThread = new Thread(new Consumer(myBlockingQueue));
                
       
         consumerThread.start();
         producerThread.start();



    }

}

0 comments: