남궁성님의 Java의 정석(3rd Edition)을 보고 정리한 글입니다.
1. 쓰레드 동기화
- 두 개 이상의 쓰레드가 공유 데이터에 동시에 접근하게 되면 예상과 벗어난 결과가 나타날 수 있다. 이러한 문제를 해결하는 것이 쓰레드 동기화이다.
- 쓰레드 동기화를 이해하기 위해서는 임계영역과 락에 대한 이해가 필요하다.
- 임계영역(Critical section): 여러 쓰레드가 동시에 접근하면 문제가 발생할 수 있는 부분
- 잠금(Lock): 임계 영역에 대한 접근을 제어하기 위한 도구
2. synchronized를 이용한 동기화
a. 임계 영역을 지정하는 2가지 방법
// 1. 메서드 전체를 임계 영역으로 지정
public synchronized void calcSum() {
// 임계 영역
}
// 2. 특정 영역을 임계 영역으로 지정
synchronized (객체의 참조변수) {
// 임계 영역
}
- 1, 2번 모두 블럭 영역 안에 들어가면서 쓰레드는 lock을 획득하고 블럭을 벗어나면 lock을 반납한다.
- 임계영역은 멀티스레드 프로그램의 성능을 좌우하기 때문에 가능한 임계영역을 최소화하는 것이 좋다.
b. 동기화를 하지 않을 경우 발생하는 코드
public class ThreadEx21 {
public static void main(String[] args) {
Runnable r = new RunnableEx21();
new Thread(r).start();
new Thread(r).start();
}
}
class Account {
private int balance = 1000;
public int getBalance() {
return balance;
}
public void withdraw(int money) {
if(balance >= money) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
balance -= money;
}
}
}
class RunnableEx21 implements Runnable {
Account acc = new Account();
public void run() {
while (acc.getBalance() > 0) {
// 100, 200, 300 중의 한 값을 임의로 선택해서 출금(withdraw)
int money = (int) (Math.random() * 3 + 1) * 100;
acc.withdraw(money);
System.out.println("balance: " + acc.getBalance());
}
}
}
실행결과
balance: 600
balance: 600
balance: 300
balance: 300
balance: 0
balance: -100
- balance가 출금금액(money)보다 작으면 출금을 못하도록 코드를 구성했다.
- 하지만 결과에 음수가 나온다.
- 이유는 한 쓰레드가 if문의 조건식을 통과하고 출금하기 바로 직전에 다른 쓰레드가 끼어들어서출금을 먼저 했기 때문이다.
- 동기화를 적용하면 이러한 문제를 해결할 수 있다.
c. 동기화를 적용한 코드
public class ThreadEx21 {
public static void main(String[] args) {
Runnable r = new RunnableEx21();
new Thread(r).start();
new Thread(r).start();
}
}
class Account {
private int balance = 1000; // private으로 해야 동기화 의미가 있다.
public int getBalance() {
return balance;
}
public synchronized void withdraw(int money) {
if(balance >= money) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
balance -= money;
}
}
}
class RunnableEx21 implements Runnable {
Account acc = new Account();
public void run() {
while (acc.getBalance() > 0) {
// 100, 200, 300 중의 한 값을 임의로 선택해서 출금(withdraw)
int money = (int) (Math.random() * 3 + 1) * 100;
acc.withdraw(money);
System.out.println("balance: " + acc.getBalance());
}
}
}
실행결과
balance: 700
balance: 400
balance: 100
balance: 100
balance: 100
balance: 100
balance: 0
balance: 0
- 음수값이 안 나오는 것을 확인할 수 있다.(동기화 처리 O)
3. wait(), notify(), notifyAll()을 이용한 동기화
- 특정 쓰레드가 객체의 락을 오랜 시간 가지는 하는 상황으로 인해 동기화 효율이 떨어지는 상황을 개선하기 위해 사용
- wait()를 호출하여 쓰레드가 락을 반납하고 대기상태가 되고, 다른 쓰레드가 락을 얻어 해당 객체에 대한 작업을 수행할 수 있게 된다.
a. 동기화를 적용하지 않은 코드
public class ThreadWaitEx {
public static void main(String[] args) throws InterruptedException {
Table table = new Table(); // 여러 쓰레드가 공유하는 객체
new Thread(new Cook(table), "COOK1").start();
new Thread(new Customer(table, "donut"), "CUST1").start();
new Thread(new Customer(table, "burger"), "CUST2").start();
Thread.sleep(100);
System.exit(0);
}
}
class Customer implements Runnable {
private Table table;
private String food;
Customer(Table table, String food) {
this.table = table;
this.food = food;
}
@Override
public void run() {
while (true) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
}
String name = Thread.currentThread().getName();
if (eatFood())
System.out.println(name + " ate a " + food);
else
System.out.println(name + " failed to eat. :(");
}
}
boolean eatFood() {
return table.remove(food);
}
}
class Cook implements Runnable {
private Table table;
Cook(Table table) {
this.table = table;
}
@Override
public void run() {
while (true) {
// 임의의 요리를 하나 선택해서 table에 추가
int idx = (int) (Math.random() * table.dishNum());
table.add(table.dishNames[idx]);
try {
Thread.sleep(1);
} catch (InterruptedException e) {}
}
}
}
class Table {
String[] dishNames = {"donut", "donut", "burger"};
final int MAX_FOOD = 6;
private ArrayList<String> dishes = new ArrayList<>(); // 실제 음식을 담는 접시
public void add(String dish) {
if(dishes.size() >= MAX_FOOD)
return;
dishes.add(dish);
System.out.println("Dishes: " + dishes.toString());
}
public boolean remove(String dishName) {
// 지정된 요리와 일치하는 요리를 테이블에서 제거
for(int i=0; i<dishes.size(); i++)
if(dishName.equals(dishes.get(i))) {
dishes.remove(i);
return true;
}
return false;
}
public int dishNum() {
return dishNames.length;
}
}
실행결과
Dishes: [donut, donut, donut, donut, donut, burger]
CUST2 ate a burger
CUST1 ate a donut
CUST2 ate a burger
Exception in thread "COOK1" java.util.ConcurrentModificationException
at java.base/java.util.ArrayList$Itr.checkForComodification(ArrayList.java:1013)
at java.base/java.util.ArrayList$Itr.next(ArrayList.java:967)
at java.base/java.util.AbstractCollection.toString(AbstractCollection.java:456)
at org.example.threading.Table.add(ThreadWaitEx.java:80)
at org.example.threading.Cook.run(ThreadWaitEx.java:62)
at java.base/java.lang.Thread.run(Thread.java:833)
CUST1 ate a donut
CUST2 failed to eat. :(
.. 생략
- 요리사는 1ms 주기로 랜덤으로 음식을 만들어 Table에 놓는다.
- 고객은 10ms 주기로 Table에 있는 지정된 음식을 먹는다.
- 요리사와 고객은 각각 쓰레드로 구현되어 있고, 하나의 Table 객체에 대해서 공유하고 있다.
- 실행 때 마다 서로 다른 결과가 나오는데, 여러개의 쓰레드가 Table 인스턴스를 공유하고 있어서 ConcurrentModificationException 예외나 IndexOutOfBoundsException이 발생한다.
b. synchronized 적용 코드
public class ThreadWaitEx {
public static void main(String[] args) throws InterruptedException {
Table table = new Table(); // 여러 쓰레드가 공유하는 객체
new Thread(new Cook(table), "COOK1").start();
new Thread(new Customer(table, "donut"), "CUST1").start();
new Thread(new Customer(table, "burger"), "CUST2").start();
Thread.sleep(5000);
System.exit(0);
}
}
class Customer implements Runnable {
private Table table;
private String food;
Customer(Table table, String food) {
this.table = table;
this.food = food;
}
@Override
public void run() {
while (true) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
}
String name = Thread.currentThread().getName();
if (eatFood())
System.out.println(name + " ate a " + food);
else
System.out.println(name + " failed to eat. :(");
}
}
boolean eatFood() {
return table.remove(food);
}
}
class Cook implements Runnable {
private Table table;
Cook(Table table) {
this.table = table;
}
@Override
public void run() {
while (true) {
// 임의의 요리를 하나 선택해서 table에 추가
int idx = (int) (Math.random() * table.dishNum());
table.add(table.dishNames[idx]);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
}
}
}
class Table {
String[] dishNames = {"donut", "donut", "burger"};
final int MAX_FOOD = 6;
private ArrayList<String> dishes = new ArrayList<>(); // 실제 음식을 담는 접시
public synchronized void add(String dish) {
if (dishes.size() >= MAX_FOOD)
return;
dishes.add(dish);
System.out.println("Dishes: " + dishes.toString());
}
public boolean remove(String dishName) {
synchronized (this) {
while (dishes.size() == 0) {
String name = Thread.currentThread().getName();
System.out.println(name + " is waiting.");
try {
Thread.sleep(500);
} catch (InterruptedException e) {
}
}
for (int i = 0; i < dishes.size(); i++)
if (dishName.equals(dishes.get(i))) {
dishes.remove(i);
return true;
}
}
return false;
}
public int dishNum() {
return dishNames.length;
}
}
Dishes: [donut]
CUST2 failed to eat. :(
CUST1 ate a donut
CUST2 is waiting. <- donut이 없어서 테이블에 lock을 건 채로 계속 기다리고 있다.
CUST2 is waiting.
CUST2 is waiting.
생략..
- synchronized로 동기화 했지만, 음식이 없어서 테이블 락을 건채로 계속 기다리게 된다.
- wait()과 notify()을 적용하지 않으면 쓰레드는 락을 오래 시간 가져 프로그램이 원활하게 진행되지 않는다.
c. wait(), notify() 적용코드
import java.util.ArrayList;
public class ThreadWaitEx {
public static void main(String[] args) throws InterruptedException {
Table table = new Table(); // 여러 쓰레드가 공유하는 객체
new Thread(new Cook(table), "COOK").start();
new Thread(new Customer(table, "donut"), "CUST1").start();
new Thread(new Customer(table, "burger"), "CUST2").start();
Thread.sleep(2000);
System.exit(0);
}
}
class Customer implements Runnable {
private Table table;
private String food;
Customer(Table table, String food) {
this.table = table;
this.food = food;
}
public void run() {
while(true) {
try { Thread.sleep(100);} catch(InterruptedException e) {}
String name = Thread.currentThread().getName();
table.remove(food);
System.out.println(name + " ate a " + food);
} // while
}
}
class Cook implements Runnable {
private Table table;
Cook(Table table) { this.table = table; }
public void run() {
while(true) {
int idx = (int)(Math.random()*table.dishNum());
table.add(table.dishNames[idx]);
try { Thread.sleep(10);} catch(InterruptedException e) {}
} // while
}
}
class Table {
String[] dishNames = { "donut","donut","burger" }; // donut의 확률을 높인다.
final int MAX_FOOD = 6;
private ArrayList<String> dishes = new ArrayList<>();
public synchronized void add(String dish) {
while(dishes.size() >= MAX_FOOD) {
String name = Thread.currentThread().getName();
System.out.println(name+" is waiting.");
try {
wait();
Thread.sleep(500);
} catch(InterruptedException e) {}
}
dishes.add(dish);
notify();
System.out.println("Dishes:" + dishes.toString());
}
public void remove(String dishName) {
synchronized(this) {
String name = Thread.currentThread().getName();
while(dishes.size()==0) {
System.out.println(name+" is waiting.");
try {
wait();
Thread.sleep(500);
} catch(InterruptedException e) {}
}
while(true) {
for(int i=0; i<dishes.size();i++) {
if(dishName.equals(dishes.get(i))) {
dishes.remove(i);
notify();
return;
}
} // for문의 끝
try {
System.out.println(name+" is waiting.");
wait();
Thread.sleep(500);
} catch(InterruptedException e) {}
} // while(true)
} // synchronized
}
public int dishNum() { return dishNames.length; }
}
- wait()로 락을 풀고 기다리다가 특정 작업이 끝나면 notify로() 통보를 받고 다시 lock을 얻어서 진행하기 때문에 단순히 synchronized로 동기화 한 코드보다 원활하게 진행된다.
- 하지만 쓰레드 중에서 하나를 임의로 선택해서 통지할 뿐 쓰레드를 선택해서 통지할 수 없다.
- 그로인해 특정 쓰레드가 계속해서 통지받지 못하고 오랫동안 대기하게 되는데 이를 가아(starvation) 현상이라고 한다.
- notifyAll()을 활용하면 모든 쓰레드에게 통지할 수 있어 가아현상에 대비할 수 있는데, 여러 쓰레드가 락을 얻기 위해 서로 경쟁하는데 이를 경쟁상태(Race Condition)이라고 한다.
- 경쟁상태를 개선하기 위해 Look과 Condition을 이용하면 된다.
4. Look, Condition 이용한 동기화
a. ReentrantLock과 Condition 적용한 코드
public class ThreadWaitEx {
public static void main(String[] args) throws InterruptedException {
Table table = new Table(); // 여러 쓰레드가 공유하는 객체
new Thread(new Cook(table), "COOK1").start();
new Thread(new Customer(table, "donut"), "CUST1").start();
new Thread(new Customer(table, "burger"), "CUST2").start();
Thread.sleep(2000);
System.exit(0);
}
}
class Customer implements Runnable {
private Table table;
private String food;
Customer(Table table, String food) {
this.table = table;
this.food = food;
}
public void run() {
while (true) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
String name = Thread.currentThread().getName();
table.remove(food);
System.out.println(name + " ate a " + food);
}
}
}
class Cook implements Runnable {
private Table table;
Cook(Table table) {
this.table = table;
}
public void run() {
while (true) {
int idx = (int) (Math.random() * table.dishNum());
table.add(table.dishNames[idx]);
try {
Thread.sleep(10);
} catch (InterruptedException e) {
}
} // while
}
}
class Table {
String[] dishNames = {"donut", "donut", "burger"}; // donut의 확률을 높인다.
final int MAX_FOOD = 6;
private ArrayList<String> dishes = new ArrayList<>();
private ReentrantLock lock = new ReentrantLock();
private Condition forCook = lock.newCondition();
private Condition forCust = lock.newCondition();
public void add(String dish) {
lock.lock();
try {
while (dishes.size() >= MAX_FOOD) {
String name = Thread.currentThread().getName();
System.out.println(name + " is waiting.");
try {
forCook.await(); // COOK 쓰레드를 기다리게 한다.
Thread.sleep(500);
} catch (InterruptedException e) {
}
}
dishes.add(dish);
forCust.signal(); // 기다리고 있는 CUST를 깨우기 위함.
System.out.println("Dishes: " + dishes.toString());
} finally {
lock.unlock();
}
}
public void remove(String dishName) {
lock.lock();
String name = Thread.currentThread().getName();
try {
while (dishes.size() == 0) {
System.out.println(name + " is waiting.");
try {
forCust.await(); // CUST 쓰레드를 기다리게 한다.
Thread.sleep(500);
} catch (InterruptedException e) {
}
}
while (true) {
for (int i = 0; i < dishes.size(); i++) {
if (dishName.equals(dishes.get(i))) {
dishes.remove(i);
forCook.signal(); // 잠자고 있는 COOK을 깨우기 위함
return;
}
}
try {
System.out.println(name + " is waiting.");
forCust.await(); // 원하는 음식이 없는 CUST 쓰레드를 기다리게 한다.
Thread.sleep(500);
} catch (InterruptedException e) {
}
}
} finally {
lock.unlock();
}
}
public int dishNum() {
return dishNames.length;
}
}
- 쓰레드 종류에 따라 wating pool에서 따로 기다리도록 하여 선별적으로 통지할 수 있다.
- 요리 쓰레드가 통지를 받아야하는 상황에서 손님 쓰레드가 통지를 받는 경우가 없어졌다. ⇒ 기아 현상, 경쟁 상태 개선
- 특정 쓰레드를 선택할 수 없기 때문에 기아현상과 경쟁상태는 여전히 남아있으며, Condition을 조금 더 세분화하면 이를 개선할 수 있다.(고객을 세분화해서 고객이 먹는 음식이 없으면 다른 고객을 깨우는 식으로 수정 가능 )
5. volatile
- 코어마다 별도의 캐시를 가지고 있다.
- 코어는 메모리에서 읽어온 값을 캐시에 저장한다.
- 그로 인해 캐시에 저장된 값이 갱신되지 않아서 메모리에 저장된 값과 다른 경우가 발생한다.
- volatile을 붙혀주면 캐시가 아닌 메모리에서 값을 읽기 때문에 불일치 문제를 해결한다.
volatile long balance;
synchronized int getBalance() {
return balance;
}
synchronized void withdraw(int money) {
if(balance >= money) {
balance -= money;
}
}
그렇다고 해서 위 코드에서 getBalance() 메서드를 동기화 하지 않으면 withdraw()가 호출되어 객체에 락을 걸고 작업중인데 getBalance()가 호출되는 현상이 발생해서 예상치 못한 결과가 나올 수 있다.
6. fork & join 프레임
- jdk1.7부터 추가되었다.
- 하나의 작업을 작은 단위로 나눠 여러 쓰레드가 동시에 처리하는 것을 쉽게 해준다.
- RecursiveAction 또는 RecursiveTask를 상속받아서 구현
- RecursiveAction: 반환값이 없는 작업을구현할 때 사용
- RecursiveTask: 반환값이 있는 작업을 구현할 때 사용
- fork(): 해당 작업을 쓰레드 풀의 작업 큐에 넣는다. 비동기 메서드
- join(): 해당 작업의 수행이 끝날 때까지 기다렸다가, 수행이 끝나면 그 결과를 반환한다. 동기메서드
예제
더할 숫자를 반으로 나눠서 한쪽에는 fork()를 호출해서 작업큐에 저장하고, 하나의 쓰레드는 compute()를 재귀호출 하면서 작업을 계속해서 반으로 나누고, 다른 쓰레드는 fork()에 의해 작업 큐에 추가된 작업을 수행한다.
작업큐가 비어있는 쓰레드가 다른 쓰레드의 작업을 가져와서 작업을 수행하는데 이것을 작업 work stealing이라고 한다.
실행결과를 보면 fork&join을 사용하여 계산한 결과보다 for문으로 작성한 결과가 빨리 구해진다. 항상 멀티쓰레드로 처리하는 것이 빠른 것은 아니며 테스트 해보고 이득이 있을 때 사용하는 것이 좋다.
public class ForkJoinEx {
static final ForkJoinPool pool = new ForkJoinPool(); // 쓰레드 풀 생성
public static void main(String[] args) {
long from = 1L, to = 100_000_000L;
SumTask task = new SumTask(from, to);
long start = System.currentTimeMillis();
Long result = pool.invoke(task);
System.out.println("Elapsed time(4 core) : " + (System.currentTimeMillis() - start));
System.out.printf("sum of %d~%d=%d\\n", from, to, result);
System.out.println();
result = 0L;
start = System.currentTimeMillis();
for (long i = from; i <= to; i++)
result += i;
System.out.println("Elapsed time(1 core): " + (System.currentTimeMillis() - start));
System.out.printf("sum of %d~%d=%d\\n", from, to, result);
}
}
class SumTask extends RecursiveTask<Long> {
long from, to;
SumTask(long from, long to) {
this.from = from;
this.to = to;
}
@Override
protected Long compute() {
long size = to - from + 1;
if (size <= 5)
return sum();
long half = (from + to) / 2;
// 범위를 반으로 나눠서 두 개의 작업을 생성
SumTask leftSum = new SumTask(from, half);
SumTask rightSum = new SumTask(half + 1, to);
leftSum.fork(); // 작업 큐에 넣는다.
return rightSum.compute() + leftSum.join(); // 동기 메서드(호출 결과를 기다린다.)
}
long sum() {
long tmp = 0L;
for (long i = from; i <= to; i++)
tmp += i;
return tmp;
}
}
실행결과
Elapsed time(4 core) : 314
sum of 1~100000000=5000000050000000
Elapsed time(1 core): 264
sum of 1~100000000=5000000050000000
'Programming > Java' 카테고리의 다른 글
[Java] 클래스 생성과 인스턴스 생성 (1) | 2024.01.05 |
---|---|
[Java] 스트림(Stream) (0) | 2023.12.10 |
[Java] 쓰레드(Thread) - 5(쓰레드 상태와 실행제어) (0) | 2023.12.10 |
[Java] 쓰레드(Thread) - 4(데몬 쓰레드) (0) | 2023.12.10 |
[Java] 쓰레드(Thread) - 3(싱글쓰레드 vs. 멀티쓰레드) (0) | 2023.12.10 |