3. Airflow — 배치처리

Airflow는 두 개의 DAG로 배치 데이터를 수집하고 관심종목을 관리합니다.


DAG 목록

DAG스케줄역할
enhanced_nasdaq_bulk_collection_postgres매일 07:005년치 히스토리컬 데이터 수집
daily_watchlist_scanner_postgres30분마다 (*/30 * * * *)기술적 조건 기반 관심종목 스캔

DAG 1: 5년치 히스토리컬 데이터 수집

graph TD
    A[NASDAQ API - 심볼 목록] --> B[collect_nasdaq_symbols_task]
    C[FinanceDataReader - 5년 데이터] --> D[bulk_collect_stock_data_task]
    B --> E[(PostgreSQL - nasdaq_symbols)]
    D --> F[(PostgreSQL - stock_data)]
    E --> G{오늘 수집 완료?}
    G -->|No| H[BulkDataCollector - batch=200, workers=4]
    G -->|Yes| I[Skip - 중복 방지]
    H --> F
    F --> J[calculate_technical_indicators_task]
    J --> K[(PostgreSQL - stock_data_technical_indicators)]
    K --> L[generate_daily_watchlist_task]
    L --> M[(PostgreSQL - daily_watchlist)]

핵심 설계 전략

  • 배치 크기: 200개씩 처리 — API 제한과 메모리 효율 균형
  • 병렬 처리: max_workers=4 동시 API 호출
  • 중복 방지: 당일 이미 수집된 경우 Skip
  • 에러 복구: retries=2, retry_delay=10분

성능 지표

  • 실행 시간: 90~140분
  • 처리량: 65K 레코드/배치

DAG 2: 30분 관심종목 스캐너

graph TD
    A[⏰ 30분 주기 실행] --> B{시장 시간대 체크}
    B -->|09:30-16:00 EST| C[HIGH 강도 - 4개 조건]
    B -->|Pre/Post Market| D[MEDIUM 강도 - 2개 조건]
    B -->|장외시간| E[LOW 강도 - 1개 조건]
    C --> F[볼린저밴드 상단 터치]
    C --> G[RSI ≤ 30 과매도]
    C --> H[MACD 강세 신호]
    C --> I[거래량 2배 이상]
    F & G & H & I --> J[(PostgreSQL - daily_watchlist)]
    J --> K[Redis 실시간 동기화]
    K --> L[Kafka Producer 연동]

시장 시간대별 강도 조절

시간대강도활성 조건
09:30~16:00 EST (장중)HIGH볼린저, RSI, MACD, 거래량 4개 활성
04:0009:30 / 16:0020:00 EST (Pre/Post)MEDIUM볼린저, 거래량 2개 활성
20:00~04:00 EST (야간)LOW볼린저 1개만 활성

성능 지표

  • 실행 시간: 3.5~5.5분
  • 일일 실행 횟수: 48회

데이터 흐름 전체 요약

FinanceDataReader (과거 데이터)
    → stock_data (PostgreSQL)
    → stock_data_technical_indicators
    → daily_watchlist
    → Redis Cache
    → Kafka Producer (실시간 수집 대상 목록)