MockTwitterStreamRandom.java

package tweetoscope.tweetsProducer;

import java.time.OffsetDateTime;
import java.util.Random;

import com.twitter.clientlib.model.Tweet;


/**
 * PRODUCTEUR 1 : Générateur de tweets aléatoires continus
 * 
 * ============================================================================
 * DESCRIPTION GÉNÉRALE
 * ============================================================================
 * 
 * MockTwitterStreamRandom simule un flux continu de tweets aléatoires. C'est utile
 * pour tester le pipeline de traitement sans dépendre de l'API Twitter réelle,
 * qui a des limites de débit et peut être indisponible.
 * 
 * Ce producteur :
 * 1. Génère des tweets de manière aléatoire et continue (boucle infinie)
 * 2. Sélectionne les hashtags à partir d'une liste prédéfinie (12 hashtags)
 * 3. Utilise une distribution Gaussienne pour rendre la sélection réaliste
 * 4. Varie le nombre de hashtags par tweet (0 à 4 environ)
 * 5. Assigne une langue aléatoire (fr, en, ru, es, it)
 * 6. Crée des tweets avec ID, texte, langue et timestamp
 * 7. Publie chaque tweet sur le topic Kafka "raw-tweets"
 * 8. Maintient un débit constant (~100 tweets/seconde avec 10ms de délai)
 * 
 * Cas d'utilisation :
 * - Tests du pipeline avec un flux stable et prévisible
 * - Développement sans accès à l'API Twitter
 * - Démonstration du système avec un flux de données continu
 * 
 * ============================================================================
 */
public final class MockTwitterStreamRandom extends TweetsProducer {

	/**
	 * Générateur de nombres aléatoires (java.util.Random).
	 * Utilisé pour :
	 * - Sélectionner les hashtags (avec distribution Gaussienne)
	 * - Déterminer le nombre de hashtags par tweet (0-4)
	 * - Choisir la langue aléatoirement
	 * Initialisé une seule fois au démarrage (seed peut varier).
	 */
	private final Random random = new Random();

	/**
	 * Constructeur de MockTwitterStreamRandom.
	 * 
	 * Responsabilités :
	 * 1. Appelle le constructeur parent TweetsProducer(brokers)
	 *    - Initialise la connexion Kafka
	 *    - Configure le producteur Kafka interne
	 *    - Connecte au topic "raw-tweets"
	 * 
	 * 2. Initialise l'attribut Random (fait automatiquement à la déclaration)
	 * 
	 * @param brokers Adresses des brokers Kafka, format "localhost:9092,localhost:9093"
	 */
	public MockTwitterStreamRandom(String brokers) {
		super(brokers);
	}

	/**
	 * Exécute le producteur : génère et publie des tweets aléatoires en continu.
	 * 
	 * ============================================================================
	 * PROCESSUS GÉNÉRAL
	 * ============================================================================
	 * 
	 * La méthode run() exécute une boucle infinie qui :
	 * 1. Génère un tweet aléatoire (ID, texte, langue, timestamp)
	 * 2. Le publie sur Kafka (topic "raw-tweets")
	 * 3. Attend 10ms (maintient le débit de ~100 tweets/seconde)
	 * 4. Recommence indéfiniment
	 * 
	 * Cette boucle infinie est intentionnelle pour :
	 * - Simuler un flux de tweets continu
	 * - Maintenir le service actif en Docker/Kubernetes
	 * 
	 * ============================================================================
	 * COMPOSANTES DU TWEET
	 * ============================================================================
	 * 
	 * 1. ID : numéro séquentiel ("1", "2", "3", ...)
	 *    - Commence à 1
	 *    - Incrémente à chaque tweet généré
	 *    - Format : String
	 * 
	 * 2. TEXTE : combinaison de hashtags sélectionnés aléatoirement
	 *    Exemple : "Tweet 42 #fun #bitCoin #weather"
	 *    - Commence par "Tweet <nb>"
	 *    - Ajoute 0 à 4 hashtags sélectionnés aléatoirement
	 *    - Format : String
	 * 
	 * 3. LANGUE : choisie aléatoirement parmi [fr, en, ru, es, it]
	 *    - Simulation de tweets multlingues
	 *    - Format : code langue ISO 639-1
	 * 
	 * 4. TIMESTAMP : la date/heure actuelle (OffsetDateTime.now())
	 *    - Horodatage ISO-8601 avec fuseau horaire
	 *    - Mis à jour à chaque tweet
	 * 
	 * ============================================================================
	 * LISTE DE HASHTAGS ET DISTRIBUTION GAUSSIENNE
	 * ============================================================================
	 * 
	 * Hashtags disponibles (12 au total) :
	 * { "fun", "bitCoin", "climate", "crypto", "CS", "Metz", "weather", "summer",
	 *   "holidays", "health", "running", "sport" }
	 * 
	 * Sélection des hashtags avec distribution GAUSSIENNE :
	 * 
	 * La distribution Gaussienne est utilisée pour rendre les résultats plus réalistes.
	 * Sans distribution, tous les hashtags seraient équiprobables (irréaliste).
	 * Avec distribution, les hashtags au CENTRE sont plus probables (réaliste).
	 * 
	 * Mathématique :
	 *   d = random.nextGaussian()           // Normal(0, 1) : valeur aléatoire centrée sur 0
	 *   d = (int) (hashtags.length / 2.0 + d * hashtags.length / 2.0)
	 *      // Transforme la distribution pour être centrée sur l'indice 6 (milieu)
	 *      // La moyenne devient indice 6, écart-type = 2
	 * 
	 * Exemple (hashtags.length = 12) :
	 *   - 68% des hashtags sélectionnés seront entre indices 4 et 8 (ceux du milieu)
	 *   - 95% seront entre indices 2 et 10
	 *   - Seulement quelques % seront aux extrêmes (indices 0-1 ou 11)
	 * 
	 * Résultat : les hashtags populaires (#CS, #Metz) apparaissent plus souvent
	 *            que les hashtags rares (#bitCoin, #crypto)
	 * 
	 * ============================================================================
	 * NOMBRE DE HASHTAGS PAR TWEET
	 * ============================================================================
	 * 
	 * Calcul du nombre de hashtags :
	 *   int numHashtags = (int) (4 * random.nextDouble())
	 *   // random.nextDouble() : [0.0, 1.0) - nombre aléatoire uniforme
	 *   // 4 * valeur : [0.0, 4.0)
	 *   // (int) cast : donne 0, 1, 2 ou 3 (jamais 4)
	 * 
	 * Distribution :
	 *   - 0 hashtag : 25% des tweets (pas de hashtag)
	 *   - 1 hashtag : 25% des tweets
	 *   - 2 hashtags : 25% des tweets
	 *   - 3 hashtags : 25% des tweets
	 *   - Total : 4 cas équiprobables
	 * 
	 * Simulation réaliste : beaucoup de tweets n'ont pas de hashtag,
	 *                       d'autres en ont 1, 2 ou 3.
	 * 
	 * ============================================================================
	 * DÉBIT DE PUBLICATION (TIMING)
	 * ============================================================================
	 * 
	 * Thread.sleep(10) : pause 10 millisecondes après chaque tweet
	 * 
	 * Calcul du débit :
	 *   - 1 seconde = 1000 ms
	 *   - Avec 10 ms par tweet : 1000 / 10 = 100 tweets/seconde
	 *   - Débit stable et prévisible
	 * 
	 * Utilité :
	 * - Évite de surcharger Kafka avec trop de tweets
	 * - Laisse du temps au pipeline pour traiter les données
	 * - Simule un flux réaliste (pas d'explosion de données)
	 * 
	 * ============================================================================
	 */
	@Override
	@SuppressWarnings("java:S2189")  // This is a producer service - infinite loop is intentional for continuous streaming
	public void run() {
		// Liste des hashtags disponibles (12 au total)
		// Ces hashtags sont les thèmes possibles pour les tweets générés
		String[] hashtags = { "fun", "bitCoin", "climate", "crypto", "CS", "Metz", "weather", "summer", "holidays",
				"health", "running", "sport" };
		
		// Liste des langues possibles pour les tweets (5 langues)
		String[] languages = { "fr", "en", "ru", "es", "it" };

		// Variable locale pour stocker le tweet en cours de création
		Tweet tweet;
		
		// Compteur de tweets générés (utilisé pour l'ID du tweet)
		int nb = 0;
		
		// Texte du tweet en cours de construction
		String text;
		
		// ========================================================================
		// BOUCLE INFINIE : génération et publication continue de tweets
		// ========================================================================
		while (true) {
			// ÉTAPE 1 : Initialisation du tweet
			nb++;  // Incrémente le compteur (1, 2, 3, ...)
			tweet = new Tweet();  // Crée un nouvel objet Tweet (vierge)
			text = "Tweet " + nb;  // Commence le texte avec "Tweet <numéro>"
			
			// ÉTAPE 2 : Sélection et ajout aléatoire des hashtags
			// Boucle : ajoute de 0 à 3 hashtags au texte (selon (int)(4 * random.nextDouble()))
			for (int i = 0; i < (int) (4 * random.nextDouble()); i++) {
				double d;  // Variable pour stocker l'indice du hashtag sélectionné
				
				// Boucle do-while : assure que d est dans la plage [0, hashtags.length - 1]
				do {
					// Génère une valeur Gaussienne centrée sur 0 (distribution normale)
					d = random.nextGaussian();
					
					// Transforme la valeur Gaussienne pour être centrée sur le milieu du tableau
					// et ayant une plage acceptable [0, hashtags.length - 1]
					// Formule : centre + (valeur Gaussienne * écart-type)
					// Résultat : indice biaisé vers le centre (plus réaliste)
					d = (int) (hashtags.length / 2.0 + d * hashtags.length / 2.0);
					
				// Répète si d est hors de la plage valide (< 0 ou >= hashtags.length)
				} while (d < 0 || d > hashtags.length - 1);
				
				// Ajoute le hashtag sélectionné au texte du tweet
				// Format : "#<hashtag> " (avec un espace après)
				text += "#" + hashtags[(int) d] + " ";
			}
			
			// ÉTAPE 3 : Configuration du tweet avec les attributs générés
			tweet.setId("" + nb);  // ID = numéro du tweet en String
			tweet.setText(text);  // Texte = "Tweet <nb> #hashtag1 #hashtag2 ..."
			tweet.setLang(languages[(int) (random.nextDouble() * languages.length)]);  // Langue aléatoire
			tweet.setCreatedAt(OffsetDateTime.now());  // Timestamp = maintenant
			
			// ÉTAPE 4 : Publication du tweet sur Kafka
			// publishTweet() : envoie le tweet au topic "raw-tweets"
			// Ce tweet sera consommé par HashtagExtractor
			publishTweet(tweet);
			
			// ÉTAPE 5 : Pause pour maintenir un débit stable
			// 10 ms = ~100 tweets/seconde (1000 ms / 10 ms = 100)
			try {
				Thread.sleep(10);  // Dort 10 millisecondes
			} catch (InterruptedException e) {
				// InterruptedException : quelqu'un a appelé thread.interrupt()
				// Typiquement lors d'un arrêt gracieux (Ctrl+C ou signal SIGTERM)
				java.util.logging.Logger.getLogger(MockTwitterStreamRandom.class.getName()).warning("Producer sleep interrupted");
				// Signale au thread qu'il a été interrompu
				Thread.currentThread().interrupt();
			}
		}
	}
}