Files
AliceT 18e8d6eef1 Premier commit du programme sur la nouvelle instance Gitea
Import des fichiers :

Modifications qui seront validées :
	nouveau fichier : MOT20-02.mp4
	nouveau fichier : README.md
	nouveau fichier : autoinstall.sh
	nouveau fichier : autostart.sh
	nouveau fichier : checking-camera.py
	nouveau fichier : main.py
	nouveau fichier : models/yolo11n.pt
	nouveau fichier : models/yolo12n.pt
	nouveau fichier : requirements.txt
	nouveau fichier : track/botsort.yaml
	nouveau fichier : track/bytetrack.yaml
2025-10-19 11:53:24 +02:00

285 lines
12 KiB
Python

import time
import json
import asyncio
import aiohttp
import ssl
import requests
from datetime import date
from collections import defaultdict
import cv2
import numpy as np
import argparse
import sys
from ultralytics import YOLO
# Declare video file to read with parameter
parser = argparse.ArgumentParser()
parser.add_argument("-i", "--input", dest="input", default=0, help="Chemin vers la vidéo à lire ('0' est à utiliser pour la webcam par défaut)")
parser.add_argument("-o", "--output", dest="output", default="", help="Emplacement pour enregistrer le processus dans une vidéo (Attention au RGPD, désactivé si aucun)")
parser.add_argument("-f", "--force", action='store_true', help="Forcer le redimensionnement du flux d'entrée")
parser.add_argument("-u", "--api-url", dest="API_URL", default="", help="Chemin réseau vers l'API (désactivé si aucun)")
parser.add_argument("-m", "--model", dest="modelname", default="yolo12n.pt", help="Nom complet du modèle de détection à utiliser (répertoire 'models/')")
parser.add_argument('-v', '--verbose', action='store_true', help='Activer la sortie verbeuse du programme.')
args = parser.parse_args()
# Initialize verbose mode
if args.verbose:
print("Mode verbeux activé")
time_mid_list = []
time_end_list = []
fps_list = []
# Initialize API variables
API_URL_EVENT = args.API_URL + "/api/c_auto"
API_URL_POS = args.API_URL + "/api/p_auto"
API_URL_STATUS = args.API_URL + "/api/status"
# Declare interval variable to get status
STATUS_CHECK_INTERVAL = 10
# Define function to get API event status
async def get_status(session):
try:
async with session.get(API_URL_STATUS) as response:
response.raise_for_status()
data = await response.text()
return int(data)
except aiohttp.ClientError as e:
print(f"Impossible d'obtenir l'état: {e}")
# Define event API requests function
async def send_event(session, action, people_count, person_id, verbose=False):
payload = [{"action": action, "people": people_count}, {"id": person_id}]
try:
async with session.post(API_URL_EVENT, json=payload) as response:
response.raise_for_status()
if verbose:
print(f"Requête action {action} pour l'id:{person_id} avec {people_count} visitors")
except aiohttp.ClientError as e:
print(f"Impossible d'envoyer l'action {action} de l'id:{person_id}: {e}")
# Define position API requests function
async def send_position(session, positions, verbose=False):
try:
async with session.post(API_URL_POS, json=positions) as response:
response.raise_for_status()
if verbose:
print(f"Positions envoyées : {len(positions)}")
except aiohttp.ClientError as e:
print(f"Impossible d'envoyer les positions: {e}")
# Store the track history
track_history = defaultdict(lambda: [])
# Create asyncio event loop
async def main_loop():
# Initialize FPS measurement
if args.verbose:
prev_time_fps = time.time()
# Define for the first time last_position_sent
try:
last_position_sent
except NameError:
last_position_sent = int(time.time())
# Patch public variable
current_people = set()
# Define for the first time last_status_check
try:
last_status_check
except NameError:
last_status_check = int(time.time()) - STATUS_CHECK_INTERVAL
# Variable for status and det/track by default
if args.API_URL:
status = 0
else:
status = 1
model = None
cap = None
video_writer = None
async with aiohttp.ClientSession() as session:
while True:
# Check status at the defined interval
if args.API_URL:
if time.time() - last_status_check >= STATUS_CHECK_INTERVAL:
status = await get_status(session)
last_status_check = time.time()
if args.verbose:
print(f"État mis à jour: {status}")
# Clear everything and wait for other status
if status == 0 or not status:
if cap:
cap.release()
cap = None
if video_writer:
video_writer.release()
video_writer = None
cv2.destroyAllWindows()
if args.verbose:
print("Ressources libérées/détruites. Dans l'attente de l'état 1 pour redémarrer.")
# Prevent CPU overload
await asyncio.sleep(1)
continue
# Skip if status is 0
if status == 2:
if current_people and args.API_URL:
for person_id in current_people:
await send_event(session, "exit", 0, person_id, args.verbose)
current_people.clear()
if args.verbose:
print("État 2 reçu: Envoi de l'action de sortie pour tous les ID encore actifs.")
await asyncio.sleep(1)
continue
# Initialize model and video capture if status is 1
if status == 1 and not cap:
model = YOLO(f"models/{args.modelname}")
if args.verbose:
print(f"Modèle YOLO chargé: models/{args.modelname}")
try:
input_value = int(args.input)
cap = cv2.VideoCapture(input_value)
except ValueError:
cap = cv2.VideoCapture(args.input, cv2.CAP_FFMPEG)
if not cap.isOpened():
print("Erreur: Impossible d'ouvrir le flux vidéo.")
sys.exit(1)
if args.force:
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720)
if args.output:
w, h, fps = (int(cap.get(x)) for x in (cv2.CAP_PROP_FRAME_WIDTH, cv2.CAP_PROP_FRAME_HEIGHT, cv2.CAP_PROP_FPS))
video_writer = cv2.VideoWriter(args.output, cv2.VideoWriter_fourcc(*"mp4v"), fps, (w, h))
if args.verbose:
print("Capture vidéo initialisée.")
# Skip if no video capture is open
if not cap:
await asyncio.sleep(1)
continue
# Read frame and process
success, frame = cap.read()
if not success:
print("Erreur: Impossible de lire le flux vidéo. Libération des ressources.")
status = 0 # Force reset
continue
# declare start timing variable
if args.verbose:
time_start = time.time()
# Run YOLO tracking on the frame
results = model.track(frame, persist=True, classes=[0], tracker="track/botsort.yaml", verbose=False)
annotated_frame = results[0].plot()
# declare mid timing variable
if args.verbose:
time_mid = time.time()
new_people = set()
tasks = []
pos_tasks = []
if results[0].boxes.id is not None:
boxes = results[0].boxes.xywh.cpu()
track_ids = results[0].boxes.id.int().cpu().tolist()
for box, track_id in zip(boxes, track_ids):
x, y, w, h = box
track = track_history[track_id]
track.append((float(x), float(y)))
if len(track) > 30:
track.pop(0)
new_people.add(track_id)
# Detect entries and exits
entered = new_people - current_people
exited = current_people - new_people
# Send event to API for each ID
if args.API_URL and status == 1:
for person_id in entered:
tasks.append(send_event(session, "enter", len(new_people), person_id, args.verbose))
for person_id in exited:
tasks.append(send_event(session, "exit", len(new_people), person_id, args.verbose))
# Send position to API for each ID
if args.API_URL and status == 1:
if int(time.time()) >= (last_position_sent+1):
payload_positions = []
height_img, width_img = annotated_frame.shape[:2]
boxes_n = results[0].boxes.xywh.cpu()
for box, track_id in zip(boxes_n, track_ids):
x, y, w, h = [float(coord.item()) for coord in box]
x = (x-(w/2)) / width_img
y = (y-(h/2)) / height_img
w = w / width_img
h = h / height_img
payload_positions.append([{"pos_x": round(x, 4), "pos_y": round(y, 4), "w": round(w, 4), "h": round(h, 4)}, {"id": track_id}])
last_position_sent = int(time.time())
pos_tasks.append(send_position(session, payload_positions, args.verbose))
current_people = new_people
else:
# No people detected, send exit event for all remaining
if current_people and status == 1:
if args.API_URL:
for person_id in current_people:
tasks.append(send_event(session, "exit", 0, person_id, args.verbose))
current_people = set()
# Await all API calls concurrently
if tasks:
await asyncio.gather(*tasks)
if pos_tasks:
await asyncio.gather(*pos_tasks)
# Math FPS
if args.verbose:
time_end = time.time()
fps_display = int(1 / (time_end - prev_time_fps))
prev_time_fps = time_end
# Math MS
time_mid_temp = round((time_mid - time_start)*1000 , 1)
time_end_temp = round((time_end - time_mid)*1000 , 1)
time_mid_list.append(time_mid_temp)
time_end_list.append(time_end_temp)
fps_list.append(fps_display)
# Print iteration measurements
print(f"Temps de réponse détection/suivi: {time_mid_temp}\nTemps de réponse dessin: {time_end_temp}\nTemps de travail total: {round(time_mid_temp + time_end_temp, 4)}\nImages par seconde: {fps_display}\n-------------------------")
# Prepare OpenCV to print
cv2.putText(annotated_frame, f"Nombre visiteurs: {len(current_people)}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
cv2.imshow("Vision par ordinateur - Python", annotated_frame)
if args.output:
video_writer.write(annotated_frame)
if cv2.waitKey(1) & 0xFF == ord("q"):
if cap:
cap.release()
if video_writer:
video_writer.release()
cv2.destroyAllWindows()
break
# Run the async main loop
asyncio.run(main_loop())
# Print Summary
if args.verbose and time_mid_list:
print(f"Rapport de performances:\nTemps de réponse moyen détection/suivi: {round(sum(time_mid_list) / len(time_mid_list), 1)}\nTemps de réponse moyen dessin: {round(sum(time_end_list) / len(time_end_list), 1)}\nTemps de réponse moyen: {round((sum(time_mid_list) / len(time_mid_list)) + (sum(time_end_list) / len(time_end_list)), 1)}\nMoyenne images par seconde: {round(sum(fps_list) / len(fps_list), 2)}")