public class MyBlockingQueue { //Instead use internal BlockingQueue of concurrent package
private Object lock = new Object();
private List queue = new LinkedList();
private int limit = 10;
public MyBlockingQueue(int limit) {
this.limit = limit;
}
public void put(Object item) throws InterruptedException { // Synchronize with a LOCK -> wait while queueSize == queueLimit
synchronized (lock) {
while (this.queue.size() == this.limit) {
lock.wait();
}
if (this.queue.size() == 0) { //if while putting queue is EMPTY -> NotifyAll Producer Threds
lock.notifyAll();
}
this.queue.add(item);
}
}
public Object take() throws InterruptedException {
synchronized (lock) {
while (this.queue.size() == 0) {//while removing if size=0 -> lock.wait
lock.wait();
}
if (this.queue.size() == this.limit) {//while removing size = limit -> Notify all Threds
lock.notifyAll();
}
return this.queue.remove(0);
}
}
}
public class Producer implements Runnable{
private final MyBlockingQueue sharedQueue;
public Producer(MyBlockingQueue sharedQueue) {
this.sharedQueue = sharedQueue;
}
@Override
public void run() {
for(int i=1; i<11 b="" i="">
try {
System.out.println("Produced: " + i);
sharedQueue.put(i);
} catch (InterruptedException ex) {
System.out.println("InterruptedException Occured inside Producer");
}
}
}
}11>
public class Consumer implements Runnable{
private final MyBlockingQueue sharedQueue;
public Consumer(MyBlockingQueue sharedQueue) {
this.sharedQueue = sharedQueue;
}
@Override
public void run() {
while(true){
try {
System.out.println("~~~~~~~Consumed: "+ sharedQueue.take());
} catch (InterruptedException ex) {
System.out.println("InterruptedException Occured inside Consumer");
}
}
}
}
public class ProducerConsumerTest {
public static void main(String[] args) {
//Either use internal BlockingQueue or write a custom MyBlockingQueue
BlockingQueue sharedQueue = new LinkedBlockingQueue();
MyBlockingQueue myBlockingQueue = new MyBlockingQueue(10);
Thread producerThread = new Thread(new Producer(myBlockingQueue));
Thread consumerThread = new Thread(new Consumer(myBlockingQueue));
consumerThread.start();
producerThread.start();
}
}
private Object lock = new Object();
private List queue = new LinkedList();
private int limit = 10;
public MyBlockingQueue(int limit) {
this.limit = limit;
}
public void put(Object item) throws InterruptedException { // Synchronize with a LOCK -> wait while queueSize == queueLimit
synchronized (lock) {
while (this.queue.size() == this.limit) {
lock.wait();
}
if (this.queue.size() == 0) { //if while putting queue is EMPTY -> NotifyAll Producer Threds
lock.notifyAll();
}
this.queue.add(item);
}
}
public Object take() throws InterruptedException {
synchronized (lock) {
while (this.queue.size() == 0) {//while removing if size=0 -> lock.wait
lock.wait();
}
if (this.queue.size() == this.limit) {//while removing size = limit -> Notify all Threds
lock.notifyAll();
}
return this.queue.remove(0);
}
}
}
public class Producer implements Runnable{
private final MyBlockingQueue sharedQueue;
public Producer(MyBlockingQueue sharedQueue) {
this.sharedQueue = sharedQueue;
}
@Override
public void run() {
for(int i=1; i<11 b="" i="">
try {
System.out.println("Produced: " + i);
sharedQueue.put(i);
} catch (InterruptedException ex) {
System.out.println("InterruptedException Occured inside Producer");
}
}
}
}11>
public class Consumer implements Runnable{
private final MyBlockingQueue sharedQueue;
public Consumer(MyBlockingQueue sharedQueue) {
this.sharedQueue = sharedQueue;
}
@Override
public void run() {
while(true){
try {
System.out.println("~~~~~~~Consumed: "+ sharedQueue.take());
} catch (InterruptedException ex) {
System.out.println("InterruptedException Occured inside Consumer");
}
}
}
}
public class ProducerConsumerTest {
public static void main(String[] args) {
//Either use internal BlockingQueue or write a custom MyBlockingQueue
BlockingQueue sharedQueue = new LinkedBlockingQueue();
MyBlockingQueue myBlockingQueue = new MyBlockingQueue(10);
Thread producerThread = new Thread(new Producer(myBlockingQueue));
Thread consumerThread = new Thread(new Consumer(myBlockingQueue));
consumerThread.start();
producerThread.start();
}
}
0 comments:
Post a Comment