import time import json import asyncio import aiohttp import ssl import requests from datetime import date from collections import defaultdict import cv2 import numpy as np import argparse import sys from ultralytics import YOLO # Declare video file to read with parameter parser = argparse.ArgumentParser() parser.add_argument("-i", "--input", dest="input", default=0, help="Chemin vers la vidéo à lire ('0' est à utiliser pour la webcam par défaut)") parser.add_argument("-o", "--output", dest="output", default="", help="Emplacement pour enregistrer le processus dans une vidéo (Attention au RGPD, désactivé si aucun)") parser.add_argument("-f", "--force", action='store_true', help="Forcer le redimensionnement du flux d'entrée") parser.add_argument("-u", "--api-url", dest="API_URL", default="", help="Chemin réseau vers l'API (désactivé si aucun)") parser.add_argument("-m", "--model", dest="modelname", default="yolo12n.pt", help="Nom complet du modèle de détection à utiliser (répertoire 'models/')") parser.add_argument('-v', '--verbose', action='store_true', help='Activer la sortie verbeuse du programme.') args = parser.parse_args() # Initialize verbose mode if args.verbose: print("Mode verbeux activé") time_mid_list = [] time_end_list = [] fps_list = [] # Initialize API variables API_URL_EVENT = args.API_URL + "/api/c_auto" API_URL_POS = args.API_URL + "/api/p_auto" API_URL_STATUS = args.API_URL + "/api/status" # Declare interval variable to get status STATUS_CHECK_INTERVAL = 10 # Define function to get API event status async def get_status(session): try: async with session.get(API_URL_STATUS) as response: response.raise_for_status() data = await response.text() return int(data) except aiohttp.ClientError as e: print(f"Impossible d'obtenir l'état: {e}") # Define event API requests function async def send_event(session, action, people_count, person_id, verbose=False): payload = [{"action": action, "people": people_count}, {"id": person_id}] try: async with session.post(API_URL_EVENT, json=payload) as response: response.raise_for_status() if verbose: print(f"Requête action {action} pour l'id:{person_id} avec {people_count} visitors") except aiohttp.ClientError as e: print(f"Impossible d'envoyer l'action {action} de l'id:{person_id}: {e}") # Define position API requests function async def send_position(session, positions, verbose=False): try: async with session.post(API_URL_POS, json=positions) as response: response.raise_for_status() if verbose: print(f"Positions envoyées : {len(positions)}") except aiohttp.ClientError as e: print(f"Impossible d'envoyer les positions: {e}") # Store the track history track_history = defaultdict(lambda: []) # Create asyncio event loop async def main_loop(): # Initialize FPS measurement if args.verbose: prev_time_fps = time.time() # Define for the first time last_position_sent try: last_position_sent except NameError: last_position_sent = int(time.time()) # Patch public variable current_people = set() # Define for the first time last_status_check try: last_status_check except NameError: last_status_check = int(time.time()) - STATUS_CHECK_INTERVAL # Variable for status and det/track by default if args.API_URL: status = 0 else: status = 1 model = None cap = None video_writer = None async with aiohttp.ClientSession() as session: while True: # Check status at the defined interval if args.API_URL: if time.time() - last_status_check >= STATUS_CHECK_INTERVAL: status = await get_status(session) last_status_check = time.time() if args.verbose: print(f"État mis à jour: {status}") # Clear everything and wait for other status if status == 0 or not status: if cap: cap.release() cap = None if video_writer: video_writer.release() video_writer = None cv2.destroyAllWindows() if args.verbose: print("Ressources libérées/détruites. Dans l'attente de l'état 1 pour redémarrer.") # Prevent CPU overload await asyncio.sleep(1) continue # Skip if status is 0 if status == 2: if current_people and args.API_URL: for person_id in current_people: await send_event(session, "exit", 0, person_id, args.verbose) current_people.clear() if args.verbose: print("État 2 reçu: Envoi de l'action de sortie pour tous les ID encore actifs.") await asyncio.sleep(1) continue # Initialize model and video capture if status is 1 if status == 1 and not cap: model = YOLO(f"models/{args.modelname}") if args.verbose: print(f"Modèle YOLO chargé: models/{args.modelname}") try: input_value = int(args.input) cap = cv2.VideoCapture(input_value) except ValueError: cap = cv2.VideoCapture(args.input, cv2.CAP_FFMPEG) if not cap.isOpened(): print("Erreur: Impossible d'ouvrir le flux vidéo.") sys.exit(1) if args.force: cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280) cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720) if args.output: w, h, fps = (int(cap.get(x)) for x in (cv2.CAP_PROP_FRAME_WIDTH, cv2.CAP_PROP_FRAME_HEIGHT, cv2.CAP_PROP_FPS)) video_writer = cv2.VideoWriter(args.output, cv2.VideoWriter_fourcc(*"mp4v"), fps, (w, h)) if args.verbose: print("Capture vidéo initialisée.") # Skip if no video capture is open if not cap: await asyncio.sleep(1) continue # Read frame and process success, frame = cap.read() if not success: print("Erreur: Impossible de lire le flux vidéo. Libération des ressources.") status = 0 # Force reset continue # declare start timing variable if args.verbose: time_start = time.time() # Run YOLO tracking on the frame results = model.track(frame, persist=True, classes=[0], tracker="track/botsort.yaml", verbose=False) annotated_frame = results[0].plot() # declare mid timing variable if args.verbose: time_mid = time.time() new_people = set() tasks = [] pos_tasks = [] if results[0].boxes.id is not None: boxes = results[0].boxes.xywh.cpu() track_ids = results[0].boxes.id.int().cpu().tolist() for box, track_id in zip(boxes, track_ids): x, y, w, h = box track = track_history[track_id] track.append((float(x), float(y))) if len(track) > 30: track.pop(0) new_people.add(track_id) # Detect entries and exits entered = new_people - current_people exited = current_people - new_people # Send event to API for each ID if args.API_URL and status == 1: for person_id in entered: tasks.append(send_event(session, "enter", len(new_people), person_id, args.verbose)) for person_id in exited: tasks.append(send_event(session, "exit", len(new_people), person_id, args.verbose)) # Send position to API for each ID if args.API_URL and status == 1: if int(time.time()) >= (last_position_sent+1): payload_positions = [] height_img, width_img = annotated_frame.shape[:2] boxes_n = results[0].boxes.xywh.cpu() for box, track_id in zip(boxes_n, track_ids): x, y, w, h = [float(coord.item()) for coord in box] x = (x-(w/2)) / width_img y = (y-(h/2)) / height_img w = w / width_img h = h / height_img payload_positions.append([{"pos_x": round(x, 4), "pos_y": round(y, 4), "w": round(w, 4), "h": round(h, 4)}, {"id": track_id}]) last_position_sent = int(time.time()) pos_tasks.append(send_position(session, payload_positions, args.verbose)) current_people = new_people else: # No people detected, send exit event for all remaining if current_people and status == 1: if args.API_URL: for person_id in current_people: tasks.append(send_event(session, "exit", 0, person_id, args.verbose)) current_people = set() # Await all API calls concurrently if tasks: await asyncio.gather(*tasks) if pos_tasks: await asyncio.gather(*pos_tasks) # Math FPS if args.verbose: time_end = time.time() fps_display = int(1 / (time_end - prev_time_fps)) prev_time_fps = time_end # Math MS time_mid_temp = round((time_mid - time_start)*1000 , 1) time_end_temp = round((time_end - time_mid)*1000 , 1) time_mid_list.append(time_mid_temp) time_end_list.append(time_end_temp) fps_list.append(fps_display) # Print iteration measurements print(f"Temps de réponse détection/suivi: {time_mid_temp}\nTemps de réponse dessin: {time_end_temp}\nTemps de travail total: {round(time_mid_temp + time_end_temp, 4)}\nImages par seconde: {fps_display}\n-------------------------") # Prepare OpenCV to print cv2.putText(annotated_frame, f"Nombre visiteurs: {len(current_people)}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) cv2.imshow("Vision par ordinateur - Python", annotated_frame) if args.output: video_writer.write(annotated_frame) if cv2.waitKey(1) & 0xFF == ord("q"): if cap: cap.release() if video_writer: video_writer.release() cv2.destroyAllWindows() break # Run the async main loop asyncio.run(main_loop()) # Print Summary if args.verbose and time_mid_list: print(f"Rapport de performances:\nTemps de réponse moyen détection/suivi: {round(sum(time_mid_list) / len(time_mid_list), 1)}\nTemps de réponse moyen dessin: {round(sum(time_end_list) / len(time_end_list), 1)}\nTemps de réponse moyen: {round((sum(time_mid_list) / len(time_mid_list)) + (sum(time_end_list) / len(time_end_list)), 1)}\nMoyenne images par seconde: {round(sum(fps_list) / len(fps_list), 2)}")