Source code for anim.tools

import logging
import os
import time

import numpy as np
import xarray as xr

logger = logging.getLogger(__name__)


[docs] class Timing: """compute time execution example : with Timing() as dt: myfunction() print("time for myfunction : {dt}") """ scale = {"s": 1, "ms": 1e3}
[docs] def __init__(self, unit: str = "ms"): self.unit = unit
def __enter__(self): self.t0 = time.perf_counter() return self def __exit__(self, exc_type, exc_val, exc_tb): self.tN = time.perf_counter() self.dt = self.tN - self.t0 def __str__(self): return f"{self.dt*self.scale[self.unit]:.2f}{self.unit}"
[docs] def image_patern(n_frames=None): """compute how many leading 0 is needed in the name Parameters ---------- n_frames : int total number of images """ if n_frames is None or n_frames <= 0: n_zeros = 7 else: n_zeros = int(np.ceil(np.log10(n_frames))) return f"img_%0{n_zeros}d.png"
[docs] def images2video(imagePatern, fps, videoName, crf=24, vcodec="libx264", pix_fmt="yuv420p", ffmpeg_log=False): """convert images into mp4 video Parameters ---------- imagePatern : str patern where to find images. For example '/tmp/img_%03d.png' fps : int frames per seconds for the video videoName : str video name. If the folder doesn't exist, create it vcodec : str video codec to use. If x265 is installed its better, but its not always installed. crf : str I don't remember... Its something for the quality, but its works well 0 : high quality, 51: bad quality, 20 by default, and starting at 18 we don't see differences pix_fmt : str pixel format. I don't remember but its OK. If you don't specify yuv420p, you cannot read your video into the webbrowser ffmpeg_log : bool print all ffmpeg logs. If not specified, run `ffmpeg` with `-loglevel quiet` """ with Timing() as dt: _check_video_name(videoName) try: os.makedirs(os.path.dirname(videoName), exist_ok=True) except FileNotFoundError: pass if ffmpeg_log: _cmd = "ffmpeg " else: _cmd = "ffmpeg -loglevel error " cmd = ( f"{_cmd}" f" -framerate {fps} " f" -i {imagePatern} " f" -c:v {vcodec} " # " -profile:v high " f" -crf {crf} " f" -pix_fmt {pix_fmt} " f" {videoName} -y " ) logger.info("ffmpeg command : \n%s", cmd) res = os.system(cmd) if res != 0: logger.error("video not created, ffmpeg error. Please use -v DEBUG to have full ffmpeg debug output") else: logger.info(f"video {videoName} done! (ffmpeg time : {dt})") return videoName
def _check_video_name(videoName): name, ext = os.path.splitext(videoName) if ext != ".mp4": msg = "Please give a valid videoName which ends by '.mp4'" logger.error(msg) raise ValueError(msg)
[docs] def video2gif(videoName, gif_fps=10, scale=350, ffmpeg_log=False): """convert mp4 into gif Parameters ---------- videoName : str video name gif_fps : int, optional frame per seconds for the gif, by default 15 scale : int, optional height in pixel size. Video scale is kept. by default 350 ffmpeg_log : bool print all ffmpeg logs. If not specified, run `ffmpeg` with `-loglevel quiet` """ with Timing() as dt: _check_video_name(videoName) gifName = videoName.replace(".mp4", ".gif") # palette = "/tmp/palette.png" # filters = f"fps={gif_fps},scale=-1:{scale}:flags=lanczos" # cmd = "ffmpeg " f" -i {videoName} -vf " f" {filters},palettegen -y {palette}" # os.system(cmd) # cmd = "ffmpeg " f" -i {videoName} -i {palette} " f' -lavfi "{filters} [x]; [x][1:v] paletteuse" -y {gifName}' # os.system(cmd) if ffmpeg_log: _cmd = "ffmpeg" else: _cmd = "ffmpeg -loglevel error " cmd = ( f"{_cmd}" f" -i {videoName} " f' -vf "fps={gif_fps},scale=-1:{scale}:-1:flags=lanczos,split[s0][s1];[s0]palettegen[p];[s1][p]paletteuse"' " -loop 0" f" -y {gifName}" ) logger.info(cmd) res = os.system(cmd) if res != 0: logger.error("video not created, ffmpeg error. Please use -v DEBUG to have full ffmpeg debug output") else: logger.info(f"gif {gifName} fait! (temps ffmpeg : {dt})") return gifName
[docs] def parallelize_computation(func, data_iterator, client=None): pass
def _sanitize_inputs(max_frames, compute): # at least one of max_frames and compute have to be defined if max_frames is None and compute is None: raise ValueError("`max_frames` or `compute` have to be defined") # both are defined : OK elif (max_frames is not None) and (compute is not None): # compute = compute pass # only max_frames is defined => loop until the end using compute elif max_frames is None: max_frames = -10 if not hasattr(compute, "__call__"): raise ValueError("`compute` need to be a function. Please check docstring to provide correct function") elif compute is None: def custom_compute(): for _ in range(max_frames): yield xr.Dataset() compute = custom_compute return max_frames, iter(compute())