[ 원티드 프리온보딩 ] Human Scape - 공공 API 활용 임상정보 Batch Task Server
💪 Study 참여/원티드 프리온보딩 백엔드

[ 원티드 프리온보딩 ] Human Scape - 공공 API 활용 임상정보 Batch Task Server

반응형

 

이번에는 휴먼 스케이프의 프로젝트를 진행하면서 배운 내용과 이슈들을 기록해 보려고 한다.

지난번 프로젝트와 조금은 달라진 점이 있다.

그것은 바로 Github의 Branch 전략을 좀 수정하였다.

이전 프로젝트들에서는 팀원들과 브랜치를 파서 작업을 하는데 브랜치의 이름을 아래와 같이 생성하고 진행하였다.

Before

위처럼 작성을 하니, 팀원끼리 소통을 할 때 어느 브랜치인지 이름을 말해야 하고, 진행상황도 마찬가지로 알기 힘들었다.

그래서 이번 프로젝트부터는 아래와 같이 브랜치 이름 앞에 접두사로 #번호_상세 기능 이런 식으로 네이밍 방식 규칙을 변경해 주었다.

After

위처럼 작성하니 팀원과 의사소통할 때 보다 편해졌다. 몇 번 브랜치에 ~~ 부분 이런 식으로 빠르게 진행상황을 공유할 수 있었고,

막히는 부분이나 이슈 같은 것들을 금방 파악할 수 있었다.

게다가 Notion의 칸반 보드를 활용해서 작업 현황 보드를 작성할 때도 마찬가지로 아래와 같이 브랜 치명과 동일하게 하여, 진행 중인 작업에 대한 것은 진행중인 영역에, 완료된 부분은 완료 영역에 옮기면서 시각화를 하였다.

 

노션 칸반보드를 활용한 작업 진행 상황 공유

 

작업 중인 Github 브랜치에 대해서는 같은 이름의 Notion Block으로 통일하였다.

작업 담당자를 두어 누가 진행하고 있는지에 대한 정보들도 간략하게 알 수 있어 좋았다.

 

이렇게 규칙들을 정해두고, 지난번과 지지난번의 경험을 토대로 마찬가지로 이번 프로젝트에서도 요구사항 분석을 각자 진행하고 이후 서로의 분석 내용을 공유한 뒤, 역할 분담을 진행하고 브랜치의 번호까지 약속한 뒤 작업에 착수하였다.

확실히 진행하면서 어느 정도 노하우가 생겨서 그런지는 몰라도 빠르게 진행되었던 것 같았다.

필자가 이번 프로젝트에 기여한 PR 내용은 아래와 같다.

작성한 PR

이번 핵심 부분이었던 공공 API를 호출하여 일정 주기별로 데이터를 가져오는 Batch Task를 작성하였다.

데이터를 가져온 후, 수정된 부분이 있는지 확인 및 존재 여부를 확인하여 DB에 적재하는 로직을 작성하였고,

몇 개의 데이터가 성공했는지 실패했는지에 대한 로깅을 할 수 있도록 기능을 추가하였다.

우리 팀은 Apscheduler라는 python 모듈을 사용하였다. 다른 팀들은 crontab을 사용하였다.

아래의 링크는 해당 모듈에 대한 공식문서이다.

 

 

User guide — APScheduler 3.9.0.post1.post1 documentation

Sometimes the scheduler may be unable to execute a scheduled job at the time it was scheduled to run. The most common case is when a job is scheduled in a persistent job store and the scheduler is shut down and restarted after the job was supposed to execu

apscheduler.readthedocs.io

 

모델링 같은 경우는 공공 API의 문서를 보면서 그대로 작성하였고, 시간이 얼마 들지 않았다. 따로 테이블을 나누지 않고 그대로 모델링 한 뒤, 데이터들을 가져올 수 있도록 모델링 작업을 하였다.

그리고 core라는 app을 추가해주어 core app 모델 내에서는 TimeStampModel을 정의해주었다.

from django.db import models

class TimeStampModel(models.Model):
    created_at = models.DateTimeField(auto_now_add=True)
    updated_at = models.DateTimeField(auto_now=True)

    class Meta:
        abstract = True

위의 모델은 데이터가 생성된 시간과, 수정된 시간에 대한 필드를 가지고 있으며 아래와 같이 상속받아 사용하였다.

from django.db import models
from core.models import TimeStampModel

class Research(TimeStampModel):
    number      = models.CharField(max_length=100, unique=True, verbose_name='과제 번호')
    title       = models.CharField(max_length=300, verbose_name='과제명')
    department  = models.CharField(max_length=100, null=True, verbose_name='진료과')
    institute   = models.CharField(max_length=100, null=True, verbose_name='연구 책임 기관')
    # 생략

위처럼 Research라는 모델은 TimeStampModel을 상속받았기 때문에 추가로 등록일자와 수정 일자에 대한 필드를 가지게 된다.

 

프로젝트 구조

프로젝트 구조에 대해서 간단하게 살펴보자면 아래와 같다.

.
├── README.md
├── batch_task.log
├── core
│   ├── __init__.py
│   ├── admin.py
│   ├── apps.py
│   ├── logger.py
│   ├── migrations
│   ├── models.py
│   ├── tests.py
│   └── views.py
├── dockerfile
├── human
│   ├── __init__.py
│   ├── asgi.py
│   ├── settings.py
│   ├── urls.py
│   └── wsgi.py
├── manage.py
├── my_settings.py
├── requirements.txt
└── research
    ├── __init__.py
    ├── admin.py
    ├── apps.py
    ├── migrations
    ├── models.py
    ├── scheduler.py
    ├── tests.py
    ├── updater.py
    ├── urls.py
    └── views.py

5 directories, 27 files

human은 프로젝트의 기본 설정 app이고

core app는 위에서 TimeStamp에 대한 모델이 정의되어 있으며

logger라는 파일이 있어 배치 스케쥴러에 의한 로깅을 위한 파일이 있다.

research app은 공공 API 데이터를 가져오기 위한 배치 스케쥴러에 대한 파일(scheduler.py, updater.py)이 있고 하위 view에는 RESTful 한 API가 작성되어 있다.

 

Batch Task 관련 내용

research app에서 배치 Task를 정의해주었다고 잠깐 언급했었는데 어떤 식으로 동작되는지 기록해보려고 한다.

우선, research app의 app.py에서 아래의 함수를 추가해 준다.

# research/apps.py

def ready(self):
	from . import scheduler
	scheduler.start()

같은 경로에 있는 scheduler를 import 하고 스케쥴러를 실행할 것이라는 것을 알려주도록 하는 부분이다.

그렇다면 실제로 scheduler는 어떻게 되어있을까?

# research/scheduler.py

from apscheduler.schedulers.background import BackgroundScheduler
from .updater import batch_task

def start():
    scheduler = BackgroundScheduler()
    # test code
    # scheduler.add_job(batch_task, 'interval', seconds=5) 
    scheduler.add_job(batch_task, 'interval', days=1)
    scheduler.start()

우선 주석 처리된 부분은 5초에 한 번씩 실행하도록 하기 위한 테스트 코드이다.

위의 apps.py에서 scheduler의 start를 호출하였으므로 위의 scheduler에 대한 인스턴스가 만들어지고. add_job 함수를 통해서 어떤 작업을 실행할지 그리고 주기는 어떻게 할지를 파라미터를 통해 전달해 줄 수 있다.

우리는 하루에 한 번 공공 API를 호출하여 데이터를 가져오도록 하기 위해서 1일로 설정해 주었다.

add_job의 첫 번째 인자인 batch_task인 함수는 실제로 공공 API를 호출하여 데이터를 받아온 뒤, DB에 적재하도록 하는 함수이다.

함수의 내용은 아래와 같다.

# 공공 API url
OPEN_API_URL = 'https://api.odcloud.kr/api/3074271/v1/uddi:cfc19dda-6f75-4c57-86a8-bb9c8b103887'
# 공공 API를 호출하기 위한 비밀 키
OPEN_API_SECRET_KEY = OPEN_API_SECRET_KEY

def get_count():
    """
        전체 데이터의 개수를 리턴
    """
    params = {
        'serviceKey' : OPEN_API_SECRET_KEY
    }
    response = requests.get(OPEN_API_URL, params=params).json()

    return response['totalCount']

def batch_task():
    """
        get_count()에서 가져온 전체 데이터의 개수만큼 DB에 적재하는 batch task
    """
    params = {
        'page' : 1,
        'perPage' : get_count(),
        'serviceKey' : OPEN_API_SECRET_KEY
    }

    success_count = 0 # DB에 적재된 데이터의 총 갯수
    failure_count = 0 # 이미 존재하는 데이터의 총 갯수
    pass_count = 0    # 수정된 데이터의 총 갯수

    response = requests.get(OPEN_API_URL, params=params).json()

    for study in response['data']:
        number = study['과제번호']
        
        if not Research.objects.filter(number=number).exists():
            # 과제 번호가 등록되어 있지 않은 경우
            Research.objects.create(
                number      = number,
                title       = study['과제명'],
                department  = study['진료과'] if study['진료과'] else '',
                # 생략
            )
            success_count += 1
        else:
            # 이미 과제 번호가 등록되어 있는 경우
            # 해당 연구의 수정날짜와 등록날짜 비교
            research = Research.objects.get(number=number)
            if (research.updated_at).strftime('%Y-%m-%d %H:%M:%S')\
                != (research.created_at).strftime('%Y-%m-%d %H:%M:%S'):
                pass_count += 1
            else:
                failure_count += 1

    # API 1회 요청에 대한 성공, 실패, 통과된 row개수를 로깅
    batch_task_logger(success_count, failure_count, pass_count)

 

데이터를 가져올 때 로직을 간략하게 설명하자면,

DB에 100개의 데이터가 있고 데이터들은 각각 고유 번호를 가지고 있으며 그 값이 PK이다.

한번 생성되어지면 연구 데이터에 대한 등록일자와 수정 일자의 값이 같다고 생각하였고,

생성된 후 유저에 의해서 해당 연구의 기록이 수정되면 등록일자와 수정 일자의 값이 달라지기 때문에 이를 가지고 기존 데이터와 수정된 데이터를 구별 지어 주었다.

하지만 실제로 DB에 적재된 연구 데이터를 살펴보니 밀리초 값이 달라서 두 값이 정확하게 일치하지 않는다는 것을 알게 되었다.

이는 모델 생성 시 save 함수의 로직과 관련이 있었다.

즉, core app에서 정의한 TimeStampModel의 내용을 밀리초를 제거하여 저장하도록 아래와 같이 수정해야 했다.

from django.utils import timezone
from django.db import models

class NoMillisecDateTimeField(models.DateTimeField):
    def pre_save(self, model_instance, add):
        if self.auto_now or (self.auto_now_add and add):
            # Before
            # value = datetime.date.today()

            # After
            value = timezone.now().replace(microsecond=0)
            setattr(model_instance, self.attname, value)
            return value
        else:
            return super().pre_save(model_instance, add)

class TimeStampModel(models.Model):
    created_at = NoMillisecDateTimeField(auto_now_add=True)
    updated_at = NoMillisecDateTimeField(auto_now=True)

    class Meta:
        abstract = True

    @property
    def is_updated(self):
        """Tips: 추후 obj에서 update 여부 체크를 위함"""
        return self.created_at != self.updated_at

새로운 필드에 대한 클래스를 하나 정의해 주었고, 밀리초를 제거한 시간 값을 리턴하도록 정의해 주었다.

그러니 초기 데이터가 삽입될 때 결과가 아래와 같이 생성일자와 수정 날짜가 같아지는 모습이다.

 

 

한편, 이렇게 해서 구분을 할 수도 있겠지만, 좀 더 일반적인 방법이 없을까 고민을 해보았고 멘토님의 조언을 토대로 아래와 같은 방법이 있음을 알 수 있었다.

class TimeStampModel(models.Model):
    created = models.DateTimeField(auto_now_add=True)
    updated = models.DateTimeField(auto_now=True)
    is_updated = models.BooleanField(default=False)

    class Meta:
        abstract = True

    def save(self, *args, **kwargs):
        if self.pk: # create 에선 self.pk = None / 이후 업데이트때는 pk가 값을 가짐.
            self.is_updated = True
        super(TimeStampModel, self).save(*args, **kwargs)

# Then
AModel.objects.filter(is_update=True or False)

위처럼 따로 새로운 필드 클래스를 정의해주는 것 말고 새로운 필드인 is_updated라는 녀석을 정의해 주는 방법이었다.

그리고 create 시에는 default로 설정해준 false가 들어갈 것이고, 유저가 특정 연구 데이터를 수정하면 모델의 save라는 트리거가 동작하게 되므로 is_updated의 필드 값을 True로 바꾸어 주어 업데이트된 내용인지 확인을 할 수 있도록 한다.

그러면 아래와 같은 결과가 나온다.

수정된 데이터와 수정되지 않은 데이터 구분

is_updated라는 필드가 1(True)이면 수정된 데이터이고 0(False)이면 수정되지 않은 데이터임을 알 수 있다.

 

이후 batch_task()라는 함수가 종료되면 가장 아래의 Logger 함수를 실행한다.

함수의 내용은 아래와 같다.

import logging
import logging.handlers

def batch_task_logger(success_count, failure_count, pass_count):
    logger = logging.getLogger(__name__)
    formatter = logging.Formatter('[%(asctime)s][%(levelname)s] >> %(message)s')
    streamHandler = logging.StreamHandler()
    fileHandler = logging.FileHandler('./batch_task.log')
    streamHandler.setFormatter(formatter)
    fileHandler.setFormatter(formatter)

    logger.addHandler(streamHandler)
    logger.addHandler(fileHandler)
    logger.setLevel(level=logging.DEBUG)
    logger.info(f'{success_count} rows created, {failure_count} rows failed and {pass_count} rows passed')

Logging이라는 Python 내장 모듈을 사용하였고,

Formatter를 통해 출력될 메시지의 형식을 지정해주었다.

서버 프로젝트 root 경로에. log 파일이 생성되고 로깅 작업을 한다.

terminal log

터미널에서 위와 같이 로깅이 되고, 파일도 위와 마찬가지로 작성되어진다.

batch_task.log

 

발생한 이슈 정리

이번에는 프로젝트를 진행하면서 발생한 이슈에 대해서 정리해 보려고 한다.

우선, 기본적으로 Django는 아래와 같은 명령어로 서버를 실행한다.

>> python manage.py runserver

이렇게 서버를 실행하고 작성한 스케쥴러에 대한 테스트를 진행하는데, 계속해서 두 번씩 실행되는 이슈가 있었다.

분명 주기를 10초로 설정해주었는데 위와 같이 서버를 시작하니 발생한 이슈였다.

구글링을 해보니 우리와 같은 이슈를 발견했다.

아래의 링크이다.

 

Apscheduler Job in Django executes twice

The scheduled job executes twice with difference in nanoseconds. I have a task as follows in task.py def print_hello(): print("time-->",datetime.datetime.now()) print("hello...

stackoverflow.com

해결 방법은 단순했다. 위의 명령어로 서버를 실행하면 프로젝트의 파일이 변경될 때마다 reload를 실행하여 서버를 재시작하는데,

아래와 같은 명령어로 서버를 실행하면 된다.

>> python manage.py runserver --noreload

이렇게 하면 정상적으로 스케쥴러가 두 번씩 실행되어지지 않았다.

 

그리고 또 발생한 이슈로는 우리 프로젝트의 임상정보 데이터들에 대한 API를 Django View로 작성하였다.

따라서 응답을 JsonResponse로 주었는데, 배포 환경에서 응답에 대한 결괏값이 이상하게 나왔었다. 바로 아래의 사진처럼 이다.

배포 주소를 통하여 응답 확인 결과

인코딩 방식에 관련된 문제였던 것 같았다. 이 부분에 있어서는 초반에 왜 그런지에 대해서 생각해보았는데 배포 환경에 대한 인코딩? 문제인가 싶어서 배포 환경 쪽을 건드려보기도 하였고 구글링을 통해 해결할 수 있었다.

 

 

python django jsonresponse에서 한글 인코딩 문제 질문입니다

def vanalysis(request): device = Device.objects.all() locations_list = list() for d in device: try: location = d.dev_name.split(';')[2].split(' ')[0] except Exception as e: location = '데이터오류' locations_list.append(location) data =

hashcode.co.kr

위의 현상과 동일했다. 따라서 댓글에 해결방법이 나와있었는데 그 내용은 아래와 같다.

return JsonResponse({'result' : result}, json_dumps_params={'ensure_ascii': False}, status=200)

위처럼 응답을 주니 해결되었다!

 

 

느낀 점과 아쉬운 점

로깅 같은 부분은 지난 인턴 생활을 하면서 수도 없이 봐왔기 때문에 형태를 간략하게 만들어 보았다.

python의 내장 logging을 사용하였지만, 찾아보니 좀 더 커스터마이징과 사용법이 간단한? pylogging이라는 모듈도 있다고 한다.

 

GitHub - Clivern/PyLogging: Python Logging Library

:rugby_football: Python Logging Library. Contribute to Clivern/PyLogging development by creating an account on GitHub.

github.com

나중에 한번 사용해 보도록 해야겠다.

 

이번 프로젝트를 하면서 아쉬운 점은 주어진 API 외에도 다른 임상실험에 대한 API도 관리할 수 있게 하지 못한 점이다.

이 부분에 대해서는 조금 늦게 이해를 한 것 같았다.

또한 모델을 하나로 정의하고, 해당 모델의 맞도록 값을 넣어야 하는데 해당 Merge 규칙에 대한 생각을 심도있게 해보지 못해서 아쉬웠다.

진행하면서 처음 접하는 batch task였기 때문에 시간이 조금은 부족했던 부분이었던 것 같았다.

그렇지만 새로운 branch 협업 전략을 수정하고, 일정 관리 및 작업 공유와 관련된 문제는 없었고 수정된 데이터를 구별할 수 있는 방법에 대해서 구체적으로 알 수 있게 된 점이 흥미로웠다.

 

 

반응형