스레드의 동작 (feat. Python)

7 분 소요

Process vs Thread

  1. 프로세스
    • 운영체제로부터 할당받는 자원의 단위이다.
    • 실행 중인 프로그램이다. (vscode, pycharm, chrome…)
    • 각 프로세스마다 cpu와 메모리가 독립적이다.
    • code, data, stack, heap이 각 프로세스마다 전부 독립적으로 사용된다.
    • 최소 1개의 메인 스레드를 보유한다.
    • 파이프, 소켓, 파일 등을 사용해서 프로세스간 통신이 가능하다. (context switching)
  2. 스레드

    • 프로세스 내에서 실행되는 흐름의 단위이다.
    • 프로세스의 자원을 사용한다.
    • stack만 별도로 스레드마다 할당하고 나머지(code, data, heap)은 공유한다.
    • 메모리를 공유한다. (변수 공유)
    • 한 스레드의 결과가 다른 스레드에 영향을 끼친다.
    • 동기화 문제는 정말 주의해야한다. (디버깅의 어려움)
  3. 멀티 스레드
  • 한 개의 단일 어플리케이션(하나의 프로세스)이 여러 스레드로 구성되어 작업을 처리
  • 자원 공유 문제 (교착 상태)
  • cost 비용이 낮음 (하나의 프로세스의 자원을 공유하기 때문에)
  • 스레드간 통신이 쉬움
  1. 멀티 프로세스
  • 한 개의 단일 어플리케이션(하나의 프로세스)이 여러 프로세스로 구성되어 작업을 처리
  • 한 개의 프로세스 문제 발생은 확산이 없음 (각각의 프로세스가 독립된 자원을 가지기 때문에
  • 캐시 체인지, cost 비용이 매우 높음 (오버헤드)
  • 프로세스간 통신이 어려움

https://kamang-it.tistory.com/entry/Go18고루틴과-동시성-프로그래밍쓰레드를-대체한다


GIL (Global Interpreter Lock)

  • 파이썬 고유의 멀티스레드 방식 (결론은 싱글 스레드로 동작하는게 제일 좋다….)
  • python코드를 cpython이라는 컴파일러가 해석을 한다? → 바이트 코드로 바뀐다.
  • GIL
    • 단일 스레드만이 python object에 접근하게 제한하는 mutex (하나의 작업을 무조건 하나의 스레드로만 하는 것 - 싱글 스레드)
    • 멀티 스레드로 하게 되면 cpython 메모리 관리가 취약하다 (not thread-safe)
    • 멀티 스레드로 해야하는 작업은 거의 I/O
    • 멀티 프로세스를 하면 된다.
    • 병렬 처리는 멀티 프로세싱과 asyncio 등을 사용하면 된다.

Image


기본 Thread 기본동작

def sub_thread(name):
    logging.info('%s start!!!', name)
    time.sleep(3)
    logging.info('%s finish!!!', name)

if __name__ == '__main__':
    logging.basicConfig(format='%(asctime)s - %(message)s', level=logging.INFO, datefmt='%H:%M:%S')
    logging.info('Main-thread : before creating thread')

    x = threading.Thread(target=sub_thread, args=('Sub Thread',))

    logging.info('Main-thread : after creating thread')

    x.start()

    # 자식 스레드의 작업을 기다려준다.
    # x.join()

    logging.info('Main-thread : end')
22:33:18 - Main-thread : before creating thread
22:33:18 - Main-thread : after creating thread
22:33:18 - First Sub Thread start!!!
22:33:18 - Main-thread : end
22:33:21 - First Sub Thread finish!!!

Process finished with exit code 0

Daemon Thread

  • 백그라운드에서 실행
  • 자식 스레드가 아무리 작업을 하고 있어도 메인 스레드가 종료되면 즉시 모든 이외의 스레드들을 종료
  • 백그라운드 무한 대기 이벤트 발생 실행하는 부분을 담당 (jetbrain의 자동저장, jvm의 gc, 크롬의 탭들…)
    • 메인 스레드는 문서를 작성하는 작업이고
    • 데몬 스레드는 자동저장하는 작업
import logging
import threading

def sub_thread(name, r):
    logging.info('%s start!!!', name)

    for i in r:
        logging.info(i)

    logging.info('%s finish!!!', name)

if __name__ == '__main__':
    logging.basicConfig(format='%(asctime)s - %(message)s', level=logging.INFO, datefmt='%H:%M:%S')
    logging.info('Main-thread : before creating thread')

    x = threading.Thread(target=sub_thread, args=('First Sub Thread', range(2000)), daemon=True)
    y = threading.Thread(target=sub_thread, args=('Second Sub Thread', range(1000)), daemon=True)
    logging.info('First Sub Thread is daemon? : %r', x.isDaemon())
    logging.info('Second Sub Thread is daemon? : %r', y.isDaemon())

    logging.info('Main-thread : after creating thread')

    x.start()
    y.start()

    # 메인스레드가 종료되자마자 sub스레드들 즉시 종료된다. (why? 데몬스레드라서)
    logging.info('Main-thread : end')

ThreadPoolExecutor

  • python 3.2 이상 내장 라이브러리
  • from concurrent.futures import ThreadPoolExecutor
  • with 사용으로 스레드 생성, 라이프사이클 관리 용이
  • 대기중인 작업 → queue → 완료 상태 조사 → 결과 또는 예외 → 단일화, 캡슐화
from concurrent.futures import ThreadPoolExecutor
import logging

def task(name):
    logging.info('%s Sub Thread start!!!', name)
    result = 0
    for i in range(10001):
        result += i
		time.sleep(2)
    logging.info('%s Sub Thread finish!!! - result : %d', name, result)
    return result

def main():
    logging.info('Main Thread start!!!')

    with ThreadPoolExecutor(max_workers=2) as executor:
        tasks = executor.map(task, ['First', 'Second', 'Third'])
        logging.info(list(tasks))

    logging.info('Main Thread finish!!!')

if __name__ == '__main__':
    logging.basicConfig(format='%(asctime)s - %(message)s', level=logging.INFO, datefmt='%H:%M:%S')
    main()
22:40:48 - Main Thread start!!!
22:40:48 - First Sub Thread start!!!
22:40:48 - Second Sub Thread start!!!
22:40:50 - First Sub Thread finish!!! - result : 50005000
22:40:50 - Third Sub Thread start!!!
22:40:50 - Second Sub Thread finish!!! - result : 50005000
22:40:52 - Third Sub Thread finish!!! - result : 50005000
22:40:52 - [50005000, 50005000, 50005000]
22:40:52 - Main Thread finish!!!

Process finished with exit code 0

Lock, Deadlock

  • 세마포어(semaphore) : 프로세스간 공유된 자원에 접근 시 문제가 발생할 수 있어서 한 개의 프로세스만 접근 처리를 고안 → 경쟁 상태 예방
  • 뮤텍스(mutex) : 공유된 자원을 여러 스레드가 접근하는 것을 막는 것 → 경쟁 상태 예방
  • Lock : 상호 배제를 위한 잠금 처리 → 데이터 경쟁
  • DeadLock : 프로세스가 자원을 획득하지 못해 다음 처리를 못하는 무한 대기 상태 (교착 상태)
  • 스레드 동기화(thread synchronization) : 안정적으로 동작하도록 처리
  • 세마포어 vs 뮤텍스 (https://velog.io/@conatuseus/OS-세마포어와-뮤텍스)
    • 세마포어는 여러개의 화장실을 여러명의 사람들이 한 화장실에 한명이 들어가서 작업이 끝나면 나오고 하는 과정이고
    • 뮤텍스는 무조건 하나의 화장실에 여러명이 계속 기다리며 작업이 끝나면 다음 사람이 들어가는 과정이다.
    • 세마포어는 뮤텍스가 될 수 있지만, 뮤텍스는 세마포어가 될 수 없다.
  • 아래 코드는 공유된 자원에 다수의 스레드가 동시에 접근하여 조작하여 값이 원하는대로 떨어지지 않는 동기화가 되지않은 문제점을 보여주는 코드이다.
import time
from concurrent.futures import ThreadPoolExecutor
import logging

class FakeDataStore:
    def __init__(self):
        # 공유된 자원 (heap)
        self.value = 0

    def update(self, s):
        logging.info('%s Sub Thread start!!!', s)

        # 스레드마다 갖고 있는 stack영역
        # mutex, Lock등 동기화가 필요 (thread synchronization)
        stack_value = self.value
        stack_value += 1
        time.sleep(0.1)
        self.value = stack_value

        logging.info('%s Sub Thread finish!!!', s)

def main():
    logging.info('Main Thread start!!!')

    store = FakeDataStore()

    logging.info('FakeDataStore update method test start!!! // value : %d', store.value)

    with ThreadPoolExecutor(max_workers=2) as executor:
        for v in ['First', 'Second', 'Third']:
            executor.submit(store.update, v)

    logging.info('FakeDataStore update method test finish!!! // value : %d', store.value)
    logging.info('Main Thread finish!!!')

if __name__ == '__main__':
    logging.basicConfig(format='%(asctime)s - %(message)s', level=logging.INFO, datefmt='%H:%M:%S')
    main()
17:59:22 - Main Thread start!!!
17:59:22 - FakeDataStore update method test start!!! // value : 0
17:59:22 - First Sub Thread start!!!
17:59:22 - Second Sub Thread start!!!
17:59:22 - First Sub Thread finish!!!
17:59:22 - Third Sub Thread start!!!
17:59:22 - Second Sub Thread finish!!!
17:59:23 - Third Sub Thread finish!!!
17:59:23 - FakeDataStore update method test finish!!! // value : 2
17:59:23 - Main Thread finish!!!
  • 위 코드의 문제점을 Lock을 통해 해결한 것이 아래 코드이다.
import threading
import time
from concurrent.futures import ThreadPoolExecutor
import logging

class FakeDataStore:
    def __init__(self):
        # 공유된 자원 (heap)
        self.value = 0
        # 스레마다 독립적으로 가지고 있는 자원 (stack)
        self._lock = threading.Lock()

    def update(self, s):
        logging.info('%s Sub Thread start!!!', s)

        # 스레드마다 락이 걸려서 다른 스레드가 해당 작업을 할 때는 릴리즈가 되지않은 이상 작업을 하지 못하고 기다려야 한다.
        with self._lock:
            logging.info('%s Sub Thread has Lock', s)
            stack_value = self.value
            stack_value += 1
            time.sleep(0.1)
            self.value = stack_value
            logging.info('%s Sub Thread release Lock', s)

        logging.info('%s Sub Thread finish!!!', s)

def main():
    logging.info('Main Thread start!!!')

    store = FakeDataStore()

    logging.info('FakeDataStore update method test start!!! // value : %d', store.value)

    with ThreadPoolExecutor(max_workers=2) as executor:
        for v in ['First', 'Second', 'Third']:
            executor.submit(store.update, v)

    logging.info('FakeDataStore update method test finish!!! // value : %d', store.value)
    logging.info('Main Thread finish!!!')

if __name__ == '__main__':
    logging.basicConfig(format='%(asctime)s - %(message)s', level=logging.INFO, datefmt='%H:%M:%S')
    main()
18:08:45 - Main Thread start!!!
18:08:45 - FakeDataStore update method test start!!! // value : 0
18:08:45 - First Sub Thread start!!!
18:08:45 - First Sub Thread has Lock
18:08:45 - Second Sub Thread start!!!
18:08:45 - First Sub Thread release Lock
18:08:45 - First Sub Thread finish!!!
18:08:45 - Third Sub Thread start!!!
18:08:45 - Second Sub Thread has Lock
18:08:45 - Second Sub Thread release Lock
18:08:45 - Second Sub Thread finish!!!
18:08:45 - Third Sub Thread has Lock
18:08:45 - Third Sub Thread release Lock
18:08:45 - Third Sub Thread finish!!!
18:08:45 - FakeDataStore update method test finish!!! // value : 3
18:08:45 - Main Thread finish!!!

Producer-Consumer Queue

  • Producer-Consumer Pattern
    • 멀티스레드 디자인 패턴의 정석
    • 서버 프로그래밍의 핵심 (데이터 생산자와 데이터 소비자)
    • 주로 허리역할
  • Python Event 객체
    • Flag 초기값 0
    • Set() → 1, Clear() → 0, Wait(1→리턴, 0→대기), isSet() → 현 플래그 상태 조회
import concurrent.futures
import logging

# 생산자
import queue
import random
import threading
import time

def producer(pipeline, event):
    # 네트워크 대기 상태라 가정 (서버)
    while not event.is_set():
        message = random.randint(1, 11)
        logging.info('Producer produce message %s', message)
        pipeline.put(message)
    logging.info('Producer got event set... producer exit...')

# 소비자
def consumer(pipeline, event):
    # 응답 받고 소비하는 것으로 가정 or db저장
    while not event.is_set() or not pipeline.empty():
        message = pipeline.get()
        logging.info('Consumer got message %s from pipeline queue (current queue size : %d)', message, pipeline.qsize())
    logging.info('Consumer got event set... consumer exit...')

def main():
    logging.info('Main Thread start!!!')

    # queue 생성 (사이즈 중요)
    pipeline = queue.Queue(maxsize=10)

    # event 생성
    event = threading.Event()

    # 멀티 스레딩
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        executor.submit(producer, pipeline, event)
        executor.submit(consumer, pipeline, event)

        # 실행시간 조정
        time.sleep(0.1)

        # producer, consumer의 상태를 종료시킴
        event.set()

    # 프로그램 종료
    logging.info('Main Thread finish!!!')

if __name__ == '__main__':
    logging.basicConfig(format='%(asctime)s - %(message)s', level=logging.INFO, datefmt='%H:%M:%S')
    main()
23:05:14 - Main Thread start!!!
23:05:14 - Producer produce message 1
23:05:14 - Producer produce message 8
23:05:14 - Producer produce message 6
23:05:14 - Producer produce message 5
23:05:14 - Producer produce message 10
23:05:14 - Producer produce message 4
23:05:14 - Producer produce message 11
23:05:14 - Producer produce message 6
23:05:14 - Producer produce message 9
23:05:14 - Producer produce message 1
23:05:14 - Producer produce message 8
23:05:14 - Consumer got message 1 from pipeline queue (current queue size : 9)
23:05:14 - Consumer got message 8 from pipeline queue (current queue size : 8)
23:05:14 - Consumer got message 6 from pipeline queue (current queue size : 7)
23:05:14 - Consumer got message 5 from pipeline queue (current queue size : 6)
23:05:14 - Consumer got message 10 from pipeline queue (current queue size : 5)
23:05:14 - Consumer got message 4 from pipeline queue (current queue size : 4)
23:05:14 - Consumer got message 11 from pipeline queue (current queue size : 3)
23:05:14 - Consumer got message 6 from pipeline queue (current queue size : 2)
23:05:14 - Consumer got message 9 from pipeline queue (current queue size : 1)
23:05:14 - Consumer got message 1 from pipeline queue (current queue size : 0)
23:05:14 - Consumer got event set... consumer exit...
23:05:14 - Producer got event set... producer exit...
23:05:14 - Main Thread finish!!!

카테고리:

업데이트:

댓글남기기