MockTwitterStreamRandom.java
package tweetoscope.tweetsProducer;
import java.time.OffsetDateTime;
import java.util.Random;
import com.twitter.clientlib.model.Tweet;
/**
* PRODUCTEUR 1 : Générateur de tweets aléatoires continus
*
* ============================================================================
* DESCRIPTION GÉNÉRALE
* ============================================================================
*
* MockTwitterStreamRandom simule un flux continu de tweets aléatoires. C'est utile
* pour tester le pipeline de traitement sans dépendre de l'API Twitter réelle,
* qui a des limites de débit et peut être indisponible.
*
* Ce producteur :
* 1. Génère des tweets de manière aléatoire et continue (boucle infinie)
* 2. Sélectionne les hashtags à partir d'une liste prédéfinie (12 hashtags)
* 3. Utilise une distribution Gaussienne pour rendre la sélection réaliste
* 4. Varie le nombre de hashtags par tweet (0 à 4 environ)
* 5. Assigne une langue aléatoire (fr, en, ru, es, it)
* 6. Crée des tweets avec ID, texte, langue et timestamp
* 7. Publie chaque tweet sur le topic Kafka "raw-tweets"
* 8. Maintient un débit constant (~100 tweets/seconde avec 10ms de délai)
*
* Cas d'utilisation :
* - Tests du pipeline avec un flux stable et prévisible
* - Développement sans accès à l'API Twitter
* - Démonstration du système avec un flux de données continu
*
* ============================================================================
*/
public final class MockTwitterStreamRandom extends TweetsProducer {
/**
* Générateur de nombres aléatoires (java.util.Random).
* Utilisé pour :
* - Sélectionner les hashtags (avec distribution Gaussienne)
* - Déterminer le nombre de hashtags par tweet (0-4)
* - Choisir la langue aléatoirement
* Initialisé une seule fois au démarrage (seed peut varier).
*/
private final Random random = new Random();
/**
* Constructeur de MockTwitterStreamRandom.
*
* Responsabilités :
* 1. Appelle le constructeur parent TweetsProducer(brokers)
* - Initialise la connexion Kafka
* - Configure le producteur Kafka interne
* - Connecte au topic "raw-tweets"
*
* 2. Initialise l'attribut Random (fait automatiquement à la déclaration)
*
* @param brokers Adresses des brokers Kafka, format "localhost:9092,localhost:9093"
*/
public MockTwitterStreamRandom(String brokers) {
super(brokers);
}
/**
* Exécute le producteur : génère et publie des tweets aléatoires en continu.
*
* ============================================================================
* PROCESSUS GÉNÉRAL
* ============================================================================
*
* La méthode run() exécute une boucle infinie qui :
* 1. Génère un tweet aléatoire (ID, texte, langue, timestamp)
* 2. Le publie sur Kafka (topic "raw-tweets")
* 3. Attend 10ms (maintient le débit de ~100 tweets/seconde)
* 4. Recommence indéfiniment
*
* Cette boucle infinie est intentionnelle pour :
* - Simuler un flux de tweets continu
* - Maintenir le service actif en Docker/Kubernetes
*
* ============================================================================
* COMPOSANTES DU TWEET
* ============================================================================
*
* 1. ID : numéro séquentiel ("1", "2", "3", ...)
* - Commence à 1
* - Incrémente à chaque tweet généré
* - Format : String
*
* 2. TEXTE : combinaison de hashtags sélectionnés aléatoirement
* Exemple : "Tweet 42 #fun #bitCoin #weather"
* - Commence par "Tweet <nb>"
* - Ajoute 0 à 4 hashtags sélectionnés aléatoirement
* - Format : String
*
* 3. LANGUE : choisie aléatoirement parmi [fr, en, ru, es, it]
* - Simulation de tweets multlingues
* - Format : code langue ISO 639-1
*
* 4. TIMESTAMP : la date/heure actuelle (OffsetDateTime.now())
* - Horodatage ISO-8601 avec fuseau horaire
* - Mis à jour à chaque tweet
*
* ============================================================================
* LISTE DE HASHTAGS ET DISTRIBUTION GAUSSIENNE
* ============================================================================
*
* Hashtags disponibles (12 au total) :
* { "fun", "bitCoin", "climate", "crypto", "CS", "Metz", "weather", "summer",
* "holidays", "health", "running", "sport" }
*
* Sélection des hashtags avec distribution GAUSSIENNE :
*
* La distribution Gaussienne est utilisée pour rendre les résultats plus réalistes.
* Sans distribution, tous les hashtags seraient équiprobables (irréaliste).
* Avec distribution, les hashtags au CENTRE sont plus probables (réaliste).
*
* Mathématique :
* d = random.nextGaussian() // Normal(0, 1) : valeur aléatoire centrée sur 0
* d = (int) (hashtags.length / 2.0 + d * hashtags.length / 2.0)
* // Transforme la distribution pour être centrée sur l'indice 6 (milieu)
* // La moyenne devient indice 6, écart-type = 2
*
* Exemple (hashtags.length = 12) :
* - 68% des hashtags sélectionnés seront entre indices 4 et 8 (ceux du milieu)
* - 95% seront entre indices 2 et 10
* - Seulement quelques % seront aux extrêmes (indices 0-1 ou 11)
*
* Résultat : les hashtags populaires (#CS, #Metz) apparaissent plus souvent
* que les hashtags rares (#bitCoin, #crypto)
*
* ============================================================================
* NOMBRE DE HASHTAGS PAR TWEET
* ============================================================================
*
* Calcul du nombre de hashtags :
* int numHashtags = (int) (4 * random.nextDouble())
* // random.nextDouble() : [0.0, 1.0) - nombre aléatoire uniforme
* // 4 * valeur : [0.0, 4.0)
* // (int) cast : donne 0, 1, 2 ou 3 (jamais 4)
*
* Distribution :
* - 0 hashtag : 25% des tweets (pas de hashtag)
* - 1 hashtag : 25% des tweets
* - 2 hashtags : 25% des tweets
* - 3 hashtags : 25% des tweets
* - Total : 4 cas équiprobables
*
* Simulation réaliste : beaucoup de tweets n'ont pas de hashtag,
* d'autres en ont 1, 2 ou 3.
*
* ============================================================================
* DÉBIT DE PUBLICATION (TIMING)
* ============================================================================
*
* Thread.sleep(10) : pause 10 millisecondes après chaque tweet
*
* Calcul du débit :
* - 1 seconde = 1000 ms
* - Avec 10 ms par tweet : 1000 / 10 = 100 tweets/seconde
* - Débit stable et prévisible
*
* Utilité :
* - Évite de surcharger Kafka avec trop de tweets
* - Laisse du temps au pipeline pour traiter les données
* - Simule un flux réaliste (pas d'explosion de données)
*
* ============================================================================
*/
@Override
@SuppressWarnings("java:S2189") // This is a producer service - infinite loop is intentional for continuous streaming
public void run() {
// Liste des hashtags disponibles (12 au total)
// Ces hashtags sont les thèmes possibles pour les tweets générés
String[] hashtags = { "fun", "bitCoin", "climate", "crypto", "CS", "Metz", "weather", "summer", "holidays",
"health", "running", "sport" };
// Liste des langues possibles pour les tweets (5 langues)
String[] languages = { "fr", "en", "ru", "es", "it" };
// Variable locale pour stocker le tweet en cours de création
Tweet tweet;
// Compteur de tweets générés (utilisé pour l'ID du tweet)
int nb = 0;
// Texte du tweet en cours de construction
String text;
// ========================================================================
// BOUCLE INFINIE : génération et publication continue de tweets
// ========================================================================
while (true) {
// ÉTAPE 1 : Initialisation du tweet
nb++; // Incrémente le compteur (1, 2, 3, ...)
tweet = new Tweet(); // Crée un nouvel objet Tweet (vierge)
text = "Tweet " + nb; // Commence le texte avec "Tweet <numéro>"
// ÉTAPE 2 : Sélection et ajout aléatoire des hashtags
// Boucle : ajoute de 0 à 3 hashtags au texte (selon (int)(4 * random.nextDouble()))
for (int i = 0; i < (int) (4 * random.nextDouble()); i++) {
double d; // Variable pour stocker l'indice du hashtag sélectionné
// Boucle do-while : assure que d est dans la plage [0, hashtags.length - 1]
do {
// Génère une valeur Gaussienne centrée sur 0 (distribution normale)
d = random.nextGaussian();
// Transforme la valeur Gaussienne pour être centrée sur le milieu du tableau
// et ayant une plage acceptable [0, hashtags.length - 1]
// Formule : centre + (valeur Gaussienne * écart-type)
// Résultat : indice biaisé vers le centre (plus réaliste)
d = (int) (hashtags.length / 2.0 + d * hashtags.length / 2.0);
// Répète si d est hors de la plage valide (< 0 ou >= hashtags.length)
} while (d < 0 || d > hashtags.length - 1);
// Ajoute le hashtag sélectionné au texte du tweet
// Format : "#<hashtag> " (avec un espace après)
text += "#" + hashtags[(int) d] + " ";
}
// ÉTAPE 3 : Configuration du tweet avec les attributs générés
tweet.setId("" + nb); // ID = numéro du tweet en String
tweet.setText(text); // Texte = "Tweet <nb> #hashtag1 #hashtag2 ..."
tweet.setLang(languages[(int) (random.nextDouble() * languages.length)]); // Langue aléatoire
tweet.setCreatedAt(OffsetDateTime.now()); // Timestamp = maintenant
// ÉTAPE 4 : Publication du tweet sur Kafka
// publishTweet() : envoie le tweet au topic "raw-tweets"
// Ce tweet sera consommé par HashtagExtractor
publishTweet(tweet);
// ÉTAPE 5 : Pause pour maintenir un débit stable
// 10 ms = ~100 tweets/seconde (1000 ms / 10 ms = 100)
try {
Thread.sleep(10); // Dort 10 millisecondes
} catch (InterruptedException e) {
// InterruptedException : quelqu'un a appelé thread.interrupt()
// Typiquement lors d'un arrêt gracieux (Ctrl+C ou signal SIGTERM)
java.util.logging.Logger.getLogger(MockTwitterStreamRandom.class.getName()).warning("Producer sleep interrupted");
// Signale au thread qu'il a été interrompu
Thread.currentThread().interrupt();
}
}
}
}