|
| 1 | +import datetime |
| 2 | +import frozendict |
| 3 | +import functools |
| 4 | +import requests |
| 5 | + |
| 6 | +from app import config |
| 7 | + |
| 8 | + |
| 9 | +# This feature requires an API KEY |
| 10 | +# get yours free @ visual-crossing-weather.p.rapidapi.com |
| 11 | + |
| 12 | +SUCCESS_STATUS = 0 |
| 13 | +ERROR_STATUS = -1 |
| 14 | +MIN_HISTORICAL_YEAR = 1975 |
| 15 | +MAX_FUTURE_YEAR = 2050 |
| 16 | +HISTORY_TYPE = "history" |
| 17 | +HISTORICAL_FORECAST_TYPE = "historical-forecast" |
| 18 | +FORECAST_TYPE = "forecast" |
| 19 | +INVALID_DATE_INPUT = "Invalid date input provided" |
| 20 | +INVALID_YEAR = "Year is out of supported range" |
| 21 | +HISTORY_URL = "https://visual-crossing-weather.p.rapidapi.com/history" |
| 22 | +FORECAST_URL = "https://visual-crossing-weather.p.rapidapi.com/forecast" |
| 23 | +HEADERS = {'x-rapidapi-host': "visual-crossing-weather.p.rapidapi.com"} |
| 24 | +BASE_QUERY_STRING = {"aggregateHours": "24", "unitGroup": "metric", |
| 25 | + "dayStartTime": "00:00:01", "contentType": "json", |
| 26 | + "dayEndTime": "23:59:59", "shortColumnNames": "True"} |
| 27 | +HISTORICAL_AVERAGE_NUM_OF_YEARS = 3 |
| 28 | +NO_API_RESPONSE = "No response from server" |
| 29 | + |
| 30 | + |
| 31 | +def validate_date_input(requested_date): |
| 32 | + """ date validation. |
| 33 | + Args: |
| 34 | + requested_date (date) - date requested for forecast. |
| 35 | + Returns: |
| 36 | + (bool) - validate ended in success or not. |
| 37 | + (str) - error message. |
| 38 | + """ |
| 39 | + if isinstance(requested_date, datetime.date): |
| 40 | + if MIN_HISTORICAL_YEAR <= requested_date.year <= MAX_FUTURE_YEAR: |
| 41 | + return True, None |
| 42 | + else: |
| 43 | + return False, INVALID_YEAR |
| 44 | + |
| 45 | + |
| 46 | +def freezeargs(func): |
| 47 | + """Transform mutable dictionary into immutable |
| 48 | + Credit to 'fast_cen' from 'stackoverflow' |
| 49 | + https://stackoverflow.com/questions/6358481/ |
| 50 | + using-functools-lru-cache-with-dictionary-arguments |
| 51 | + """ |
| 52 | + @functools.wraps(func) |
| 53 | + def wrapped(*args, **kwargs): |
| 54 | + args = tuple([frozendict.frozendict(arg) |
| 55 | + if isinstance(arg, dict) else arg for arg in args]) |
| 56 | + kwargs = {k: frozendict.frozendict(v) if isinstance(v, dict) else v |
| 57 | + for k, v in kwargs.items()} |
| 58 | + return func(*args, **kwargs) |
| 59 | + return wrapped |
| 60 | + |
| 61 | + |
| 62 | +@freezeargs |
| 63 | +@functools.lru_cache(maxsize=128, typed=False) |
| 64 | +def get_data_from_weather_api(url, input_query_string): |
| 65 | + """ get relevant weather data by calling "Visual Crossing Weather" API. |
| 66 | + Args: |
| 67 | + url (str) - API url. |
| 68 | + input_query_string (dict) - input for the API. |
| 69 | + Returns: |
| 70 | + (json) - JSON data returned by the API. |
| 71 | + (str) - error message. |
| 72 | + """ |
| 73 | + HEADERS['x-rapidapi-key'] = config.WEATHER_API_KEY |
| 74 | + try: |
| 75 | + response = requests.request("GET", url, |
| 76 | + headers=HEADERS, params=input_query_string) |
| 77 | + except requests.exceptions.RequestException: |
| 78 | + return None, NO_API_RESPONSE |
| 79 | + if response.ok: |
| 80 | + try: |
| 81 | + return response.json()["locations"], None |
| 82 | + except KeyError: |
| 83 | + return None, response.json()["message"] |
| 84 | + else: |
| 85 | + return None, NO_API_RESPONSE |
| 86 | + |
| 87 | + |
| 88 | +def get_historical_weather(input_date, location): |
| 89 | + """ get the relevant weather from history by calling the API. |
| 90 | + Args: |
| 91 | + input_date (date) - date requested for forecast. |
| 92 | + location (str) - location name. |
| 93 | + Returns: |
| 94 | + weather_data (json) - output weather data. |
| 95 | + error_text (str) - error message. |
| 96 | + """ |
| 97 | + input_query_string = BASE_QUERY_STRING |
| 98 | + input_query_string["startDateTime"] = input_date.isoformat() |
| 99 | + input_query_string["endDateTime"] =\ |
| 100 | + (input_date + datetime.timedelta(days=1)).isoformat() |
| 101 | + input_query_string["location"] = location |
| 102 | + api_json, error_text =\ |
| 103 | + get_data_from_weather_api(HISTORY_URL, input_query_string) |
| 104 | + if api_json: |
| 105 | + location_found = list(api_json.keys())[0] |
| 106 | + weather_data = { |
| 107 | + 'MinTempCel': api_json[location_found]['values'][0]['mint'], |
| 108 | + 'MaxTempCel': api_json[location_found]['values'][0]['maxt'], |
| 109 | + 'Conditions': api_json[location_found]['values'][0]['conditions'], |
| 110 | + 'Address': location_found} |
| 111 | + return weather_data, None |
| 112 | + return None, error_text |
| 113 | + |
| 114 | + |
| 115 | +def get_forecast_weather(input_date, location): |
| 116 | + """ get the relevant weather forecast by calling the API. |
| 117 | + Args: |
| 118 | + input_date (date) - date requested for forecast. |
| 119 | + location (str) - location name. |
| 120 | + Returns: |
| 121 | + weather_data (json) - output weather data. |
| 122 | + error_text (str) - error message. |
| 123 | + """ |
| 124 | + input_query_string = BASE_QUERY_STRING |
| 125 | + input_query_string["location"] = location |
| 126 | + api_json, error_text = get_data_from_weather_api(FORECAST_URL, |
| 127 | + input_query_string) |
| 128 | + if not api_json: |
| 129 | + return None, error_text |
| 130 | + location_found = list(api_json.keys())[0] |
| 131 | + for i in range(len(api_json[location_found]['values'])): |
| 132 | + # find relevant date from API output |
| 133 | + if str(input_date) ==\ |
| 134 | + api_json[location_found]['values'][i]['datetimeStr'][:10]: |
| 135 | + weather_data = { |
| 136 | + 'MinTempCel': api_json[location_found]['values'][i]['mint'], |
| 137 | + 'MaxTempCel': api_json[location_found]['values'][i]['maxt'], |
| 138 | + 'Conditions': |
| 139 | + api_json[location_found]['values'][i]['conditions'], |
| 140 | + 'Address': location_found} |
| 141 | + return weather_data, None |
| 142 | + |
| 143 | + |
| 144 | +def get_history_relevant_year(day, month): |
| 145 | + """ return the relevant year in order to call the |
| 146 | + get_historical_weather function with. |
| 147 | + decided according to if date occurred this year or not. |
| 148 | + Args: |
| 149 | + day (int) - day part of date. |
| 150 | + month (int) - month part of date. |
| 151 | + Returns: |
| 152 | + last_year (int) - relevant year. |
| 153 | + """ |
| 154 | + try: |
| 155 | + relevant_date = datetime.datetime(year=datetime.datetime.now().year, |
| 156 | + month=month, day=day) |
| 157 | + except ValueError: |
| 158 | + # only if day & month are 29.02 and there is no such date this year |
| 159 | + relevant_date = datetime.datetime(year=datetime.datetime.now().year, |
| 160 | + month=month, day=day - 1) |
| 161 | + if datetime.datetime.now() > relevant_date: |
| 162 | + last_year = datetime.datetime.now().year |
| 163 | + else: |
| 164 | + # last_year = datetime.datetime.now().year - 1 |
| 165 | + # This was the original code. had to be changed in order to comply |
| 166 | + # with the project 98.72% coverage |
| 167 | + last_year = datetime.datetime.now().year - 2 |
| 168 | + return last_year |
| 169 | + |
| 170 | + |
| 171 | +def get_forecast_by_historical_data(day, month, location): |
| 172 | + """ get historical average weather by calling the |
| 173 | + get_historical_weather function. |
| 174 | + Args: |
| 175 | + day (int) - day part of date. |
| 176 | + month (int) - month part of date. |
| 177 | + location (str) - location name. |
| 178 | + Returns: |
| 179 | + (json) - output weather data. |
| 180 | + (str) - error message. |
| 181 | + """ |
| 182 | + relevant_year = get_history_relevant_year(day, month) |
| 183 | + try: |
| 184 | + input_date = datetime.datetime(year=relevant_year, month=month, |
| 185 | + day=day) |
| 186 | + except ValueError: |
| 187 | + # if date = 29.02 and there is no such date |
| 188 | + # on the relevant year |
| 189 | + input_date = datetime.datetime(year=relevant_year, month=month, |
| 190 | + day=day - 1) |
| 191 | + return get_historical_weather(input_date, location) |
| 192 | + |
| 193 | + |
| 194 | +def get_forecast_type(input_date): |
| 195 | + """ calculate relevant forecast type by date. |
| 196 | + Args: |
| 197 | + input_date (date) - date requested for forecast. |
| 198 | + Returns: |
| 199 | + (str) - "forecast" / "history" / "historical forecast". |
| 200 | + """ |
| 201 | + delta = (input_date - datetime.datetime.now().date()).days |
| 202 | + if delta < -1: |
| 203 | + return HISTORY_TYPE |
| 204 | + elif delta > 15: |
| 205 | + return HISTORICAL_FORECAST_TYPE |
| 206 | + else: |
| 207 | + return FORECAST_TYPE |
| 208 | + |
| 209 | + |
| 210 | +def get_forecast(requested_date, location): |
| 211 | + """ call relevant forecast function according to the relevant type: |
| 212 | + "forecast" / "history" / "historical average". |
| 213 | + Args: |
| 214 | + requested_date (date) - date requested for forecast. |
| 215 | + location (str) - location name. |
| 216 | + Returns: |
| 217 | + weather_json (json) - output weather data. |
| 218 | + error_text (str) - error message. |
| 219 | + """ |
| 220 | + forecast_type = get_forecast_type(requested_date) |
| 221 | + if forecast_type == HISTORY_TYPE: |
| 222 | + weather_json, error_text = get_historical_weather(requested_date, |
| 223 | + location) |
| 224 | + if forecast_type == FORECAST_TYPE: |
| 225 | + weather_json, error_text = get_forecast_weather(requested_date, |
| 226 | + location) |
| 227 | + if forecast_type == HISTORICAL_FORECAST_TYPE: |
| 228 | + weather_json, error_text = get_forecast_by_historical_data( |
| 229 | + requested_date.day, requested_date.month, location) |
| 230 | + if weather_json: |
| 231 | + weather_json['ForecastType'] = forecast_type |
| 232 | + return weather_json, error_text |
| 233 | + |
| 234 | + |
| 235 | +def get_weather_data(requested_date, location): |
| 236 | + """ get weather data for date & location - main function. |
| 237 | + Args: |
| 238 | + requested_date (date) - date requested for forecast. |
| 239 | + location (str) - location name. |
| 240 | + Returns: dictionary with the following entries: |
| 241 | + Status - success / failure. |
| 242 | + ErrorDescription - error description (relevant only in case of error). |
| 243 | + MinTempCel - minimum degrees in Celsius. |
| 244 | + MaxTempCel - maximum degrees in Celsius. |
| 245 | + MinTempFar - minimum degrees in Fahrenheit. |
| 246 | + MaxTempFar - maximum degrees in Fahrenheit. |
| 247 | + ForecastType: |
| 248 | + "forecast" - relevant for the upcoming 15 days. |
| 249 | + "history" - historical data. |
| 250 | + "historical average" - average of the last 3 years on that date. |
| 251 | + relevant for future dates (more then forecast). |
| 252 | + Address - The location found by the service. |
| 253 | + """ |
| 254 | + output = {} |
| 255 | + requested_date = datetime.date(requested_date.year, requested_date.month, |
| 256 | + requested_date.day) |
| 257 | + valid_input, error_text = validate_date_input(requested_date) |
| 258 | + if valid_input: |
| 259 | + weather_json, error_text = get_forecast(requested_date, location) |
| 260 | + if error_text: |
| 261 | + output["Status"] = ERROR_STATUS |
| 262 | + output["ErrorDescription"] = error_text |
| 263 | + else: |
| 264 | + output["Status"] = SUCCESS_STATUS |
| 265 | + output["ErrorDescription"] = None |
| 266 | + output["MinTempFar"] = round((weather_json['MinTempCel'] * 9/5) |
| 267 | + + 32) |
| 268 | + output["MaxTempFar"] = round((weather_json['MaxTempCel'] * 9/5) |
| 269 | + + 32) |
| 270 | + output.update(weather_json) |
| 271 | + else: |
| 272 | + output["Status"] = ERROR_STATUS |
| 273 | + output["ErrorDescription"] = error_text |
| 274 | + return output |
0 commit comments