WebVisualizer.java

package tweetoscope;

import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import com.sun.net.httpserver.HttpServer;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.*;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 * Classe WebVisualizer - Service 5 du pipeline tweetoscope (Visualization)
 * 
 * RÔLE ET RESPONSABILITÉS:
 * Ce service est le dernier composant du pipeline tweetoscope. Il consomme les leaderboards
 * de hashtags publiés par HashtagCounter et les affiche dans une interface web interactive
 * en temps réel. Les utilisateurs peuvent visualiser les hashtags les plus tendance via
 * un navigateur web sur http://localhost:8080
 * 
 * ARCHITECTURE:
 * 1. Thread Kafka Consumer: Écoute continuellement le topic "leaderboard"
 * 2. Thread HTTP Server: Accepte les requêtes HTTP des navigateurs web
 * 3. État partagé: latestLeaderboard stocke le dernier leaderboard reçu
 * 
 * PIPELINE TWEETOSCOPE COMPLET:
 * MockTwitterStream → TweetFilter → HashtagExtractor → HashtagCounter → WebVisualizer
 *    (tweets)       (filtrage)    (extraction)      (comptage)       (affichage)
 * 
 * TOPICS KAFKA:
 * - INPUT: "leaderboard" - Reçoit les rankings de hashtags toutes les ~1000ms
 * - FORMAT: JSON array avec objets {hashtag, count, timestamp}
 * 
 * INTERFACE WEB:
 * - Page HTML: Affiche les hashtags avec leur rang et leur compte
 * - Auto-refresh: Met à jour l'affichage toutes les 2 secondes
 * - API REST: Endpoint /api/leaderboard retourne le JSON brut
 * 
 * SÉCURITÉ ET FIABILITÉ:
 * - Pas de CORS restrictions (Access-Control-Allow-Origin: *)
 * - Pas d'authentification requise (interface locale)
 * - Utilise volatile pour la synchronisation thread-safe du latestLeaderboard
 */
public class WebVisualizer {
    private static final Logger logger = Logger.getLogger(WebVisualizer.class.getName());
    
    // === CONFIGURATION KAFKA ===
    // Topic d'entrée: "leaderboard"
    // Le WebVisualizer consomme les leaderboards publiés par HashtagCounter
    private static final String LEADERBOARD_TOPIC = "leaderboard";
    
    // === CONFIGURATION HTTP ===
    // Port HTTP pour l'interface web
    // L'utilisateur accède au service via http://localhost:8080
    private static final int HTTP_PORT = 8080;
    
    // === ATTRIBUTS D'INSTANCE ===
    // Consumer Kafka qui écoute le topic "leaderboard"
    // Configuré pour recevoir les messages au format String
    private final KafkaConsumer<String, String> consumer;
    
    // Parseur JSON pour convertir les données JSON en objets Java
    // Utilisé pour parser et générer du JSON
    private final Gson gson;
    
    // === ÉTAT PARTAGÉ ENTRE THREADS ===
    // Dernière chaîne JSON du leaderboard reçue du topic
    // Initialisée à "[]" (tableau JSON vide) si aucun leaderboard n'a été reçu
    // volatile: Permet aux threads de voir les mises à jour immédiatement
    //           (sans cache local du CPU)
    // Ce champ est partagé entre:
    //   - Thread consommateur Kafka (écrit)
    //   - Threads du serveur HTTP (lecture)
    private volatile String latestLeaderboard = "[]";
    
    // Drapeau de contrôle pour arrêter proprement le service
    // Quand false, la boucle de consommation Kafka arrête son exécution
    private volatile boolean running = true;

    /**
     * Constructeur du service WebVisualizer
     * 
     * RESPONSABILITÉS:
     * 1. Crée un consommateur Kafka configuré pour recevoir les leaderboards
     * 2. Initialise le parseur JSON (GSON)
     * 3. Affiche les paramètres de démarrage dans les logs
     * 
     * PARAMÈTRES:
     * - bootstrapServers: Adresses des brokers Kafka (ex: "localhost:9092")
     * - groupId: Identifiant du groupe de consommateurs (ex: "visualizer-group")
     *            Si plusieurs instances sont lancées avec le même groupId,
     *            Kafka les considère comme un même groupe et partage les messages
     * 
     * CONFIGURATION KAFKA:
     * - AUTO_OFFSET_RESET_CONFIG = "latest":
     *   Si le consommateur rejoint le groupe pour la première fois (ou après un reset),
     *   il commence par les NOUVEAUX messages (pas les anciens)
     *   Cela signifie: pas de "rattrappage" du passé, juste les leaderboards futurs
     * 
     * - ENABLE_AUTO_COMMIT_CONFIG = "true":
     *   Kafka enregistre automatiquement la position du consommateur
     *   En cas de redémarrage, on reprend depuis la position sauvegardée
     * 
     * GSON:
     * - Initialisé sans adaptateurs spécialisés
     * - Utilise les deserializers par défaut pour String et des types simples
     */
    public WebVisualizer(String bootstrapServers, String groupId) {
        // === INITIALISATION DU PARSEUR JSON ===
        // GSON sera utilisé pour traiter les leaderboards si nécessaire
        // (Optionnel: le code actual traite les leaderboards comme des Strings brutes)
        this.gson = new Gson();

        // === CONFIGURATION DU CONSOMMATEUR KAFKA ===
        // Crée les propriétés de configuration
        Properties consumerProps = new Properties();
        
        // Adresses des brokers Kafka auxquels se connecter
        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        
        // Identifiant du groupe de consommateurs
        // Permet à Kafka de tracker la position de ce consommateur
        // Les instances du même groupe partagent automatiquement les partitions
        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        
        // === DÉSERIALISEURS ===
        // Convertissent les données du format Kafka (bytes) au format Java (String)
        // KEY_DESERIALIZER: convertit les clés en String
        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        
        // VALUE_DESERIALIZER: convertit les valeurs en String
        // Les leaderboards sont des JSON sérialisés en String
        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        
        // === POSITION DE DÉMARRAGE ===
        // AUTO_OFFSET_RESET_CONFIG = "latest":
        // Si c'est le premier accès au topic (pas de committed offset):
        //   - "latest": commence par les nouveaux messages (saute le passé)
        //   - "earliest": commence par les premiers messages jamais publiés
        // Choix "latest" = pas de flood du passé, juste les leaderboards futurs
        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        
        // === COMMIT AUTOMATIQUE ===
        // ENABLE_AUTO_COMMIT_CONFIG = "true":
        // Kafka enregistre automatiquement la position du consommateur (offset)
        // Cela permet au service de reprendre depuis la bonne position après un redémarrage
        // Avantage: pas de risque de perdre des messages
        // Inconvénient: pas de garantie exacte-une-fois (mais acceptable pour une UI)
        consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        
        // === CRÉATION DU CONSOMMATEUR ===
        // KafkaConsumer<String, String>: clés et valeurs sont des Strings
        this.consumer = new KafkaConsumer<>(consumerProps);

        // === LOGS D'INITIALISATION ===
        logger.info("Web Visualizer initialized");
        logger.info(() -> "Consuming from: " + LEADERBOARD_TOPIC);
        logger.info(() -> "Web interface: http://localhost:" + HTTP_PORT);
    }

    /**
     * Méthode start() - Lance le service WebVisualizer
     * 
     * PROCESSUS:
     * 1. Crée et démarre un thread de consommation Kafka
     * 2. Crée et démarre un serveur HTTP
     * 3. Enregistre les adresses web dans les logs
     * 
     * ARCHITECTURE MULTI-THREAD:
     * Le service utilise TWO threads parallèles:
     * 
     * THREAD 1 - Consommateur Kafka:
     *   - Nom: "Web-Visualizer-Consumer"
     *   - Fonction: consumeLeaderboard()
     *   - Tâche: Écoute le topic "leaderboard" et met à jour latestLeaderboard
     *   - Durée de vie: Jusqu'à ce que running = false
     * 
     * THREAD PRINCIPAL (HTTP Server):
     *   - Accepte les connexions HTTP entrantes
     *   - Exécute les handlers pour "/" (HTML) et "/api/leaderboard" (JSON)
     *   - Crée un thread par requête HTTP (ThreadPool géré par HttpServer)
     * 
     * SYNCHRONISATION:
     * - Les deux threads lisent/écrivent latestLeaderboard
     * - Utilise 'volatile' pour la synchronisation sans locks
     * 
     * ERREURS POTENTIELLES:
     * - Si le port 8080 est déjà utilisé, IOException est levée
     * - Les erreurs sont loggées mais le service continue
     */
    public void start() {
        // === DÉMARRAGE DU THREAD CONSOMMATEUR KAFKA ===
        // Crée un thread dont la tâche est d'écouter le topic "leaderboard"
        // Thread(Runnable, String) = crée un thread exécutant "this::consumeLeaderboard"
        // Le nom "Web-Visualizer-Consumer" aide à identifier le thread dans les logs
        Thread consumerThread = new Thread(this::consumeLeaderboard, "Web-Visualizer-Consumer");
        
        // Démarre le thread de consommation
        // À partir de ce moment, le thread exécute consumeLeaderboard()
        consumerThread.start();

        // === DÉMARRAGE DU SERVEUR HTTP ===
        try {
            // === CRÉATION DU SERVEUR HTTP ===
            // HttpServer.create(InetSocketAddress, backlog):
            // - InetSocketAddress(port) = écoute sur localhost:port (127.0.0.1:port)
            // - backlog = 0 = accepte un nombre illimité de connexions en attente
            HttpServer server = HttpServer.create(new InetSocketAddress(HTTP_PORT), 0);
            
            // === ENREGISTREMENT DES ROUTES ===
            // Enregistre le HomeHandler pour "/" = page HTML principale
            // Chaque requête GET http://localhost:8080/ exécute HomeHandler.handle()
            server.createContext("/", new HomeHandler());
            
            // Enregistre le LeaderboardApiHandler pour "/api/leaderboard" = API JSON
            // Chaque requête GET http://localhost:8080/api/leaderboard retourne le JSON
            server.createContext("/api/leaderboard", new LeaderboardApiHandler());
            
            // === CONFIGURATION DU POOL DE THREADS ===
            // setExecutor(null) = utilise le ThreadPool par défaut
            // Le serveur crée un nouveau thread par requête HTTP reçue
            server.setExecutor(null);
            
            // === DÉMARRAGE DU SERVEUR ===
            // Le serveur est maintenant actif et écoute sur le port 8080
            server.start();
            
            // === LOGS ===
            logger.info("Web Visualizer started");
            logger.info(() -> "Open your browser: http://localhost:" + HTTP_PORT);
            
        } catch (IOException e) {
            // === GESTION D'ERREUR ===
            // Le port 8080 est peut-être déjà occupé
            // Ou permission insuffisante pour écouter un port < 1024
            logger.log(Level.SEVERE, "Failed to start HTTP server", e);
        }
    }

    /**
     * Méthode consumeLeaderboard() - Écoute continuellement les leaderboards
     * 
     * PROCESSUS:
     * 1. S'abonne au topic "leaderboard"
     * 2. Boucle infinie: poll() toutes les 500ms
     * 3. Pour chaque leaderboard reçu, met à jour latestLeaderboard
     * 4. Enregistre les mises à jour dans les logs
     * 5. Gère les erreurs et arrête quand running = false
     * 
     * EXÉCUTION:
     * - Cette méthode s'exécute dans un thread séparé
     * - Elle est lancée par start() dans "Web-Visualizer-Consumer"
     * - Elle bloque continuellement dans la boucle while
     * 
     * ARCHITECTURE:
     * - S'abonne une fois (pas de ré-abonnement)
     * - Poll toutes les 500ms: équilibre entre réactivité et efficacité
     * - Boucle sur chaque record du batch reçu
     * - Mise à jour atomique: une seule assignation (latestLeaderboard = ...)
     * 
     * SYNCHRONISATION:
     * - latestLeaderboard est volatile pour la visibilité inter-thread
     * - Les lecteurs HTTP voient immédiatement les mises à jour
     * 
     * ARRÊT GRACIEUX:
     * - Quand running = false, la boucle while arrête
     * - Mais la méthode ne ferme PAS le consumer
     * - stop() doit être appelée pour fermer proprement
     */
    private void consumeLeaderboard() {
        // === S'ABONNE AU TOPIC ===
        // Collections.singletonList() crée une liste avec un seul élément: "leaderboard"
        // Le consumer reçoit maintenant TOUS les messages du topic
        consumer.subscribe(Collections.singletonList(LEADERBOARD_TOPIC));
        
        logger.info("Waiting for leaderboard updates");

        try {
            // === BOUCLE INFINIE DE CONSOMMATION ===
            // Tant que running = true, continue à recevoir les leaderboards
            while (running) {
                // === POLL: ATTEND LES LEADERBOARDS ===
                // consumer.poll(Duration) récupère les messages du broker Kafka
                // Timeout de 500ms: attend jusqu'à 500ms pour voir s'il y a des messages
                // Si aucun message: retourne un batch vide après 500ms
                // Si messages arrivent: retourne immédiatement avec les messages
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
                
                // === ITÉRATION SUR LE BATCH ===
                // Pour chaque record (leaderboard) reçu du topic
                for (ConsumerRecord<String, String> record : records) {
                    // === MISE À JOUR DU LEADERBOARD ===
                    // record.value() = la String JSON contenant le leaderboard
                    // Exemple: [{"hashtag":"travail","count":42},{"hashtag":"work","count":38}]
                    latestLeaderboard = record.value();
                    
                    // === LOG ===
                    // Enregistre chaque mise à jour reçue
                    logger.info(() -> "Leaderboard updated: " + latestLeaderboard);
                }
            }
            
        } catch (Exception e) {
            // === GESTION D'ERREUR ===
            // Erreur réseau, broker down, ou problème de desérialisation
            // Log l'erreur mais le service arrête (la boucle se termine)
            logger.log(Level.SEVERE, "Error in Web Visualizer consumer", e);
        }
    }

    /**
     * Classe interne HomeHandler - Gère les requêtes HTTP GET /
     * 
     * RÔLE:
     * Implémente HttpHandler pour traiter les requêtes HTTP vers la racine "/"
     * Retourne une page HTML complète avec interface visuelle pour le leaderboard
     * 
     * PROCESSUS:
     * 1. Génère le HTML (inline dans la chaîne)
     * 2. Définit le header Content-Type: text/html
     * 3. Envoie le HTML au navigateur client
     * 4. Ferme la connexion
     * 
     * INTERFACE WEB:
     * - Titre: "Tweetoscope - Real-time Hashtag Trends"
     * - Gradient de couleur: purple à violet
     * - Liste affichant les hashtags avec rang et count
     * - Auto-refresh toutes les 2 secondes via JavaScript
     * 
     * JAVASCRIPT:
     * - updateLeaderboard(): Fait un fetch() vers /api/leaderboard
     * - Affiche les résultats au format: "1. #travail 42"
     * - Affiche la dernière mise à jour (timestamp local)
     * - Erreur?: Affiche "No data yet..."
     */
    private class HomeHandler implements HttpHandler {
        /**
         * Méthode handle() - Traite une requête HTTP GET /
         * 
         * PARAMÈTRE:
         * - exchange: HttpExchange - représente la requête HTTP et la réponse
         *   Contient: headers, méthode (GET/POST), URL, client, etc.
         * 
         * ÉTAPES:
         * 1. Génère le HTML avec CSS et JavaScript
         * 2. Configure les headers HTTP (Content-Type)
         * 3. Écrit le HTML dans la réponse
         * 4. Ferme la connexion
         */
        @Override
        public void handle(HttpExchange exchange) throws IOException {
            // === HTML COMPLET ===
            // String multi-lignes contenant le HTML5 complet
            // Inclus: DOCTYPE, meta tags, styling, JavaScript
            String html = "<!DOCTYPE html>\n" +
                    "<html lang='en'>\n" +
                    "<head>\n" +
                    // === META TAGS ===
                    // Encoding UTF-8 pour les caractères spéciaux (emoji, accents)
                    "    <meta charset='UTF-8'>\n" +
                    // Viewport: responsive design pour mobile
                    "    <meta name='viewport' content='width=device-width, initial-scale=1.0'>\n" +
                    // Titre visible dans l'onglet du navigateur
                    "    <title>Tweetoscope - Hashtag Leaderboard</title>\n" +
                    
                    "    <style>\n" +
                    // === STYLES CSS ===
                    // body: fond dégradé (gradient violet), texte blanc
                    "        body { font-family: Arial, sans-serif; background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); color: white; margin: 0; padding: 20px; }\n" +
                    // container: contenu centré, max 800px de largeur
                    "        .container { max-width: 800px; margin: 0 auto; }\n" +
                    // h1: titre principal, grand et centré
                    "        h1 { text-align: center; font-size: 2.5em; margin-bottom: 10px; }\n" +
                    // subtitle: sous-titre, légèrement transparent
                    "        .subtitle { text-align: center; opacity: 0.9; margin-bottom: 30px; }\n" +
                    // leaderboard: conteneur avec fond semi-transparent, flou glassmorphism
                    "        .leaderboard { background: rgba(255,255,255,0.1); border-radius: 10px; padding: 20px; backdrop-filter: blur(10px); }\n" +
                    // hashtag-item: chaque ligne du classement (rang, hashtag, count)
                    "        .hashtag-item { display: flex; justify-content: space-between; align-items: center; padding: 15px; margin: 10px 0; background: rgba(255,255,255,0.2); border-radius: 8px; transition: transform 0.2s; }\n" +
                    // hover effect: animation au survol de la souris (zoom 1.02x)
                    "        .hashtag-item:hover { transform: scale(1.02); }\n" +
                    // rank: numéro de position (1, 2, 3...), grand et gras
                    "        .rank { font-size: 1.5em; font-weight: bold; width: 40px; }\n" +
                    // hashtag: nom du hashtag, texte plus grand
                    "        .hashtag { flex: 1; font-size: 1.2em; }\n" +
                    // count: nombre de fois où le hashtag a été compté
                    "        .count { font-size: 1.5em; font-weight: bold; }\n" +
                    // loading: message d'attente avant que les données arrivent
                    "        .loading { text-align: center; padding: 50px; font-size: 1.2em; }\n" +
                    // last-update: timestamp de la dernière mise à jour
                    "        .last-update { text-align: center; opacity: 0.7; margin-top: 20px; font-size: 0.9em; }\n" +
                    "    </style>\n" +
                    "</head>\n" +
                    
                    "<body>\n" +
                    // === CONTENU HTML ===
                    "    <div class='container'>\n" +
                    "        <h1>🐦 Tweetoscope</h1>\n" +
                    "        <div class='subtitle'>Real-time Hashtag Trends</div>\n" +
                    // Zone du leaderboard: remplie par JavaScript
                    "        <div class='leaderboard' id='leaderboard'>\n" +
                    "            <div class='loading'>⏳ Loading leaderboard...</div>\n" +
                    "        </div>\n" +
                    // Timestamp de la dernière mise à jour
                    "        <div class='last-update' id='lastUpdate'></div>\n" +
                    "    </div>\n" +
                    
                    "    <script>\n" +
                    // === FONCTION JAVASCRIPT ===
                    // updateLeaderboard(): récupère le JSON et l'affiche
                    "        async function updateLeaderboard() {\n" +
                    "            try {\n" +
                    // FETCH: fait une requête HTTP GET vers /api/leaderboard
                    "                const response = await fetch('/api/leaderboard');\n" +
                    // Convertit la réponse en objet JavaScript (JSON.parse)
                    "                const data = await response.json();\n" +
                    // Extrait le tableau 'leaderboard' (ou [] si absent)
                    "                const leaderboard = data.leaderboard || [];\n" +
                    
                    // Génère le HTML pour chaque hashtag
                    // map(): transforme chaque item en div HTML
                    // index: position dans l'array (0, 1, 2...)
                    // index + 1: affiche le rang (1, 2, 3...)
                    "                const html = leaderboard.map((item, index) => `\n" +
                    "                    <div class='hashtag-item'>\n" +
                    "                        <div class='rank'>${index + 1}</div>\n" +
                    "                        <div class='hashtag'>#${item.hashtag}</div>\n" +
                    "                        <div class='count'>${item.count}</div>\n" +
                    "                    </div>\n" +
                    // join(''): combine tous les divs en une seule string
                    "                `).join('');\n" +
                    
                    // Remplace le contenu du div 'leaderboard'
                    // Si html est vide: affiche "No data yet..."
                    "                document.getElementById('leaderboard').innerHTML = html || '<div class=\"loading\">No data yet...</div>';\n" +
                    // Affiche la dernière mise à jour (heure locale du client)
                    "                document.getElementById('lastUpdate').textContent = 'Last update: ' + new Date().toLocaleTimeString();\n" +
                    "            } catch (error) {\n" +
                    // Erreur réseau ou JSON invalide
                    "                console.error('Error fetching leaderboard:', error);\n" +
                    "            }\n" +
                    "        }\n" +
                    
                    // === AUTO-REFRESH ===
                    // Appelle updateLeaderboard() immédiatement au chargement
                    "        updateLeaderboard();\n" +
                    // Puis chaque 2 secondes (2000ms)
                    // setInterval() répète la fonction indéfiniment
                    "        setInterval(updateLeaderboard, 2000);\n" +
                    "    </script>\n" +
                    "</body>\n" +
                    "</html>";

            // === HEADERS HTTP ===
            // Content-Type: text/html signifie que le contenu est du HTML
            // charset=UTF-8 signifie que le texte est encodé en UTF-8
            exchange.getResponseHeaders().set("Content-Type", "text/html; charset=UTF-8");
            
            // === ENVOI DE LA RÉPONSE ===
            // sendResponseHeaders(statusCode, contentLength):
            // - 200 = OK (succès)
            // - contentLength = nombre de bytes du HTML
            exchange.sendResponseHeaders(200, html.getBytes().length);
            
            // === ÉCRITURE DU BODY ===
            // getResponseBody() retourne un OutputStream
            OutputStream os = exchange.getResponseBody();
            // write(): écrit le HTML dans la réponse
            os.write(html.getBytes());
            // close(): termine la réponse et ferme la connexion
            os.close();
        }
    }

    /**
     * Classe interne LeaderboardApiHandler - API REST JSON pour les leaderboards
     * 
     * RÔLE:
     * Implémente HttpHandler pour traiter les requêtes HTTP GET /api/leaderboard
     * Retourne le leaderboard actuel au format JSON
     * 
     * PROCESSUS:
     * 1. Configure les headers HTTP (Content-Type: application/json)
     * 2. Ajoute les headers CORS (Access-Control-Allow-Origin: *)
     * 3. Envoie le latestLeaderboard (String JSON)
     * 4. Ferme la connexion
     * 
     * CORS:
     * - Permet aux navigateurs des domaines externes d'accéder à cette API
     * - Access-Control-Allow-Origin: * = accepte les requêtes de N'IMPORTE QUEL domaine
     * - Utile si un autre service web veut afficher ces données
     * 
     * FORMAT DE RÉPONSE:
     * JSON array contenant les hashtags avec leur count
     * Exemple:
     * [
     *   {"hashtag":"travail","count":42,"timestamp":1732531800000},
     *   {"hashtag":"work","count":38,"timestamp":1732531800000}
     * ]
     * 
     * LATENCE:
     * - Retourne toujours le dernier leaderboard reçu (peut être légèrement ancien)
     * - Pas de garantie que c'est le "plus récent" sur le broker Kafka
     * - Acceptable pour une UI (affichage visuel, pas critique)
     */
    private class LeaderboardApiHandler implements HttpHandler {
        /**
         * Méthode handle() - Traite une requête HTTP GET /api/leaderboard
         * 
         * PARAMÈTRE:
         * - exchange: HttpExchange - la requête et réponse HTTP
         */
        @Override
        public void handle(HttpExchange exchange) throws IOException {
            // === HEADERS HTTP ===
            // Content-Type: application/json indique que la réponse est du JSON
            exchange.getResponseHeaders().set("Content-Type", "application/json");
            
            // === HEADERS CORS ===
            // Access-Control-Allow-Origin: * permet à N'IMPORTE QUEL site d'accéder à cette API
            // Sinon, les navigateurs bloqueraient les requêtes cross-domain (CORS policy)
            exchange.getResponseHeaders().set("Access-Control-Allow-Origin", "*");
            
            // === ENVOI DE LA RÉPONSE ===
            // latestLeaderboard est déjà une String JSON (mis à jour par le thread consumer)
            // Exemple: "[{\"hashtag\":\"travail\",\"count\":42}]"
            exchange.sendResponseHeaders(200, latestLeaderboard.getBytes().length);
            
            // === ÉCRITURE DU BODY ===
            OutputStream os = exchange.getResponseBody();
            // Écrit directement le JSON String reçu du topic
            os.write(latestLeaderboard.getBytes());
            // Ferme et envoie la réponse
            os.close();
        }
    }

    /**
     * Méthode stop() - Arrête le service proprement
     * 
     * PROCESSUS:
     * 1. Définit running = false (arrête la boucle de consommation Kafka)
     * 2. Ferme le consumer Kafka (libère les ressources)
     * 
     * NETTOYAGE:
     * - running = false: arrête le thread consommateur
     * - consumer.close(): enregistre la position (offset) auprès de Kafka
     *                    libère les connexions réseaux
     *                    signale au groupe que ce consommateur s'arrête
     * 
     * NOTE:
     * - Ne ferme PAS le serveur HTTP
     * - Si appelée, le serveur HTTP continue à recevoir les requêtes
     * - Le dernier leaderboard reste disponible via /api/leaderboard
     */
    public void stop() {
        // === ARRÊT DE LA CONSOMMATION ===
        // Définit le drapeau à false, ce qui arrête la boucle while dans consumeLeaderboard()
        running = false;
        
        // === FERMETURE DU CONSUMER KAFKA ===
        // consumer.close() effectue:
        // 1. Enregistre la position (offset) pour cette partition
        // 2. Se retire du groupe de consommateurs
        // 3. Ferme les connexions réseau vers les brokers
        // 4. Libère les ressources (memory, threads, etc.)
        consumer.close();
    }

    /**
     * Méthode main() - Point d'entrée du service WebVisualizer
     * 
     * PROCESSUS:
     * 1. Valide les arguments de ligne de commande
     * 2. Initialise le service WebVisualizer
     * 3. Lance le service (démarre les threads Kafka et HTTP)
     * 4. Gère les erreurs critiques
     * 
     * ARGUMENTS REQUIS:
     * args[0] = bootstrap-servers: Adresses Kafka (ex: "localhost:9092")
     * args[1] = consumer-group: ID du groupe (ex: "visualizer-group")
     * 
     * EXEMPLE:
     * java WebVisualizer localhost:9092 visualizer-group
     * 
     * CODES DE SORTIE:
     * - 0: Arrêt normal (impossible à atteindre car start() bloque)
     * - 1: Erreur au démarrage (arguments invalides ou exception)
     * 
     * COMPORTEMENT:
     * - Le service démarre deux threads et bloque dans le serveur HTTP
     * - Arrêt: Ctrl+C dans le terminal (envoie SIGINT)
     */
    public static void main(String[] args) {
        // === VALIDATION DES ARGUMENTS ===
        // Vérifie que l'utilisateur a fourni au moins 2 arguments
        if (args.length < 2) {
            // === ERREUR: ARGUMENTS INSUFFISANTS ===
            // Affiche le mode d'emploi (usage)
            logger.severe("Usage: WebVisualizer <bootstrap-servers> <consumer-group>");
            // Quitte le programme avec code d'erreur 1
            System.exit(1);
        }

        // === EXTRACTION DES ARGUMENTS ===
        // args[0] = adresses des brokers Kafka (ex: "localhost:9092,kafka2:9092")
        String bootstrapServers = args[0];
        
        // args[1] = identifiant du groupe de consommateurs
        // Permet à Kafka de tracker la position de ce service
        String consumerGroup = args[1];

        // === LOGS D'INFORMATION ===
        logger.info("Starting WebVisualizer service");
        logger.info(() -> "Bootstrap servers: " + bootstrapServers);
        logger.info(() -> "Consumer group: " + consumerGroup);

        try {
            // === INITIALISATION ===
            // Crée une instance de WebVisualizer avec les paramètres Kafka
            // Le constructeur configure le consumer, les handlers HTTP, etc.
            WebVisualizer visualizer = new WebVisualizer(bootstrapServers, consumerGroup);

            // === LOGS ===
            logger.info("Web Visualizer service initialized");
            logger.info("Consuming from: leaderboard");
            logger.info("Web interface: http://localhost:8080");
            logger.info("Running in headless mode");

            // === LANCEMENT DU SERVICE ===
            // start() crée les threads et lance le serveur HTTP
            // Cette méthode bloque pour toujours (le serveur HTTP reste actif)
            // Le service s'arrête seulement via Ctrl+C
            visualizer.start();

        } catch (Exception e) {
            // === GESTION D'ERREUR CRITIQUE ===
            // Exceptions: port 8080 utilisé, broker Kafka inaccessible, etc.
            logger.log(Level.SEVERE, "Web Visualizer service failed", e);
            // Quitte avec code d'erreur 1
            System.exit(1);
        }
    }
}