HashtagExtractor.java
package tweetoscope;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonSyntaxException;
import com.google.gson.TypeAdapter;
import com.google.gson.stream.JsonReader;
import com.google.gson.stream.JsonToken;
import com.google.gson.stream.JsonWriter;
import com.twitter.clientlib.model.Tweet;
import com.twitter.twittertext.Extractor;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.io.IOException;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* Classe HashtagExtractor - Service 2 du pipeline tweetoscope
*
* RÔLE PRINCIPAL:
* Ce service extrait les hashtags des tweets filtrés et les publie dans Kafka.
* Il agit comme un maillon intermédiaire dans le pipeline de traitement des tweets.
*
* FLUX DE TRAITEMENT:
* 1. Consomme les tweets filtrés depuis le topic "filtered-tweets"
* 2. Parse chaque tweet JSON et extrait les hashtags
* 3. Utilise la bibliothèque Twitter Text pour identifier les hashtags valides
* 4. Crée un message structuré pour chaque hashtag
* 5. Publie les hashtags dans le topic "hashtags" avec garantie de fiabilité
*
* FIABILITÉ (TASK8):
* - Garantit la sémantique "exactly-once" avec acks=all
* - Active l'idempotence du producteur (pas de doublons)
* - Maintient l'ordre des messages
* - Retry automatique en cas d'échec
*
*/
public class HashtagExtractor {
// Logger statique pour tracer les opérations du service
private static final Logger logger = Logger.getLogger(HashtagExtractor.class.getName());
// === CONSTANTES KAFKA ===
// Topic d'entrée: tweets filtrés issus du service de filtrage
private static final String INPUT_TOPIC = "filtered-tweets";
// Topic de sortie: hashtags extraits à destination du compteur
private static final String OUTPUT_TOPIC = "hashtags";
// === ATTRIBUTS D'INSTANCE ===
// Consommateur Kafka pour lire les tweets filtrés depuis le topic d'entrée
private final KafkaConsumer<String, String> consumer;
// Producteur Kafka pour publier les hashtags dans le topic de sortie
// Configuré avec des paramètres de fiabilité (acks=all, idempotence activée)
private final KafkaProducer<String, String> producer;
// Parseur JSON GSON configuré pour gérer les dates OffsetDateTime
private final Gson gson;
/**
* Utilitaire Twitter Text pour l'extraction de hashtags
*
* Cette classe provient de la bibliothèque twitter-text qui fournit
* une implémentation robuste et officielle de Twitter pour identifier
* les hashtags, mentions, URLs, etc. selon les spécifications Twitter.
*
* Elle est préférable à une regex simple car elle gère les cas limites.
*/
final Extractor twitterTextExtractor = new Extractor();
/**
* Classe interne OffsetDateTimeAdapter - Adaptateur personnalisé GSON
*
* OBJECTIF:
* GSON par défaut ne sait pas convertir les dates OffsetDateTime.
* Cet adaptateur indique à GSON comment sérialiser/désérialiser ce type.
*
* SÉRIALISATION (Java → JSON):
* Convertit un OffsetDateTime en String au format ISO-8601
* Exemple: 2023-11-25T10:30:00+01:00
*
* DÉSÉRIALISATION (JSON → Java):
* Convertit une String JSON en objet OffsetDateTime
* Gère les valeurs null correctement
*/
private static class OffsetDateTimeAdapter extends TypeAdapter<OffsetDateTime> {
/**
* Méthode write() - Sérialise un OffsetDateTime en JSON
*
* @param out Écrivain JSON pour écrire la valeur
* @param value Objet OffsetDateTime à sérialiser (peut être null)
*/
@Override
public void write(JsonWriter out, OffsetDateTime value) throws IOException {
// Teste si la valeur est null
if (value == null) {
// Écrit littéralement null en JSON
out.nullValue();
return;
}
// Convertit la date en String ISO-8601 et l'écrit
// toString() retourne le format ISO officiel: "2023-11-25T10:30:00+01:00"
out.value(value.toString());
}
/**
* Méthode read() - Désérialise un JSON en OffsetDateTime
*
* @param in Lecteur JSON pour lire la valeur
* @return Objet OffsetDateTime parsé (ou null si JSON contient null)
*/
@Override
public OffsetDateTime read(JsonReader in) throws IOException {
// Vérifie si le token JSON courant est null
if (in.peek() == JsonToken.NULL) {
// Lit le null et le retourne
in.nextNull();
return null;
}
// Lit la String depuis le JSON et la parse en OffsetDateTime
// OffsetDateTime.parse() utilise le format ISO-8601 par défaut
return OffsetDateTime.parse(in.nextString());
}
}
/**
* Constructeur du service HashtagExtractor
*
* @param bootstrapServers Adresses des brokers Kafka (ex: "localhost:9092")
* @param groupId Identifiant du groupe de consommateurs (ex: "extractor-group")
*/
public HashtagExtractor(String bootstrapServers, String groupId) {
// === CONFIGURATION DU CONSOMMATEUR ===
// Le consommateur lit les tweets filtrés depuis le topic d'entrée
Properties consumerProps = new Properties();
// Adresses des brokers Kafka pour se connecter au cluster
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// Identifiant du groupe de consommateurs
// Si plusieurs instances du service sont lancées avec le même groupId,
// Kafka distribuera automatiquement les partitions entre elles
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
// Désérialiseurs: convertissent les données Kafka en objets Java
// KEY_DESERIALIZER: convertit les clés en String
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
// VALUE_DESERIALIZER: convertit les valeurs en String (tweets JSON)
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
// Stratégie si aucun offset n'est trouvé pour ce groupe:
// "earliest" = commence depuis le début du topic (tous les anciens messages)
// Alternative: "latest" = commence depuis les nouveaux messages uniquement
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// Crée le consommateur Kafka avec la configuration
this.consumer = new KafkaConsumer<>(consumerProps);
// === CONFIGURATION DU PRODUCTEUR AVEC GARANTIES DE FIABILITÉ ===
// Le producteur publie les hashtags extraits avec fiabilité maximale (TASK8)
Properties producerProps = new Properties();
// Adresses des brokers Kafka
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// Sérialiseurs: convertissent les objets Java en données Kafka
// KEY_SERIALIZER: convertit les clés (hashtag) en bytes
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// VALUE_SERIALIZER: convertit les valeurs (message JSON) en bytes
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// === GARANTIES EXACTLY-ONCE (TASK8) ===
// Ces paramètres garantissent qu'aucun hashtag ne sera publié deux fois
// ACKS_CONFIG = "all": le producteur attend la confirmation de TOUS les replicas
// Cela signifie que Kafka a copié le message sur tous les serveurs (leader + followers)
// Très fiable mais un peu plus lent (tradeoff entre latence et fiabilité)
producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
// ENABLE_IDEMPOTENCE_CONFIG = "true": active l'idempotence du producteur
// Si le producteur envoie deux fois le même message (ex: après timeout),
// Kafka en reconnaîtra qu'un seul et rejettera le doublon
producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
// MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION = 5: limite les requêtes simultanées
// Garantit que les messages sont publiés dans l'ordre même en cas de retry
// (Si trop de requêtes en vol, l'ordre peut se brouiller)
producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5");
// RETRIES_CONFIG = MAX_VALUE: réessaye indéfiniment en cas d'échec
// Combine avec timeouts appropriés, cela garantit que les messages seront publiés
producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
// Crée le producteur Kafka avec les configurations de fiabilité
this.producer = new KafkaProducer<>(producerProps);
// === CONFIGURATION DU PARSEUR JSON ===
// GSON est utilisé pour convertir les Strings JSON en objets Java et vice-versa
this.gson = new GsonBuilder()
// Enregistre un adaptateur personnalisé pour les dates OffsetDateTime
// Permet à GSON de parser les dates au format ISO-8601
.registerTypeAdapter(OffsetDateTime.class, new OffsetDateTimeAdapter())
.create();
// Enregistre l'initialisation réussie dans les logs
logger.info("HashtagExtractor initialized with reliability configurations (acks=all, idempotence=true)");
}
/**
* Méthode run() - Boucle principale du service d'extraction
*
* PROCESSUS:
* 1. S'abonne au topic d'entrée "filtered-tweets"
* 2. Attend continuellement l'arrivée de tweets
* 3. Pour chaque batch de tweets, extrait les hashtags
* 4. Publie les hashtags dans le topic de sortie
*
* NOTE: Cette méthode est bloquante (infinie) - elle ne retourne jamais
* sauf en cas d'erreur ou d'interruption du thread
*/
@SuppressWarnings("java:S2189") // Infinite loop is intentional for continuous stream processing
public void run() {
// === S'ABONNE AU TOPIC D'ENTRÉE ===
// Collections.singletonList() crée une liste contenant un seul élément: le topic "filtered-tweets"
// Le consommateur reçoit alors tous les messages publiés dans ce topic
consumer.subscribe(Collections.singletonList(INPUT_TOPIC));
// Enregistre le démarrage du service dans les logs
logger.info(() -> "HashtagExtractor started. Consuming from '" + INPUT_TOPIC + "', producing to '" + OUTPUT_TOPIC + "'.");
logger.info("Publishing structured messages for hashtags");
try {
// Drapeau pour la boucle de traitement
// (En pratique, il reste toujours true car aucun code ne le change)
boolean keepProcessing = true;
// === BOUCLE INFINIE DE TRAITEMENT ===
// Traite continuellement les tweets qui arrivent du topic
while (keepProcessing) {
// === POLL: Attend les tweets avec timeout de 500ms ===
// consumer.poll(timeout) récupère les tweets du broker Kafka
// Si aucun tweet n'arrive pendant 500ms, retourne un batch vide
// Cela évite que le thread ne consomme 100% du CPU
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
// Traite le batch reçu (peut être vide)
processRecords(records);
}
} finally {
// === NETTOYAGE DES RESSOURCES ===
// Le bloc finally s'exécute TOUJOURS, même en cas d'exception
// Ferme proprement le consommateur (libère les ressources)
consumer.close();
// Ferme proprement le producteur (libère les ressources)
producer.close();
}
}
/**
* Méthode processRecords() - Traite un batch de tweets
*
* PROCESSUS:
* 1. Parcourt chaque tweet du batch reçu
* 2. Désérialise le JSON en objet Tweet
* 3. Extrait tous les hashtags du texte du tweet
* 4. Publie un message structuré pour chaque hashtag
* 5. Gère les erreurs de parsing JSON
*
* PARAMÈTRE:
* - records: ConsumerRecords<String, String> - batch de messages Kafka
* Chaque record contient un tweet au format JSON dans sa valeur
*
* GESTION D'ERREURS:
* - Si le JSON n'est pas valide: capture et log JsonSyntaxException, continue
* - Si une erreur générique survient: capture et log Exception, continue
* Cela permet au service de rester robuste face à des données défectueuses
*/
private void processRecords(ConsumerRecords<String, String> records) {
// === ITÉRATION SUR LE BATCH ===
// Pour chaque record (tweet) reçu du topic "filtered-tweets"
for (ConsumerRecord<String, String> record : records) {
try {
// === DÉSÉRIALISATION JSON ===
// record.value() contient la chaîne JSON du tweet
// gson.fromJson() la convertit en objet Tweet avec tous les champs
// (id, createdAt, text, author, lang, conversation_id, etc.)
Tweet tweet = gson.fromJson(record.value(), Tweet.class);
// === VÉRIFICATION DE NULL ===
// Vérifie que le parsing a réussi et que l'objet Tweet existe
if (tweet != null) {
// Traite les hashtags extraits du tweet
processHashtagsInTweet(tweet);
}
} catch (JsonSyntaxException e) {
// === GESTION D'ERREUR: JSON INVALIDE ===
// Le JSON ne peut pas être parsé (format incorrect, champs manquants, etc.)
// Log l'erreur avec le contenu JSON brut pour analyser le problème
// La boucle continue pour traiter les prochains tweets
logger.log(Level.WARNING, "Failed to parse tweet JSON: " + record.value(), e);
} catch (Exception e) {
// === GESTION D'ERREUR: AUTRES EXCEPTIONS ===
// Capture toute exception non prévue (accès null, erreurs de calcul, etc.)
// Log l'erreur complète avec la pile d'appels pour déboguer
// La boucle continue pour traiter les prochains tweets
logger.log(Level.SEVERE, "Unexpected error while processing a tweet", e);
}
}
}
/**
* Méthode processHashtagsInTweet() - Extrait et publie les hashtags d'un tweet
*
* PROCESSUS:
* 1. Utilise la bibliothèque Twitter Text pour extraire les hashtags
* 2. Respecte les règles Twitter officielles pour les hashtags
* 3. Pour chaque hashtag trouvé, crée un message structuré
* 4. Publie le message dans le topic "hashtags"
*
* PARAMÈTRE:
* - tweet: L'objet Tweet à traiter
* Doit contenir le texte (tweet.getText()) pour l'extraction
*
* EXEMPLE:
* Si le tweet contient: "Je suis heureux! #travail #success"
* Cette méthode extraira: ["travail", "success"]
* Et publiera 2 messages HashtagMessage séparés
*
* NOTES SUR LA BIBLIOTHÈQUE TWITTER TEXT:
* - Respecte les règles Twitter: caractères Unicode, longueur minimale, etc.
* - Extrait UNIQUEMENT les hashtags (pas les mentions @ ou URLs)
* - Gère correctement la ponctuation avant/après les hashtags
*/
private void processHashtagsInTweet(Tweet tweet) {
// === EXTRACTION DES HASHTAGS ===
// twitterTextExtractor.extractHashtags() retourne une List<String> avec tous les hashtags
// Utilise les règles officielles Twitter pour identifier les hashtags valides
List<String> hashtags = twitterTextExtractor.extractHashtags(tweet.getText());
// === PUBLICATION DE CHAQUE HASHTAG ===
// Pour chaque hashtag trouvé, crée et publie un message structuré
for (String hashtag : hashtags) {
publishHashtag(hashtag, tweet);
}
}
/**
* Méthode publishHashtag() - Crée et publie un message de hashtag dans Kafka
*
* PROCESSUS:
* 1. Crée un objet HashtagMessage structuré avec le hashtag et les métadonnées du tweet
* 2. Sérialise l'objet en JSON avec GSON
* 3. Crée un ProducerRecord avec topic, clé (hashtag), et valeur (JSON)
* 4. Envoie le message au broker Kafka de façon ASYNCHRONE
* 5. Enregistre le succès ou l'erreur dans les logs
*
* PARAMÈTRES:
* - hashtag: String - le hashtag à publier (ex: "travail")
* - tweet: Tweet - le tweet source contenant le hashtag
*
* STRUCTURE KAFKA:
* Topic: "hashtags"
* Clé: nom du hashtag (permet de grouper les messages par hashtag)
* Valeur: JSON du HashtagMessage contenant hashtag, id du tweet, timestamp, etc.
*
* ENVOI ASYNCHRONE:
* - producer.send() retourne IMMÉDIATEMENT (non-bloquant)
* - Le callback de Kafka notifie du succès ou de l'erreur PLUS TARD
* - Cela permet au service de continuer à traiter sans attendre la confirmation
*
* NOTE: Grâce aux configurations (acks=all, idempotence), le message est garanti
* d'être publié exactement une fois, même en cas de retry
*/
private void publishHashtag(String hashtag, Tweet tweet) {
// === CRÉATION DU MESSAGE ===
// createHashtagMessage() construit un objet HashtagMessage contenant:
// - le hashtag lui-même
// - l'ID du tweet source
// - le timestamp du tweet (ou l'heure actuelle si absent)
HashtagMessage message = createHashtagMessage(hashtag, tweet);
// === SÉRIALISATION EN JSON ===
// gson.toJson() convertit l'objet HashtagMessage en String JSON
// Format: {"hashtag":"travail","tweetId":"123456","timestamp":1234567890}
String jsonValue = gson.toJson(message);
// === CRÉATION DU RECORD KAFKA ===
// ProducerRecord<Key, Value> = ProducerRecord<String, String>
// - Topic: OUTPUT_TOPIC = "hashtags"
// - Clé: hashtag (ex: "travail") - permet de grouper par hashtag
// - Valeur: JSON du message
ProducerRecord<String, String> hashtagRecord = new ProducerRecord<>(OUTPUT_TOPIC, hashtag, jsonValue);
// === ENVOI ASYNCHRONE ===
// producer.send(record, callback) envoie le message de façon non-bloquante
// Le callback s'exécutera quand Kafka aura confirmé (ou rejeté) le message
// metadata = informations de confirmation (partition, offset, timestamp Kafka)
// exception = exception si l'envoi a échoué
producer.send(hashtagRecord, (metadata, exception) -> {
// === GESTION DU CALLBACK ===
if (exception != null) {
// === ERREUR: L'ENVOI A ÉCHOUÉ ===
// Même avec les retries et acks=all, l'envoi peut échouer
// (broker down, timeout réseau, erreur du broker, etc.)
logger.log(Level.SEVERE, "Failed to publish hashtag", exception);
}
});
// === LOG D'INFORMATION ===
// Enregistre que le message a été ENVOYÉ (pas encore confirmé)
// L'enregistrement "Published" dans les logs signifie que send() a été appelé,
// pas que le broker a reçu le message
logger.info(() -> "Published hashtag: #" + hashtag + " (key=" + hashtag + ")");
}
/**
* Méthode createHashtagMessage() - Crée un message structuré de hashtag
*
* PROCESSUS:
* 1. Extrait le timestamp du tweet (date de publication)
* 2. Si le tweet n'a pas de date, utilise l'heure actuelle du système
* 3. Crée et retourne un objet HashtagMessage avec tous les champs
*
* PARAMÈTRES:
* - hashtag: String - le hashtag à inclure (ex: "travail")
* - tweet: Tweet - le tweet source contenant la date et l'ID
*
* RETOUR:
* - HashtagMessage - objet structuré contenant:
* * hashtag: le hashtag lui-même
* * timestamp: milliseconde depuis l'époque Unix (timestamp Kafka compatible)
* * tweetId: l'ID du tweet source
*
* EXTRACTION DU TIMESTAMP:
* - tweet.getCreatedAt() retourne un OffsetDateTime (date + heure + fuseau horaire)
* - toInstant() convertit en UTC (Instant)
* - toEpochMilli() convertit en millisecondes depuis 1970-01-01 UTC
* - Si getCreatedAt() est null (date manquante), utilise System.currentTimeMillis()
*
* EXEMPLE:
* Si tweet.getCreatedAt() = "2025-11-25T14:30:00+01:00"
* Alors timestamp = 1732531800000 (milliseconds)
*/
private HashtagMessage createHashtagMessage(String hashtag, Tweet tweet) {
// === EXTRACTION OU GÉNÉRATION DU TIMESTAMP ===
// Utilise l'opérateur ternaire pour gérer deux cas:
long timestamp = tweet.getCreatedAt() != null
// CAS 1: Tweet a une date - utilise celle-ci convertie en millisecondes
// getCreatedAt() retourne un OffsetDateTime de la Twitter API
// toInstant() le convertit en UTC
// toEpochMilli() le convertit en long (millisecondes depuis 1970)
? tweet.getCreatedAt().toInstant().toEpochMilli()
// CAS 2: Tweet sans date - utilise l'heure actuelle du système
// System.currentTimeMillis() retourne les ms depuis 1970 UTC
: System.currentTimeMillis();
// === CRÉATION DE L'OBJET MESSAGE ===
// Construit un HashtagMessage avec les trois informations essentielles:
// - hashtag: le mot-clé à compter
// - timestamp: quand le hashtag a été publié
// - tweet.getId(): traçabilité vers le tweet source
return new HashtagMessage(hashtag, timestamp, tweet.getId());
}
/**
* Méthode main() - Point d'entrée du service HashtagExtractor
*
* PROCESSUS:
* 1. Valide les arguments de ligne de commande
* 2. Initialise le service avec les paramètres Kafka
* 3. Lance la boucle de traitement infinie
* 4. Gère les erreurs critiques et l'arrêt du service
*
* ARGUMENTS REQUIS:
* args[0] = bootstrap-servers: Adresses des brokers Kafka séparées par des virgules
* Exemple: "localhost:9092" ou "kafka-broker-1:9092,kafka-broker-2:9092"
* args[1] = consumer-group: Identifiant unique du groupe de consommateurs
* Exemple: "extractor-group"
*
* CODES DE SORTIE:
* - 0: Arrêt normal (ou interruption)
* - 1: Erreur (arguments manquants, exception initiale)
*
* EXEMPLE D'UTILISATION:
* java HashtagExtractor localhost:9092 extractor-group
* java HashtagExtractor kafka-1:9092,kafka-2:9092 prod-extractor
*
* COMPORTEMENT:
* - Une fois lancé, le service tourne indéfiniment
* - Le service est sans interface graphique ("headless mode")
* - Logs en temps réel vers stdout/fichier log
*/
public static void main(String[] args) {
// === VALIDATION DES ARGUMENTS ===
// Vérifie que l'utilisateur a fourni exactement 2 arguments
if (args.length < 2) {
// === ERREUR: ARGUMENTS INSUFFISANTS ===
// Affiche le mode d'emploi (usage)
logger.severe("Usage: HashtagExtractor <bootstrap-servers> <consumer-group>");
// Quitte le programme avec code d'erreur 1
System.exit(1);
}
// === EXTRACTION DES ARGUMENTS ===
// args[0] = adresses des brokers Kafka (ex: "localhost:9092")
String bootstrapServers = args[0];
// args[1] = identifiant du groupe de consommateurs (ex: "extractor-group")
// Permet à Kafka de distribuer les partitions entre plusieurs instances
String consumerGroup = args[1];
// === LOGS D'INFORMATION ===
// Enregistre le démarrage et les paramètres de configuration
logger.info("Starting HashtagExtractor service");
logger.info(() -> "Bootstrap servers: " + bootstrapServers);
logger.info(() -> "Consumer group: " + consumerGroup);
try {
// === INITIALISATION DU SERVICE ===
// Crée une nouvelle instance de HashtagExtractor avec les paramètres
// Le constructeur configure le consommateur, le producteur, et GSON
HashtagExtractor extractor = new HashtagExtractor(bootstrapServers, consumerGroup);
// === LOGS DE CONFIGURATION ===
// Affiche les configurations du service
logger.info("HashtagExtractor service initialized");
logger.info("Consuming from: filtered-tweets");
logger.info("Publishing to: hashtags");
logger.info("Running in headless mode");
// === LANCEMENT DU SERVICE ===
// Appelle extractor.run() qui lance la boucle infinie
// Cette méthode ne retourne JAMAIS (sauf exception)
// Le service reste bloqué dans la boucle de poll() attendant les tweets
extractor.run();
} catch (Exception e) {
// === GESTION D'ERREUR CRITIQUE ===
// Si une exception non gérée survient (erreur réseau, configuration invalide, etc.)
// Log l'erreur complète avec la pile d'appels
logger.log(Level.SEVERE, "HashtagExtractor service failed", e);
// Quitte le programme avec code d'erreur 1 pour signaler une failure
System.exit(1);
}
}
}