openlamb/openlamb.py

248 lines
8.6 KiB
Python
Executable File

#!/usr/bin/python3
import argparse
import traceback
import sys
import pandas as pd
import numpy as np
from sodapy import Socrata
import matplotlib.pyplot as plt
import re
import glob
import os
from os import getcwd, chdir
path_to_csv_files = "csv/"
datasets_ambiente = {"2020": "nicp-bhqi",
"2019": "kujm-kavy",
"2018": "bgqm-yq56",
"2017": "j8j8-qsb2"}
csv_ambiente = {"sensori_aria_1968-1995.zip": "puwt-3xxh",
"sensori_aria_1996-2000.zip": "wabv-jucw",
"sensori_aria_2001-2004.zip": "5jdj-7x8y",
"sensori_aria_2005-2007.zip": "h3i4-wm93",
"sensori_aria_2008-2010.zip": "wp2f-5nw6",
"sensori_aria_2011.zip": "5mut-i45n",
"sensori_aria_2012.zip": "wr4y-c9ti",
"sensori_aria_2013.zip": "hsdm-3yhd",
"sensori_aria_2014.zip": "69yc-isbh",
"sensori_aria_2015.zip": "bpin-c7k8",
"sensori_aria_2016.zip": "7v3n-37f3",
"sensori_aria_2017.zip": "fdv6-2rbs",
"sensori_aria_2018.zip": "4t9j-fd8z",
"sensori_aria_2019.zip": "j2mz-aium"}
def _connect():
client = Socrata("www.dati.lombardia.it", None)
return client
def read_data_online(dataset, sensore):
client = _connect()
return client.get(dataset, IdSensore=sensore)
def read_data_from_csv(datafile):
return pd.read_csv("csv/" + datafile, usecols=['IdSensore', 'Data', 'Valore', 'Stato', 'idOperatore'])
def process(dati, sensore, csv):
""" processa i dati per un sensore da un dataset o un file csv e restituisce un dataframe """
print('Sto processando i dati del sensore %s per l\'origine dati %s...' % (sensore, dati))
if csv:
results = read_data_from_csv(dati)
else:
results = read_data_online(dati, sensore)
results_df = pd.DataFrame.from_records(results)
results_df.columns = [x.lower() for x in results_df.columns]
try:
results_df = results_df.astype({'idsensore': 'int64'})
results_df = results_df[results_df['idsensore'] == int(sensore)]
results_df = results_df.astype({'valore': 'float64'})
results_df["data"] = pd.to_datetime(results_df["data"])
results_df = results_df.replace(-9999, np.nan)
except:
print('\nERRORE: dati non disponibili per il sensore %s\n') % sensore
traceback.print_exc()
sys.exit(-1)
results_df.sort_values(by=['data'], inplace=True)
results_df.rename(columns={'valore': sensore}, inplace=True)
results_df.drop(columns=['idoperatore', 'idsensore', 'stato'],
inplace=True)
return results_df
def merge_df(dataframes, sensori):
""" fonde diversi dataframes in un dataframe unico con un sensore per colonna """
df = dataframes[sensori[0]]
for sensore in sensori[1:]:
df = pd.merge(df, dataframes[sensore])
if len(df) == 0:
print('\nERRORE: dati non disponibili per il sensore nel periodo considerato\n')
sys.exit(-1)
return df
def get_dataframes(dati_csv, dati, sensori):
""" salva in un dict i dataframes dei vari sensori richiesti """
dataframes = {}
for sensore in sensori:
if dati_csv:
df = process(dati_csv[0], sensore, True)
for d in dati_csv[1:]:
df = pd.concat([df, process(d, sensore, True)], axis=0, ignore_index=True)
df.rename(columns={sensore: sensore + "-csv"}, inplace=True)
dataframes[sensore + "-csv"] = df
if dati:
df = process(dati[0], sensore, False)
for d in dati[1:]:
df = pd.concat([df, process(d, sensore, False)], axis=0, ignore_index=True)
dataframes[sensore] = df
return dataframes
def plot_dataframe(dataframe):
dataframe.plot(x='data')
plt.axhline(y=50, color='black', linestyle='-', label='EU limit')
plt.show()
def list_of_csv_files(dir_name):
saved = getcwd()
os.chdir(dir_name)
filelist = glob.glob('*.zip')
chdir(saved)
return filelist
def parse_range(x):
x = x.strip()
if x.isdigit():
yield str(x)
elif '-' in x:
xr = x.split('-')
yield from range(int(xr[0].strip()), int(xr[1].strip()) + 1)
else:
raise ValueError(f"Unknown range specified: {x}")
def get_csv_dict(dict):
d = {}
for (k, v) in dict.items():
filename, id = k, v
match_multi = re.search("\\d{4}-\\d{4}", filename)
match_single = re.search("\\d{4}", filename)
if match_multi:
years = [str(x) for x in parse_range(str(match_multi.group()))]
elif match_single:
years = [match_single.group()]
else:
print("no match")
for year in years:
d.update({year: [filename, id]})
return d
def check_csv(args, filelist, csv_dict):
years = [str(x) for x in parse_range(args)]
f = []
for y in years:
if y not in csv_dict.keys():
print("Errore: i dati per l'anno %s non sono disponibili come csv" % y)
sys.exit(-1)
if csv_dict[y][0] not in filelist:
print("file %s for year %s is not available in folder %s" % (csv_dict[y][0], y, path_to_csv_files))
download_csv(csv_dict[y][0], csv_dict[y][1], path_to_csv_files)
if csv_dict[y][0] not in f:
f.append(csv_dict[y][0])
return f
def download_csv(filename, id, path):
print("downloading %s....... please wait" % filename)
import requests
url = "https://www.dati.lombardia.it/download/" + id + "/application%2Fzip"
req = requests.get(url, allow_redirects=True)
try:
req.raise_for_status()
except (requests.ConnectionError,
requests.RequestException,
requests.HTTPError,
requests.Timeout,
requests.TooManyRedirects) as e:
print("Download error: \n\t %s" % str(e))
sys.exit(-1)
else:
f = open(os.path.dirname(path) + "/" + filename, "wb")
f.write(req.content)
f.close()
pass
def check_year_range(arg):
"""check if arg is a year or a year range"""
if not re.search("\\d{4}-\\d{4}", arg):
if not re.search("\\d{4}", arg):
print("\nError: syntax for --csv and --dataset parameter: "
"NNNN single year or NNNN-NNNN for years range\n")
sys.exit(-1)
return True
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--dataset", nargs='+', required=False,
help="ricerca dei datasets")
parser.add_argument("--csv", nargs='+', required=False,
help="ricerca nei files csv")
parser.add_argument('--sensori', nargs='+', required=True,
help="cerca i dati di questi sensori")
args = parser.parse_args()
try:
csv_dict = get_csv_dict(csv_ambiente)
csv_files = list_of_csv_files(path_to_csv_files)
dati_csv = []
if args.csv:
check_year_range(args.csv[0])
dati_csv = check_csv(args.csv[0], csv_files, csv_dict)
dati = []
if args.dataset:
if "all" in args.dataset:
for k in datasets_ambiente.keys():
dati.append(datasets_ambiente[k])
else:
check_year_range(args.dataset[0])
for d in parse_range(args.dataset[0]): # args.dataset:
if datasets_ambiente[str(d)] not in dati:
dati.append(datasets_ambiente[str(d)])
dataframes = get_dataframes(dati_csv, dati, args.sensori)
datamerged = merge_df(dataframes, list(dataframes.keys()))
datamerged.to_csv("export.csv")
import stazioni
s = stazioni.get_stazioni()
for sensore in datamerged.columns[1:]:
location = s.loc[s['idsensore'] == sensore.split("-")[0], 'nomestazione'].iloc[0]
print('Valore medio per il sensore %s %s: %s' % (sensore, location, datamerged[sensore].mean().round(1)))
plot_dataframe(datamerged)
except KeyError:
print("\nKeyError: forse hai specificato un dataset che non esiste ?\n"
"i dataset sono disponibili per gli anni %s\n " % list(datasets_ambiente.keys()))
traceback.print_exc()
except KeyboardInterrupt:
print("program terminated by user")
except SystemExit:
print("program terminated, bye")
except:
print("\nAn unhandled exception occured, here's the traceback!\n")
traceback.print_exc()
print("\nReport this to putro@autistici.org")
sys.exit()
if __name__ == '__main__':
main()