Import des fichiers : Modifications qui seront validées : nouveau fichier : MOT20-02.mp4 nouveau fichier : README.md nouveau fichier : autoinstall.sh nouveau fichier : autostart.sh nouveau fichier : checking-camera.py nouveau fichier : main.py nouveau fichier : models/yolo11n.pt nouveau fichier : models/yolo12n.pt nouveau fichier : requirements.txt nouveau fichier : track/botsort.yaml nouveau fichier : track/bytetrack.yaml
285 lines
12 KiB
Python
285 lines
12 KiB
Python
import time
|
|
import json
|
|
import asyncio
|
|
import aiohttp
|
|
import ssl
|
|
import requests
|
|
from datetime import date
|
|
from collections import defaultdict
|
|
import cv2
|
|
import numpy as np
|
|
import argparse
|
|
import sys
|
|
from ultralytics import YOLO
|
|
|
|
# Declare video file to read with parameter
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("-i", "--input", dest="input", default=0, help="Chemin vers la vidéo à lire ('0' est à utiliser pour la webcam par défaut)")
|
|
parser.add_argument("-o", "--output", dest="output", default="", help="Emplacement pour enregistrer le processus dans une vidéo (Attention au RGPD, désactivé si aucun)")
|
|
parser.add_argument("-f", "--force", action='store_true', help="Forcer le redimensionnement du flux d'entrée")
|
|
parser.add_argument("-u", "--api-url", dest="API_URL", default="", help="Chemin réseau vers l'API (désactivé si aucun)")
|
|
parser.add_argument("-m", "--model", dest="modelname", default="yolo12n.pt", help="Nom complet du modèle de détection à utiliser (répertoire 'models/')")
|
|
parser.add_argument('-v', '--verbose', action='store_true', help='Activer la sortie verbeuse du programme.')
|
|
args = parser.parse_args()
|
|
|
|
# Initialize verbose mode
|
|
if args.verbose:
|
|
print("Mode verbeux activé")
|
|
time_mid_list = []
|
|
time_end_list = []
|
|
fps_list = []
|
|
|
|
# Initialize API variables
|
|
API_URL_EVENT = args.API_URL + "/api/c_auto"
|
|
API_URL_POS = args.API_URL + "/api/p_auto"
|
|
API_URL_STATUS = args.API_URL + "/api/status"
|
|
|
|
# Declare interval variable to get status
|
|
STATUS_CHECK_INTERVAL = 10
|
|
|
|
# Define function to get API event status
|
|
async def get_status(session):
|
|
try:
|
|
async with session.get(API_URL_STATUS) as response:
|
|
response.raise_for_status()
|
|
data = await response.text()
|
|
return int(data)
|
|
except aiohttp.ClientError as e:
|
|
print(f"Impossible d'obtenir l'état: {e}")
|
|
|
|
# Define event API requests function
|
|
async def send_event(session, action, people_count, person_id, verbose=False):
|
|
payload = [{"action": action, "people": people_count}, {"id": person_id}]
|
|
try:
|
|
async with session.post(API_URL_EVENT, json=payload) as response:
|
|
response.raise_for_status()
|
|
if verbose:
|
|
print(f"Requête action {action} pour l'id:{person_id} avec {people_count} visitors")
|
|
except aiohttp.ClientError as e:
|
|
print(f"Impossible d'envoyer l'action {action} de l'id:{person_id}: {e}")
|
|
|
|
# Define position API requests function
|
|
async def send_position(session, positions, verbose=False):
|
|
try:
|
|
async with session.post(API_URL_POS, json=positions) as response:
|
|
response.raise_for_status()
|
|
if verbose:
|
|
print(f"Positions envoyées : {len(positions)}")
|
|
except aiohttp.ClientError as e:
|
|
print(f"Impossible d'envoyer les positions: {e}")
|
|
|
|
# Store the track history
|
|
track_history = defaultdict(lambda: [])
|
|
|
|
# Create asyncio event loop
|
|
async def main_loop():
|
|
# Initialize FPS measurement
|
|
if args.verbose:
|
|
prev_time_fps = time.time()
|
|
|
|
# Define for the first time last_position_sent
|
|
try:
|
|
last_position_sent
|
|
except NameError:
|
|
last_position_sent = int(time.time())
|
|
|
|
# Patch public variable
|
|
current_people = set()
|
|
|
|
# Define for the first time last_status_check
|
|
try:
|
|
last_status_check
|
|
except NameError:
|
|
last_status_check = int(time.time()) - STATUS_CHECK_INTERVAL
|
|
|
|
# Variable for status and det/track by default
|
|
if args.API_URL:
|
|
status = 0
|
|
else:
|
|
status = 1
|
|
model = None
|
|
cap = None
|
|
video_writer = None
|
|
|
|
async with aiohttp.ClientSession() as session:
|
|
while True:
|
|
# Check status at the defined interval
|
|
if args.API_URL:
|
|
if time.time() - last_status_check >= STATUS_CHECK_INTERVAL:
|
|
status = await get_status(session)
|
|
last_status_check = time.time()
|
|
if args.verbose:
|
|
print(f"État mis à jour: {status}")
|
|
|
|
# Clear everything and wait for other status
|
|
if status == 0 or not status:
|
|
if cap:
|
|
cap.release()
|
|
cap = None
|
|
if video_writer:
|
|
video_writer.release()
|
|
video_writer = None
|
|
cv2.destroyAllWindows()
|
|
if args.verbose:
|
|
print("Ressources libérées/détruites. Dans l'attente de l'état 1 pour redémarrer.")
|
|
# Prevent CPU overload
|
|
await asyncio.sleep(1)
|
|
continue
|
|
|
|
# Skip if status is 0
|
|
if status == 2:
|
|
if current_people and args.API_URL:
|
|
for person_id in current_people:
|
|
await send_event(session, "exit", 0, person_id, args.verbose)
|
|
current_people.clear()
|
|
if args.verbose:
|
|
print("État 2 reçu: Envoi de l'action de sortie pour tous les ID encore actifs.")
|
|
await asyncio.sleep(1)
|
|
continue
|
|
|
|
# Initialize model and video capture if status is 1
|
|
if status == 1 and not cap:
|
|
model = YOLO(f"models/{args.modelname}")
|
|
if args.verbose:
|
|
print(f"Modèle YOLO chargé: models/{args.modelname}")
|
|
|
|
try:
|
|
input_value = int(args.input)
|
|
cap = cv2.VideoCapture(input_value)
|
|
except ValueError:
|
|
cap = cv2.VideoCapture(args.input, cv2.CAP_FFMPEG)
|
|
|
|
if not cap.isOpened():
|
|
print("Erreur: Impossible d'ouvrir le flux vidéo.")
|
|
sys.exit(1)
|
|
|
|
if args.force:
|
|
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280)
|
|
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720)
|
|
|
|
if args.output:
|
|
w, h, fps = (int(cap.get(x)) for x in (cv2.CAP_PROP_FRAME_WIDTH, cv2.CAP_PROP_FRAME_HEIGHT, cv2.CAP_PROP_FPS))
|
|
video_writer = cv2.VideoWriter(args.output, cv2.VideoWriter_fourcc(*"mp4v"), fps, (w, h))
|
|
|
|
if args.verbose:
|
|
print("Capture vidéo initialisée.")
|
|
|
|
# Skip if no video capture is open
|
|
if not cap:
|
|
await asyncio.sleep(1)
|
|
continue
|
|
|
|
# Read frame and process
|
|
success, frame = cap.read()
|
|
if not success:
|
|
print("Erreur: Impossible de lire le flux vidéo. Libération des ressources.")
|
|
status = 0 # Force reset
|
|
continue
|
|
|
|
# declare start timing variable
|
|
if args.verbose:
|
|
time_start = time.time()
|
|
|
|
# Run YOLO tracking on the frame
|
|
results = model.track(frame, persist=True, classes=[0], tracker="track/botsort.yaml", verbose=False)
|
|
annotated_frame = results[0].plot()
|
|
|
|
# declare mid timing variable
|
|
if args.verbose:
|
|
time_mid = time.time()
|
|
|
|
new_people = set()
|
|
tasks = []
|
|
pos_tasks = []
|
|
|
|
if results[0].boxes.id is not None:
|
|
boxes = results[0].boxes.xywh.cpu()
|
|
track_ids = results[0].boxes.id.int().cpu().tolist()
|
|
|
|
for box, track_id in zip(boxes, track_ids):
|
|
x, y, w, h = box
|
|
track = track_history[track_id]
|
|
track.append((float(x), float(y)))
|
|
if len(track) > 30:
|
|
track.pop(0)
|
|
|
|
new_people.add(track_id)
|
|
|
|
# Detect entries and exits
|
|
entered = new_people - current_people
|
|
exited = current_people - new_people
|
|
|
|
# Send event to API for each ID
|
|
if args.API_URL and status == 1:
|
|
for person_id in entered:
|
|
tasks.append(send_event(session, "enter", len(new_people), person_id, args.verbose))
|
|
for person_id in exited:
|
|
tasks.append(send_event(session, "exit", len(new_people), person_id, args.verbose))
|
|
|
|
# Send position to API for each ID
|
|
if args.API_URL and status == 1:
|
|
if int(time.time()) >= (last_position_sent+1):
|
|
payload_positions = []
|
|
height_img, width_img = annotated_frame.shape[:2]
|
|
boxes_n = results[0].boxes.xywh.cpu()
|
|
for box, track_id in zip(boxes_n, track_ids):
|
|
x, y, w, h = [float(coord.item()) for coord in box]
|
|
x = (x-(w/2)) / width_img
|
|
y = (y-(h/2)) / height_img
|
|
w = w / width_img
|
|
h = h / height_img
|
|
payload_positions.append([{"pos_x": round(x, 4), "pos_y": round(y, 4), "w": round(w, 4), "h": round(h, 4)}, {"id": track_id}])
|
|
last_position_sent = int(time.time())
|
|
pos_tasks.append(send_position(session, payload_positions, args.verbose))
|
|
|
|
current_people = new_people
|
|
else:
|
|
# No people detected, send exit event for all remaining
|
|
if current_people and status == 1:
|
|
if args.API_URL:
|
|
for person_id in current_people:
|
|
tasks.append(send_event(session, "exit", 0, person_id, args.verbose))
|
|
current_people = set()
|
|
|
|
# Await all API calls concurrently
|
|
if tasks:
|
|
await asyncio.gather(*tasks)
|
|
if pos_tasks:
|
|
await asyncio.gather(*pos_tasks)
|
|
|
|
# Math FPS
|
|
if args.verbose:
|
|
time_end = time.time()
|
|
fps_display = int(1 / (time_end - prev_time_fps))
|
|
prev_time_fps = time_end
|
|
|
|
# Math MS
|
|
time_mid_temp = round((time_mid - time_start)*1000 , 1)
|
|
time_end_temp = round((time_end - time_mid)*1000 , 1)
|
|
time_mid_list.append(time_mid_temp)
|
|
time_end_list.append(time_end_temp)
|
|
fps_list.append(fps_display)
|
|
|
|
# Print iteration measurements
|
|
print(f"Temps de réponse détection/suivi: {time_mid_temp}\nTemps de réponse dessin: {time_end_temp}\nTemps de travail total: {round(time_mid_temp + time_end_temp, 4)}\nImages par seconde: {fps_display}\n-------------------------")
|
|
|
|
# Prepare OpenCV to print
|
|
cv2.putText(annotated_frame, f"Nombre visiteurs: {len(current_people)}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
|
|
cv2.imshow("Vision par ordinateur - Python", annotated_frame)
|
|
if args.output:
|
|
video_writer.write(annotated_frame)
|
|
|
|
if cv2.waitKey(1) & 0xFF == ord("q"):
|
|
if cap:
|
|
cap.release()
|
|
if video_writer:
|
|
video_writer.release()
|
|
cv2.destroyAllWindows()
|
|
break
|
|
|
|
# Run the async main loop
|
|
asyncio.run(main_loop())
|
|
|
|
# Print Summary
|
|
if args.verbose and time_mid_list:
|
|
print(f"Rapport de performances:\nTemps de réponse moyen détection/suivi: {round(sum(time_mid_list) / len(time_mid_list), 1)}\nTemps de réponse moyen dessin: {round(sum(time_end_list) / len(time_end_list), 1)}\nTemps de réponse moyen: {round((sum(time_mid_list) / len(time_mid_list)) + (sum(time_end_list) / len(time_end_list)), 1)}\nMoyenne images par seconde: {round(sum(fps_list) / len(fps_list), 2)}") |