Dapatkan data cuaca dari cuaca bawah tanah

import json
import threading
from datetime import timedelta

import pandas as pd
import pendulum
import requests


class WeatherUnderground:
    """ WeatherUnderground class """

    def __init__(self, city, api_key):
        self.city = city
        self.api_key = api_key

    def get_weather_by_date_range_threaded(self, _start, _end):
        """ Get weather data by date range
        :param _start: start date
        :param _end: end date
        """
        _end_time, _valid_time = self.get_end_and_start_time(_end, _start)
        data, worker = self.create_worker()
        _df = self.get_data(_end_time, _valid_time, data, worker)
        _df = self.clean_data(_df)
        _df = self.prepare_data(_df)
        return _df

    @staticmethod
    def prepare_data(_df):
        _df = _df.sort_values(by="valid_time_gmt")  # Sort dataframe by valid time
        _df["datetime"] = pd.to_datetime(_df["valid_time_gmt"] * 1e9) + timedelta(hours=7)
        _df = _df.drop(columns=["valid_time_gmt"])  # Drop valid time column
        _df = _df.set_index("datetime")             # Set index to datetime
        _df = _df.rename(columns={'temp': 'temp_wu', 'dewPt': 'dew_point'})
        _df['temp_wu'] = (_df['temp_wu'] - 32) * 5 / 9      # Convert temperature to Celsius
        _df['dew_point'] = (_df['dew_point'] - 32) * 5 / 9  # Convert dew point to Celsius
        return _df

    def get_data(self, _end_time, _valid_time, data, worker):
        """ This method used to get data from API """
        num_th = 20  # Number of threads to run
        while _end_time > _valid_time:

            threads = []  # Initialize threads list
            for i in range(num_th):
                if _end_time <= _valid_time:  # If end time is less than valid time, break
                    break

                dt = _valid_time.strftime("%Y%m%d")  # Convert valid time to string
                t = threading.Thread(target=worker, args=(dt, self.city, self.api_key))  # Create thread
                threads.append(t)  # Add thread to thread list
                _valid_time = _valid_time.add(days=1)

            for t in threads:
                t.start()  # Start threads
            for t in threads:
                t.join()  # Join threads
        _df = pd.concat(data.values())  # Concatenate dataframes
        return _df

    @staticmethod
    def create_worker():
        """ Create worker function. """
        data = {}  # Initialize data dictionary

        def worker(_dt, _city, _api_key):
            a = requests.get(
                f'https://api.weather.com/v1/location/{_city}/observations/historical.json?apiKey={_api_key}&units=e'
                f'&startDate={_dt}')

            b = json.loads(a.text)  # Convert json to dictionary
            c = pd.DataFrame(b['observations'])  # Convert dictionary to dataframe
            data.update({_dt: c})  # Update data dictionary

        return data, worker

    def get_end_and_start_time(self, _end, _start):
        _valid_time = self.convert_date_string_to_datetime(_start)
        _end_time = self.convert_date_string_to_datetime(_end)
        return _end_time, _valid_time

    @staticmethod
    def clean_data(_df):
        """ Clean data. """
        columns = ['key', 'class', 'expire_time_gmt', 'obs_id', 'obs_name']
        _df = _df.drop(columns=columns)  # Drop irrelevant columns
        _df = _df.dropna(axis=1, thresh=int(len(_df) * 0.9))  # Drop columns with more than 90% of data as null.
        return _df

    @staticmethod
    def convert_date_string_to_datetime(date_string):
        """ Convert date string to datetime object.
        :param date_string: Date string in format YYYY-MM-DD
        :return: datetime object
        """
        return pendulum.parse(date_string, tz='Asia/Bangkok')


if __name__ == "__main__":
    start = "2019-01-01"
    end = "2019-01-31"
    wu = WeatherUnderground("YSCB:9:AU", "YOUR_API_KEY")
    df = wu.get_weather_by_date_range_threaded(start, end)
    print(df)
Ugly Unicorn