Skip to content

Commit

Permalink
Merge pull request #19 from tgrandje/dev
Browse files Browse the repository at this point in the history
hydrometry v2 & watercourses' flow update
  • Loading branch information
tgrandje authored Jan 15, 2025
2 parents 012f996 + 6fe0fbb commit ec53889
Show file tree
Hide file tree
Showing 12 changed files with 2,473 additions and 2,085 deletions.
61 changes: 57 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ At this stage, the following APIs are covered by cl-hubeau:
* [hydrometry/hydrométrie](https://hubeau.eaufrance.fr/page/api-hydrometrie)
* [drinking water quality/qualité de l'eau potable](https://hubeau.eaufrance.fr/page/api-qualite-eau-potable)
* [superficial waterbodies quality/qualité physico-chimique des cours d'eau'](https://hubeau.eaufrance.fr/page/api-qualite-cours-deau)
* [watercourses flow/écoulement des cours d'eau'](https://hubeau.eaufrance.fr/page/api-ecoulement)


For any help on available kwargs for each endpoint, please refer
directly to the documentation on hubeau (this will not be covered
Expand All @@ -24,14 +26,14 @@ cl-hubeau to crawl allong the results).
## Parallelization

`cl-hubeau` already uses simple multithreading pools to perform requests.
In order not to endanger the webservers and share ressources amont users, a
In order not to endanger the webservers and share ressources among users, a
rate limiter is set to 10 queries per second. This limiter should work fine on
any given machine, whatever the context (even with a new parallelization
overlay).

However `cl-hubeau` should **NOT** be used in containers or pods with
parallelization. There is currently no way of tracking the rate of querying
amont multiple machines and greedy queries may end up blacklisted by the
However `cl-hubeau` should **NOT** be used in containers (or pods) with
parallelization. There is currently no way of tracking the queries' rate
among multiple machines: greedy queries may end up blacklisted by the
team managing Hub'eau.


Expand Down Expand Up @@ -267,3 +269,54 @@ with superficial_waterbodies_quality.SuperficialWaterbodiesQualitySession() as s
df = session.get_analysis(code_commune='59183', code_parametre="1340")

```

### Watercourses flow

3 high level functions are available (and one class for low level operations).



get_all_campaigns

Get all stations (uses a 30 days caching):

```python
from cl_hubeau import watercourses_flow
df = watercourses_flow.get_all_stations()
```

Get all observations (uses a 30 days caching):

```python
from cl_hubeau import watercourses_flow
df = watercourses_flow.get_all_observations()
```

Note that this query is heavy, users should restrict it to a given territory when possible.
For instance, you could use :
```python
df = watercourses_flow.get_all_observations(code_region="11")
```

Get all campagins:

```python
from cl_hubeau import watercourses_flow
df = watercourses_flow.get_all_campaigns()
```

Low level class to perform the same tasks:


Note that :

* the API is forbidding results > 20k rows and you may need inner loops
* the cache handling will be your responsibility

```python
with watercourses_flow.WatercoursesFlowSession() as session:
df = session.get_stations(code_departement="59")
df = session.get_campaigns(code_campagne=[12])
df = session.get_observations(code_station="F6640008")

```
131 changes: 81 additions & 50 deletions cl_hubeau/hydrometry/hydrometry_scraper.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,20 @@ class HydrometrySession(BaseHubeauSession):

def __init__(self, *args, **kwargs):

super().__init__(version="1.0.1", *args, **kwargs)
super().__init__(version="2.0.1", *args, **kwargs)

# Set default size for API queries, based on hub'eau piezo's doc
self.size = 1000

def get_stations(self, **kwargs):
"""
Lister les stations hydrométriques
Endpoint /v1/hydrometrie/referentiel/stations
Endpoint /api/v2/hydrometrie/referentiel/stations
Ce service permet d'interroger les stations du référentiel
hydrométrique. Une station peut porter des observations de hauteur
et/ou de débit (directement mesurés ou calculés à partir d'une courbe
de tarage).
hydrométrique.
Une station peut porter des observations de hauteur et/ou de débit
(directement mesurés ou calculés à partir d'une courbe de tarage).
Si la valeur du paramètre size n'est pas renseignée, la taille de page
par défaut : 1000, taille max de la page : 10000.
La profondeur d'accès aux résultats est : 20000, calcul de la
Expand All @@ -42,7 +42,7 @@ def get_stations(self, **kwargs):
"""

params = {}
for arg in "date_fermeture_station", "date_ouverture_station":
for arg in ("date_fermeture_station", "date_ouverture_station"):
try:
variable = kwargs.pop(arg)
self.ensure_date_format_is_ok(variable)
Expand All @@ -62,7 +62,14 @@ def get_stations(self, **kwargs):
pass

try:
params["en_service"] = int(kwargs.pop("en_service"))

variable = int(kwargs.pop("en_service"))
if variable not in {0, 1}:
raise ValueError(
"en_service must be among (0, 1), "
f"found en_service='{variable}' instead"
)
params["en_service"] = variable
except KeyError:
pass

Expand Down Expand Up @@ -106,19 +113,32 @@ def get_stations(self, **kwargs):
)

method = "GET"
url = self.BASE_URL + "/v1/hydrometrie/referentiel/stations"
url = self.BASE_URL + "/v2/hydrometrie/referentiel/stations"
df = self.get_result(method, url, params=params)

for f in (
"date_maj_ref_alti_station",
"date_fermeture_station",
"date_activation_ref_alti_station",
"date_debut_ref_alti_station",
"date_ouverture_station",
"date_maj_station",
):
try:
df[f] = pd.to_datetime(df[f])
except KeyError:
continue

return df

def get_sites(self, **kwargs):
"""
Lister les sites hydrométriques
Endpoint /v1/hydrometrie/referentiel/sites
Endpoint /api/v2/hydrometrie/referentiel/sites
Ce service permet d'interroger les sites du référentiel hydrométrique
(tronçon de cours d'eau sur lequel les mesures de débit sont réputées
homogènes et comparables entre elles). Un site peut posséder une ou
homogènes et comparables entre elles). Un site peut posséder une ou
plusieurs stations ; il est support de données de débit (Q)
Si la valeur du paramètre size n'est pas renseignée, la taille de page
par défaut : 1000, taille max de la page : 10000.
Expand Down Expand Up @@ -182,18 +202,32 @@ def get_sites(self, **kwargs):
)

method = "GET"
url = self.BASE_URL + "/v1/hydrometrie/referentiel/sites"
url = self.BASE_URL + "/v2/hydrometrie/referentiel/sites"
df = self.get_result(method, url, params=params)

for f in (
"date_premiere_donnee_dispo_site",
"date_maj_site",
):
try:
df[f] = pd.to_datetime(df[f])
except KeyError:
continue

return df

def get_observations(self, **kwargs):
"""
Lister les observations hydrométriques élaborées
Endpoint /v1/hydrometrie/obs_elab
Endpoint /api/v2/hydrometrie/obs_elab
Grandeurs hydrométriques élaborées disponibles : débits moyens
journaliers (QmJ), débits moyens mensuels (QmM)
journaliers (QmnJ), débits moyens mensuels (QmM)
Si la valeur du paramètre size n'est pas renseignée, la taille de page
par défaut : 1000, taille max de la page : 20000.
La profondeur d'accès aux résultats est : 20000, calcul de la
profondeur = numéro de la page * nombre maximum de résultats dans une
page.
Trie par défaut : code_station,date_obs_elab asc
Doc: https://hubeau.eaufrance.fr/page/api-hydrometrie
Expand All @@ -219,14 +253,19 @@ def get_observations(self, **kwargs):
except KeyError:
pass

for arg in ("code_entite", "fields"):
try:
variable = kwargs.pop(arg)
params[arg] = self.list_to_str_param(variable)
except KeyError:
continue
try:
variable = kwargs.pop("code_entite")
params["code_entite"] = self.list_to_str_param(variable, 100)
except KeyError:
pass

try:
variable = kwargs.pop("fields")
params["fields"] = self.list_to_str_param(variable)
except KeyError:
pass

for arg in "date_debut_obs_elab", "date_fin_obs_elab":
for arg in ("date_debut_obs_elab", "date_fin_obs_elab"):
try:
variable = kwargs.pop(arg)
self.ensure_date_format_is_ok(variable)
Expand Down Expand Up @@ -254,23 +293,22 @@ def get_observations(self, **kwargs):
)

method = "GET"
url = self.BASE_URL + "/v1/hydrometrie/obs_elab"
url = self.BASE_URL + "/v2/hydrometrie/obs_elab"

df = self.get_result(method, url, params=params)

try:
df["date_obs_elab"] = pd.to_datetime(
df["date_obs_elab"], format="%Y-%m-%d"
)
except KeyError:
pass
for f in "date_obs_elab", "date_prod":
try:
df[f] = pd.to_datetime(df[f])
except KeyError:
continue

return df

def get_realtime_observations(self, **kwargs):
"""
Lister les observations hydrométriques
Endpoint /v1/hydrometrie/observations_tr
Endpoint /api/v2/hydrometrie/observations_tr
Ce service permet de lister les observations dites "temps réel" portées
par le référentiel (sites et stations hydrométriques), à savoir les
Expand All @@ -292,17 +330,6 @@ def get_realtime_observations(self, **kwargs):
except KeyError:
pass

try:
variable = kwargs.pop("grandeur_hydro_elab")
if variable not in ("QmJ", "QmM"):
raise ValueError(
"grandeur_hydro_elab must be among ('QmJ', 'QmM'), "
f"found grandeur_hydro_elab='{variable}' instead"
)
params["grandeur_hydro_elab "] = variable
except KeyError:
pass

try:
variable = kwargs.pop("sort")
if variable not in ("asc", "desc"):
Expand All @@ -325,7 +352,7 @@ def get_realtime_observations(self, **kwargs):
except KeyError:
pass

for arg in ("code_entite", "fields"):
for arg in ("code_entite", "fields", "code_statut"):
try:
variable = kwargs.pop(arg)
params[arg] = self.list_to_str_param(variable)
Expand All @@ -349,7 +376,7 @@ def get_realtime_observations(self, **kwargs):
except KeyError:
pass

for arg in "date_debut_obs", "date_fin_obs":
for arg in ("date_debut_obs", "date_fin_obs"):
try:
variable = kwargs.pop(arg)
self.ensure_date_format_is_ok(variable)
Expand All @@ -375,14 +402,15 @@ def get_realtime_observations(self, **kwargs):
)

method = "GET"
url = self.BASE_URL + "/v1/hydrometrie/observations_tr"
url = self.BASE_URL + "/v2/hydrometrie/observations_tr"

df = self.get_result(method, url, params=params)

try:
df["date_obs"] = pd.to_datetime(df["date_obs"])
except KeyError:
pass
for f in "date_debut_serie", "date_fin_serie", "date_obs":
try:
df[f] = pd.to_datetime(df[f])
except KeyError:
continue

return df

Expand All @@ -392,14 +420,17 @@ def get_realtime_observations(self, **kwargs):

# # logging.basicConfig(level=logging.WARNING)
# with HydrometrySession() as session:
# gdf = session.get_sites(code_departement="02", format="geojson")
# # gdf = session.get_stations(code_departement="02", format="geojson")
# # gdf = session.get_sites(
# # code_departement=["02", "59"], format="geojson"
# # )
# # df = session.get_observations(code_entite="K437311001")

# df = session.get_realtime_observations(
# code_entite="K437311001",
# grandeur_hydro="Q",
# # date_debut_obs="2010-01-01",
# )
# df.pivot_table(
# index="date_obs", columns="grandeur_hydro", values="resultat_obs"
# ).plot()
# # df.pivot_table(
# # index="date_obs", columns="grandeur_hydro", values="resultat_obs"
# # ).plot()
4 changes: 4 additions & 0 deletions cl_hubeau/hydrometry/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,3 +172,7 @@ def get_realtime_observations(codes_entites: list, **kwargs) -> pd.DataFrame:
results = [x.dropna(axis=1, how="all") for x in results if not x.empty]
results = pd.concat(results, ignore_index=True)
return results


# if __name__ == "__main__":
# gdf = get_all_sites()
2 changes: 1 addition & 1 deletion cl_hubeau/session/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ def ensure_date_format_is_ok(date_str: str) -> None:
datetime.strptime(date_str, "%Y-%m-%d").date()
except ValueError as exc:
raise ValueError(
"hubeau date should respect yyyy-MM-dd format"
"cl-hubeau date should respect yyyy-MM-dd format"
) from exc

def request(
Expand Down
3 changes: 2 additions & 1 deletion cl_hubeau/watercourses_flow/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
# -*- coding: utf-8 -*-

from .watercourses_flow_scraper import WatercoursesFlowSession
from .utils import get_all_stations, get_all_observations
from .utils import get_all_stations, get_all_observations, get_all_campaigns


__all__ = [
"get_all_stations",
"get_all_observations",
"get_all_campaigns",
"WatercoursesFlowSession",
]
Loading

0 comments on commit ec53889

Please sign in to comment.