|

태양광 데이터, 아직도 노가다? 기상청 무료 API로 1년치 일사량 10분 만에 자동 수집!


혹시 아직도 태양광 발전량 예측을 위해 작년 날씨 데이터를 엑셀로 한땀한땀 복붙하고 계신가요? 진정한 에너지 데이터 고수라면 API 하나로 전국 50개 지역, 1년치 일사량·일조 데이터를 10분 만에 자동 수집합니다.

2025년, “데이터가 석유”라는 말은 이제 식상합니다. 데이터는 공기와도 같습니다. 특히 태양광 에너지 분야에서 일사량일조시간 데이터는 사업의 성패를 좌우하는 핵심 연료입니다. 문제는 이 연료를 얼마나 빠르고 정확하게 조달하느냐입니다.

다행히 기상청이 어마어마한 데이터를 무료로 공개했습니다. 바로 관측-통계(일사, 일조) 묶음형 조회서비스 API입니다. 이 API의 진정한 가치는 분(Minute) 단위 관측 데이터시·일·월·년 단위 통계 데이터를 한 번에, 그것도 가벼운 텍스트 형태로 제공한다는 점입니다. 데이터 수집과 전처리의 허들을 단번에 낮춰주죠.

안녕하세요! Do You Know? 입니다.

지난 포스팅들을 통해 우리는 기상청 API의 세계를 차근차근 정복해 왔습니다. 내일의 발전량을 예측해보기도 하고, LSTM으로 날씨 예측 모델을 만들어보기도 했으며, 평년값으로 과거 패턴을 돌아보기도 했습니다. 하지만 “진짜 고수”는 디테일에 강한 법. 1시간 단위의 뭉툭한 데이터로는 만족할 수 없는 여러분을 위해 오늘의 주제를 준비했습니다.

오늘은 기상청 API의 숨겨진 보물, 관측-통계(일사, 일조) 묶음형 조회서비스를 탈탈 털어봅니다. 무려 분 단위 일사량 관측 데이터까지 한 번에 가져올 수 있는, 태양광 데이터 수집의 게임체인저입니다.

이 포스팅 하나면, 파이썬 코드 몇 줄로 전국의 태양 에너지 데이터를 내 손안에서 쥐락펴락할 수 있습니다. 1년치 데이터를 자동 수집하고, 결측치를 처리한 뒤, 간단한 AI 예측 모델까지 만들어보는 여정—지금 바로 시작합니다.



왜 이 API를 써야 하는가? (필요성)

기존에도 기상청 API는 많았습니다. 하지만 이 ‘묶음형’ API는 차원이 다릅니다.

  1. 원샷 원킬: 단 한 번의 요청으로 특정 기간의 분 단위 상세 데이터와 깔끔하게 집계된 시, 일, 월, 년 통계 데이터를 모두 얻을 수 있습니다. 직접 데이터를 받아 가공할 필요가 없습니다.
  2. 데이터의 신뢰성: 국가 기관인 기상청이 직접 관측하고 검증한 데이터입니다. 데이터의 품질과 신뢰도는 두말할 필요가 없죠.

이 데이터만 있으면 태양광 발전량 예측, 농작물 생육 분석, 건축 설계 시 채광 계산 등 활용 분야는 무궁무진합니다.

API 주요 입출력 변수 완벽 해부

API를 사용하기 전, 설계도를 먼저 파악해야 합니다.

입력 변수 (우리가 요청하는 것)

파라미터설명필수예시 값
authKey기상청 API 허브에서 발급받은 인증키Y(개인별 고유 키)
stn관측 지점 번호 (전국 주요 지점 코드 참조)Y108 (서울)
tm1조회 시작 시간 (YYYYMMDDHHMI 형식)Y202301010000
tm2조회 종료 시간 (YYYYMMDDHHMI 형식)Y202301012359
mode조회 자료 종류: si = 일사량, ss = 일조시간Ysi
help도움말(헤더 설명) 표시 여부 (1: 표시, 0: 표시 안 함)N1
disp출력 포맷: 1 csv 또는 0 txtN1
  • modesi로 두면 일사량 패키지(분/시/일/월/년)가, ss로 두면 일조시간 패키지가 내려옵니다.

주요 관측 지점 코드: 서울(108), 인천(112), 수원(119), 강릉(105), 대전(133), 대구(143), 부산(159), 광주(156), 제주(184)

출력 변수 (API가 우리에게 주는 것)

API는 #START7777#7777END 사이에 다음과 같은 형식의 텍스트를 돌려줍니다.

실제 응답 포맷:

#START7777
#--------------------------------------------------------------------------------------------------
##관측 데이터
# SI_MI: 일 누적 일사량 매분자료(단위:MJ/m^2)
##통계 데이터
# SI_HR: 일사량(단위:MJ/m^2)
# SI_DAY: 일 합계일사량(단위:MJ/m^2)
# SI_MON: 월 합계일사량(단위:MJ/m^2)
# SI_YEAR: 연 합계일사량(단위:MJ/m^2)
#--------------------------------------------------------------------------------------------------
#=================| 관측데이터 |------------------------통계 데이터-------------------|
# YYMMDDHHMI   STN           SI           SI           SI           SI           SI
#        KST    ID           MI           HR          DAY          MON         YEAR 
202301011200,108,4.2,1.7,10.8,290.8,5186.0,=
202301011201,108,4.2,1.7,10.8,290.8,5186.0,=
202301011202,108,4.3,1.7,10.8,290.8,5186.0,=
...
#7777END

주의사항:

  • 각 데이터 라인은 콤마로 구분되어 있습니다.(disp가 1인 경우)
  • 각 라인 끝에 = 기호가 붙어 있으므로 파싱할 때 제거해야 합니다.
  • #==== 등의 구분선은 데이터가 아니므로 건너뛰어야 합니다.
컬럼명설명단위데이터 종류
YYMMDDHHMI관측 시간 (년월일시분, KST)
STN관측 지점 번호
SI_MI1분 누적 일사량MJ/m²관측
SI_HR1시간 누적 일사량MJ/m²통계
SI_DAY일 합계 일사량MJ/m²통계
SI_MON월 합계 일사량MJ/m²통계
SI_YEAR연 합계 일사량MJ/m²통계

자, 이제 설계도 파악은 끝났습니다. 바로 코딩에 착수합시다.

1단계: 초간단 API 호출 및 파싱

가장 먼저 API를 호출해서 원본 데이터를 가져와 보겠습니다. requests 라이브러리를 사용하고, 복잡한 텍스트를 pandas로 쉽게 파싱하기 위해 io.StringIO를 활용하는 것이 포인트입니다.

import requests
import pandas as pd
import io

BASE_URL = "https://apihub.kma.go.kr/api/typ01/cgi-bin/url/nph-sun_sfc_sts_pkg"

def fetch_kma_solar_data(api_key, stn, tm1, tm2, mode="si", disp="1"):
    """기상청 관측-통계 묶음형 API 데이터 요청 및 파싱"""
    params = {
        'authKey': api_key,
        'stn': stn,
        'tm1': tm1,
        'tm2': tm2,
        'mode': mode,  # si: 일사량, ss: 일조시간
        'help': '1',   # 헤더/단위 설명 포함
        'disp': disp
    }

    try:
        response = requests.get(BASE_URL, params=params, timeout=30)
        response.raise_for_status()
        content = response.text

        # '#START7777'~'#7777END' 사이의 데이터만 추출
        start_idx = content.find('#START7777')
        end_idx = content.find('#7777END')

        if start_idx == -1 or end_idx == -1:
            print("데이터 범위를 찾을 수 없습니다.")
            return pd.DataFrame()

        data_section = content[start_idx:end_idx]
        lines = data_section.split('\n')

        # 파싱할 데이터 라인 추출
        data_lines = []
        header_found = False

        for line in lines:
            # 헤더 라인 건너뛰기 (# 또는 ## 또는 #= 등으로 시작)
            if line.startswith('#') or line.strip() == '':
                continue

            # 첫 자리가 숫자로 시작하는 데이터 라인
            if line and line[0].isdigit():
                # 라인 끝의 '=' 제거
                line_cleaned = line.rstrip('=').rstrip(',').strip()
                if line_cleaned:
                    data_lines.append(line_cleaned)

        if not data_lines:
            print("데이터 라인을 찾을 수 없습니다.")
            return pd.DataFrame()

        # CSV 포맷으로 변환 및 파싱
        csv_text = '\n'.join(data_lines)
        df = pd.read_csv(io.StringIO(csv_text), header=None, sep=',')

        df = df.dropna(axis=1, how='all')  # 빈 컬럼 제거

        # 컬럼 이름 재정의 (일사 모드 기준)
        if mode == "si":
            df.columns = ['YYMMDDHHMI', 'STN', 'SI_MI', 'SI_HR', 'SI_DAY', 'SI_MON', 'SI_YEAR']
        elif mode == "ss":
            # 일조 모드: 컬럼명 조정 가능
            df.columns = ['YYMMDDHHMI', 'STN', 'SUM_SS_MI', 'SS_HR', 'SUM_SS_DAY', 'SUM_SS_HR_DAY', 'SUM_SS_HR_YEAR', 'SSRATE_MON', 'SSRATE_YEAR']

        return df

    except requests.exceptions.RequestException as e:
        print(f"API 요청 실패: {e}")
        return pd.DataFrame()

실행 예시

API_KEY = API_KEY # 발급받은 키로 교체

df_seoul_raw = fetch_kma_solar_data(
    api_key=API_KEY,
    stn='108',
    tm1='202301010000',
    tm2='202301012359',
    mode='si',   # 일사량
    disp='1'
)

print("=== API 호출 및 파싱 결과 (서울, 2023-01-01) ===")
print(df_seoul_raw.head())
print(f"\n데이터 크기: {df_seoul_raw.shape}")
print(f"컬럼: {df_seoul_raw.columns.tolist()}")

결과

=== API 호출 및 파싱 결과 (서울, 2023-01-01) ===
     YYMMDDHHMI  STN  SI_MI  SI_HR  SI_DAY  SI_MON  SI_YEAR
0  202301011159  108    4.2    1.4    10.8   290.8   5186.0
1  202301011200  108    4.2    1.7    10.8   290.8   5186.0
2  202301011201  108    4.2    1.7    10.8   290.8   5186.0
3  202301011202  108    4.3    1.7    10.8   290.8   5186.0
4  202301011203  108    4.3    1.7    10.8   290.8   5186.0

데이터 크기: (721, 7)
컬럼: ['YYMMDDHHMI', 'STN', 'SI_MI', 'SI_HR', 'SI_DAY', 'SI_MON', 'SI_YEAR']

파싱 핵심 로직:

  1. #START7777#7777END 사이만 추출
  2. #로 시작하는 헤더/주석 라인 건너뛰기
  3. 숫자로 시작하는 데이터 라인만 수집
  4. 라인 끝의 = 제거 (rstrip('='))
  5. 콤마로 구분된 CSV로 파싱

2단계: 클래스화 및 데이터 정제

앞으로 여러 지역, 여러 날짜의 데이터를 반복적으로 가져오려면 코드를 재사용하기 쉽게 클래스(Class)로 만드는 것이 현명합니다. 이 클래스 안에 데이터 타입을 변환하고, 기본적인 이상치를 제거하는 로직까지 넣어보겠습니다.

import pandas as pd
import requests
import io
from datetime import datetime, timedelta
import time
from tqdm import tqdm

class KMASolarCollector:
    """기상청 일사량 데이터 수집 및 정제 클래스"""

    def __init__(self, api_key):
        self.api_key = api_key
        self.base_url = "https://apihub.kma.go.kr/api/typ01/cgi-bin/url/nph-sun_sfc_sts_pkg"

    def fetch_kma_solar_data(self, stn, tm1, tm2, mode="si", disp="1"):
        """기상청 관측-통계 묶음형 API 데이터 요청 및 파싱"""
        params = {
            'authKey': self.api_key,
            'stn': stn,
            'tm1': tm1,
            'tm2': tm2,
            'mode': mode,  # si: 일사량, ss: 일조시간
            'help': '1',   # 헤더/단위 설명 포함
            'disp': disp   # 현재 미작동 (항상 고정폭 텍스트)
        }

        try:
            response = requests.get(self.base_url, params=params, timeout=30)
            response.raise_for_status()
            content = response.text

            # '#START7777'~'#7777END' 사이의 데이터만 추출
            start_idx = content.find('#START7777')
            end_idx = content.find('#7777END')

            if start_idx == -1 or end_idx == -1:
                print("데이터 범위를 찾을 수 없습니다.")
                return pd.DataFrame()

            data_section = content[start_idx:end_idx]
            lines = data_section.split('\n')

            # 파싱할 데이터 라인 추출
            data_lines = []
            header_found = False

            for line in lines:
                # 헤더 라인 건너뛰기 (# 또는 ## 또는 #= 등으로 시작)
                if line.startswith('#') or line.strip() == '':
                    continue

                # 첫 자리가 숫자로 시작하는 데이터 라인
                if line and line[0].isdigit():
                    # 라인 끝의 '=' 제거
                    line_cleaned = line.rstrip('=').rstrip(',').strip()
                    if line_cleaned:
                        data_lines.append(line_cleaned)

            if not data_lines:
                print("데이터 라인을 찾을 수 없습니다.")
                return pd.DataFrame()

            # CSV 포맷으로 변환 및 파싱
            csv_text = '\n'.join(data_lines)
            df = pd.read_csv(io.StringIO(csv_text), header=None, sep=',')

            df = df.dropna(axis=1, how='all')  # 빈 컬럼 제거

            # 컬럼 이름 재정의 (일사 모드 기준)
            if mode == "si":
                df.columns = ['YYMMDDHHMI', 'STN', 'SI_MI', 'SI_HR', 'SI_DAY', 'SI_MON', 'SI_YEAR']
            elif mode == "ss":
                # 일조 모드: 컬럼명 조정 가능
                df.columns = ['YYMMDDHHMI', 'STN', 'SUM_SS_MI', 'SS_HR', 'SUM_SS_DAY', 'SUM_SS_HR_DAY', 'SUM_SS_HR_YEAR', 'SSRATE_MON', 'SSRATE_YEAR']

            return df

        except requests.exceptions.RequestException as e:
            print(f"API 요청 실패: {e}")
            return pd.DataFrame()

    def _clean_data(self, df):
        """데이터 타입 변환 및 기본 정제"""
        if df.empty:
            return df

        # 날짜/시간 타입 변환
        df['YYMMDDHHMI'] = pd.to_datetime(df['YYMMDDHHMI'], format='%Y%m%d%H%M')

        # 숫자 타입 변환 (오류 발생 시 NaN으로 처리)
        numeric_cols = ['STN', 'SI_MI', 'SI_HR', 'SI_DAY', 'SI_MON', 'SI_YEAR']
        for col in numeric_cols:
            df[col] = pd.to_numeric(df[col], errors='coerce')

        # 이상치 처리 (-9와 같은 결측치 코드를 NaN으로)
        # 기상청 데이터는 종종 -9, -99 등을 결측치 코드로 사용합니다.
        df.replace(-9.0, pd.NA, inplace=True)
        df.replace(-99.0, pd.NA, inplace=True)

        # 물리적으로 불가능한 음수 일사량 제거
        df.loc[df['SI_MI'] 



실행

collector = KMASolarCollector(API_KEY)
df_seoul_clean = collector.get_clean_data('108', '202301010000', '202301012359')

print("\n=== 정제된 데이터 ===")
print(df_seoul_clean.head())
print("\n정제 후 데이터 타입:")
df_seoul_clean.info()

결과

결측치 확인:
YYMMDDHHMI    0
STN           0
SI_MI         0
SI_HR         0
SI_DAY        0
SI_MON        0
SI_YEAR       0
dtype: int64

=== 정제된 데이터 ===
           YYMMDDHHMI  STN  SI_MI  SI_HR  SI_DAY  SI_MON  SI_YEAR
0 2023-01-01 11:59:00  108    4.2    1.4    10.8   290.8   5186.0
1 2023-01-01 12:00:00  108    4.2    1.7    10.8   290.8   5186.0
2 2023-01-01 12:01:00  108    4.2    1.7    10.8   290.8   5186.0
3 2023-01-01 12:02:00  108    4.3    1.7    10.8   290.8   5186.0
4 2023-01-01 12:03:00  108    4.3    1.7    10.8   290.8   5186.0

정제 후 데이터 타입:

RangeIndex: 721 entries, 0 to 720
Data columns (total 7 columns):
 #   Column      Non-Null Count  Dtype         
---  ------      --------------  -----         
 0   YYMMDDHHMI  721 non-null    datetime64[ns]
 1   STN         721 non-null    int64         
 2   SI_MI       721 non-null    float64       
 3   SI_HR       721 non-null    float64       
 4   SI_DAY      721 non-null    float64       
 5   SI_MON      721 non-null    float64       
 6   SI_YEAR     721 non-null    float64       
dtypes: datetime64[ns](1), float64(5), int64(1)
memory usage: 39.6 KB

클래스로 코드를 캡슐화하고 데이터 정제 파이프라인을 구축했습니다. 이제 대량의 데이터를 안정적으로 수집할 준비가 끝났습니다.

3단계: 1년치 데이터 자동 수집하기

이제 이 클래스를 활용해 1년치 데이터를 쓸어 담아 보겠습니다. 기상청 API는 과도한 요청을 막기 위해 시간당 요청 횟수 제한(Rate Limiting)이 있습니다. 따라서 각 요청 사이에 time.sleep()으로 짧은 지연시간을 주는 것이 매우 중요합니다.

# KMASolarCollector 클래스에 아래 메소드 추가

def collect_yearly_data(self, stn, year):
    """특정 연도의 1년치 데이터를 월별로 수집"""
    all_data = []

    for month in tqdm(range(1, 13), desc=f"지점 {stn} ({year}년) 수집 중"):
        start_date = datetime(year, month, 1)
        # 월의 마지막 날 계산
        if month == 12:
            end_date = datetime(year + 1, 1, 1) - timedelta(days=1)
        else:
            end_date = datetime(year, month + 1, 1) - timedelta(days=1)

        tm1 = start_date.strftime('%Y%m%d%H%M')
        tm2 = end_date.strftime('%Y%m%d%H%M')

        monthly_df = self.get_clean_data(stn, tm1, tm2)
        if not monthly_df.empty:
            all_data.append(monthly_df)

        # API 서버 부하를 줄이기 위해 요청 사이에 0.5초 대기
        time.sleep(0.5)

    if not all_data:
        return pd.DataFrame()

    yearly_df = pd.concat(all_data, ignore_index=True)
    print(f"\n{year}년 지점 {stn} 데이터 총 {len(yearly_df)}개 수집 완료!")
    return yearly_df

실행

collector = KMASolarCollector(API_KEY)
df_2023_seoul = collector.collect_yearly_data('108', 2023)

print("\n=== 2023년 서울 연간 데이터 샘플 ===")
print(df_2023_seoul.tail())

# 수집한 데이터를 파일로 저장
df_2023_seoul.to_csv('kma_solar_2023_seoul.csv', index=False)
print("\n데이터를 'kma_solar_2023_seoul.csv' 파일로 저장했습니다.")

tqdm 라이브러리가 제공하는 진행률 표시줄을 보며 커피 한 잔 하고 오면, 1년치 데이터가 csv 파일로 깔끔하게 저장되어 있을 겁니다.

결과

결측치 확인:
YYMMDDHHMI    0
STN           0
SI_MI         0
SI_HR         0
SI_DAY        0
SI_MON        0
SI_YEAR       0
dtype: int64
지점 108 (2023년) 수집 중:  25%|██▌       | 3/12 [00:47

4단계: 데이터 가공 및 AI 모델링

데이터 가공

데이터를 모았으니 이제 분석하고 예측해볼 시간입니다. 여기서 중요한 포인트가 있습니다.
우리가 수집한 SI_MI‘누적’ 데이터입니다. 이를 그대로 학습하면 안 되고, 변화량(미분)을 구해서 ‘순간’ 데이터로 만들어야 합니다.

import pandas as pd
import matplotlib.pyplot as plt

# 1) 데이터 불러오기
df = pd.read_csv('kma_solar_2023_seoul.csv')

# 2) YYMMDDHHMI를 datetime으로 변환
df['YYMMDDHHMI'] = pd.to_datetime(df['YYMMDDHHMI'], errors='coerce')
df = df.set_index('YYMMDDHHMI')

# --- [핵심 수정 파트 시작] ---
# 3) '누적' 데이터를 '순간' 데이터로 변환
# SI_MI는 0시부터 쌓이는 누적값이므로, 분 단위 변화량을 구해야 실제 1분간 내리쬔 햇빛 양이 됨
df['Solar_Min_Flux'] = df['SI_MI'].diff()

# 4) 자정(00:00) 초기화로 인한 음수 값 처리
# 날짜가 바뀌면 누적값이 0으로 리셋되므로 diff가 큰 음수가 됨 -> 0으로 처리
df.loc[df['Solar_Min_Flux'] 



결과

=== 시간 단위 리샘플링 후 데이터 (MJ/m²) ===
                     SI_Hourly_Sum
YYMMDDHHMI                        
2023-01-30 12:00:00            2.2
2023-01-30 13:00:00            2.3
2023-01-30 14:00:00            1.9
2023-01-30 15:00:00            1.5
2023-01-30 16:00:00            0.8

AI 모델링

데이터 불러오기 및 전처리
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from sklearn.preprocessing import MinMaxScaler

# -----------------------------------------------------------------------------
# 1. 데이터 불러오기 및 전처리 (Pandas 부분)
# -----------------------------------------------------------------------------
# 1) 데이터 불러오기
df = pd.read_csv('kma_solar_2023_seoul.csv')

# 2) 시간 인덱스 설정
df['YYMMDDHHMI'] = pd.to_datetime(df['YYMMDDHHMI'], errors='coerce')
df = df.set_index('YYMMDDHHMI')

# --- [수정된 핵심 로직] ---
# 3) SI_MI 변환: '누적' 데이터를 '순간' 데이터로 변환
# SI_MI는 0시부터 계속 쌓이는 값이므로, 1분 간의 변화량(diff)을 구해야 실제 에너지량이 됨
df['SI_Step'] = df['SI_MI'].diff()

# 4) 자정(00:00) 초기화로 인한 음수 값 처리
# 날짜가 바뀌면 누적값이 0으로 리셋되면서 diff가 큰 음수가 나옴 -> 0으로 처리
df.loc[df['SI_Step'] 



결과

=== 시간 단위 리샘플링(Sum) 및 전처리 후 데이터 ===
                     SI_MI
YYMMDDHHMI                
2023-01-30 12:00:00    2.2
2023-01-30 13:00:00    2.3
2023-01-30 14:00:00    1.9
2023-01-30 15:00:00    1.5
2023-01-30 16:00:00    0.8

단위: MJ/m² (시간당 누적 일사량)
데이터셋 준비
# -----------------------------------------------------------------------------
# 2. 데이터셋 준비 (Numpy & Scaling)
# -----------------------------------------------------------------------------
data = df_hourly['SI_MI'].values.reshape(-1, 1)

# MinMax 스케일링 (0~1 사이로 정규화)
scaler = MinMaxScaler(feature_range=(0, 1))
scaled_data = scaler.fit_transform(data)

# 학습/테스트 분할 (80:20)
train_size = int(len(scaled_data) * 0.8)
train_data = scaled_data[:train_size]
test_data = scaled_data[train_size:]

# 시퀀스 데이터 생성 함수 (Sliding Window)
def create_sequences(data, seq_length):
    xs = []
    ys = []
    for i in range(len(data) - seq_length):
        x = data[i:(i + seq_length)]
        y = data[i + seq_length]
        xs.append(x)
        ys.append(y)
    return np.array(xs), np.array(ys)

SEQ_LENGTH = 24  # 24시간 데이터를 보고 다음 1시간 예측

# Numpy 배열 생성
X_train, y_train = create_sequences(train_data, SEQ_LENGTH)
X_test, y_test = create_sequences(test_data, SEQ_LENGTH)

# Tensor로 변환
X_train = torch.from_numpy(X_train).float()
y_train = torch.from_numpy(y_train).float()
X_test = torch.from_numpy(X_test).float()
y_test = torch.from_numpy(y_test).float()
데이터로더
# -----------------------------------------------------------------------------
# 3. PyTorch Dataset & DataLoader
# -----------------------------------------------------------------------------
class SolarDataset(Dataset):
    def __init__(self, X, y):
        self.X = X
        self.y = y

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

train_dataset = SolarDataset(X_train, y_train)
test_dataset = SolarDataset(X_test, y_test)

BATCH_SIZE = 64
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)
LSTM 모델 정의
# -----------------------------------------------------------------------------
# 4. LSTM 모델 정의 (PyTorch)
# -----------------------------------------------------------------------------
class SolarLSTM(nn.Module):
    def __init__(self, input_size=1, hidden_size=50, num_layers=2, output_size=1):
        super(SolarLSTM, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers

        # LSTM Layer
        # batch_first=True -> (batch, seq, feature)
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)

        # Fully Connected Layer
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        # x shape: (batch_size, seq_length, input_size)
        # 초기 hidden state와 cell state는 0으로 자동 초기화됨 (명시하지 않아도 됨)

        # LSTM 출력
        # out shape: (batch_size, seq_length, hidden_size)
        out, _ = self.lstm(x)

        # 마지막 타임스텝의 hidden state만 사용
        # out[:, -1, :] shape: (batch_size, hidden_size)
        out = out[:, -1, :]

        # 최종 예측
        out = self.fc(out)
        return out

# 모델 초기화
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = SolarLSTM().to(device)

# 손실함수와 옵티마이저
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
모델 학습
# -----------------------------------------------------------------------------
# 5. 모델 학습 (Training Loop)
# -----------------------------------------------------------------------------
NUM_EPOCHS = 10
train_losses = []

print("=== 학습 시작 ===")
model.train()
for epoch in range(NUM_EPOCHS):
    epoch_loss = 0
    for X_batch, y_batch in train_loader:
        X_batch, y_batch = X_batch.to(device), y_batch.to(device)

        # Forward
        outputs = model(X_batch)
        loss = criterion(outputs, y_batch)

        # Backward & Optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        epoch_loss += loss.item()

    avg_loss = epoch_loss / len(train_loader)
    train_losses.append(avg_loss)
    print(f'Epoch [{epoch+1}/{NUM_EPOCHS}], Loss: {avg_loss:.6f}')

결과

=== 학습 시작 ===
Epoch [1/10], Loss: 0.000298
Epoch [2/10], Loss: 0.000290
Epoch [3/10], Loss: 0.000283
Epoch [4/10], Loss: 0.000278
Epoch [5/10], Loss: 0.000276
Epoch [6/10], Loss: 0.000274
Epoch [7/10], Loss: 0.000272
Epoch [8/10], Loss: 0.000271
Epoch [9/10], Loss: 0.000270
Epoch [10/10], Loss: 0.000269
예측 및 평가
# -----------------------------------------------------------------------------
# 6. 예측 및 평가
# -----------------------------------------------------------------------------
model.eval()
with torch.no_grad():
    # 학습 데이터 구간 예측
    train_predictions = model(X_train.to(device)).cpu().numpy()
    # 테스트 데이터 구간 예측
    test_predictions = model(X_test.to(device)).cpu().numpy()

# 스케일링 역변환 (원래 단위인 MJ/m²로 복구)
train_predict = scaler.inverse_transform(train_predictions)
test_predict = scaler.inverse_transform(test_predictions)
y_train_actual = scaler.inverse_transform(y_train.numpy())
y_test_actual = scaler.inverse_transform(y_test.numpy())

결과

==================================================
모델 평가 결과
==================================================
Train MSE: 0.0387 | RMSE: 0.1968 | MAE: 0.0501
Test  MSE: 0.0074 | RMSE: 0.0860 | MAE: 0.0459
==================================================

5. 예측 및 평가: 숫자의 함정과 진실

모델 학습이 완료되었습니다. 성적표를 확인해볼까요?
수치만 보면 매우 훌륭해 보이지만, 여기에는 태양광 데이터 특유의 통계적 함정이 숨어 있습니다. 데이터 과학자의 눈으로 냉정하게 분석해 보겠습니다.

구분MSERMSEMAE
Train0.03870.19680.0501
Test0.00740.08600.0459
1) 표면적 해석: “완벽에 가까운 예측?”

일단 숫자 자체는 매우 고무적입니다.

  • 압도적인 정확도: 테스트 구간의 RMSE가 0.0860에 불과합니다. 시간당 최대 일사량이 보통 2.0~3.0 MJ/m² 내외임을 감안하면, 오차율이 매우 미미합니다.
  • 안정적인 추세: MAE(0.0459) 역시 매우 낮아, 모델이 급격한 튀는 값(Outlier) 없이 실제 패턴을 부드럽게 따라가고 있음을 보여줍니다.
  • 과적합(Overfitting) 없음: 오히려 학습 데이터(Train)보다 테스트 데이터(Test)에서 성능이 더 좋게 나왔으므로, 과적합 걱정은 없습니다.

이 정도라면 당장이라도 ESS 충전 계획 수립이나 전력 거래 입찰에 투입해도 될 것만 같습니다.

2) 심층 분석: “왜 Test 오차가 더 낮을까?” (Nighttime Effect)

하지만 여기서 의문이 들어야 합니다. “보통은 안 본 문제(Test)를 더 많이 틀려야 정상 아닌가?”
왜 Test 셋의 오차가 절반 이하로 뚝 떨어졌을까요? 모델이 갑자기 똑똑해진 걸까요?

아닙니다. 여기엔 데이터의 편향(Bias)이 숨어 있습니다.

  1. 밤의 비중 (Zero-Inflation): 태양광 데이터의 절반은 ‘밤(0)’입니다. 모델 입장에서 밤 시간대의 0을 맞추는 건 누워서 떡 먹기입니다.
  2. 계절의 비밀:
  • Train (1~9월): 여름과 장마철이 포함되어 있습니다. 구름, 비, 태풍 등 날씨 변동이 심해 예측이 어렵습니다.
  • Test (10~12월): 가을~겨울입니다. 해가 짧아 ‘밤(0)’인 시간이 길고, 날씨가 상대적으로 맑고 건조하여 일사량 패턴이 단순합니다.
  1. 결론: 즉, “쉬운 문제(0 맞추기)”의 비중이 늘어나서 평균 점수가 올라간 착시 현상입니다.

데이터 편향에 대한 더 자세한 내용은 AI가 차별하는 충격적 이유 – 데이터 편향과 공정성의 모든 것 울 참고하세요!

3) 최종 진단 및 개선 방향 (Next Level)

낮은 RMSE(0.0860)는 밤과 낮을 모두 합친 평균입니다. 실제 발전이 이루어지는 낮 시간대(Daytime)만의 오차는 이보다 분명 클 것입니다.

하지만 실망할 필요는 없습니다. 모델이 기본적인 일사량의 주기와 추세는 확실하게 학습했다는 뜻이니까요. 이 모델을 ‘진짜 고성능 AI’로 업그레이드하려면 다음 스텝이 필요합니다.

🚀 더 정확한 모델을 위한 팁

  1. Daytime Only 검증: 해가 떠 있는 시간(일사량 > 0)의 데이터만 골라서 RMSE를 다시 계산해 보세요. 그게 진짜 실력입니다.
  2. 다변량(Multivariate) 모델링: 지금은 ‘과거의 태양’만 보고 미래를 맞췄습니다. 여기에 기상청의 전운량(구름), 습도, 기온 데이터를 피처(Feature)로 추가해 보세요. 구름에 가려지는 순간까지 잡아내는 무서운 녀석이 될 겁니다.

결과 시각화

# -----------------------------------------------------------------------------
# 7. 결과 시각화
# -----------------------------------------------------------------------------
# 인덱스 매칭
# Train 구간: SEQ_LENGTH 시점부터 예측 시작
train_target_idx = df_hourly.index[SEQ_LENGTH : SEQ_LENGTH + len(train_predict)]

# Test 구간: train_size + SEQ_LENGTH 시점부터 예측 시작
test_target_start = train_size + SEQ_LENGTH
test_target_idx = df_hourly.index[test_target_start : test_target_start + len(test_predict)]

# Series 생성
train_pred_series = pd.Series(train_predict.flatten(), index=train_target_idx)
test_pred_series = pd.Series(test_predict.flatten(), index=test_target_idx)
actual_series = df_hourly['SI_MI']

plt.style.use('seaborn-v0_8-whitegrid')
# 폰트 설정 (앞 단계에서 설치한 나눔바른고딕 적용)
plt.rc('font', family='NanumBarunGothic')
plt.rcParams['axes.unicode_minus'] = False

plt.figure(figsize=(16, 6))

# 전체 실제값
plt.plot(actual_series.index, actual_series.values, label='실제 일사량', color='steelblue', alpha=0.4)

# 학습 구간 예측
plt.plot(train_pred_series.index, train_pred_series.values,
         label='LSTM 예측 (Train)', color='orange', linewidth=1.5, alpha=0.8)

# 테스트 구간 예측 (이게 진짜 성능)
plt.plot(test_pred_series.index, test_pred_series.values,
         label='LSTM 예측 (Test)', color='red', linewidth=1.5)

# 구분선
split_time = df_hourly.index[train_size]
plt.axvline(split_time, color='gray', linestyle='--', linewidth=1, label='Train/Test 분할')

plt.title('서울 2023년 시간별 일사량 예측 (PyTorch LSTM)', fontsize=14)
plt.xlabel('날짜')
plt.ylabel('일사량 (MJ/m²)')
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()

Note: AI 모델 코드는 개념 증명을 위한 간단한 예시이며, 실제 프로덕션 환경에서는 더 정교한 모델 구조와 하이퍼파라미터 튜닝이 필요합니다.

결론: 데이터, 줍는 사람이 임자다

이제 여러분은 기상청의 강력한 ‘관측-통계 묶음형’ API를 자유자재로 다루는 능력을 얻었습니다. 오늘 배운 내용을 응용하면 전국 태양광 발전소의 시간별 발전량을 예측하는 시스템을 만들 수도, 지역별 농작물 재배 최적기를 분석할 수도 있습니다.

오늘의 핵심 요약:

  • requestspandas로 텍스트 API를 손쉽게 파싱한다.
  • Class로 코드를 구조화하여 재사용성을 높인다.
  • time.sleep()을 활용해 서버에 부담을 주지 않고 대용량 데이터를 안정적으로 수집한다.
  • diff(미분)sum(합계)으로 누적 데이터를 순간 데이터로 정확히 변환한다.
  • LSTM으로 예측 모델을 만들고, 결과의 통계적 함정까지 간파한다.

데이터는 더 이상 어려운 학문이 아니라, 문제를 해결하는 ‘도구’입니다. 기상청이 제공하는 이 강력한 도구를 활용해 여러분만의 인사이트를 발굴해보세요. 지금 바로 시작하지 않을 이유가 없습니다.


같이 보기

외부 참고 자료

답글 남기기

이메일 주소는 공개되지 않습니다. 필수 필드는 *로 표시됩니다