#!/usr/bin/python3 import argparse import traceback import sys import pandas as pd import requests import numpy as np import matplotlib.pyplot as plt import re import json import glob import os from os import getcwd, chdir path_to_csv_files = "csv/" datasets_ambiente = {"3000": "nicp-bhqi", "2018": "g2hp-ar79", "2017": "j8j8-qsb2", "2010": "nr8w-tj77", "2000": "cthp-zqrr", "1999": "evzn-32bs", } # 1999 fino al 1999 # 2000 dal 2000 al 2009 # 2010 dal 2010 al 2017 # 2018 sono i dati dal 2018 ad oggi # 3000 sono i dati dell'anno corrente csv_ambiente = {"sensori_aria_1968-1995.zip": "puwt-3xxh", "sensori_aria_1996-2000.zip": "wabv-jucw", "sensori_aria_2001-2004.zip": "5jdj-7x8y", "sensori_aria_2005-2007.zip": "h3i4-wm93", "sensori_aria_2008-2010.zip": "wp2f-5nw6", "sensori_aria_2011.zip": "5mut-i45n", "sensori_aria_2012.zip": "wr4y-c9ti", "sensori_aria_2013.zip": "hsdm-3yhd", "sensori_aria_2014.zip": "69yc-isbh", "sensori_aria_2015.zip": "bpin-c7k8", "sensori_aria_2016.zip": "7v3n-37f3", "sensori_aria_2017.zip": "fdv6-2rbs", "sensori_aria_2018.zip": "4t9j-fd8z", "sensori_aria_2019.zip": "j2mz-aium"} class SocrataClient: def __init__(self, domain, app_token=None): self.base_url = f"https://{domain}/resource" self.app_token = app_token def get(self, resource_id, IdSensore, limit, filters=None): """ Retrieve data from a Socrata dataset. :param resource_id: The ID of the Socrata dataset. :param filters: Optional filters to apply to the query. :return: A list of records from the dataset. """ url = f"{self.base_url}/{resource_id}.json" params = {"$limit": limit, "idsensore": IdSensore, } if filters: params.update(filters) headers = {} if self.app_token: headers["X-App-Token"] = self.app_token response = requests.get(url, params=params, headers=headers) if response.status_code == 200: data = response.json() return data raise Exception(f"Failed to retrieve data. Status code: {response.status_code}") def read_data_online(dataset, sensore): client = SocrataClient('www.dati.lombardia.it', None) limit = 1000000 try: data = client.get(dataset, sensore, limit) if data: #print(json.dumps(data, indent=2)) return data else: print("No data found for the specified sensor.") sys.exit(-1) except Exception as e: print(f"Error: {e}") sys.exit(-1) def read_data_from_csv(datafile): return pd.read_csv("csv/" + datafile, usecols=['IdSensore', 'Data', 'Valore', 'Stato', 'idOperatore']) def process(dati, sensore, csv): """ processa i dati per un sensore da un dataset o un file csv e restituisce un dataframe """ print('Sto processando i dati del sensore %s per l\'origine dati %s...' % (sensore, dati)) if csv: results = read_data_from_csv(dati) else: results = read_data_online(dati, sensore) results_df = pd.DataFrame.from_records(results) results_df.columns = [x.lower() for x in results_df.columns] try: results_df = results_df.astype({'idsensore': 'int64'}) results_df = results_df[results_df['idsensore'] == int(sensore)] results_df = results_df.astype({'valore': 'float64'}) results_df["data"] = pd.to_datetime(results_df["data"]) results_df = results_df.replace(-9999, np.nan) except: print('\nERRORE: dati non disponibili per il sensore %s\n' % sensore) traceback.print_exc() sys.exit(-1) results_df.sort_values(by=['data'], inplace=True) results_df.rename(columns={'valore': sensore}, inplace=True) results_df.drop(columns=['idoperatore', 'idsensore', 'stato'], inplace=True) return results_df def merge_df(dataframes, sensori): """ fonde diversi dataframes in un dataframe unico con un sensore per colonna """ df = dataframes[sensori[0]] for sensore in sensori[1:]: df = pd.merge(df, dataframes[sensore]) if len(df) == 0: print('\nERRORE: dati non disponibili per il sensore nel periodo considerato\n') sys.exit(-1) return df def get_dataframes(dati_csv, dati, sensori): """ salva in un dict i dataframes dei vari sensori richiesti """ dataframes = {} for sensore in sensori: if dati_csv: df = process(dati_csv[0], sensore, True) for d in dati_csv[1:]: df = pd.concat([df, process(d, sensore, True)], axis=0, ignore_index=True) df.rename(columns={sensore: sensore + "-csv"}, inplace=True) dataframes[sensore + "-csv"] = df if dati: df = process(dati[0], sensore, False) for d in dati[1:]: df = pd.concat([df, process(d, sensore, False)], axis=0, ignore_index=True) dataframes[sensore] = df return dataframes def plot_dataframe(dataframe): dataframe.plot(x='data') plt.axhline(y=50, color='black', linestyle='-', label='EU limit') plt.show() def list_of_csv_files(dir_name): """ restituisce la lista dei files .zip presenti in una directory """ saved = getcwd() os.chdir(dir_name) filelist = glob.glob('*.zip') chdir(saved) return filelist def parse_range(x): """ espande un range di anni nel formato NNNN-NNNN restituendo una lista dei singoli anni """ x = x.strip() if x.isdigit(): yield str(x) elif '-' in x: xr = x.split('-') yield from range(int(xr[0].strip()), int(xr[1].strip()) + 1) else: raise ValueError(f"Unknown range specified: {x}") def get_csv_dict(dict): """ prende un dizionario dei files csv e assegna ad ogni anno un filename e una sigla id, restituendo un dict """ d = {} for (k, v) in dict.items(): filename, id = k, v match_multi = re.search("\\d{4}-\\d{4}", filename) match_single = re.search("\\d{4}", filename) if match_multi: years = [str(x) for x in parse_range(str(match_multi.group()))] elif match_single: years = [match_single.group()] else: print("\nError: no match, the filename does not contain any year") for year in years: d.update({year: [filename, id]}) return d def check_csv(args, filelist, csv_dict): years = [str(x) for x in parse_range(args)] f = [] for y in years: if y not in csv_dict.keys(): print("Errore: i dati per l'anno %s non sono disponibili come csv" % y) sys.exit(-1) if csv_dict[y][0] not in filelist: print("file %s for year %s is not available in folder %s" % (csv_dict[y][0], y, path_to_csv_files)) download_csv(csv_dict[y][0], csv_dict[y][1], path_to_csv_files) if csv_dict[y][0] not in f: f.append(csv_dict[y][0]) return f def download_csv(filename, id, path): print("downloading %s....... please wait" % filename) import requests url = "https://www.dati.lombardia.it/download/" + id + "/application%2Fzip" req = requests.get(url, allow_redirects=True) try: req.raise_for_status() except (requests.ConnectionError, requests.RequestException, requests.HTTPError, requests.Timeout, requests.TooManyRedirects) as e: print("Download error: \n\t %s" % str(e)) sys.exit(-1) else: f = open(os.path.dirname(path) + "/" + filename, "wb") f.write(req.content) f.close() pass def check_year_range(arg): """check if arg is a year or a year range""" if not re.search("\\d{4}-\\d{4}", arg): if not re.search("\\d{4}", arg): print("\nError: syntax for --csv and --dataset parameter: " "NNNN single year or NNNN-NNNN for years range\n") sys.exit(-1) return True def create_folder_if_not_exists(folder_path): if not os.path.exists(folder_path): try: os.makedirs(folder_path) print(f"Folder '{folder_path}' created successfully.") except OSError as e: print(f"Error creating folder '{folder_path}': {e}") else: print(f"Folder '{folder_path}' already exists.") def is_graphical_environment_active(): value = os.environ.get("DISPLAY") if value is not None and value != "": return True else: return False def is_remote_tty(): """ Check if the script is executed on a remote TTY. Returns: bool: True if running on a remote TTY, False if running locally. """ term = os.environ.get('TERM') if term is not None and (term.startswith('xterm') or term == 'ssh'): return True else: return False def main(): parser = argparse.ArgumentParser() parser.add_argument("--dataset", nargs='+', required=False, help="ricerca dei datasets") parser.add_argument("--csv", nargs='+', required=False, help="ricerca nei files csv") parser.add_argument('--sensori', nargs='+', required=True, help="cerca i dati di questi sensori") args = parser.parse_args() try: create_folder_if_not_exists(path_to_csv_files) csv_dict = get_csv_dict(csv_ambiente) csv_files = list_of_csv_files(path_to_csv_files) dati_csv = [] if args.csv: check_year_range(args.csv[0]) dati_csv = check_csv(args.csv[0], csv_files, csv_dict) dati = [] if args.dataset: if "all" in args.dataset: for k in datasets_ambiente.keys(): dati.append(datasets_ambiente[k]) else: check_year_range(args.dataset[0]) for d in parse_range(args.dataset[0]): # args.dataset: if datasets_ambiente[str(d)] not in dati: dati.append(datasets_ambiente[str(d)]) dataframes = get_dataframes(dati_csv, dati, args.sensori) datamerged = merge_df(dataframes, list(dataframes.keys())) datamerged.to_csv("export.csv") import stazioni s = stazioni.get_stazioni() for sensore in datamerged.columns[1:]: location = s.loc[s['idsensore'] == sensore.split("-")[0], 'nomestazione'].iloc[0] sensor_type = s.loc[s['idsensore'] == sensore.split("-")[0], 'nometiposensore'].iloc[0] print('Valore medio per il sensore %s %s %s: %s' % (sensore, sensor_type, location, datamerged[sensore].mean().round(1))) if is_graphical_environment_active() and not is_remote_tty(): plot_dataframe(datamerged) except KeyError: print("\nKeyError: forse hai specificato un dataset che non esiste ?\n" "i dataset sono disponibili per gli anni %s\n " % list(datasets_ambiente.keys())) #traceback.print_exc() sys.exit(-1) except KeyboardInterrupt: print("program terminated by user") sys.exit(-1) except SystemExit: print("program terminated, bye") sys.exit(-1) except: print("\nAn unhandled exception occured, here's the traceback!\n") traceback.print_exc() print("\nReport this to putro@autistici.org") sys.exit(-1) if __name__ == '__main__': main()