WebVisualizer.java
package tweetoscope;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import com.sun.net.httpserver.HttpServer;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.*;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* Classe WebVisualizer - Service 5 du pipeline tweetoscope (Visualization)
*
* RÔLE ET RESPONSABILITÉS:
* Ce service est le dernier composant du pipeline tweetoscope. Il consomme les leaderboards
* de hashtags publiés par HashtagCounter et les affiche dans une interface web interactive
* en temps réel. Les utilisateurs peuvent visualiser les hashtags les plus tendance via
* un navigateur web sur http://localhost:8080
*
* ARCHITECTURE:
* 1. Thread Kafka Consumer: Écoute continuellement le topic "leaderboard"
* 2. Thread HTTP Server: Accepte les requêtes HTTP des navigateurs web
* 3. État partagé: latestLeaderboard stocke le dernier leaderboard reçu
*
* PIPELINE TWEETOSCOPE COMPLET:
* MockTwitterStream → TweetFilter → HashtagExtractor → HashtagCounter → WebVisualizer
* (tweets) (filtrage) (extraction) (comptage) (affichage)
*
* TOPICS KAFKA:
* - INPUT: "leaderboard" - Reçoit les rankings de hashtags toutes les ~1000ms
* - FORMAT: JSON array avec objets {hashtag, count, timestamp}
*
* INTERFACE WEB:
* - Page HTML: Affiche les hashtags avec leur rang et leur compte
* - Auto-refresh: Met à jour l'affichage toutes les 2 secondes
* - API REST: Endpoint /api/leaderboard retourne le JSON brut
*
* SÉCURITÉ ET FIABILITÉ:
* - Pas de CORS restrictions (Access-Control-Allow-Origin: *)
* - Pas d'authentification requise (interface locale)
* - Utilise volatile pour la synchronisation thread-safe du latestLeaderboard
*/
public class WebVisualizer {
private static final Logger logger = Logger.getLogger(WebVisualizer.class.getName());
// === CONFIGURATION KAFKA ===
// Topic d'entrée: "leaderboard"
// Le WebVisualizer consomme les leaderboards publiés par HashtagCounter
private static final String LEADERBOARD_TOPIC = "leaderboard";
// === CONFIGURATION HTTP ===
// Port HTTP pour l'interface web
// L'utilisateur accède au service via http://localhost:8080
private static final int HTTP_PORT = 8080;
// === ATTRIBUTS D'INSTANCE ===
// Consumer Kafka qui écoute le topic "leaderboard"
// Configuré pour recevoir les messages au format String
private final KafkaConsumer<String, String> consumer;
// Parseur JSON pour convertir les données JSON en objets Java
// Utilisé pour parser et générer du JSON
private final Gson gson;
// === ÉTAT PARTAGÉ ENTRE THREADS ===
// Dernière chaîne JSON du leaderboard reçue du topic
// Initialisée à "[]" (tableau JSON vide) si aucun leaderboard n'a été reçu
// volatile: Permet aux threads de voir les mises à jour immédiatement
// (sans cache local du CPU)
// Ce champ est partagé entre:
// - Thread consommateur Kafka (écrit)
// - Threads du serveur HTTP (lecture)
private volatile String latestLeaderboard = "[]";
// Drapeau de contrôle pour arrêter proprement le service
// Quand false, la boucle de consommation Kafka arrête son exécution
private volatile boolean running = true;
/**
* Constructeur du service WebVisualizer
*
* RESPONSABILITÉS:
* 1. Crée un consommateur Kafka configuré pour recevoir les leaderboards
* 2. Initialise le parseur JSON (GSON)
* 3. Affiche les paramètres de démarrage dans les logs
*
* PARAMÈTRES:
* - bootstrapServers: Adresses des brokers Kafka (ex: "localhost:9092")
* - groupId: Identifiant du groupe de consommateurs (ex: "visualizer-group")
* Si plusieurs instances sont lancées avec le même groupId,
* Kafka les considère comme un même groupe et partage les messages
*
* CONFIGURATION KAFKA:
* - AUTO_OFFSET_RESET_CONFIG = "latest":
* Si le consommateur rejoint le groupe pour la première fois (ou après un reset),
* il commence par les NOUVEAUX messages (pas les anciens)
* Cela signifie: pas de "rattrappage" du passé, juste les leaderboards futurs
*
* - ENABLE_AUTO_COMMIT_CONFIG = "true":
* Kafka enregistre automatiquement la position du consommateur
* En cas de redémarrage, on reprend depuis la position sauvegardée
*
* GSON:
* - Initialisé sans adaptateurs spécialisés
* - Utilise les deserializers par défaut pour String et des types simples
*/
public WebVisualizer(String bootstrapServers, String groupId) {
// === INITIALISATION DU PARSEUR JSON ===
// GSON sera utilisé pour traiter les leaderboards si nécessaire
// (Optionnel: le code actual traite les leaderboards comme des Strings brutes)
this.gson = new Gson();
// === CONFIGURATION DU CONSOMMATEUR KAFKA ===
// Crée les propriétés de configuration
Properties consumerProps = new Properties();
// Adresses des brokers Kafka auxquels se connecter
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// Identifiant du groupe de consommateurs
// Permet à Kafka de tracker la position de ce consommateur
// Les instances du même groupe partagent automatiquement les partitions
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
// === DÉSERIALISEURS ===
// Convertissent les données du format Kafka (bytes) au format Java (String)
// KEY_DESERIALIZER: convertit les clés en String
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// VALUE_DESERIALIZER: convertit les valeurs en String
// Les leaderboards sont des JSON sérialisés en String
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// === POSITION DE DÉMARRAGE ===
// AUTO_OFFSET_RESET_CONFIG = "latest":
// Si c'est le premier accès au topic (pas de committed offset):
// - "latest": commence par les nouveaux messages (saute le passé)
// - "earliest": commence par les premiers messages jamais publiés
// Choix "latest" = pas de flood du passé, juste les leaderboards futurs
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
// === COMMIT AUTOMATIQUE ===
// ENABLE_AUTO_COMMIT_CONFIG = "true":
// Kafka enregistre automatiquement la position du consommateur (offset)
// Cela permet au service de reprendre depuis la bonne position après un redémarrage
// Avantage: pas de risque de perdre des messages
// Inconvénient: pas de garantie exacte-une-fois (mais acceptable pour une UI)
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
// === CRÉATION DU CONSOMMATEUR ===
// KafkaConsumer<String, String>: clés et valeurs sont des Strings
this.consumer = new KafkaConsumer<>(consumerProps);
// === LOGS D'INITIALISATION ===
logger.info("Web Visualizer initialized");
logger.info(() -> "Consuming from: " + LEADERBOARD_TOPIC);
logger.info(() -> "Web interface: http://localhost:" + HTTP_PORT);
}
/**
* Méthode start() - Lance le service WebVisualizer
*
* PROCESSUS:
* 1. Crée et démarre un thread de consommation Kafka
* 2. Crée et démarre un serveur HTTP
* 3. Enregistre les adresses web dans les logs
*
* ARCHITECTURE MULTI-THREAD:
* Le service utilise TWO threads parallèles:
*
* THREAD 1 - Consommateur Kafka:
* - Nom: "Web-Visualizer-Consumer"
* - Fonction: consumeLeaderboard()
* - Tâche: Écoute le topic "leaderboard" et met à jour latestLeaderboard
* - Durée de vie: Jusqu'à ce que running = false
*
* THREAD PRINCIPAL (HTTP Server):
* - Accepte les connexions HTTP entrantes
* - Exécute les handlers pour "/" (HTML) et "/api/leaderboard" (JSON)
* - Crée un thread par requête HTTP (ThreadPool géré par HttpServer)
*
* SYNCHRONISATION:
* - Les deux threads lisent/écrivent latestLeaderboard
* - Utilise 'volatile' pour la synchronisation sans locks
*
* ERREURS POTENTIELLES:
* - Si le port 8080 est déjà utilisé, IOException est levée
* - Les erreurs sont loggées mais le service continue
*/
public void start() {
// === DÉMARRAGE DU THREAD CONSOMMATEUR KAFKA ===
// Crée un thread dont la tâche est d'écouter le topic "leaderboard"
// Thread(Runnable, String) = crée un thread exécutant "this::consumeLeaderboard"
// Le nom "Web-Visualizer-Consumer" aide à identifier le thread dans les logs
Thread consumerThread = new Thread(this::consumeLeaderboard, "Web-Visualizer-Consumer");
// Démarre le thread de consommation
// À partir de ce moment, le thread exécute consumeLeaderboard()
consumerThread.start();
// === DÉMARRAGE DU SERVEUR HTTP ===
try {
// === CRÉATION DU SERVEUR HTTP ===
// HttpServer.create(InetSocketAddress, backlog):
// - InetSocketAddress(port) = écoute sur localhost:port (127.0.0.1:port)
// - backlog = 0 = accepte un nombre illimité de connexions en attente
HttpServer server = HttpServer.create(new InetSocketAddress(HTTP_PORT), 0);
// === ENREGISTREMENT DES ROUTES ===
// Enregistre le HomeHandler pour "/" = page HTML principale
// Chaque requête GET http://localhost:8080/ exécute HomeHandler.handle()
server.createContext("/", new HomeHandler());
// Enregistre le LeaderboardApiHandler pour "/api/leaderboard" = API JSON
// Chaque requête GET http://localhost:8080/api/leaderboard retourne le JSON
server.createContext("/api/leaderboard", new LeaderboardApiHandler());
// === CONFIGURATION DU POOL DE THREADS ===
// setExecutor(null) = utilise le ThreadPool par défaut
// Le serveur crée un nouveau thread par requête HTTP reçue
server.setExecutor(null);
// === DÉMARRAGE DU SERVEUR ===
// Le serveur est maintenant actif et écoute sur le port 8080
server.start();
// === LOGS ===
logger.info("Web Visualizer started");
logger.info(() -> "Open your browser: http://localhost:" + HTTP_PORT);
} catch (IOException e) {
// === GESTION D'ERREUR ===
// Le port 8080 est peut-être déjà occupé
// Ou permission insuffisante pour écouter un port < 1024
logger.log(Level.SEVERE, "Failed to start HTTP server", e);
}
}
/**
* Méthode consumeLeaderboard() - Écoute continuellement les leaderboards
*
* PROCESSUS:
* 1. S'abonne au topic "leaderboard"
* 2. Boucle infinie: poll() toutes les 500ms
* 3. Pour chaque leaderboard reçu, met à jour latestLeaderboard
* 4. Enregistre les mises à jour dans les logs
* 5. Gère les erreurs et arrête quand running = false
*
* EXÉCUTION:
* - Cette méthode s'exécute dans un thread séparé
* - Elle est lancée par start() dans "Web-Visualizer-Consumer"
* - Elle bloque continuellement dans la boucle while
*
* ARCHITECTURE:
* - S'abonne une fois (pas de ré-abonnement)
* - Poll toutes les 500ms: équilibre entre réactivité et efficacité
* - Boucle sur chaque record du batch reçu
* - Mise à jour atomique: une seule assignation (latestLeaderboard = ...)
*
* SYNCHRONISATION:
* - latestLeaderboard est volatile pour la visibilité inter-thread
* - Les lecteurs HTTP voient immédiatement les mises à jour
*
* ARRÊT GRACIEUX:
* - Quand running = false, la boucle while arrête
* - Mais la méthode ne ferme PAS le consumer
* - stop() doit être appelée pour fermer proprement
*/
private void consumeLeaderboard() {
// === S'ABONNE AU TOPIC ===
// Collections.singletonList() crée une liste avec un seul élément: "leaderboard"
// Le consumer reçoit maintenant TOUS les messages du topic
consumer.subscribe(Collections.singletonList(LEADERBOARD_TOPIC));
logger.info("Waiting for leaderboard updates");
try {
// === BOUCLE INFINIE DE CONSOMMATION ===
// Tant que running = true, continue à recevoir les leaderboards
while (running) {
// === POLL: ATTEND LES LEADERBOARDS ===
// consumer.poll(Duration) récupère les messages du broker Kafka
// Timeout de 500ms: attend jusqu'à 500ms pour voir s'il y a des messages
// Si aucun message: retourne un batch vide après 500ms
// Si messages arrivent: retourne immédiatement avec les messages
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
// === ITÉRATION SUR LE BATCH ===
// Pour chaque record (leaderboard) reçu du topic
for (ConsumerRecord<String, String> record : records) {
// === MISE À JOUR DU LEADERBOARD ===
// record.value() = la String JSON contenant le leaderboard
// Exemple: [{"hashtag":"travail","count":42},{"hashtag":"work","count":38}]
latestLeaderboard = record.value();
// === LOG ===
// Enregistre chaque mise à jour reçue
logger.info(() -> "Leaderboard updated: " + latestLeaderboard);
}
}
} catch (Exception e) {
// === GESTION D'ERREUR ===
// Erreur réseau, broker down, ou problème de desérialisation
// Log l'erreur mais le service arrête (la boucle se termine)
logger.log(Level.SEVERE, "Error in Web Visualizer consumer", e);
}
}
/**
* Classe interne HomeHandler - Gère les requêtes HTTP GET /
*
* RÔLE:
* Implémente HttpHandler pour traiter les requêtes HTTP vers la racine "/"
* Retourne une page HTML complète avec interface visuelle pour le leaderboard
*
* PROCESSUS:
* 1. Génère le HTML (inline dans la chaîne)
* 2. Définit le header Content-Type: text/html
* 3. Envoie le HTML au navigateur client
* 4. Ferme la connexion
*
* INTERFACE WEB:
* - Titre: "Tweetoscope - Real-time Hashtag Trends"
* - Gradient de couleur: purple à violet
* - Liste affichant les hashtags avec rang et count
* - Auto-refresh toutes les 2 secondes via JavaScript
*
* JAVASCRIPT:
* - updateLeaderboard(): Fait un fetch() vers /api/leaderboard
* - Affiche les résultats au format: "1. #travail 42"
* - Affiche la dernière mise à jour (timestamp local)
* - Erreur?: Affiche "No data yet..."
*/
private class HomeHandler implements HttpHandler {
/**
* Méthode handle() - Traite une requête HTTP GET /
*
* PARAMÈTRE:
* - exchange: HttpExchange - représente la requête HTTP et la réponse
* Contient: headers, méthode (GET/POST), URL, client, etc.
*
* ÉTAPES:
* 1. Génère le HTML avec CSS et JavaScript
* 2. Configure les headers HTTP (Content-Type)
* 3. Écrit le HTML dans la réponse
* 4. Ferme la connexion
*/
@Override
public void handle(HttpExchange exchange) throws IOException {
// === HTML COMPLET ===
// String multi-lignes contenant le HTML5 complet
// Inclus: DOCTYPE, meta tags, styling, JavaScript
String html = "<!DOCTYPE html>\n" +
"<html lang='en'>\n" +
"<head>\n" +
// === META TAGS ===
// Encoding UTF-8 pour les caractères spéciaux (emoji, accents)
" <meta charset='UTF-8'>\n" +
// Viewport: responsive design pour mobile
" <meta name='viewport' content='width=device-width, initial-scale=1.0'>\n" +
// Titre visible dans l'onglet du navigateur
" <title>Tweetoscope - Hashtag Leaderboard</title>\n" +
" <style>\n" +
// === STYLES CSS ===
// body: fond dégradé (gradient violet), texte blanc
" body { font-family: Arial, sans-serif; background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); color: white; margin: 0; padding: 20px; }\n" +
// container: contenu centré, max 800px de largeur
" .container { max-width: 800px; margin: 0 auto; }\n" +
// h1: titre principal, grand et centré
" h1 { text-align: center; font-size: 2.5em; margin-bottom: 10px; }\n" +
// subtitle: sous-titre, légèrement transparent
" .subtitle { text-align: center; opacity: 0.9; margin-bottom: 30px; }\n" +
// leaderboard: conteneur avec fond semi-transparent, flou glassmorphism
" .leaderboard { background: rgba(255,255,255,0.1); border-radius: 10px; padding: 20px; backdrop-filter: blur(10px); }\n" +
// hashtag-item: chaque ligne du classement (rang, hashtag, count)
" .hashtag-item { display: flex; justify-content: space-between; align-items: center; padding: 15px; margin: 10px 0; background: rgba(255,255,255,0.2); border-radius: 8px; transition: transform 0.2s; }\n" +
// hover effect: animation au survol de la souris (zoom 1.02x)
" .hashtag-item:hover { transform: scale(1.02); }\n" +
// rank: numéro de position (1, 2, 3...), grand et gras
" .rank { font-size: 1.5em; font-weight: bold; width: 40px; }\n" +
// hashtag: nom du hashtag, texte plus grand
" .hashtag { flex: 1; font-size: 1.2em; }\n" +
// count: nombre de fois où le hashtag a été compté
" .count { font-size: 1.5em; font-weight: bold; }\n" +
// loading: message d'attente avant que les données arrivent
" .loading { text-align: center; padding: 50px; font-size: 1.2em; }\n" +
// last-update: timestamp de la dernière mise à jour
" .last-update { text-align: center; opacity: 0.7; margin-top: 20px; font-size: 0.9em; }\n" +
" </style>\n" +
"</head>\n" +
"<body>\n" +
// === CONTENU HTML ===
" <div class='container'>\n" +
" <h1>🐦 Tweetoscope</h1>\n" +
" <div class='subtitle'>Real-time Hashtag Trends</div>\n" +
// Zone du leaderboard: remplie par JavaScript
" <div class='leaderboard' id='leaderboard'>\n" +
" <div class='loading'>⏳ Loading leaderboard...</div>\n" +
" </div>\n" +
// Timestamp de la dernière mise à jour
" <div class='last-update' id='lastUpdate'></div>\n" +
" </div>\n" +
" <script>\n" +
// === FONCTION JAVASCRIPT ===
// updateLeaderboard(): récupère le JSON et l'affiche
" async function updateLeaderboard() {\n" +
" try {\n" +
// FETCH: fait une requête HTTP GET vers /api/leaderboard
" const response = await fetch('/api/leaderboard');\n" +
// Convertit la réponse en objet JavaScript (JSON.parse)
" const data = await response.json();\n" +
// Extrait le tableau 'leaderboard' (ou [] si absent)
" const leaderboard = data.leaderboard || [];\n" +
// Génère le HTML pour chaque hashtag
// map(): transforme chaque item en div HTML
// index: position dans l'array (0, 1, 2...)
// index + 1: affiche le rang (1, 2, 3...)
" const html = leaderboard.map((item, index) => `\n" +
" <div class='hashtag-item'>\n" +
" <div class='rank'>${index + 1}</div>\n" +
" <div class='hashtag'>#${item.hashtag}</div>\n" +
" <div class='count'>${item.count}</div>\n" +
" </div>\n" +
// join(''): combine tous les divs en une seule string
" `).join('');\n" +
// Remplace le contenu du div 'leaderboard'
// Si html est vide: affiche "No data yet..."
" document.getElementById('leaderboard').innerHTML = html || '<div class=\"loading\">No data yet...</div>';\n" +
// Affiche la dernière mise à jour (heure locale du client)
" document.getElementById('lastUpdate').textContent = 'Last update: ' + new Date().toLocaleTimeString();\n" +
" } catch (error) {\n" +
// Erreur réseau ou JSON invalide
" console.error('Error fetching leaderboard:', error);\n" +
" }\n" +
" }\n" +
// === AUTO-REFRESH ===
// Appelle updateLeaderboard() immédiatement au chargement
" updateLeaderboard();\n" +
// Puis chaque 2 secondes (2000ms)
// setInterval() répète la fonction indéfiniment
" setInterval(updateLeaderboard, 2000);\n" +
" </script>\n" +
"</body>\n" +
"</html>";
// === HEADERS HTTP ===
// Content-Type: text/html signifie que le contenu est du HTML
// charset=UTF-8 signifie que le texte est encodé en UTF-8
exchange.getResponseHeaders().set("Content-Type", "text/html; charset=UTF-8");
// === ENVOI DE LA RÉPONSE ===
// sendResponseHeaders(statusCode, contentLength):
// - 200 = OK (succès)
// - contentLength = nombre de bytes du HTML
exchange.sendResponseHeaders(200, html.getBytes().length);
// === ÉCRITURE DU BODY ===
// getResponseBody() retourne un OutputStream
OutputStream os = exchange.getResponseBody();
// write(): écrit le HTML dans la réponse
os.write(html.getBytes());
// close(): termine la réponse et ferme la connexion
os.close();
}
}
/**
* Classe interne LeaderboardApiHandler - API REST JSON pour les leaderboards
*
* RÔLE:
* Implémente HttpHandler pour traiter les requêtes HTTP GET /api/leaderboard
* Retourne le leaderboard actuel au format JSON
*
* PROCESSUS:
* 1. Configure les headers HTTP (Content-Type: application/json)
* 2. Ajoute les headers CORS (Access-Control-Allow-Origin: *)
* 3. Envoie le latestLeaderboard (String JSON)
* 4. Ferme la connexion
*
* CORS:
* - Permet aux navigateurs des domaines externes d'accéder à cette API
* - Access-Control-Allow-Origin: * = accepte les requêtes de N'IMPORTE QUEL domaine
* - Utile si un autre service web veut afficher ces données
*
* FORMAT DE RÉPONSE:
* JSON array contenant les hashtags avec leur count
* Exemple:
* [
* {"hashtag":"travail","count":42,"timestamp":1732531800000},
* {"hashtag":"work","count":38,"timestamp":1732531800000}
* ]
*
* LATENCE:
* - Retourne toujours le dernier leaderboard reçu (peut être légèrement ancien)
* - Pas de garantie que c'est le "plus récent" sur le broker Kafka
* - Acceptable pour une UI (affichage visuel, pas critique)
*/
private class LeaderboardApiHandler implements HttpHandler {
/**
* Méthode handle() - Traite une requête HTTP GET /api/leaderboard
*
* PARAMÈTRE:
* - exchange: HttpExchange - la requête et réponse HTTP
*/
@Override
public void handle(HttpExchange exchange) throws IOException {
// === HEADERS HTTP ===
// Content-Type: application/json indique que la réponse est du JSON
exchange.getResponseHeaders().set("Content-Type", "application/json");
// === HEADERS CORS ===
// Access-Control-Allow-Origin: * permet à N'IMPORTE QUEL site d'accéder à cette API
// Sinon, les navigateurs bloqueraient les requêtes cross-domain (CORS policy)
exchange.getResponseHeaders().set("Access-Control-Allow-Origin", "*");
// === ENVOI DE LA RÉPONSE ===
// latestLeaderboard est déjà une String JSON (mis à jour par le thread consumer)
// Exemple: "[{\"hashtag\":\"travail\",\"count\":42}]"
exchange.sendResponseHeaders(200, latestLeaderboard.getBytes().length);
// === ÉCRITURE DU BODY ===
OutputStream os = exchange.getResponseBody();
// Écrit directement le JSON String reçu du topic
os.write(latestLeaderboard.getBytes());
// Ferme et envoie la réponse
os.close();
}
}
/**
* Méthode stop() - Arrête le service proprement
*
* PROCESSUS:
* 1. Définit running = false (arrête la boucle de consommation Kafka)
* 2. Ferme le consumer Kafka (libère les ressources)
*
* NETTOYAGE:
* - running = false: arrête le thread consommateur
* - consumer.close(): enregistre la position (offset) auprès de Kafka
* libère les connexions réseaux
* signale au groupe que ce consommateur s'arrête
*
* NOTE:
* - Ne ferme PAS le serveur HTTP
* - Si appelée, le serveur HTTP continue à recevoir les requêtes
* - Le dernier leaderboard reste disponible via /api/leaderboard
*/
public void stop() {
// === ARRÊT DE LA CONSOMMATION ===
// Définit le drapeau à false, ce qui arrête la boucle while dans consumeLeaderboard()
running = false;
// === FERMETURE DU CONSUMER KAFKA ===
// consumer.close() effectue:
// 1. Enregistre la position (offset) pour cette partition
// 2. Se retire du groupe de consommateurs
// 3. Ferme les connexions réseau vers les brokers
// 4. Libère les ressources (memory, threads, etc.)
consumer.close();
}
/**
* Méthode main() - Point d'entrée du service WebVisualizer
*
* PROCESSUS:
* 1. Valide les arguments de ligne de commande
* 2. Initialise le service WebVisualizer
* 3. Lance le service (démarre les threads Kafka et HTTP)
* 4. Gère les erreurs critiques
*
* ARGUMENTS REQUIS:
* args[0] = bootstrap-servers: Adresses Kafka (ex: "localhost:9092")
* args[1] = consumer-group: ID du groupe (ex: "visualizer-group")
*
* EXEMPLE:
* java WebVisualizer localhost:9092 visualizer-group
*
* CODES DE SORTIE:
* - 0: Arrêt normal (impossible à atteindre car start() bloque)
* - 1: Erreur au démarrage (arguments invalides ou exception)
*
* COMPORTEMENT:
* - Le service démarre deux threads et bloque dans le serveur HTTP
* - Arrêt: Ctrl+C dans le terminal (envoie SIGINT)
*/
public static void main(String[] args) {
// === VALIDATION DES ARGUMENTS ===
// Vérifie que l'utilisateur a fourni au moins 2 arguments
if (args.length < 2) {
// === ERREUR: ARGUMENTS INSUFFISANTS ===
// Affiche le mode d'emploi (usage)
logger.severe("Usage: WebVisualizer <bootstrap-servers> <consumer-group>");
// Quitte le programme avec code d'erreur 1
System.exit(1);
}
// === EXTRACTION DES ARGUMENTS ===
// args[0] = adresses des brokers Kafka (ex: "localhost:9092,kafka2:9092")
String bootstrapServers = args[0];
// args[1] = identifiant du groupe de consommateurs
// Permet à Kafka de tracker la position de ce service
String consumerGroup = args[1];
// === LOGS D'INFORMATION ===
logger.info("Starting WebVisualizer service");
logger.info(() -> "Bootstrap servers: " + bootstrapServers);
logger.info(() -> "Consumer group: " + consumerGroup);
try {
// === INITIALISATION ===
// Crée une instance de WebVisualizer avec les paramètres Kafka
// Le constructeur configure le consumer, les handlers HTTP, etc.
WebVisualizer visualizer = new WebVisualizer(bootstrapServers, consumerGroup);
// === LOGS ===
logger.info("Web Visualizer service initialized");
logger.info("Consuming from: leaderboard");
logger.info("Web interface: http://localhost:8080");
logger.info("Running in headless mode");
// === LANCEMENT DU SERVICE ===
// start() crée les threads et lance le serveur HTTP
// Cette méthode bloque pour toujours (le serveur HTTP reste actif)
// Le service s'arrête seulement via Ctrl+C
visualizer.start();
} catch (Exception e) {
// === GESTION D'ERREUR CRITIQUE ===
// Exceptions: port 8080 utilisé, broker Kafka inaccessible, etc.
logger.log(Level.SEVERE, "Web Visualizer service failed", e);
// Quitte avec code d'erreur 1
System.exit(1);
}
}
}