HashtagExtractor.java

package tweetoscope;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonSyntaxException;
import com.google.gson.TypeAdapter;
import com.google.gson.stream.JsonReader;
import com.google.gson.stream.JsonToken;
import com.google.gson.stream.JsonWriter;
import com.twitter.clientlib.model.Tweet;
import com.twitter.twittertext.Extractor;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.io.IOException;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 * Classe HashtagExtractor - Service 2 du pipeline tweetoscope
 * 
 * RÔLE PRINCIPAL:
 * Ce service extrait les hashtags des tweets filtrés et les publie dans Kafka.
 * Il agit comme un maillon intermédiaire dans le pipeline de traitement des tweets.
 * 
 * FLUX DE TRAITEMENT:
 * 1. Consomme les tweets filtrés depuis le topic "filtered-tweets"
 * 2. Parse chaque tweet JSON et extrait les hashtags
 * 3. Utilise la bibliothèque Twitter Text pour identifier les hashtags valides
 * 4. Crée un message structuré pour chaque hashtag
 * 5. Publie les hashtags dans le topic "hashtags" avec garantie de fiabilité
 * 
 * FIABILITÉ (TASK8):
 * - Garantit la sémantique "exactly-once" avec acks=all
 * - Active l'idempotence du producteur (pas de doublons)
 * - Maintient l'ordre des messages
 * - Retry automatique en cas d'échec
 * 
 */
public class HashtagExtractor {
	// Logger statique pour tracer les opérations du service
	private static final Logger logger = Logger.getLogger(HashtagExtractor.class.getName());
	
	// === CONSTANTES KAFKA ===
	// Topic d'entrée: tweets filtrés issus du service de filtrage
	private static final String INPUT_TOPIC = "filtered-tweets";
	
	// Topic de sortie: hashtags extraits à destination du compteur
	private static final String OUTPUT_TOPIC = "hashtags";
	
	// === ATTRIBUTS D'INSTANCE ===
	// Consommateur Kafka pour lire les tweets filtrés depuis le topic d'entrée
	private final KafkaConsumer<String, String> consumer;
	
	// Producteur Kafka pour publier les hashtags dans le topic de sortie
	// Configuré avec des paramètres de fiabilité (acks=all, idempotence activée)
	private final KafkaProducer<String, String> producer;
	
	// Parseur JSON GSON configuré pour gérer les dates OffsetDateTime
	private final Gson gson;
	
	/**
	 * Utilitaire Twitter Text pour l'extraction de hashtags
	 * 
	 * Cette classe provient de la bibliothèque twitter-text qui fournit
	 * une implémentation robuste et officielle de Twitter pour identifier
	 * les hashtags, mentions, URLs, etc. selon les spécifications Twitter.
	 * 
	 * Elle est préférable à une regex simple car elle gère les cas limites.
	 */
	final Extractor twitterTextExtractor = new Extractor();

	/**
	 * Classe interne OffsetDateTimeAdapter - Adaptateur personnalisé GSON
	 * 
	 * OBJECTIF:
	 * GSON par défaut ne sait pas convertir les dates OffsetDateTime.
	 * Cet adaptateur indique à GSON comment sérialiser/désérialiser ce type.
	 * 
	 * SÉRIALISATION (Java → JSON):
	 * Convertit un OffsetDateTime en String au format ISO-8601
	 * Exemple: 2023-11-25T10:30:00+01:00
	 * 
	 * DÉSÉRIALISATION (JSON → Java):
	 * Convertit une String JSON en objet OffsetDateTime
	 * Gère les valeurs null correctement
	 */
	private static class OffsetDateTimeAdapter extends TypeAdapter<OffsetDateTime> {
		/**
		 * Méthode write() - Sérialise un OffsetDateTime en JSON
		 * 
		 * @param out Écrivain JSON pour écrire la valeur
		 * @param value Objet OffsetDateTime à sérialiser (peut être null)
		 */
		@Override
		public void write(JsonWriter out, OffsetDateTime value) throws IOException {
			// Teste si la valeur est null
			if (value == null) {
				// Écrit littéralement null en JSON
				out.nullValue();
				return;
			}
			// Convertit la date en String ISO-8601 et l'écrit
			// toString() retourne le format ISO officiel: "2023-11-25T10:30:00+01:00"
			out.value(value.toString());
		}

		/**
		 * Méthode read() - Désérialise un JSON en OffsetDateTime
		 * 
		 * @param in Lecteur JSON pour lire la valeur
		 * @return Objet OffsetDateTime parsé (ou null si JSON contient null)
		 */
		@Override
		public OffsetDateTime read(JsonReader in) throws IOException {
			// Vérifie si le token JSON courant est null
			if (in.peek() == JsonToken.NULL) {
				// Lit le null et le retourne
				in.nextNull();
				return null;
			}
			// Lit la String depuis le JSON et la parse en OffsetDateTime
			// OffsetDateTime.parse() utilise le format ISO-8601 par défaut
			return OffsetDateTime.parse(in.nextString());
		}
	}

	/**
	 * Constructeur du service HashtagExtractor
	 * 
	 * @param bootstrapServers Adresses des brokers Kafka (ex: "localhost:9092")
	 * @param groupId Identifiant du groupe de consommateurs (ex: "extractor-group")
	 */
	public HashtagExtractor(String bootstrapServers, String groupId) {
		// === CONFIGURATION DU CONSOMMATEUR ===
		// Le consommateur lit les tweets filtrés depuis le topic d'entrée
		Properties consumerProps = new Properties();
		
		// Adresses des brokers Kafka pour se connecter au cluster
		consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
		
		// Identifiant du groupe de consommateurs
		// Si plusieurs instances du service sont lancées avec le même groupId,
		// Kafka distribuera automatiquement les partitions entre elles
		consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
		
		// Désérialiseurs: convertissent les données Kafka en objets Java
		// KEY_DESERIALIZER: convertit les clés en String
		consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
		
		// VALUE_DESERIALIZER: convertit les valeurs en String (tweets JSON)
		consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
		
		// Stratégie si aucun offset n'est trouvé pour ce groupe:
		// "earliest" = commence depuis le début du topic (tous les anciens messages)
		// Alternative: "latest" = commence depuis les nouveaux messages uniquement
		consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
		
		// Crée le consommateur Kafka avec la configuration
		this.consumer = new KafkaConsumer<>(consumerProps);

		// === CONFIGURATION DU PRODUCTEUR AVEC GARANTIES DE FIABILITÉ ===
		// Le producteur publie les hashtags extraits avec fiabilité maximale (TASK8)
		Properties producerProps = new Properties();
		
		// Adresses des brokers Kafka
		producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
		
		// Sérialiseurs: convertissent les objets Java en données Kafka
		// KEY_SERIALIZER: convertit les clés (hashtag) en bytes
		producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
		
		// VALUE_SERIALIZER: convertit les valeurs (message JSON) en bytes
		producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
		
		// === GARANTIES EXACTLY-ONCE (TASK8) ===
		// Ces paramètres garantissent qu'aucun hashtag ne sera publié deux fois
		
		// ACKS_CONFIG = "all": le producteur attend la confirmation de TOUS les replicas
		// Cela signifie que Kafka a copié le message sur tous les serveurs (leader + followers)
		// Très fiable mais un peu plus lent (tradeoff entre latence et fiabilité)
		producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
		
		// ENABLE_IDEMPOTENCE_CONFIG = "true": active l'idempotence du producteur
		// Si le producteur envoie deux fois le même message (ex: après timeout),
		// Kafka en reconnaîtra qu'un seul et rejettera le doublon
		producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
		
		// MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION = 5: limite les requêtes simultanées
		// Garantit que les messages sont publiés dans l'ordre même en cas de retry
		// (Si trop de requêtes en vol, l'ordre peut se brouiller)
		producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5");
		
		// RETRIES_CONFIG = MAX_VALUE: réessaye indéfiniment en cas d'échec
		// Combine avec timeouts appropriés, cela garantit que les messages seront publiés
		producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
		
		// Crée le producteur Kafka avec les configurations de fiabilité
		this.producer = new KafkaProducer<>(producerProps);

		// === CONFIGURATION DU PARSEUR JSON ===
		// GSON est utilisé pour convertir les Strings JSON en objets Java et vice-versa
		this.gson = new GsonBuilder()
				// Enregistre un adaptateur personnalisé pour les dates OffsetDateTime
				// Permet à GSON de parser les dates au format ISO-8601
				.registerTypeAdapter(OffsetDateTime.class, new OffsetDateTimeAdapter())
				.create();
		
		// Enregistre l'initialisation réussie dans les logs
		logger.info("HashtagExtractor initialized with reliability configurations (acks=all, idempotence=true)");
	}

	/**
	 * Méthode run() - Boucle principale du service d'extraction
	 * 
	 * PROCESSUS:
	 * 1. S'abonne au topic d'entrée "filtered-tweets"
	 * 2. Attend continuellement l'arrivée de tweets
	 * 3. Pour chaque batch de tweets, extrait les hashtags
	 * 4. Publie les hashtags dans le topic de sortie
	 * 
	 * NOTE: Cette méthode est bloquante (infinie) - elle ne retourne jamais
	 * sauf en cas d'erreur ou d'interruption du thread
	 */
	@SuppressWarnings("java:S2189")  // Infinite loop is intentional for continuous stream processing
	public void run() {
		// === S'ABONNE AU TOPIC D'ENTRÉE ===
		// Collections.singletonList() crée une liste contenant un seul élément: le topic "filtered-tweets"
		// Le consommateur reçoit alors tous les messages publiés dans ce topic
		consumer.subscribe(Collections.singletonList(INPUT_TOPIC));
		
		// Enregistre le démarrage du service dans les logs
		logger.info(() -> "HashtagExtractor started. Consuming from '" + INPUT_TOPIC + "', producing to '" + OUTPUT_TOPIC + "'.");
		logger.info("Publishing structured messages for hashtags");

		try {
			// Drapeau pour la boucle de traitement
			// (En pratique, il reste toujours true car aucun code ne le change)
			boolean keepProcessing = true;
			
			// === BOUCLE INFINIE DE TRAITEMENT ===
			// Traite continuellement les tweets qui arrivent du topic
			while (keepProcessing) {
				// === POLL: Attend les tweets avec timeout de 500ms ===
				// consumer.poll(timeout) récupère les tweets du broker Kafka
				// Si aucun tweet n'arrive pendant 500ms, retourne un batch vide
				// Cela évite que le thread ne consomme 100% du CPU
				ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
				
				// Traite le batch reçu (peut être vide)
				processRecords(records);
			}
		} finally {
			// === NETTOYAGE DES RESSOURCES ===
			// Le bloc finally s'exécute TOUJOURS, même en cas d'exception
			// Ferme proprement le consommateur (libère les ressources)
			consumer.close();
			
			// Ferme proprement le producteur (libère les ressources)
			producer.close();
		}
	}

	/**
	 * Méthode processRecords() - Traite un batch de tweets
	 * 
	 * PROCESSUS:
	 * 1. Parcourt chaque tweet du batch reçu
	 * 2. Désérialise le JSON en objet Tweet
	 * 3. Extrait tous les hashtags du texte du tweet
	 * 4. Publie un message structuré pour chaque hashtag
	 * 5. Gère les erreurs de parsing JSON
	 * 
	 * PARAMÈTRE:
	 * - records: ConsumerRecords<String, String> - batch de messages Kafka
	 *   Chaque record contient un tweet au format JSON dans sa valeur
	 * 
	 * GESTION D'ERREURS:
	 * - Si le JSON n'est pas valide: capture et log JsonSyntaxException, continue
	 * - Si une erreur générique survient: capture et log Exception, continue
	 * Cela permet au service de rester robuste face à des données défectueuses
	 */
	private void processRecords(ConsumerRecords<String, String> records) {
		// === ITÉRATION SUR LE BATCH ===
		// Pour chaque record (tweet) reçu du topic "filtered-tweets"
		for (ConsumerRecord<String, String> record : records) {
			try {
				// === DÉSÉRIALISATION JSON ===
				// record.value() contient la chaîne JSON du tweet
				// gson.fromJson() la convertit en objet Tweet avec tous les champs
				// (id, createdAt, text, author, lang, conversation_id, etc.)
				Tweet tweet = gson.fromJson(record.value(), Tweet.class);
				
				// === VÉRIFICATION DE NULL ===
				// Vérifie que le parsing a réussi et que l'objet Tweet existe
				if (tweet != null) {
					// Traite les hashtags extraits du tweet
					processHashtagsInTweet(tweet);
				}
				
			} catch (JsonSyntaxException e) {
				// === GESTION D'ERREUR: JSON INVALIDE ===
				// Le JSON ne peut pas être parsé (format incorrect, champs manquants, etc.)
				// Log l'erreur avec le contenu JSON brut pour analyser le problème
				// La boucle continue pour traiter les prochains tweets
				logger.log(Level.WARNING, "Failed to parse tweet JSON: " + record.value(), e);
				
			} catch (Exception e) {
				// === GESTION D'ERREUR: AUTRES EXCEPTIONS ===
				// Capture toute exception non prévue (accès null, erreurs de calcul, etc.)
				// Log l'erreur complète avec la pile d'appels pour déboguer
				// La boucle continue pour traiter les prochains tweets
				logger.log(Level.SEVERE, "Unexpected error while processing a tweet", e);
			}
		}
	}

	/**
	 * Méthode processHashtagsInTweet() - Extrait et publie les hashtags d'un tweet
	 * 
	 * PROCESSUS:
	 * 1. Utilise la bibliothèque Twitter Text pour extraire les hashtags
	 * 2. Respecte les règles Twitter officielles pour les hashtags
	 * 3. Pour chaque hashtag trouvé, crée un message structuré
	 * 4. Publie le message dans le topic "hashtags"
	 * 
	 * PARAMÈTRE:
	 * - tweet: L'objet Tweet à traiter
	 *   Doit contenir le texte (tweet.getText()) pour l'extraction
	 * 
	 * EXEMPLE:
	 * Si le tweet contient: "Je suis heureux! #travail #success"
	 * Cette méthode extraira: ["travail", "success"]
	 * Et publiera 2 messages HashtagMessage séparés
	 * 
	 * NOTES SUR LA BIBLIOTHÈQUE TWITTER TEXT:
	 * - Respecte les règles Twitter: caractères Unicode, longueur minimale, etc.
	 * - Extrait UNIQUEMENT les hashtags (pas les mentions @ ou URLs)
	 * - Gère correctement la ponctuation avant/après les hashtags
	 */
	private void processHashtagsInTweet(Tweet tweet) {
		// === EXTRACTION DES HASHTAGS ===
		// twitterTextExtractor.extractHashtags() retourne une List<String> avec tous les hashtags
		// Utilise les règles officielles Twitter pour identifier les hashtags valides
		List<String> hashtags = twitterTextExtractor.extractHashtags(tweet.getText());
		
		// === PUBLICATION DE CHAQUE HASHTAG ===
		// Pour chaque hashtag trouvé, crée et publie un message structuré
		for (String hashtag : hashtags) {
			publishHashtag(hashtag, tweet);
		}
	}

	/**
	 * Méthode publishHashtag() - Crée et publie un message de hashtag dans Kafka
	 * 
	 * PROCESSUS:
	 * 1. Crée un objet HashtagMessage structuré avec le hashtag et les métadonnées du tweet
	 * 2. Sérialise l'objet en JSON avec GSON
	 * 3. Crée un ProducerRecord avec topic, clé (hashtag), et valeur (JSON)
	 * 4. Envoie le message au broker Kafka de façon ASYNCHRONE
	 * 5. Enregistre le succès ou l'erreur dans les logs
	 * 
	 * PARAMÈTRES:
	 * - hashtag: String - le hashtag à publier (ex: "travail")
	 * - tweet: Tweet - le tweet source contenant le hashtag
	 * 
	 * STRUCTURE KAFKA:
	 * Topic: "hashtags"
	 * Clé: nom du hashtag (permet de grouper les messages par hashtag)
	 * Valeur: JSON du HashtagMessage contenant hashtag, id du tweet, timestamp, etc.
	 * 
	 * ENVOI ASYNCHRONE:
	 * - producer.send() retourne IMMÉDIATEMENT (non-bloquant)
	 * - Le callback de Kafka notifie du succès ou de l'erreur PLUS TARD
	 * - Cela permet au service de continuer à traiter sans attendre la confirmation
	 * 
	 * NOTE: Grâce aux configurations (acks=all, idempotence), le message est garanti
	 * d'être publié exactement une fois, même en cas de retry
	 */
	private void publishHashtag(String hashtag, Tweet tweet) {
		// === CRÉATION DU MESSAGE ===
		// createHashtagMessage() construit un objet HashtagMessage contenant:
		// - le hashtag lui-même
		// - l'ID du tweet source
		// - le timestamp du tweet (ou l'heure actuelle si absent)
		HashtagMessage message = createHashtagMessage(hashtag, tweet);
		
		// === SÉRIALISATION EN JSON ===
		// gson.toJson() convertit l'objet HashtagMessage en String JSON
		// Format: {"hashtag":"travail","tweetId":"123456","timestamp":1234567890}
		String jsonValue = gson.toJson(message);
		
		// === CRÉATION DU RECORD KAFKA ===
		// ProducerRecord<Key, Value> = ProducerRecord<String, String>
		// - Topic: OUTPUT_TOPIC = "hashtags"
		// - Clé: hashtag (ex: "travail") - permet de grouper par hashtag
		// - Valeur: JSON du message
		ProducerRecord<String, String> hashtagRecord = new ProducerRecord<>(OUTPUT_TOPIC, hashtag, jsonValue);
		
		// === ENVOI ASYNCHRONE ===
		// producer.send(record, callback) envoie le message de façon non-bloquante
		// Le callback s'exécutera quand Kafka aura confirmé (ou rejeté) le message
		// metadata = informations de confirmation (partition, offset, timestamp Kafka)
		// exception = exception si l'envoi a échoué
		producer.send(hashtagRecord, (metadata, exception) -> {
			// === GESTION DU CALLBACK ===
			if (exception != null) {
				// === ERREUR: L'ENVOI A ÉCHOUÉ ===
				// Même avec les retries et acks=all, l'envoi peut échouer
				// (broker down, timeout réseau, erreur du broker, etc.)
				logger.log(Level.SEVERE, "Failed to publish hashtag", exception);
			}
		});
		
		// === LOG D'INFORMATION ===
		// Enregistre que le message a été ENVOYÉ (pas encore confirmé)
		// L'enregistrement "Published" dans les logs signifie que send() a été appelé,
		// pas que le broker a reçu le message
		logger.info(() -> "Published hashtag: #" + hashtag + " (key=" + hashtag + ")");
	}

	/**
	 * Méthode createHashtagMessage() - Crée un message structuré de hashtag
	 * 
	 * PROCESSUS:
	 * 1. Extrait le timestamp du tweet (date de publication)
	 * 2. Si le tweet n'a pas de date, utilise l'heure actuelle du système
	 * 3. Crée et retourne un objet HashtagMessage avec tous les champs
	 * 
	 * PARAMÈTRES:
	 * - hashtag: String - le hashtag à inclure (ex: "travail")
	 * - tweet: Tweet - le tweet source contenant la date et l'ID
	 * 
	 * RETOUR:
	 * - HashtagMessage - objet structuré contenant:
	 *   * hashtag: le hashtag lui-même
	 *   * timestamp: milliseconde depuis l'époque Unix (timestamp Kafka compatible)
	 *   * tweetId: l'ID du tweet source
	 * 
	 * EXTRACTION DU TIMESTAMP:
	 * - tweet.getCreatedAt() retourne un OffsetDateTime (date + heure + fuseau horaire)
	 * - toInstant() convertit en UTC (Instant)
	 * - toEpochMilli() convertit en millisecondes depuis 1970-01-01 UTC
	 * - Si getCreatedAt() est null (date manquante), utilise System.currentTimeMillis()
	 * 
	 * EXEMPLE:
	 * Si tweet.getCreatedAt() = "2025-11-25T14:30:00+01:00"
	 * Alors timestamp = 1732531800000 (milliseconds)
	 */
	private HashtagMessage createHashtagMessage(String hashtag, Tweet tweet) {
		// === EXTRACTION OU GÉNÉRATION DU TIMESTAMP ===
		// Utilise l'opérateur ternaire pour gérer deux cas:
		long timestamp = tweet.getCreatedAt() != null 
			// CAS 1: Tweet a une date - utilise celle-ci convertie en millisecondes
			// getCreatedAt() retourne un OffsetDateTime de la Twitter API
			// toInstant() le convertit en UTC
			// toEpochMilli() le convertit en long (millisecondes depuis 1970)
			? tweet.getCreatedAt().toInstant().toEpochMilli() 
			// CAS 2: Tweet sans date - utilise l'heure actuelle du système
			// System.currentTimeMillis() retourne les ms depuis 1970 UTC
			: System.currentTimeMillis();
		
		// === CRÉATION DE L'OBJET MESSAGE ===
		// Construit un HashtagMessage avec les trois informations essentielles:
		// - hashtag: le mot-clé à compter
		// - timestamp: quand le hashtag a été publié
		// - tweet.getId(): traçabilité vers le tweet source
		return new HashtagMessage(hashtag, timestamp, tweet.getId());
	}

	/**
	 * Méthode main() - Point d'entrée du service HashtagExtractor
	 * 
	 * PROCESSUS:
	 * 1. Valide les arguments de ligne de commande
	 * 2. Initialise le service avec les paramètres Kafka
	 * 3. Lance la boucle de traitement infinie
	 * 4. Gère les erreurs critiques et l'arrêt du service
	 * 
	 * ARGUMENTS REQUIS:
	 * args[0] = bootstrap-servers: Adresses des brokers Kafka séparées par des virgules
	 *           Exemple: "localhost:9092" ou "kafka-broker-1:9092,kafka-broker-2:9092"
	 * args[1] = consumer-group: Identifiant unique du groupe de consommateurs
	 *           Exemple: "extractor-group"
	 * 
	 * CODES DE SORTIE:
	 * - 0: Arrêt normal (ou interruption)
	 * - 1: Erreur (arguments manquants, exception initiale)
	 * 
	 * EXEMPLE D'UTILISATION:
	 * java HashtagExtractor localhost:9092 extractor-group
	 * java HashtagExtractor kafka-1:9092,kafka-2:9092 prod-extractor
	 * 
	 * COMPORTEMENT:
	 * - Une fois lancé, le service tourne indéfiniment
	 * - Le service est sans interface graphique ("headless mode")
	 * - Logs en temps réel vers stdout/fichier log
	 */
	public static void main(String[] args) {
		// === VALIDATION DES ARGUMENTS ===
		// Vérifie que l'utilisateur a fourni exactement 2 arguments
		if (args.length < 2) {
			// === ERREUR: ARGUMENTS INSUFFISANTS ===
			// Affiche le mode d'emploi (usage)
			logger.severe("Usage: HashtagExtractor <bootstrap-servers> <consumer-group>");
			// Quitte le programme avec code d'erreur 1
			System.exit(1);
		}

		// === EXTRACTION DES ARGUMENTS ===
		// args[0] = adresses des brokers Kafka (ex: "localhost:9092")
		String bootstrapServers = args[0];
		
		// args[1] = identifiant du groupe de consommateurs (ex: "extractor-group")
		// Permet à Kafka de distribuer les partitions entre plusieurs instances
		String consumerGroup = args[1];

		// === LOGS D'INFORMATION ===
		// Enregistre le démarrage et les paramètres de configuration
		logger.info("Starting HashtagExtractor service");
		logger.info(() -> "Bootstrap servers: " + bootstrapServers);
		logger.info(() -> "Consumer group: " + consumerGroup);

		try {
			// === INITIALISATION DU SERVICE ===
			// Crée une nouvelle instance de HashtagExtractor avec les paramètres
			// Le constructeur configure le consommateur, le producteur, et GSON
			HashtagExtractor extractor = new HashtagExtractor(bootstrapServers, consumerGroup);

			// === LOGS DE CONFIGURATION ===
			// Affiche les configurations du service
			logger.info("HashtagExtractor service initialized");
			logger.info("Consuming from: filtered-tweets");
			logger.info("Publishing to: hashtags");
			logger.info("Running in headless mode");

			// === LANCEMENT DU SERVICE ===
			// Appelle extractor.run() qui lance la boucle infinie
			// Cette méthode ne retourne JAMAIS (sauf exception)
			// Le service reste bloqué dans la boucle de poll() attendant les tweets
			extractor.run();

		} catch (Exception e) {
			// === GESTION D'ERREUR CRITIQUE ===
			// Si une exception non gérée survient (erreur réseau, configuration invalide, etc.)
			// Log l'erreur complète avec la pile d'appels
			logger.log(Level.SEVERE, "HashtagExtractor service failed", e);
			// Quitte le programme avec code d'erreur 1 pour signaler une failure
			System.exit(1);
		}
	}
}