import logging
import numpy as np
import xarray as xr
from scipy import interpolate
logger = logging.getLogger(__name__)
[docs]
class Path:
"""Generate camera path for the animation.
You gives the initial pos (x0, y0) and the dx,dy.
This gives the camera coordinate : [x0-dx, x0+dx, y0-dy, y0+dy]
t0 can be either:
* a date (a :class:`numpy.datetime64`)
*
Permet de générer le trajet de caméra pour les vidéos.
On donne la position initiale (x0, y0), ainsi que dx, dy, ce qui donne
les coordonnées de la caméra [x0-dx, x0+dx, y0-dy, y0+dy]
t0 peut être une date (:class:`numpy.datetime64`), dans ce cas, chaque appel à :meth:`Path.add_deplacement`
doit utiliser un np.datetime64 ou np.timedelta64 pour l'indice,
Sinon on peut lui spécifier t0 = int, et dans ce cas l'indice utilisé dans :meth:`Path.add_deplacement` doit correspondre
au nombre d'images à attendre
Parameters
----------
coords : tuple, optional
position (x0, y0) au début du chemin, by default (180, 0)
dx : int, optional
demi taille horizontal de la caméra, by default 180
dy : int, optional
demi taille verticale de la caméra, by default 90
t0 : numpy.datetime64, or int, optional
Date au début du chemin. Peut être une date, ou un entier qui représente les images, by default 0
Raises
------
TypeError
si t0 n'est pas int ou np.datetime64
"""
[docs]
def __init__(self, coords=(180, 0), dx=180, dy=90, t0=0):
self._sanitize_coords(coords)
self._times = [t0]
self._coords = [coords]
self._dxs = [dx]
self._dys = [dy]
@classmethod
def _sanitize_coords(cls, coords):
if coords is not None and len(coords) != 2:
logger.error(f"coords shoud be None or a tuple(x,y), not {type(coords)}")
raise TypeError("coords bad type")
def move(self, time, coords=None):
self.move_and_focus(time, None, None, coords)
def move_and_zoom(self, time, zoom, coords=None):
old_dx, old_dy = self._dxs[-1], self._dys[-1]
self.move_and_focus(time, old_dx / zoom, old_dy / zoom, coords)
def move_and_focus(self, time, dx=None, dy=None, coords=None):
self._sanitize_time(time)
self._sanitize_coords(coords)
if coords is None:
coords = self._coords[-1]
if dx is None:
dx = self._dxs[-1]
if dy is None:
dy = self._dys[-1]
self._coords.append(coords)
self._dxs.append(dx)
self._dys.append(dy)
self._add_time(time)
def _merge_moves(self, dt):
x, y = np.array(self._coords).T
dates = np.array(self._times)
dxs = np.array(self._dxs)
dys = np.array(self._dys)
time_coords = np.arange(dates[0], dates[-1], dt)
return x, y, dxs, dys, dates, time_coords
def _interp_moves(self, x, y, dxs, dys, dates, new_dates):
# convert to float for interpolation
int_new_dates = new_dates.astype(float)
# force both dates to have the same datetime type (s, ms, ns, ...)
int_dates = dates.astype(new_dates.dtype).astype(float)
def build_dxdy(x, y):
dxdy = np.zeros(y.size)
# slope before point
dy_b = np.diff(y[:-1]) / np.diff(x[:-1])
# slope after point
dy_a = np.diff(y[1:]) / np.diff(x[1:])
# slope should be 0 everythere except :
# - we are strictly increasing (or decreasing) on the 3 consecutives points (before, within and after)
dxdy[1:-1] = np.where((dy_b * dy_a > 0), np.mean([dy_b, dy_a], axis=0), 0)
return dxdy
# cubic hermite splice is used because it gives a result which is :
# continuous
# first derivative is continuous too
# => it gives you a nice path without spike deplacements, smooth acceleration and decelerations
f = interpolate.CubicHermiteSpline(int_dates, x, dydx=build_dxdy(int_dates, x))
X = f(int_new_dates)
f = interpolate.CubicHermiteSpline(int_dates, y, dydx=build_dxdy(int_dates, y))
Y = f(int_new_dates)
f = interpolate.CubicHermiteSpline(int_dates, dxs, dydx=build_dxdy(int_dates, dxs))
new_dx = f(int_new_dates)
f = interpolate.CubicHermiteSpline(int_dates, dys, dydx=build_dxdy(int_dates, dys))
new_dy = f(int_new_dates)
return new_dates, X, Y, new_dx, new_dy
def _compute_path(self, dt):
"""compute path for the required points of interests
Parameters
----------
dt : int or numpy.timedelta64
the unit for image computation.
An image will be created for eath `dt` time passed
Returns
-------
tuple
return new_dates, cartopy_extent, speed
where :
new_dates is the date (np.datetime64) of the image, or the indice of the image
cartopy_extent is a list of [x0, x1, y0, y1] coordinates used by ax.set_extent(...)
speed is the camera speed between this image and the last one.
if dates is an np.datetime64, the speed will be `degrees / s`
if dates is an int, the speed will be `degrees / images`
"""
x, y, dxs, dys, dates, new_dates = self._merge_moves(dt)
new_dates, X, Y, new_dx, new_dy = self._interp_moves(x, y, dxs, dys, dates, new_dates)
length = np.zeros(X.shape, dtype=np.float64)
dist = np.sqrt((X[1:] - X[:-1]) ** 2 + (Y[1:] - Y[:-1]) ** 2)
length[1:] = dist
cartopy_extent = np.array([X - new_dx, X + new_dx, Y - new_dy, Y + new_dy]).T
return new_dates, cartopy_extent, length
### VISUALISATION ###
#####################
def _build_xarray(self, dt, variables, derivative=False):
x, y, dxs, dys, dates, new_dates = self._merge_moves(dt)
new_dates, X, Y, new_dx, new_dy = self._interp_moves(x, y, dxs, dys, dates, new_dates)
dates = dates.astype("datetime64[ns]")
new_dates = new_dates.astype("datetime64[ns]")
ds = xr.Dataset()
kw = {"units": "degrees"}
if derivative:
dtime = dt / np.timedelta64(1, "D")
ds["x"] = (["time"], (X[1:] - X[:-1]) / dtime, kw)
ds["y"] = (["time"], (Y[1:] - Y[:-1]) / dtime, kw)
ds["dx"] = (["time"], (new_dx[1:] - new_dx[:-1]) / dtime, kw)
ds["dy"] = (["time"], (new_dy[1:] - new_dy[:-1]) / dtime, kw)
ds["time"] = (["time"], new_dates[1:])
ds = ds.set_coords(["time"]) # .rename({"time": "time"})
ds2 = ds.interp(time=dates).rename({"time": "old_time"})
for var in ds2.variables:
ds[var + "_old"] = ds2[var]
else:
ds["x_old"] = (["old_time"], x, kw)
ds["y_old"] = (["old_time"], y, kw)
ds["dx_old"] = (["old_time"], dxs, kw)
ds["dy_old"] = (["old_time"], dys, kw)
ds["old_time"] = (["old_time"], dates)
ds["x"] = (["time"], X, kw)
ds["y"] = (["time"], Y, kw)
ds["dx"] = (["time"], new_dx, kw)
ds["dy"] = (["time"], new_dy, kw)
ds["time"] = (["time"], new_dates)
ds = ds.set_coords(["time", "old_time"])
return ds
[docs]
def plot_moves(self, dt, variables=["x", "y", "dx", "dy"], derivated=False):
"""Build a matplotlib plot with different variables, to visualize the path
Parameters
----------
dt : numpy.timedelta64
dt used to compute path. Same used in :meth:`Path._compute_path`
variables : list, optional
variables to show, by default ["x", "y", "dx", "dy"]
Returns
-------
tuple(matplotlib.figure.Figure, matplotlib.axes.Axes)
figure and axes used
Raises
------
ImportError
If matplotlib not installed
"""
try:
import matplotlib.pyplot as plt
except ImportError as err:
logger.error("please install matplotlib to use this function")
raise err
ds = self._build_xarray(dt, variables, derivated)
fig, axs = plt.subplots(len(variables), 1, figsize=(10, 5), dpi=120)
for i, (ax, var) in enumerate(zip(axs, variables)):
ax.grid()
ds[var + "_old"].plot(
ax=ax, label=f"real ({ds.sizes['old_time']} pts)", marker="o", x="old_time", linestyle=""
)
ds[var].plot(ax=ax, label=f"interpolated ({ds.sizes['time']} pts)", x="time")
if i < len(variables) - 1:
ax.set_xticklabels([])
ax.legend()
logger.error(str(ds))
return fig, ax
[docs]
class TimePath(Path):
[docs]
def __init__(self, coords=(180, 0), dx=180, dy=90, t0=0):
datetype = type(np.datetime64("now"))
# we should have only datetime64
if not isinstance(t0, datetype):
raise TypeError(f"t0 should be a `numpy.datetime64`, not '{type(t0)}'")
super().__init__(coords, dx, dy, t0)
@classmethod
def _sanitize_time(cls, t):
datetype = type(np.datetime64("now"))
timetype = type(np.timedelta64(1, "D"))
if not isinstance(t, (datetype, timetype)):
logger.error(f"time shoud be type `np.datetime64` or `np.timedelta64`, not {type(t)}")
raise TypeError("time bad type")
def _sanitize_dt(cls, t):
timetype = type(np.timedelta64(1, "D"))
if not isinstance(t, timetype):
logger.error(f"dt shoud be type `np.timedelta64`, not {type(t)}")
raise TypeError("dt bad type")
def _add_time(self, t):
last_date = self._times[-1]
if np.issubdtype(t.dtype, np.datetime64):
date = t
else:
# _dt is not np.datetime64, so its a np.timedelta64 (cf _sanitize_time)
date = last_date + t
if date < last_date:
raise Exception(f"time specified '{t}' make the date '{date}' before last date specified '{last_date}'")
self._times.append(date)
def compute_path(self, dt):
self._sanitize_dt(dt)
# length is in degrees / dt (could be hours, seconds or days)
new_dates, cartopy_extent, length = self._compute_path(dt)
# gives speed in degrees / day
speed = length * (np.timedelta64(1, "D") / dt)
return new_dates, cartopy_extent, speed
[docs]
class FramePath(Path):
[docs]
def __init__(self, coords=(180, 0), dx=180, dy=90):
super().__init__(coords, dx, dy, 0)
@classmethod
def _sanitize_time(cls, t):
if not isinstance(t, int):
logger.error(f"time shoud be type `int`, not {type(t)}")
raise TypeError("time bad type")
if t < 1:
logger.error("time should be a int >= 1")
raise TypeError("time bad value")
def _add_time(self, t):
old_frame = self._times[-1]
self._times.append(t + old_frame)
def compute_path(self):
return self._compute_path(1)