seminar-2020 icon indicating copy to clipboard operation
seminar-2020 copied to clipboard

Concurrent하게 Seminar에 participant 자격으로 join할 때의 문제

Open gyusang opened this issue 3 years ago • 4 comments

우리가 사용하는 테스트 서버 (python manage.py runserver)는 하나의 프로세스로 실행되지만, uWSGI 등을 이용하여 서버를 deploy할 경우 여러 개의 worker를 이용하므로, 여러 개의 장고 프로세스/스레드가 동시에 실행됩니다.

이때 POST /api/v1/seminar/{seminar_id}/user/{"role": "participant"}로 요청하는 것을 생각해봅시다.

프로세스 A, B가 있다고 하고, 각각의 프로세스에서 처리하는 것을 순서대로 A1, A2, ... 으로 표현하겠습니다. capacity가 1인 Seminar 1이 있고, participant가 없다고 합시다. user X와 user Y는 모두 accepted=True인 participant이고, join한 seminar가 없다고 합시다.

participant_count를 확인하는 시점과 UserSeminar를 추가하는 시점에 시간 차이가 있으므로 다음이 발생할 수 있습니다.

A1. X가 Seminar 1에 participant로 가입하겠다는 요청을 받음 B1. Y가 Seminar 1에 participant로 가입하겠다는 요청을 받음 A2. X의 자격을 확인하고, 해당 Seminar에 참여중인 participant 수가 0으로 participant_count+1 <= capacity임을 확인함. B2. Y의 자격을 확인하고, 해당 Seminar에 참여중인 participant 수가 0으로 participant_count+1 <= capacity임을 확인함. A3. X를 Seminar1에 participant 자격으로 가입시키고, 해당 UserSeminar를 생성함. B3. Y를 Seminar1에 participant 자격으로 가입시키고, 해당 UserSeminar를 생성함. A4. X에게 가입에 성공했다는 응답을 보냄 A5. Y에게 가입에 성공했다는 응답을 보냄

이렇게 하면 capacity보다 많은 수의 User가 participant로 가입하게 됩니다.

다음은 이를 표현하기 위해 작성한 테스트입니다.

from django.test import TransactionTestCase, Client
import threading
from rest_framework.authtoken.models import Token


def run_test_concurrently(funcs, args_list, kwargs_list=None):
    exceptions = []
    if args_list is None:
        args_list = [()] * len(funcs)
    if kwargs_list is None:
        kwargs_list = [{}] * len(funcs)

    def call_test_func(f, *args, **kwargs):
        try:
            f(*args, **kwargs)
        except Exception as e:
            exceptions.append(e)
            raise

    threads = []
    for i in range(len(funcs)):
        threads.append(threading.Thread(target=call_test_func, args=(funcs[i],) + args_list[i], kwargs=kwargs_list[i]))
    for t in threads:
        t.start()
    for t in threads:
        t.join()
    if exceptions:
        raise Exception(f'run_test_concurrently intercepted {len(exceptions)} exceptions: {exceptions}')


class ConcurrentSeminarJoin(TransactionTestCase):
    client = Client()
    PARTICIPANT_COUNT = 5
    SEMINAR_CAPACITY = 1

    def setUp(self):
        self.participant_tokens = []
        self.participant_usernames = []
        for i in range(self.PARTICIPANT_COUNT):
            participant_payload = {
                'username': f'part{i}',
                "password": "password",
                "email": f'part{i}@test.kr',
                'role': 'participant',
                'accepted': True
            }
            self.client.post(
                '/api/v1/user/',
                participant_payload,
                conent_type='application/json'
            )
            self.participant_usernames.append(f'part{i}')
            self.participant_tokens.append(Token.objects.get(user__username=f'part{i}').key)
        instructor_payload = {
            'username': 'inst',
            "password": "password",
            "email": '[email protected]',
            'role': 'instructor',
        }
        self.client.post(
            '/api/v1/user/',
            instructor_payload,
            content_type='application/json'
        )
        self.instructor_token = Token.objects.get(user__username='inst').key
        seminar_payload = {
            'name': 'seminar',
            'capacity': self.SEMINAR_CAPACITY,
            'count': 1,
            'time': '00:00'
        }
        response = self.client.post(
            '/api/v1/seminar/',
            seminar_payload,
            content_type='application/json',
            HTTP_AUTHORIZATION=f'Token {self.instructor_token}'
        )
        self.seminar_id = response.json()['id']

    def test_concurrent_participant_join(self):
        payload = {
            'role': 'participant'
        }
        path = f'/api/v1/seminar/{self.seminar_id}/user/'

        def participant_join(token):
            client = Client()
            return client.post(
                path,
                payload,
                content_type='application/json',
                HTTP_AUTHORIZATION=f'Token {token}'
            ).json()

        tokens = [(token,) for token in self.participant_tokens]
        run_test_concurrently([participant_join] * self.PARTICIPANT_COUNT, tokens)
        # PARTICIPANT_COUNT users trying to join seminar concurrently

        seminar = self.client.get(
            f'/api/v1/seminar/{self.seminar_id}/'
        ).json()
        self.assertGreaterEqual(seminar['capacity'], len(seminar['participants']))
        # number of participants should not exceed capacity
        self.assertEqual(len(seminar['participants']), min(self.SEMINAR_CAPACITY, self.PARTICIPANT_COUNT))
        # expected that 1 participant was joined.

Concurrency를 고려하지 않은 구현에서

AssertionError: 1 not greater than or equal to 5

가 발생하는 것을 확인할 수 있었습니다.

이 경우에는 participant 수가 조금 많은, 논리적인 문제일 뿐이지만 만약 POST /api/v1/seminar/POST /api/v1/seminar/{seminar_id}/user/가 instructor 자격으로 이루어졌다면 구현상 발생할 수 없는, 한 유저가 두 개의 세미나에 instructor 자격으로 참가하는 현상이 발생할 수 있습니다.

이외에도 PUT /api/v1/seminar/{seminar_id}/에서 capacity를 줄이는 동시에 새로운 유저가 세미나에 참여했다면 capacity가 participant_count보다 작아질 수 있습니다.


질문

  1. 사용자 수가 적은(<1req/s) 서버에서도 여러 개의 Django Process/Thread를 사용하나요?
  2. 한 개의 Django Process만 사용하여 서버를 운영하는 경우, Concurrency를 고려하지 않아도 되나요? 즉, 장고가 한 요청을 모두 처리한 이후에 다음 요청을 처리하기 시작하는 것이 보장되나요?

gyusang avatar Oct 01 '20 04:10 gyusang

좋은 질문이고 답변을 잘 하고 싶은데, 일단 과제 2 피드백 어느 정도 진행해놓고 돌아오겠습니닷...

davin111 avatar Oct 02 '20 04:10 davin111

과제 2 종합 피드백의 1:15:24 경에, 그리고 4번째 세미나의 32:20 경에서 concurrent하게 Seminar에 Participant들이 join하는 내용을 조금씩 다뤘습니다. 추후 더 언급할 예정입니다.

그리고 질문 1.과 2.에 대해서는 4번째 세미나의 1:10:53 경에서 언급했습니다.

davin111 avatar Oct 24 '20 07:10 davin111

@gyusang 씨도 혹시 이에 대해 나는 이렇게 구현해서 해결했다, 같은 내용이나 추가적인 질문을 덧붙이고 싶으신 게 있다면 해주셔도 좋습니다! (당연히 자유롭게)

davin111 avatar Oct 24 '20 07:10 davin111

저는 select_for_update()를 이용하여 이 문제를 해결했습니다.

해결해야 할 문제는 2개입니다.

  1. participant count + 1 > seminar.capacity를 확인한 이후에 participant_count가 수정되어도 user를 seminar에 가입시켜서 지나치게 많은 participant가 가입하는 문제
  2. participant가 이미 가입했으면 400을 반환해야 하는데, user.userseminar_set.filter(seminar=seminar).exists() 등을 확인한 이후에 user가 participant에 join하는 경우 한 user를 한 seminar에 여러 번 join시키는 문제

첫 번째 문제는 participant를 1명씩 join시키는 방법으로 해결할 수 있습니다. SQL SELECT ... FOR UPDATE를 두 개 이상의 프로세스에서 요청하면 먼저 요청한 프로세스의 transaction이 종료될 때까지 나머지 프로세스는 대기하게 됩니다.

이 경우에는 이 세미나를 한 번에 한 명씩 join하도록 하는 것이므로, seminar에 대해 select_for_update를 걸어 선택하면 한 번에 하나의 join만 처리하도록 할 수 있습니다. 다음의 코드와 같이 구현할 수 있습니다.

class UserSeminar(models.Model):
    ...
    @classmethod
    def join_as_participant(cls, user, seminar_id, as_of):
        with transaction.atomic():
            seminar = Seminar.objects.select_for_update().get(pk=seminar_id)
            # Blocks accessing seminar, so that there's at most one join_as_participant running for each seminar

            participant_count = seminar.userseminar_set.filter(role=cls.PARTICIPANT).count()

            if participant_count + 1 > seminar.capacity:
                raise ValidationError({'error': 'This seminar is already full of participants.'})
            user_seminar = cls.objects.create(user=user, seminar=seminar,
                                              joined_at=as_of, is_active=True, role=cls.PARTICIPANT)
            # raises IntegrityError when the relation already exists
            return user_seminar

이미 seminar가 하나씩 처리되는 것을 보장했기 때문에, seminar.userseminar_set.filter(user=user).exists()를 이용해서 이미 가입했는지를 확인해도 됩니다. 하지만 query 수를 줄이기 위해서 ('user', 'seminar') field에 대해 unique_together를 설정하여 이미 세미나에 가입한 경우 IntegrityError가 발생하도록 했습니다.

serializer에 구현할 수도 있지만, 이 코드는 모델의 integrity를 강제하는 목적으로 작성한 것이기 때문에 새로운 serializer를 작성하면 비슷한 코드를 새로 작성해야 할 것입니다. 따라서 재사용성을 고려해 model 내부의 classmethod로 작성했습니다.

gyusang avatar Oct 24 '20 08:10 gyusang