HashtagCounter.java
package tweetoscope;
import com.google.gson.Gson;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.*;
import java.time.Duration;
import java.util.*;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
/**
* Classe HashtagCounter - Service de comptage de hashtags avec Kafka Streams
*
* ROLE PRINCIPAL:
* Cette classe implémente un système de comptage en temps réel de hashtags extraits des tweets.
* Elle utilise Kafka Streams pour traiter un flux continu de hashtags et produire un classement
* (leaderboard) des hashtags les plus populaires, mis à jour périodiquement.
*
* FLUX DE TRAITEMENT:
* 1. Consomme les hashtags depuis le topic "hashtags"
* 2. Parse les messages JSON pour extraire les hashtags individuels
* 3. Compte les occurrences de chaque hashtag en utilisant un état distribuée (RocksDB)
* 4. Calcule le top N des hashtags les plus populaires toutes les secondes
* 5. Publie le leaderboard mis à jour dans le topic "leaderboard"
*
* CARACTÉRISTIQUES IMPORTANTES:
* - Garantit la sémantique "Exactly Once" (chaque hashtag compté exactement une fois)
* - Utilise un state store RocksDB pour la persistance
* - Publication périodique du leaderboard par la tâche 0 uniquement (évite les doublons)
*
*/
public class HashtagCounter {
// Logger statique pour tracer les opérations, événements et erreurs du service
private static final Logger logger = Logger.getLogger(HashtagCounter.class.getName());
// === CONSTANTES KAFKA ===
// Topic Kafka d'entrée contenant les hashtags à compter (au format JSON)
private static final String HASHTAGS_TOPIC = "hashtags";
// Topic Kafka de sortie pour publier le classement des hashtags les plus populaires
private static final String LEADERBOARD_TOPIC = "leaderboard";
// Nom du state store RocksDB qui persiste les comptages de hashtags
// Ce store est distribué sur tous les nœuds du cluster Kafka Streams
private static final String COUNTS_STORE = "hashtag-counts-store";
// === CONSTANTES JSON ===
// Clés utilisées dans la structure JSON du leaderboard publié
private static final String LEADERBOARD_JSON_KEY = "leaderboard"; // Tableau des top N hashtags
private static final String TIMESTAMP_JSON_KEY = "timestamp"; // Horodatage du leaderboard
private static final String HASHTAG_JSON_KEY = "hashtag"; // Nom du hashtag
private static final String COUNT_JSON_KEY = "count"; // Nombre d'occurrences
// === ATTRIBUTS D'INSTANCE ===
// Instance KafkaStreams qui gère le traitement du flux
private final KafkaStreams streams;
// Parseur JSON utilisé pour désérialiser les messages HashtagMessage
private final Gson gson;
// Nombre de hashtags à inclure dans le leaderboard (ex: Top 10)
protected int nbLeaders;
// Drapeau volatile indiquant si le service est en cours d'exécution
// volatile garantit que les changements sont visibles entre les threads
private volatile boolean running = true;
/**
* Constructeur du compteur de hashtags
*
* @param bootstrapServers Adresses des brokers Kafka (ex: "localhost:9092" ou "kafka-1:9092,kafka-2:9092")
* @param applicationId ID unique de l'application Kafka Streams (utilisé pour le state management)
* @param nbLeaders Nombre de hashtags à inclure dans le leaderboard (ex: 10 pour Top 10)
*/
public HashtagCounter(String bootstrapServers, String applicationId, int nbLeaders) {
this.nbLeaders = nbLeaders;
this.gson = new Gson(); // Initialise le parseur JSON
// === CONFIGURATION DE KAFKA STREAMS ===
// Cette section configure tous les paramètres du traitement du flux
Properties props = new Properties();
// Identifiant unique de l'application - utilise ce même ID pour reprendre un state précédent
props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
// Adresses des brokers Kafka pour se connecter au cluster
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// Sérialisation par défaut pour les clés: String (texte)
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// Sérialisation par défaut pour les valeurs: String (texte)
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// === SÉMANTIQUE DE TRAITEMENT GARANTIE ===
// EXACTLY_ONCE_V2: garantit que chaque hashtag est compté exactement une fois
// Même en cas de crash ou de rechute du service, pas de comptage dupliqué
// Alternative moins fiable: AT_LEAST_ONCE (peut compter plusieurs fois)
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
// === STOCKAGE DE L'ÉTAT DISTRIBUÉ ===
// Répertoire où RocksDB stocke localement l'état du comptage
// /tmp/kafka-streams peut être remplacé par un chemin persistent sur le serveur
props.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams");
// Intervalle entre deux commits du state (en millisecondes)
// Commit = sauvegarde du comptage dans le topic interne de changelog
// 1000ms = commit chaque seconde (équilibre entre performance et sécurité)
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
// === ENREGISTREMENT DÉTAILLÉ DU DÉMARRAGE ===
logger.info("Hashtag Counter with Kafka Streams initialized");
logger.info(() -> "Bootstrap servers: " + bootstrapServers);
logger.info(() -> "Application ID: " + applicationId);
logger.info(() -> "Top leaders: " + nbLeaders);
logger.info(() -> "Processing guarantee: EXACTLY_ONCE_V2");
// === CONSTRUCTION DE LA TOPOLOGIE ===
// La topologie définit comment les données doivent être transformées
StreamsBuilder builder = new StreamsBuilder();
buildTopology(builder); // Construit le pipeline de traitement
// Compile la description de la topologie en un graphe exécutable
Topology topology = builder.build();
logger.info("Kafka Streams topology:\n" + topology.describe()); // Affiche le plan d'exécution
// Crée l'instance KafkaStreams qui va exécuter le traitement
this.streams = new KafkaStreams(topology, props);
// === GESTION DE L'ARRÊT GRACIEUX ===
// Ajoute un hook d'arrêt JVM pour fermer proprement Kafka Streams
// Cela garantit que le state est sauvegardé avant l'arrêt
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
logger.info("Shutdown signal received");
shutdown();
}));
}
/**
* Méthode buildTopology() - Construit le pipeline de traitement Kafka Streams
*
* FLUX DE TRANSFORMATION:
* hashtags (topic) → parseAndFilterHashtags → countHashtags → createLeaderboardStream → leaderboard (topic)
*
* Chaque étape transforme les données et les rend progressivement plus utiles
*/
private void buildTopology(StreamsBuilder builder) {
// === ÉTAPE 1: LECTURE DU TOPIC D'ENTRÉE ===
// Crée un flux KStream qui lit continuellement les messages du topic "hashtags"
// Chaque message contient un ou plusieurs hashtags au format JSON
KStream<String, String> hashtagsStream = builder.stream(
HASHTAGS_TOPIC, // Topic source: "hashtags"
Consumed.with(Serdes.String(), Serdes.String()) // Sérialisation: String/String
);
// === ÉTAPE 2: PARSING ET FILTRAGE ===
// Extrait le hashtag de chaque message JSON et filtre les messages malformés
KStream<String, String> parsedHashtags = parseAndFilterHashtags(hashtagsStream);
// === ÉTAPE 3: AGRÉGATION AVEC ÉTAT ===
// Compte les occurrences de chaque hashtag unique dans un KTable
// KTable = Vue matérialisée de la table de comptage (persiste dans RocksDB)
KTable<String, Long> hashtagCounts = countHashtags(parsedHashtags);
// === ÉTAPE 4: CONVERSION ET CRÉATION DU LEADERBOARD ===
// Convertit la KTable en KStream pour pouvoir traiter les changements
KStream<String, Long> countsStream = hashtagCounts.toStream();
// Utilise un processeur personnalisé pour créer le leaderboard
// (calcule le top N et formate en JSON)
KStream<String, String> leaderboardStream = createLeaderboardStream(countsStream);
// === ÉTAPE 5: ÉCRITURE DU RÉSULTAT ===
// Publie le leaderboard formaté dans le topic de sortie "leaderboard"
leaderboardStream.to(LEADERBOARD_TOPIC, Produced.with(Serdes.String(), Serdes.String()));
}
/**
* Méthode parseAndFilterHashtags() - Extrait les hashtags des messages JSON
*
* OBJECTIF:
* Chaque message entrant contient un objet HashtagMessage JSON.
* Nous devons extraire la valeur du champ "hashtag" et filtrer les messages invalides.
*
* @param hashtagsStream Flux de messages JSON bruts du topic
* @return Flux contenant uniquement les hashtags valides (exclut les null et les erreurs)
*/
private KStream<String, String> parseAndFilterHashtags(KStream<String, String> hashtagsStream) {
return hashtagsStream
// mapValues: transforme chaque valeur du flux
// Reçoit la clé et la valeur, retourne la nouvelle valeur
.mapValues((key, value) -> {
try {
// Désérialise le JSON en objet HashtagMessage
HashtagMessage msg = gson.fromJson(value, HashtagMessage.class);
// Extrait le hashtag de l'objet (ex: "#Python" de {"hashtag": "#Python", ...})
return msg.getHashtag();
} catch (Exception e) {
// Si le message est malformé, enregistre l'erreur mais continue
logger.log(Level.WARNING, "Failed to parse HashtagMessage", e);
// Retourne null pour les messages invalides
return null;
}
})
// filter: garde seulement les messages où la condition est vraie
// Ici: garde seulement les hashtags non-null (exclut les erreurs)
.filter((key, value) -> value != null);
}
/**
* Méthode countHashtags() - Compte les occurrences de chaque hashtag
*
* OPÉRATION STATEFUL (avec état):
* Cette opération maintient un compteur pour chaque hashtag unique.
* L'état est persiste dans un state store (RocksDB) et synchronisé avec Kafka.
*
* PROCESSUS:
* 1. Groupe tous les messages par hashtag (groupByKey)
* 2. Pour chaque groupe, compte le nombre de messages (count)
* 3. Stocke les comptages dans le state store "hashtag-counts-store"
*
* EXEMPLE:
* Entrée: #Python, #Java, #Python, #Kotlin, #Python
* Sortie: {#Python: 3, #Java: 1, #Kotlin: 1}
*
* @param parsedHashtags Flux des hashtags validés
* @return Table (KTable) avec le comptage persistant de chaque hashtag
*/
private KTable<String, Long> countHashtags(KStream<String, String> parsedHashtags) {
return parsedHashtags
// groupByKey: groupe tous les messages ayant la même clé (le hashtag lui-même)
.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
// count: pour chaque groupe, compte le nombre d'éléments
// Retourne une KTable où la valeur est le nombre d'occurrences
.count(
// Configuration du state store:
Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as(COUNTS_STORE)
// Sérialisation des clés: String (les hashtags)
.withKeySerde(Serdes.String())
// Sérialisation des valeurs: Long (les comptages)
.withValueSerde(Serdes.Long())
// Active le cache local pour optimiser les opérations répétées
// Le cache réduit les lectures en état store pendant un court laps de temps
.withCachingEnabled()
// Active le changelog topic pour la durabilité et la récupération
// En cas de crash, ce topic permet de reconstruire l'état
.withLoggingEnabled(new HashMap<>())
);
}
/**
* Méthode createLeaderboardStream() - Crée un flux de leaderboard formaté
*
* OBJECTIF:
* Transforme le flux de comptages en flux de classements (leaderboards) formatés en JSON.
* Utilise un processeur personnalisé (LeaderboardProcessor) qui:
* - Accède en lecture au state store complète
* - Calcule périodiquement (toutes les secondes) le top N des hashtags
* - Formate les résultats en JSON
*
* @param countsStream Flux des comptages (mises à jour du state)
* @return Flux contenant les leaderboards JSON formatés
*/
private KStream<String, String> createLeaderboardStream(KStream<String, Long> countsStream) {
return countsStream.process(
// Constructeur du processeur - crée une nouvelle instance pour chaque tâche
() -> new LeaderboardProcessor(streams, gson, nbLeaders, logger),
// Nom du state store à accéder en lecture
COUNTS_STORE
);
}
/**
* Méthode start() - Démarre le traitement du flux Kafka
*
* PROCESSUS:
* 1. Démarre la topologie KafkaStreams (consommation et traitement)
* 2. Lance une boucle infinie pour garder le service actif
* 3. Affiche périodiquement les statistiques du state store
*
* La boucle continue tant que le flag 'running' est vrai
*/
public void start() {
logger.info("Starting Kafka Streams application");
// Démarre le moteur de traitement Kafka Streams
// À partir de ce moment, le service consomme les hashtags et produit les leaderboards
streams.start();
logger.info("Kafka Streams started - processing hashtags");
// Boucle de maintien du service en vie
try {
while (running) {
// Dort 1 seconde avant chaque vérification
Thread.sleep(1000);
// Affiche les statistiques du state store périodiquement
// La condition (% 10000 < 1000) signifie: tous les 10 secondes environ
if (System.currentTimeMillis() % 10000 < 1000) {
printStateStoreStats();
}
}
} catch (InterruptedException e) {
// Si le thread est interrompu, restaure le flag d'interruption
Thread.currentThread().interrupt();
}
}
/**
* Méthode printStateStoreStats() - Affiche les statistiques du state store
*
* OBJECTIF:
* Obtient des informations de diagnostic sur le nombre de hashtags uniques
* stockés dans le state store RocksDB
*
* NOTE:
* Le nombre retourné est "approximé" car compter tous les éléments est coûteux.
* RocksDB fournit une estimation basée sur ses métadonnées internes.
*/
private void printStateStoreStats() {
try {
// Accède au state store en lecture seule
// Utilise les QueryableStoreTypes pour spécifier qu'on veut un KeyValueStore
ReadOnlyKeyValueStore<String, Long> store = streams.store(
StoreQueryParameters.fromNameAndType(COUNTS_STORE, QueryableStoreTypes.keyValueStore())
);
// Obtient une estimation du nombre d'entrées (hashtags uniques) dans le store
long count = store.approximateNumEntries();
// Enregistre l'estimation dans les logs
logger.info(() -> "State store contains approximately " + count + " hashtags");
} catch (Exception e) {
// Le state store peut ne pas être disponible immédiatement au démarrage
// ou pendant une rééquilibrage du cluster
// On ignore silencieusement ces erreurs temporaires
}
}
/**
* Méthode shutdown() - Arrête proprement le service Kafka Streams
*
* PROCESSUS:
* 1. Arrête la boucle de maintien en vie (flag running = false)
* 2. Ferme la topologie KafkaStreams proprement
* 3. Permet le commit final du state avant la fermeture
*
* IMPORTANT:
* L'arrêt gracieux garantit que:
* - Tous les comptages en attente sont sauvegardés
* - Les connexions Kafka sont fermées proprement
* - Pas de perte de données
*/
public void shutdown() {
logger.info("Shutting down Kafka Streams");
// Arrête la boucle de maintien en vie (signal au thread start())
running = false;
// Ferme les ressources Kafka Streams
if (streams != null) {
// close(Duration): attend jusqu'à 10 secondes la fin du traitement
// Si le traitement prend plus long, forcément ferme de toute façon
streams.close(Duration.ofSeconds(10));
logger.info("Kafka Streams shutdown complete");
}
}
/**
* Méthode main() - Point d'entrée du service
*
* USAGE:
* java HashtagCounter <bootstrap-servers> <application-id> <top-n>
*
* EXEMPLE:
* java HashtagCounter localhost:9092 hashtag-counter 10
*
* PARAMÈTRES:
* @param bootstrapServers Adresses Kafka (ex: "kafka1:9092,kafka2:9092")
* @param applicationId ID unique du service (ex: "hashtag-counter-prod")
* @param topN Nombre de hashtags dans le leaderboard (ex: 10 pour Top 10)
*/
public static void main(String[] args) {
// === VALIDATION DES PARAMÈTRES ===
// Vérifie que les 3 paramètres obligatoires sont fournis
if (args.length < 3) {
logger.severe("Usage: HashtagCounterWithKafkaStreams <bootstrap-servers> <application-id> <top-n>");
System.exit(1); // Quitte avec code d'erreur 1
}
// === PARSING DES PARAMÈTRES ===
// Extrait et parse les arguments en ligne de commande
String bootstrapServers = args[0]; // Adresses des brokers Kafka
String applicationId = args[1]; // Identifiant unique du service
int topN = Integer.parseInt(args[2]); // Nombre de tops (converti de String en int)
// === ENREGISTREMENT DU DÉMARRAGE ===
logger.info("Starting Kafka Streams Hashtag Counter service");
logger.info(() -> "Bootstrap servers: " + bootstrapServers);
logger.info(() -> "Application ID: " + applicationId);
logger.info(() -> "Top N: " + topN);
try {
// === CRÉATION ET LANCEMENT DU SERVICE ===
// Crée une instance du compteur avec les paramètres spécifiés
HashtagCounter counter = new HashtagCounter(
bootstrapServers, applicationId, topN);
// === ENREGISTREMENT DE LA CONFIGURATION ===
logger.info("Kafka Streams Counter service initialized");
logger.info("Consuming from: hashtags"); // Topic d'entrée
logger.info("Publishing to: leaderboard"); // Topic de sortie
logger.info("Running with global state aggregation");
// === DÉMARRAGE DU TRAITEMENT ===
// Démarre le traitement (bloquant - continue jusqu'à arrêt)
counter.start();
} catch (Exception e) {
// Capture toute exception pendant l'initialisation ou l'exécution
logger.log(Level.SEVERE, "Kafka Streams Counter service failed", e);
System.exit(1); // Quitte avec code d'erreur 1
}
}
/**
* Classe interne LeaderboardProcessor - Processeur personnalisé de Kafka Streams
*
* RÔLE:
* Gère le calcul et la publication périodique du leaderboard des hashtags les plus populaires.
* Implémente l'interface Processor pour intégrer un traitement personnalisé dans la topologie.
*
* RESPONSABILITÉS:
* 1. Reçoit les mises à jour du comptage (bien qu'elles ne soient pas utilisées directement)
* 2. Accède en lecture au state store complet pour obtenir tous les comptages
* 3. Calcule le top N des hashtags périodiquement (toutes les secondes)
* 4. Formate les résultats en JSON
* 5. Publie les leaderboards dans le topic de sortie
*
* CONCEPT CLÉS:
* - Processor: processeur personnalisé qui a accès au contexte Kafka Streams
* - Punctuation (ponctuateur): callback périodique déclenché par le temps horloge
* - Task 0 uniquement: seul la tâche 0 publie pour éviter les doublons
*/
private class LeaderboardProcessor implements Processor<String, Long, String, String> {
// === PARAMÈTRES DE CONFIGURATION ===
// Contexte Kafka Streams donnant accès aux state stores et à la capacité de forwarder des records
private ProcessorContext<String, String> context;
// Intervalle entre deux publications du leaderboard (1000ms = 1 seconde)
private final long publishIntervalMs = 1000;
// Références aux dépendances partagées avec la classe externe
private final KafkaStreams streams; // Instance KafkaStreams
private final Gson gson; // Parseur JSON
private final int nbLeaders; // Nombre de tops à calculer
private final Logger logger; // Logger pour tracer l'exécution
/**
* Constructeur du LeaderboardProcessor
*
* @param streams Instance KafkaStreams pour accéder aux state stores
* @param gson Parseur JSON pour sérialiser le leaderboard
* @param nbLeaders Nombre de hashtags dans le top (ex: 10)
* @param logger Logger pour les messages
*/
public LeaderboardProcessor(KafkaStreams streams, Gson gson, int nbLeaders, Logger logger) {
this.streams = streams;
this.gson = gson;
this.nbLeaders = nbLeaders;
this.logger = logger;
}
/**
* Méthode init() - Initialisation du processeur
*
* Appelée une seule fois au démarrage du processeur.
* C'est ici qu'on configure les ponctuateurs (callbacks périodiques).
*
* @param context Contexte fourni par Kafka Streams
*/
@Override
public void init(ProcessorContext<String, String> context) {
this.context = context; // Sauvegarde le contexte pour utilisation ultérieure
scheduleLeaderboardPublication(); // Configure la publication périodique
}
/**
* Méthode process() - Traite chaque record reçu
*
* Appelée à chaque mise à jour du comptage de hashtag.
* Cependant, nous ne tratons pas directement ici car nous utilisons un ponctuateur.
*
* @param record Clé et valeur du record (hashtag et son comptage)
*/
@Override
public void process(Record<String, Long> record) {
// === REMARQUE: NE RIEN FAIRE ICI ===
// Les mises à jour sont traitées via le ponctuateur (scheduleLeaderboardPublication)
// qui calcule le leaderboard périodiquement plutôt que à chaque mise à jour
// Cela évite de publier des leaderboards trop fréquent et réduit la charge réseau
}
/**
* Méthode close() - Fermeture du processeur
*
* Appelée lors de l'arrêt du service.
* Permet de libérer les ressources (bien que nous n'en ayons pas ici).
*/
@Override
public void close() {
// === AUCUNE RESSOURCE À NETTOYER ===
// Le contexte et les ponctuateurs sont gérés par Kafka Streams
}
/**
* Méthode scheduleLeaderboardPublication() - Configure la publication périodique
*
* STRATÉGIE:
* Seule la tâche 0 publie le leaderboard pour éviter les doublons.
* Si multiple tâches publiaient, on aurait plusieurs copies du même leaderboard.
*
* PONCTUATEUR:
* Configure un callback qui se déclenche toutes les 1000ms (1 seconde) basé sur l'horloge système.
*/
private void scheduleLeaderboardPublication() {
// Vérifie si cette instance est la tâche 0 (partition 0)
// La partition est déterminée par Kafka Streams lors du déploiement
if (context.taskId().partition() == 0) {
// Enregistre un ponctuateur: callback périodique
context.schedule(
Duration.ofMillis(publishIntervalMs), // Intervalle: 1000ms
PunctuationType.WALL_CLOCK_TIME, // Basé sur l'horloge système (pas sur les événements)
timestamp -> { // Lambda appelée à chaque déclenchement
publishLeaderboard(); // Publie le leaderboard à jour
}
);
logger.info(() -> "Task 0 will publish leaderboard every " + publishIntervalMs + "ms");
} else {
// Les autres tâches ignorent la publication pour éviter les doublons
logger.info(() -> "Task " + context.taskId().partition() + " skipped (only task 0 publishes)");
}
}
/**
* Méthode publishLeaderboard() - Publie le leaderboard mis à jour
*
* PROCESSUS:
* 1. Récupère tous les comptages du state store
* 2. Sélectionne les top N hashtags
* 3. Formate en JSON
* 4. Envoie vers le topic de sortie
*/
private void publishLeaderboard() {
try {
// Étape 1: Récupère tous les comptages de hashtags depuis le state store
Map<String, Long> allCounts = fetchAllHashtagCounts();
// Étape 2: Sélectionne les N hashtags les plus populaires et les formate
List<Map<String, Object>> topHashtags = buildTopHashtags(allCounts);
// Étape 3: Crée le JSON du leaderboard avec timestamp et top N
String jsonLeaderboard = buildLeaderboardJson(topHashtags);
// Étape 4: Envoie (forward) le leaderboard au topic de sortie via le contexte
context.forward(new Record<>(
LEADERBOARD_JSON_KEY, // Clé: "leaderboard"
jsonLeaderboard, // Valeur: JSON du leaderboard
System.currentTimeMillis() // Timestamp actuel
));
logger.info(() -> "Published leaderboard: " + jsonLeaderboard);
} catch (Exception e) {
// En cas d'erreur, enregistre l'erreur mais continue
logger.log(Level.SEVERE, "Error publishing leaderboard", e);
}
}
/**
* Méthode fetchAllHashtagCounts() - Récupère tous les comptages du state store
*
* PROCESSUS:
* 1. Accède au state store RocksDB en lecture seule
* 2. Itère sur toutes les entrées (tous les hashtags)
* 3. Filtre les valeurs null
* 4. Retourne une map complète
*
* @return Map des hashtags et leurs comptages
*/
private Map<String, Long> fetchAllHashtagCounts() {
Map<String, Long> allCounts = new HashMap<>();
// Accède au state store en mode lecture seule (ReadOnly)
ReadOnlyKeyValueStore<String, Long> globalStore = streams.store(
StoreQueryParameters.fromNameAndType(COUNTS_STORE, QueryableStoreTypes.keyValueStore())
);
// Itère sur toutes les entrées du store avec try-with-resources pour fermer l'iterator
try (KeyValueIterator<String, Long> iterator = globalStore.all()) {
while (iterator.hasNext()) {
// Récupère l'entrée suivante (clé = hashtag, valeur = comptage)
KeyValue<String, Long> entry = iterator.next();
// Filtre les valeurs null (par sécurité)
if (entry.value != null) {
// Ajoute à la map : hashtag -> nombre d'occurrences
allCounts.put(entry.key, entry.value);
}
}
}
return allCounts;
}
/**
* Méthode buildTopHashtags() - Sélectionne et formate les top N hashtags
*
* PROCESSUS (utilisant les streams Java):
* 1. Convertit la map en stream d'entrées
* 2. Trie par comptage décroissant (du plus grand au plus petit)
* 3. Limite à N éléments
* 4. Formate chacun en Map {hashtag, count}
* 5. Collecte en liste
*
* EXEMPLE:
* Entrée: {#Java:100, #Python:150, #Kotlin:50, #Rust:75}
* Sortie (N=2): [{hashtag:#Python, count:150}, {hashtag:#Java, count:100}]
*
* @param allCounts Tous les comptages
* @return Liste des top N hashtags formatés
*/
private List<Map<String, Object>> buildTopHashtags(Map<String, Long> allCounts) {
return allCounts.entrySet().stream()
// sorted: trie les entrées
// comparingByValue().reversed(): du plus grand au plus petit comptage
.sorted(Map.Entry.<String, Long>comparingByValue().reversed())
// limit: garde seulement les N premiers (ex: top 10)
.limit(nbLeaders)
// map: transforme chaque entrée en Map {hashtag, count}
.map(entry -> {
Map<String, Object> item = new HashMap<>();
item.put(HASHTAG_JSON_KEY, entry.getKey()); // Clé: "hashtag"
item.put(COUNT_JSON_KEY, entry.getValue()); // Clé: "count"
return item;
})
// collect: convertit le stream en liste
.collect(Collectors.toList());
}
/**
* Méthode buildLeaderboardJson() - Formate le leaderboard en JSON
*
* STRUCTURE DU JSON:
* {
* "timestamp": 1700000000000,
* "leaderboard": [
* {"hashtag": "#Python", "count": 150},
* {"hashtag": "#Java", "count": 100}
* ]
* }
*
* @param topHashtags Liste des top N hashtags
* @return String JSON sérialisé
*/
private String buildLeaderboardJson(List<Map<String, Object>> topHashtags) {
// Crée la structure racine du leaderboard
Map<String, Object> leaderboard = new HashMap<>();
// Ajoute le timestamp actuel (millisecondes depuis Unix epoch)
leaderboard.put(TIMESTAMP_JSON_KEY, System.currentTimeMillis());
// Ajoute le tableau des top N hashtags
leaderboard.put(LEADERBOARD_JSON_KEY, topHashtags);
// Sérialise la map en JSON string et retourne
return gson.toJson(leaderboard);
}
}
}