Source code for anim.anim

import logging
import multiprocessing
import os
import warnings

import matplotlib
import matplotlib.pyplot as plt
from dask.distributed import Client, LocalCluster, as_completed

from anim.data import AnimationInfo, Stats, StatStorage, dump_data, load_data
from anim.tools import Timing, _sanitize_inputs, image_patern, images2video

logger = logging.getLogger(__name__)


[docs] def process(i, data, f_plot, animationInfo: AnimationInfo): """function started on every worker, for each image""" img_name = animationInfo.imagePatern % i stats = Stats(img_name=img_name) if animationInfo.checkIfImageExist: if os.path.exists(img_name): return stats ds, stat = load_data(data) stats |= stat with warnings.catch_warnings(): warnings.filterwarnings( action="ignore", message="Starting a Matplotlib GUI outside of the main thread will likely fail" ) with Timing() as timer: fig = f_plot(i, ds) stats.img_building = timer.dt if fig is None: raise Exception("plot function did not return anything. It should return a matplotlib Figure") if not isinstance(fig, matplotlib.figure.Figure): raise Exception(f"plot function did not return a matplotlib.figure.Figure, but '{type(fig)}'") if animationInfo.onlyCompute: return fig, stats try: with Timing() as timer: fig.savefig(img_name, **animationInfo.savefig_kwargs) # enregistre l'image except FileNotFoundError as err: logger.error(f"problem when saving {img_name}") raise err stats.img_saving = timer.dt # sometimes not all artists are deleted try: for ax in fig.axes: for obj_name in [ "artists", "lines", "texts", "_colorbars", "_path_effects", "collections", ]: for obj in list(getattr(ax, obj_name)): obj.remove() except Exception: pass # delete matplotlib figure plt.close(fig) return None, stats
[docs] def get_imagePatern(imageFolder, max_frames): return os.path.join(imageFolder, image_patern(max_frames))
[docs] def simple_building( f_plot, compute=None, max_frames=None, indices=[], # only savefig_kwargs=dict(), show=None, ): max_frames, iter_compute = _sanitize_inputs(max_frames, compute) if len(indices) == 0: indices = [0] logger.info(f"we will build only images : {indices}") _i = -1 while len(indices) > 0: _i += 1 try: ds = next(iter_compute) except StopIteration: break if _i in indices: indices.remove(_i) logger.info(f"processing image i={_i}") if show is not False: fig = f_plot(_i, ds) if show is None: plt.show() else: name = show.format(i=_i) fig.savefig(name, **savefig_kwargs) logger.info(f"figure i={_i} saved in '{name}'") else: f_plot(_i, ds) return
[docs] def build_images( f_plot, imageFolder, compute=None, max_frames=None, force=False, nprocess=0, savefig_kwargs=dict(), client=None, max_memory_ds=1e6, ): max_frames, iter_compute = _sanitize_inputs(max_frames, compute) os.makedirs(imageFolder, exist_ok=True) imageNames = get_imagePatern(imageFolder, max_frames) logger.info(f"image will be saved under : {imageNames}") animationInfo = AnimationInfo( imagePatern=imageNames, checkIfImageExist=not force, savefig_kwargs=savefig_kwargs, # onlyCompute=only ) # this wrap function is needed to pass the f_plot function def _process(i, data): return process(i, data, f_plot, animationInfo) statStorage = StatStorage() need_delete_client = False if force: os.system(f"rm -rf {os.path.dirname(imageNames)}") os.system(f"mkdir -p {os.path.dirname(imageNames)}") if client is None: logger.info("building dask local client..") need_delete_client = True n_workers = nprocess if nprocess > 0 else multiprocessing.cpu_count() - 1 cluster = LocalCluster(processes=True, n_workers=n_workers, threads_per_worker=1) client = Client(cluster) # if its a function, execute it elif hasattr(client, "__call__"): logger.info("building dask custom client..") client = client() need_delete_client = True else: logger.info("no need to build dask client (given by parameter)") dask_info = client.scheduler_info() logger.info( f"dask client built. nbr workers : {len(dask_info['workers'])}, dashboard : {dask_info["services"]["dashboard"]}" ) futures = [] dict_futures = {} i_image = -1 str_max_frames = max_frames if max_frames > 0 else "???" while i_image != max_frames: i_image += 1 image_name = imageNames % i_image try: with Timing() as timer: ds = next(iter_compute) except StopIteration: break stat = Stats(img_name=image_name, time_data_computation=timer.dt, size_data_uncompressed=ds.nbytes) statStorage(stat) if animationInfo.checkIfImageExist and os.path.exists(image_name): logger.debug(f"img {i_image+1:03d}/{str_max_frames} already exist") continue data, stat2 = dump_data(ds) statStorage(stat | stat2) r = client.submit(_process, i_image, data) futures.append(r) dict_futures[r] = i_image logger.debug(f"img {i_image+1:03d}/{str_max_frames} : {statStorage[stat]}") nbImagesAlreadyDone = statStorage.size - len(futures) if nbImagesAlreadyDone > 0: logger.info(f"{nbImagesAlreadyDone} images already computed") logger.info("%d images to be computed..", len(futures)) with Timing() as dt_computation: _tot = len(futures) for i_future, future in enumerate(as_completed(futures)): if future.status == "error": import traceback msg = f"Future [{i_future+1:03d}/{_tot}]: NOK\n" msg += f" args = {dict_futures[future]}\n" msg += f" exception = {future.exception()}\n" msg += f" traceback = {traceback.print_tb(future.traceback())}" logger.debug(msg) else: try: _, stat = future.result() except Exception: msg = "problems with future reception" logger.debug(msg) statStorage(stat) logger.debug(f"fig {i_future+1:03d}/{_tot} done : {statStorage[stat]}") if need_delete_client: import time time.sleep(0.5) client.close() client.cluster.close() logger.info( f"{len(futures)} images computed ({dt_computation})", ) return imageNames, statStorage.build_dataframe()
[docs] def animate( f_plot, workFolder, fps, compute=None, max_frames=None, savefig_kwargs=dict(), client=None, force=False, nprocess=0, only_convert=False, no_convert=False, max_memory_ds=1e6, ffmpeg_log=False, ): """create images in parallel and then combine them in a video Parameters ---------- f_plot : callable function used to build the image. See Note for more infos workFolder : str path where all the images and video will be created fps : int frame per seconds for the video compute : callable, optional function which yield data to be used by `f_plot`. See Note for more infos. By default None max_frames : int, optional maximum of images to build, by default None savefig_kwargs : _type_, optional options to pass to `fig.savefig(...)`, by default dict() client : dask.Client or callable, optional dask client used for building images. You can pass a function which return a dask.Client. by default None force : bool, optional if True, force regeneration of all images. if False, image already computed and saved on disk won't be computed. By default False nprocess : int, optional number of cpu to use when not providing a dask.Client. If not provided, use all cpus only_convert : bool, optional if True, don't generate images at all. We only build the video, by default False no_convert : bool, optional if True, don't generate a video at all. We only build the images max_memory_ds : int, optional if data provided by `compute` exceed max_memory_ds, it will be compressed in zarr to minimise ram usage. By default 1e6 ffmpeg_log : bool, optional print all ffmpeg logs. If not specified, run `ffmpeg` with `-loglevel quiet Returns ------- str return the video name """ os.makedirs(workFolder, exist_ok=True) workFolder = os.path.normpath(workFolder) imageFolder = os.path.join(workFolder, "imgs") if only_convert: imageNames = get_imagePatern(workFolder, max_frames) else: imageNames, df = build_images( f_plot, imageFolder, compute=compute, max_frames=max_frames, force=force, nprocess=nprocess, savefig_kwargs=savefig_kwargs, client=client, max_memory_ds=max_memory_ds, ) logger.info("\n" + str(df.describe())) videoName = "video.mp4" pathVideo = os.path.join(workFolder, videoName) name, ext = os.path.splitext(pathVideo) if ext != ".mp4": ext = ext + ".mp4" if not no_convert: images2video(imageNames, fps, pathVideo, ffmpeg_log=ffmpeg_log) return pathVideo