HashtagCounter.java

package tweetoscope;

import com.google.gson.Gson;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.*;

import java.time.Duration;
import java.util.*;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;

/**
 * Classe HashtagCounter - Service de comptage de hashtags avec Kafka Streams
 * 
 * ROLE PRINCIPAL:
 * Cette classe implémente un système de comptage en temps réel de hashtags extraits des tweets.
 * Elle utilise Kafka Streams pour traiter un flux continu de hashtags et produire un classement
 * (leaderboard) des hashtags les plus populaires, mis à jour périodiquement.
 * 
 * FLUX DE TRAITEMENT:
 * 1. Consomme les hashtags depuis le topic "hashtags"
 * 2. Parse les messages JSON pour extraire les hashtags individuels
 * 3. Compte les occurrences de chaque hashtag en utilisant un état distribuée (RocksDB)
 * 4. Calcule le top N des hashtags les plus populaires toutes les secondes
 * 5. Publie le leaderboard mis à jour dans le topic "leaderboard"
 * 
 * CARACTÉRISTIQUES IMPORTANTES:
 * - Garantit la sémantique "Exactly Once" (chaque hashtag compté exactement une fois)
 * - Utilise un state store RocksDB pour la persistance
 * - Publication périodique du leaderboard par la tâche 0 uniquement (évite les doublons)
 * 
 */
public class HashtagCounter {
    
	// Logger statique pour tracer les opérations, événements et erreurs du service
	private static final Logger logger = Logger.getLogger(HashtagCounter.class.getName());
	
	// === CONSTANTES KAFKA ===
	// Topic Kafka d'entrée contenant les hashtags à compter (au format JSON)
	private static final String HASHTAGS_TOPIC = "hashtags";
	
	// Topic Kafka de sortie pour publier le classement des hashtags les plus populaires
	private static final String LEADERBOARD_TOPIC = "leaderboard";
	
	// Nom du state store RocksDB qui persiste les comptages de hashtags
	// Ce store est distribué sur tous les nœuds du cluster Kafka Streams
	private static final String COUNTS_STORE = "hashtag-counts-store";
	
	// === CONSTANTES JSON ===
	// Clés utilisées dans la structure JSON du leaderboard publié
	private static final String LEADERBOARD_JSON_KEY = "leaderboard";      // Tableau des top N hashtags
	private static final String TIMESTAMP_JSON_KEY = "timestamp";          // Horodatage du leaderboard
	private static final String HASHTAG_JSON_KEY = "hashtag";              // Nom du hashtag
	private static final String COUNT_JSON_KEY = "count";                  // Nombre d'occurrences
	
	// === ATTRIBUTS D'INSTANCE ===
	// Instance KafkaStreams qui gère le traitement du flux
	private final KafkaStreams streams;
	
	// Parseur JSON utilisé pour désérialiser les messages HashtagMessage
	private final Gson gson;
	
	// Nombre de hashtags à inclure dans le leaderboard (ex: Top 10)
	protected int nbLeaders;
	
	// Drapeau volatile indiquant si le service est en cours d'exécution
	// volatile garantit que les changements sont visibles entre les threads
	private volatile boolean running = true;

	/**
	 * Constructeur du compteur de hashtags
	 * 
	 * @param bootstrapServers Adresses des brokers Kafka (ex: "localhost:9092" ou "kafka-1:9092,kafka-2:9092")
	 * @param applicationId ID unique de l'application Kafka Streams (utilisé pour le state management)
	 * @param nbLeaders Nombre de hashtags à inclure dans le leaderboard (ex: 10 pour Top 10)
	 */
	public HashtagCounter(String bootstrapServers, String applicationId, int nbLeaders) {
		this.nbLeaders = nbLeaders;
		this.gson = new Gson();  // Initialise le parseur JSON

		// === CONFIGURATION DE KAFKA STREAMS ===
		// Cette section configure tous les paramètres du traitement du flux
		Properties props = new Properties();
		
		// Identifiant unique de l'application - utilise ce même ID pour reprendre un state précédent
		props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
		
		// Adresses des brokers Kafka pour se connecter au cluster
		props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
		
		// Sérialisation par défaut pour les clés: String (texte)
		props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
		
		// Sérialisation par défaut pour les valeurs: String (texte)
		props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
		
		// === SÉMANTIQUE DE TRAITEMENT GARANTIE ===
		// EXACTLY_ONCE_V2: garantit que chaque hashtag est compté exactement une fois
		// Même en cas de crash ou de rechute du service, pas de comptage dupliqué
		// Alternative moins fiable: AT_LEAST_ONCE (peut compter plusieurs fois)
		props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
		
		// === STOCKAGE DE L'ÉTAT DISTRIBUÉ ===
		// Répertoire où RocksDB stocke localement l'état du comptage
		// /tmp/kafka-streams peut être remplacé par un chemin persistent sur le serveur
		props.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams");
		
		// Intervalle entre deux commits du state (en millisecondes)
		// Commit = sauvegarde du comptage dans le topic interne de changelog
		// 1000ms = commit chaque seconde (équilibre entre performance et sécurité)
		props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);

		// === ENREGISTREMENT DÉTAILLÉ DU DÉMARRAGE ===
		logger.info("Hashtag Counter with Kafka Streams initialized");
		logger.info(() -> "Bootstrap servers: " + bootstrapServers);
		logger.info(() -> "Application ID: " + applicationId);
		logger.info(() -> "Top leaders: " + nbLeaders);
		logger.info(() -> "Processing guarantee: EXACTLY_ONCE_V2");

		// === CONSTRUCTION DE LA TOPOLOGIE ===
		// La topologie définit comment les données doivent être transformées
		StreamsBuilder builder = new StreamsBuilder();
		buildTopology(builder);  // Construit le pipeline de traitement
		
		// Compile la description de la topologie en un graphe exécutable
		Topology topology = builder.build();
		logger.info("Kafka Streams topology:\n" + topology.describe());  // Affiche le plan d'exécution
		
		// Crée l'instance KafkaStreams qui va exécuter le traitement
		this.streams = new KafkaStreams(topology, props);
		
		// === GESTION DE L'ARRÊT GRACIEUX ===
		// Ajoute un hook d'arrêt JVM pour fermer proprement Kafka Streams
		// Cela garantit que le state est sauvegardé avant l'arrêt
		Runtime.getRuntime().addShutdownHook(new Thread(() -> {
			logger.info("Shutdown signal received");
			shutdown();
		}));
	}

	/**
	 * Méthode buildTopology() - Construit le pipeline de traitement Kafka Streams
	 * 
	 * FLUX DE TRANSFORMATION:
	 * hashtags (topic) → parseAndFilterHashtags → countHashtags → createLeaderboardStream → leaderboard (topic)
	 * 
	 * Chaque étape transforme les données et les rend progressivement plus utiles
	 */
	private void buildTopology(StreamsBuilder builder) {
		// === ÉTAPE 1: LECTURE DU TOPIC D'ENTRÉE ===
		// Crée un flux KStream qui lit continuellement les messages du topic "hashtags"
		// Chaque message contient un ou plusieurs hashtags au format JSON
		KStream<String, String> hashtagsStream = builder.stream(
			HASHTAGS_TOPIC,  // Topic source: "hashtags"
			Consumed.with(Serdes.String(), Serdes.String())  // Sérialisation: String/String
		);

		// === ÉTAPE 2: PARSING ET FILTRAGE ===
		// Extrait le hashtag de chaque message JSON et filtre les messages malformés
		KStream<String, String> parsedHashtags = parseAndFilterHashtags(hashtagsStream);

		// === ÉTAPE 3: AGRÉGATION AVEC ÉTAT ===
		// Compte les occurrences de chaque hashtag unique dans un KTable
		// KTable = Vue matérialisée de la table de comptage (persiste dans RocksDB)
		KTable<String, Long> hashtagCounts = countHashtags(parsedHashtags);

		// === ÉTAPE 4: CONVERSION ET CRÉATION DU LEADERBOARD ===
		// Convertit la KTable en KStream pour pouvoir traiter les changements
		KStream<String, Long> countsStream = hashtagCounts.toStream();
		
		// Utilise un processeur personnalisé pour créer le leaderboard
		// (calcule le top N et formate en JSON)
		KStream<String, String> leaderboardStream = createLeaderboardStream(countsStream);

		// === ÉTAPE 5: ÉCRITURE DU RÉSULTAT ===
		// Publie le leaderboard formaté dans le topic de sortie "leaderboard"
		leaderboardStream.to(LEADERBOARD_TOPIC, Produced.with(Serdes.String(), Serdes.String()));
	}

	/**
	 * Méthode parseAndFilterHashtags() - Extrait les hashtags des messages JSON
	 * 
	 * OBJECTIF:
	 * Chaque message entrant contient un objet HashtagMessage JSON.
	 * Nous devons extraire la valeur du champ "hashtag" et filtrer les messages invalides.
	 * 
	 * @param hashtagsStream Flux de messages JSON bruts du topic
	 * @return Flux contenant uniquement les hashtags valides (exclut les null et les erreurs)
	 */
	private KStream<String, String> parseAndFilterHashtags(KStream<String, String> hashtagsStream) {
		return hashtagsStream
			// mapValues: transforme chaque valeur du flux
			// Reçoit la clé et la valeur, retourne la nouvelle valeur
			.mapValues((key, value) -> {
				try {
					// Désérialise le JSON en objet HashtagMessage
					HashtagMessage msg = gson.fromJson(value, HashtagMessage.class);
					
					// Extrait le hashtag de l'objet (ex: "#Python" de {"hashtag": "#Python", ...})
					return msg.getHashtag();
				} catch (Exception e) {
					// Si le message est malformé, enregistre l'erreur mais continue
					logger.log(Level.WARNING, "Failed to parse HashtagMessage", e);
					
					// Retourne null pour les messages invalides
					return null;
				}
			})
			// filter: garde seulement les messages où la condition est vraie
			// Ici: garde seulement les hashtags non-null (exclut les erreurs)
			.filter((key, value) -> value != null);
	}

	/**
	 * Méthode countHashtags() - Compte les occurrences de chaque hashtag
	 * 
	 * OPÉRATION STATEFUL (avec état):
	 * Cette opération maintient un compteur pour chaque hashtag unique.
	 * L'état est persiste dans un state store (RocksDB) et synchronisé avec Kafka.
	 * 
	 * PROCESSUS:
	 * 1. Groupe tous les messages par hashtag (groupByKey)
	 * 2. Pour chaque groupe, compte le nombre de messages (count)
	 * 3. Stocke les comptages dans le state store "hashtag-counts-store"
	 * 
	 * EXEMPLE:
	 * Entrée:  #Python, #Java, #Python, #Kotlin, #Python
	 * Sortie:  {#Python: 3, #Java: 1, #Kotlin: 1}
	 * 
	 * @param parsedHashtags Flux des hashtags validés
	 * @return Table (KTable) avec le comptage persistant de chaque hashtag
	 */
	private KTable<String, Long> countHashtags(KStream<String, String> parsedHashtags) {
		return parsedHashtags
			// groupByKey: groupe tous les messages ayant la même clé (le hashtag lui-même)
			.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
			
			// count: pour chaque groupe, compte le nombre d'éléments
			// Retourne une KTable où la valeur est le nombre d'occurrences
			.count(
				// Configuration du state store:
				Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as(COUNTS_STORE)
					// Sérialisation des clés: String (les hashtags)
					.withKeySerde(Serdes.String())
					
					// Sérialisation des valeurs: Long (les comptages)
					.withValueSerde(Serdes.Long())
					
					// Active le cache local pour optimiser les opérations répétées
					// Le cache réduit les lectures en état store pendant un court laps de temps
					.withCachingEnabled()
					
					// Active le changelog topic pour la durabilité et la récupération
					// En cas de crash, ce topic permet de reconstruire l'état
					.withLoggingEnabled(new HashMap<>())
			);
	}

	/**
	 * Méthode createLeaderboardStream() - Crée un flux de leaderboard formaté
	 * 
	 * OBJECTIF:
	 * Transforme le flux de comptages en flux de classements (leaderboards) formatés en JSON.
	 * Utilise un processeur personnalisé (LeaderboardProcessor) qui:
	 * - Accède en lecture au state store complète
	 * - Calcule périodiquement (toutes les secondes) le top N des hashtags
	 * - Formate les résultats en JSON
	 * 
	 * @param countsStream Flux des comptages (mises à jour du state)
	 * @return Flux contenant les leaderboards JSON formatés
	 */
	private KStream<String, String> createLeaderboardStream(KStream<String, Long> countsStream) {
		return countsStream.process(
			// Constructeur du processeur - crée une nouvelle instance pour chaque tâche
			() -> new LeaderboardProcessor(streams, gson, nbLeaders, logger),
			
			// Nom du state store à accéder en lecture
			COUNTS_STORE
		);
	}

	/**
	 * Méthode start() - Démarre le traitement du flux Kafka
	 * 
	 * PROCESSUS:
	 * 1. Démarre la topologie KafkaStreams (consommation et traitement)
	 * 2. Lance une boucle infinie pour garder le service actif
	 * 3. Affiche périodiquement les statistiques du state store
	 * 
	 * La boucle continue tant que le flag 'running' est vrai
	 */
	public void start() {
		logger.info("Starting Kafka Streams application");
		
		// Démarre le moteur de traitement Kafka Streams
		// À partir de ce moment, le service consomme les hashtags et produit les leaderboards
		streams.start();
		logger.info("Kafka Streams started - processing hashtags");
		
		// Boucle de maintien du service en vie
		try {
			while (running) {
				// Dort 1 seconde avant chaque vérification
				Thread.sleep(1000);
				
				// Affiche les statistiques du state store périodiquement
				// La condition (% 10000 < 1000) signifie: tous les 10 secondes environ
				if (System.currentTimeMillis() % 10000 < 1000) {
					printStateStoreStats();
				}
			}
		} catch (InterruptedException e) {
			// Si le thread est interrompu, restaure le flag d'interruption
			Thread.currentThread().interrupt();
		}
	}

	/**
	 * Méthode printStateStoreStats() - Affiche les statistiques du state store
	 * 
	 * OBJECTIF:
	 * Obtient des informations de diagnostic sur le nombre de hashtags uniques
	 * stockés dans le state store RocksDB
	 * 
	 * NOTE:
	 * Le nombre retourné est "approximé" car compter tous les éléments est coûteux.
	 * RocksDB fournit une estimation basée sur ses métadonnées internes.
	 */
	private void printStateStoreStats() {
		try {
			// Accède au state store en lecture seule
			// Utilise les QueryableStoreTypes pour spécifier qu'on veut un KeyValueStore
			ReadOnlyKeyValueStore<String, Long> store = streams.store(
				StoreQueryParameters.fromNameAndType(COUNTS_STORE, QueryableStoreTypes.keyValueStore())
			);
			
			// Obtient une estimation du nombre d'entrées (hashtags uniques) dans le store
			long count = store.approximateNumEntries();
			
			// Enregistre l'estimation dans les logs
			logger.info(() -> "State store contains approximately " + count + " hashtags");
			
		} catch (Exception e) {
			// Le state store peut ne pas être disponible immédiatement au démarrage
			// ou pendant une rééquilibrage du cluster
			// On ignore silencieusement ces erreurs temporaires
		}
	}

	/**
	 * Méthode shutdown() - Arrête proprement le service Kafka Streams
	 * 
	 * PROCESSUS:
	 * 1. Arrête la boucle de maintien en vie (flag running = false)
	 * 2. Ferme la topologie KafkaStreams proprement
	 * 3. Permet le commit final du state avant la fermeture
	 * 
	 * IMPORTANT:
	 * L'arrêt gracieux garantit que:
	 * - Tous les comptages en attente sont sauvegardés
	 * - Les connexions Kafka sont fermées proprement
	 * - Pas de perte de données
	 */
	public void shutdown() {
		logger.info("Shutting down Kafka Streams");
		
		// Arrête la boucle de maintien en vie (signal au thread start())
		running = false;
		
		// Ferme les ressources Kafka Streams
		if (streams != null) {
			// close(Duration): attend jusqu'à 10 secondes la fin du traitement
			// Si le traitement prend plus long, forcément ferme de toute façon
			streams.close(Duration.ofSeconds(10));
			logger.info("Kafka Streams shutdown complete");
		}
	}

	/**
	 * Méthode main() - Point d'entrée du service
	 * 
	 * USAGE:
	 * java HashtagCounter <bootstrap-servers> <application-id> <top-n>
	 * 
	 * EXEMPLE:
	 * java HashtagCounter localhost:9092 hashtag-counter 10
	 * 
	 * PARAMÈTRES:
	 * @param bootstrapServers Adresses Kafka (ex: "kafka1:9092,kafka2:9092")
	 * @param applicationId ID unique du service (ex: "hashtag-counter-prod")
	 * @param topN Nombre de hashtags dans le leaderboard (ex: 10 pour Top 10)
	 */
	public static void main(String[] args) {
		// === VALIDATION DES PARAMÈTRES ===
		// Vérifie que les 3 paramètres obligatoires sont fournis
		if (args.length < 3) {
			logger.severe("Usage: HashtagCounterWithKafkaStreams <bootstrap-servers> <application-id> <top-n>");
			System.exit(1);  // Quitte avec code d'erreur 1
		}
		
		// === PARSING DES PARAMÈTRES ===
		// Extrait et parse les arguments en ligne de commande
		String bootstrapServers = args[0];  // Adresses des brokers Kafka
		String applicationId = args[1];     // Identifiant unique du service
		int topN = Integer.parseInt(args[2]);  // Nombre de tops (converti de String en int)
		
		// === ENREGISTREMENT DU DÉMARRAGE ===
		logger.info("Starting Kafka Streams Hashtag Counter service");
		logger.info(() -> "Bootstrap servers: " + bootstrapServers);
		logger.info(() -> "Application ID: " + applicationId);
		logger.info(() -> "Top N: " + topN);
		
		try {
			// === CRÉATION ET LANCEMENT DU SERVICE ===
			// Crée une instance du compteur avec les paramètres spécifiés
			HashtagCounter counter = new HashtagCounter(
				bootstrapServers, applicationId, topN);
			
			// === ENREGISTREMENT DE LA CONFIGURATION ===
			logger.info("Kafka Streams Counter service initialized");
			logger.info("Consuming from: hashtags");      // Topic d'entrée
			logger.info("Publishing to: leaderboard");   // Topic de sortie
			logger.info("Running with global state aggregation");
			
			// === DÉMARRAGE DU TRAITEMENT ===
			// Démarre le traitement (bloquant - continue jusqu'à arrêt)
			counter.start();
			
		} catch (Exception e) {
			// Capture toute exception pendant l'initialisation ou l'exécution
			logger.log(Level.SEVERE, "Kafka Streams Counter service failed", e);
			System.exit(1);  // Quitte avec code d'erreur 1
		}
	}

	/**
	 * Classe interne LeaderboardProcessor - Processeur personnalisé de Kafka Streams
	 * 
	 * RÔLE:
	 * Gère le calcul et la publication périodique du leaderboard des hashtags les plus populaires.
	 * Implémente l'interface Processor pour intégrer un traitement personnalisé dans la topologie.
	 * 
	 * RESPONSABILITÉS:
	 * 1. Reçoit les mises à jour du comptage (bien qu'elles ne soient pas utilisées directement)
	 * 2. Accède en lecture au state store complet pour obtenir tous les comptages
	 * 3. Calcule le top N des hashtags périodiquement (toutes les secondes)
	 * 4. Formate les résultats en JSON
	 * 5. Publie les leaderboards dans le topic de sortie
	 * 
	 * CONCEPT CLÉS:
	 * - Processor: processeur personnalisé qui a accès au contexte Kafka Streams
	 * - Punctuation (ponctuateur): callback périodique déclenché par le temps horloge
	 * - Task 0 uniquement: seul la tâche 0 publie pour éviter les doublons
	 */
	private class LeaderboardProcessor implements Processor<String, Long, String, String> {
		// === PARAMÈTRES DE CONFIGURATION ===
		
		// Contexte Kafka Streams donnant accès aux state stores et à la capacité de forwarder des records
		private ProcessorContext<String, String> context;
		
		// Intervalle entre deux publications du leaderboard (1000ms = 1 seconde)
		private final long publishIntervalMs = 1000;
		
		// Références aux dépendances partagées avec la classe externe
		private final KafkaStreams streams;      // Instance KafkaStreams
		private final Gson gson;                 // Parseur JSON
		private final int nbLeaders;             // Nombre de tops à calculer
		private final Logger logger;             // Logger pour tracer l'exécution

		/**
		 * Constructeur du LeaderboardProcessor
		 * 
		 * @param streams Instance KafkaStreams pour accéder aux state stores
		 * @param gson Parseur JSON pour sérialiser le leaderboard
		 * @param nbLeaders Nombre de hashtags dans le top (ex: 10)
		 * @param logger Logger pour les messages
		 */
		public LeaderboardProcessor(KafkaStreams streams, Gson gson, int nbLeaders, Logger logger) {
			this.streams = streams;
			this.gson = gson;
			this.nbLeaders = nbLeaders;
			this.logger = logger;
		}

		/**
		 * Méthode init() - Initialisation du processeur
		 * 
		 * Appelée une seule fois au démarrage du processeur.
		 * C'est ici qu'on configure les ponctuateurs (callbacks périodiques).
		 * 
		 * @param context Contexte fourni par Kafka Streams
		 */
		@Override
		public void init(ProcessorContext<String, String> context) {
			this.context = context;  // Sauvegarde le contexte pour utilisation ultérieure
			scheduleLeaderboardPublication();  // Configure la publication périodique
		}

		/**
		 * Méthode process() - Traite chaque record reçu
		 * 
		 * Appelée à chaque mise à jour du comptage de hashtag.
		 * Cependant, nous ne tratons pas directement ici car nous utilisons un ponctuateur.
		 * 
		 * @param record Clé et valeur du record (hashtag et son comptage)
		 */
		@Override
		public void process(Record<String, Long> record) {
			// === REMARQUE: NE RIEN FAIRE ICI ===
			// Les mises à jour sont traitées via le ponctuateur (scheduleLeaderboardPublication)
			// qui calcule le leaderboard périodiquement plutôt que à chaque mise à jour
			// Cela évite de publier des leaderboards trop fréquent et réduit la charge réseau
		}

		/**
		 * Méthode close() - Fermeture du processeur
		 * 
		 * Appelée lors de l'arrêt du service.
		 * Permet de libérer les ressources (bien que nous n'en ayons pas ici).
		 */
		@Override
		public void close() {
			// === AUCUNE RESSOURCE À NETTOYER ===
			// Le contexte et les ponctuateurs sont gérés par Kafka Streams
		}

		/**
		 * Méthode scheduleLeaderboardPublication() - Configure la publication périodique
		 * 
		 * STRATÉGIE:
		 * Seule la tâche 0 publie le leaderboard pour éviter les doublons.
		 * Si multiple tâches publiaient, on aurait plusieurs copies du même leaderboard.
		 * 
		 * PONCTUATEUR:
		 * Configure un callback qui se déclenche toutes les 1000ms (1 seconde) basé sur l'horloge système.
		 */
		private void scheduleLeaderboardPublication() {
			// Vérifie si cette instance est la tâche 0 (partition 0)
			// La partition est déterminée par Kafka Streams lors du déploiement
			if (context.taskId().partition() == 0) {
				// Enregistre un ponctuateur: callback périodique
				context.schedule(
					Duration.ofMillis(publishIntervalMs),  // Intervalle: 1000ms
					PunctuationType.WALL_CLOCK_TIME,       // Basé sur l'horloge système (pas sur les événements)
					timestamp -> {                         // Lambda appelée à chaque déclenchement
						publishLeaderboard();              // Publie le leaderboard à jour
					}
				);
				logger.info(() -> "Task 0 will publish leaderboard every " + publishIntervalMs + "ms");
			} else {
				// Les autres tâches ignorent la publication pour éviter les doublons
				logger.info(() -> "Task " + context.taskId().partition() + " skipped (only task 0 publishes)");
			}
		}

		/**
		 * Méthode publishLeaderboard() - Publie le leaderboard mis à jour
		 * 
		 * PROCESSUS:
		 * 1. Récupère tous les comptages du state store
		 * 2. Sélectionne les top N hashtags
		 * 3. Formate en JSON
		 * 4. Envoie vers le topic de sortie
		 */
		private void publishLeaderboard() {
			try {
				// Étape 1: Récupère tous les comptages de hashtags depuis le state store
				Map<String, Long> allCounts = fetchAllHashtagCounts();
				
				// Étape 2: Sélectionne les N hashtags les plus populaires et les formate
				List<Map<String, Object>> topHashtags = buildTopHashtags(allCounts);
				
				// Étape 3: Crée le JSON du leaderboard avec timestamp et top N
				String jsonLeaderboard = buildLeaderboardJson(topHashtags);
				
				// Étape 4: Envoie (forward) le leaderboard au topic de sortie via le contexte
				context.forward(new Record<>(
					LEADERBOARD_JSON_KEY,      // Clé: "leaderboard"
					jsonLeaderboard,           // Valeur: JSON du leaderboard
					System.currentTimeMillis()  // Timestamp actuel
				));
				
				logger.info(() -> "Published leaderboard: " + jsonLeaderboard);
				
			} catch (Exception e) {
				// En cas d'erreur, enregistre l'erreur mais continue
				logger.log(Level.SEVERE, "Error publishing leaderboard", e);
			}
		}

		/**
		 * Méthode fetchAllHashtagCounts() - Récupère tous les comptages du state store
		 * 
		 * PROCESSUS:
		 * 1. Accède au state store RocksDB en lecture seule
		 * 2. Itère sur toutes les entrées (tous les hashtags)
		 * 3. Filtre les valeurs null
		 * 4. Retourne une map complète
		 * 
		 * @return Map des hashtags et leurs comptages
		 */
		private Map<String, Long> fetchAllHashtagCounts() {
			Map<String, Long> allCounts = new HashMap<>();
			
			// Accède au state store en mode lecture seule (ReadOnly)
			ReadOnlyKeyValueStore<String, Long> globalStore = streams.store(
				StoreQueryParameters.fromNameAndType(COUNTS_STORE, QueryableStoreTypes.keyValueStore())
			);
			
			// Itère sur toutes les entrées du store avec try-with-resources pour fermer l'iterator
			try (KeyValueIterator<String, Long> iterator = globalStore.all()) {
				while (iterator.hasNext()) {
					// Récupère l'entrée suivante (clé = hashtag, valeur = comptage)
					KeyValue<String, Long> entry = iterator.next();
					
					// Filtre les valeurs null (par sécurité)
					if (entry.value != null) {
						// Ajoute à la map : hashtag -> nombre d'occurrences
						allCounts.put(entry.key, entry.value);
					}
				}
			}
			return allCounts;
		}

		/**
		 * Méthode buildTopHashtags() - Sélectionne et formate les top N hashtags
		 * 
		 * PROCESSUS (utilisant les streams Java):
		 * 1. Convertit la map en stream d'entrées
		 * 2. Trie par comptage décroissant (du plus grand au plus petit)
		 * 3. Limite à N éléments
		 * 4. Formate chacun en Map {hashtag, count}
		 * 5. Collecte en liste
		 * 
		 * EXEMPLE:
		 * Entrée: {#Java:100, #Python:150, #Kotlin:50, #Rust:75}
		 * Sortie (N=2): [{hashtag:#Python, count:150}, {hashtag:#Java, count:100}]
		 * 
		 * @param allCounts Tous les comptages
		 * @return Liste des top N hashtags formatés
		 */
		private List<Map<String, Object>> buildTopHashtags(Map<String, Long> allCounts) {
			return allCounts.entrySet().stream()
				// sorted: trie les entrées
				// comparingByValue().reversed(): du plus grand au plus petit comptage
				.sorted(Map.Entry.<String, Long>comparingByValue().reversed())
				
				// limit: garde seulement les N premiers (ex: top 10)
				.limit(nbLeaders)
				
				// map: transforme chaque entrée en Map {hashtag, count}
				.map(entry -> {
					Map<String, Object> item = new HashMap<>();
					item.put(HASHTAG_JSON_KEY, entry.getKey());      // Clé: "hashtag"
					item.put(COUNT_JSON_KEY, entry.getValue());      // Clé: "count"
					return item;
				})
				
				// collect: convertit le stream en liste
				.collect(Collectors.toList());
		}

		/**
		 * Méthode buildLeaderboardJson() - Formate le leaderboard en JSON
		 * 
		 * STRUCTURE DU JSON:
		 * {
		 *   "timestamp": 1700000000000,
		 *   "leaderboard": [
		 *     {"hashtag": "#Python", "count": 150},
		 *     {"hashtag": "#Java", "count": 100}
		 *   ]
		 * }
		 * 
		 * @param topHashtags Liste des top N hashtags
		 * @return String JSON sérialisé
		 */
		private String buildLeaderboardJson(List<Map<String, Object>> topHashtags) {
			// Crée la structure racine du leaderboard
			Map<String, Object> leaderboard = new HashMap<>();
			
			// Ajoute le timestamp actuel (millisecondes depuis Unix epoch)
			leaderboard.put(TIMESTAMP_JSON_KEY, System.currentTimeMillis());
			
			// Ajoute le tableau des top N hashtags
			leaderboard.put(LEADERBOARD_JSON_KEY, topHashtags);
			
			// Sérialise la map en JSON string et retourne
			return gson.toJson(leaderboard);
		}
	}
}