wait()/notify() (2)Revised: May/2nd/2002; Since: Apr./28th/2002
ここではシナリオに基づいて wait()/notify() を実装したプログラムを作ってみましょう。
Warehouse 型の共有オブジェクトを Consumer スレッドと Producer スレッドが利用します。
Producer スレッドが値をセットするのを待って Consumer スレッドが値を取得し、逆に Consumer スレッドが値を取得し終わるのを待って Producer スレッドが新しい値をセットします。
この実行と再開のタイミングがずれると、値がセットされる前に取得してその後の実行に問題が生じたり、値が取得される前に新しい値がセットされて古い値が処理されることなく上書きされてしまったりします。
Warehouse共有オブジェクト。
put()msg に値をセットし、 state に true をセット。get()msg の値を返し、 state に false をセット。Consumer共有オブジェクトから値を取り出すスレッド。一つの Warehouse 型共有オブジェクトに対して、複数の Consumer 型オブジェクトが存在するかもしれない。
run()Warehouse オブジェクトの get() を呼び出す。Producer共有オブジェクトに値をセットするスレッド。一つの Warehouse 型共有オブジェクトに対して、複数の Producer 型オブジェクトが存在するかもしれない。
run()Warehouse オブジェクトの put() を呼び出す。このプログラムでは Consumer オブジェクトと Producer オブジェクトが共有オブジェクトに所定の順番で排他的にアクセスする必要がありますので、共有オブジェクトに対する同期が必要です。
このシナリオで最善の方法は、共有オブジェクト側で自分を利用するオブジェクトの実行を停止させたり実行許可を与えたりすることです。
実行の待機、再開許可は wait() メソッド、 notify()/notifyAll() メソッドを利用します。これらのメソッドは Object クラスで定義されており、任意のクラスで使うことが出来ますが、 synchronized で同期指定されたコードの内部でなければ使えません。
Warehouse共有オブジェクト Warehouse のメンバーは次の4つです。
int msgboolean statemsg が読み込まれて新しい値が書き込み可能になれば false になり、新しい値がセットされて読み込み可能になれば true になります。void put(int val)state が true であればロックを取得しているスレッドを待機させ、 false になればメソッド引数を msg にセットします。このとき state は true に戻します。int get()state が false であればロックを取得しているスレッドを待機させ、 true になれば msg の値を返します。このとき state は false に戻します。
class Warehouse {
private int msg;
private boolean state = false;
public synchronized void put(int val) {
while (state == true) {
// 値がセットされていれば繰り返し
// 読み込まれれば繰り返し終了
try {
wait(); // msg に値がセットされていれば待機
} catch (InterruptedException e) {
System.out.println(e);
}
}
msg = val;
state = true; // msg に値がセットされたら true
notifyAll();
}
public synchronized int get() {
while (state == false) {
// 読み込み済みならば繰り返し
// 新しい値がセットされれば繰り返し終了
try {
wait(); // msg の値を読み込み済みならば待機
} catch (InterruptedException e) {
System.out.println(e);
}
}
state = false; // msg の値を読み込んだら false
notifyAll();
return msg;
}
}
ProducerWarehouse 型オブジェクトに値をセットするスレッド・クラスです。
class Producer implements Runnable { // 共有オブジェクト private Warehouse msg; // コンストラクタ Producer(Warehouse obj) { msg = obj; } // スレッドの run() メソッド public void run() { // 共有オブジェクトに値を5回セット for (int i = 0; i < 5; i++) { msg.put(i); System.out.println("Producer: " + i); // 作為的にタイミングをずらすスリープ try { Thread.sleep(1000); } catch (InterruptedException e) { System.out.println(e); } } } }
ConsumerWarehouse 型オブジェクトの値をゲットするスレッド・クラスです。
class Consumer implements Runnable { // 共有オブジェクト private Warehouse msg; // コンストラクタ Consumer(Warehouse obj) { msg = obj; } // スレッドの run() メソッド public void run() { // 共有オブジェクトの値を5回ゲット for (int i = 0; i < 5; i++) { int val = msg.get(); System.out.println("Consumer: " + val); } } }
以上のクラスをインスタンス化して利用するためのテストクラスです。
ConsProdTest.java:
class ConsProdTest {
public static void main(String[] args) {
// 共有オブジェクト
Warehouse share = new Warehouse();
// 読み取りスレッド
Consumer cons = new Consumer(share);
// 書き込みスレッド
Producer prod = new Producer(share);
// ランナブルクラスをスレッドに受け渡し
Thread threCons = new Thread(cons);
Thread threProd = new Thread(prod);
// スレッド開始
threCons.start();
threProd.start();
}
}
C:\java\thread>javac ConsProdTest.java C:\java\thread>java ConsProdTest Consumer: 0 Producer: 0 Consumer: 1 Producer: 1 Consumer: 2 Producer: 2 Producer: 3 Consumer: 3 Consumer: 4 Producer: 4 C:\java\thread>