MockTwitterStreamScenario.java
package tweetoscope.tweetsProducer;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import com.twitter.clientlib.model.Tweet;
/**
* Classe MockTwitterStreamScenario
*
* OBJECTIF:
* Cette classe crée et publie un scénario pré-défini de 10 tweets dans Kafka.
* Elle simule un flux Twitter avec des tweets test qui contiennent des hashtags
* spécifiques (notamment #travail, #work, #success, #failure) pour faciliter
* le testing du système de comptage de hashtags.
*
* CAS D'USAGE:
* - Tests et démonstrations du système de filtrage et comptage de tweets
* - Validation du pipeline de traitement sans dépendre de l'API Twitter réelle
* - Scénarios de test reproductibles avec données déterministes
*
* DONNÉES INCLUSES:
* Le scénario inclut 10 tweets multilingues (FR/EN) avec différents auteurs,
* conversations et hashtags pour couvrir divers cas d'usage du système.
*
*/
public final class MockTwitterStreamScenario extends TweetsProducer {
// Logger statique pour tracer les événements et erreurs durant l'exécution
// Utilisé pour enregistrer l'avancement du scénario et les interruptions
private static final java.util.logging.Logger logger = java.util.logging.Logger.getLogger(MockTwitterStreamScenario.class.getName());
/**
* Constructeur du producteur de scénario
*
* @param brokers Adresse des serveurs Kafka (bootstrap servers) où les tweets seront publiés
* Format: "localhost:9092" ou "kafka-broker-1:9092,kafka-broker-2:9092"
*/
public MockTwitterStreamScenario(String brokers) {
super(brokers); // Initialise le producteur parent avec l'adresse des serveurs Kafka
}
/**
* Méthode run() - Exécute le scénario de test complet
*
* PROCESSUS:
* 1. Crée un tableau de 10 tweets pré-définis
* 2. Configure chaque tweet avec ses propriétés (ID, texte, auteur, date, langue, etc.)
* 3. Publie tous les tweets dans le topic Kafka
* 4. Enregistre les résultats dans les logs
* 5. Maintient le conteneur actif indéfiniment
*
* STRUCTURE DES TWEETS:
* Chaque tweet contient: ID unique, date/heure précise, contenu textuel,
* ID d'auteur, ID de conversation, langue et hashtags spécifiques
*/
@Override
public void run() {
// === CRÉATION DU TABLEAU DE TWEETS ===
// Initialise un tableau de 10 éléments pour stocker les tweets du scénario
// Le nombre 10 est choisi pour couvrir plusieurs cas de test différents
Tweet[] tweets = new Tweet[10];
// === TWEET #1 - EN FRANÇAIS SUR LE TRAVAIL ===
// Crée le premier tweet du scénario
tweets[0] = new Tweet();
// ID unique du tweet pour l'identifier dans le système
tweets[0].setId("001");
// Date et heure précise de création: 20 juin 2021 à 11h46:23 UTC
// Utilisée pour trier chronologiquement et analyser les tendances temporelles
tweets[0].setCreatedAt(OffsetDateTime.of(2021, 6, 20, 11, 46, 23, 0, ZoneOffset.UTC));
// Texte du tweet contenant le hashtag #travail (travail en français)
// Ce hashtag sera extrait et compté par le système de comptage de hashtags
tweets[0].setText("Choisissez un #travail que vous aimez et vous n'aurez pas à travailler un seul jour de votre vie");
// ID de l'auteur qui a écrit ce tweet (l'auteur est l'utilisateur 31)
tweets[0].setAuthorId("31");
// ID de conversation: groupe de tweets liés (conversation 01 avec plusieurs tweets)
tweets[0].setConversationId("01");
// Récupère les informations géographiques du tweet (optionnel, peut être null)
tweets[0].getGeo();
// Langue du tweet: FR pour French (français)
// Important pour le filtrage par langue et l'analyse linguistique
tweets[0].setLang("fr");
// === TWEET #2 - EN FRANÇAIS, MÊME CONVERSATION QUE TWEET #1 ===
// Crée le deuxième tweet du même auteur (31) dans la même conversation (01)
tweets[1] = new Tweet();
tweets[1].setId("002");
tweets[1].setCreatedAt(OffsetDateTime.of(2021, 6, 20, 11, 48, 20, 0, ZoneOffset.UTC));
tweets[1].setText("Si on travaille pour gagner sa vie, pourquoi se tuer au #travail ?");
tweets[1].setAuthorId("31"); // Même auteur que tweet #0
tweets[1].setConversationId("01"); // Même conversation pour montrer un thread/fil de discussion
tweets[1].getGeo();
tweets[1].setLang("fr");
// === TWEET #3 - EN ANGLAIS AVEC HASHTAGS #Failure ET #success ===
// Nouvel auteur (32), mais même conversation (01), langue anglaise
// Teste le comptage de multiples hashtags dans un même tweet
tweets[2] = new Tweet();
tweets[2].setId("003");
tweets[2].setCreatedAt(OffsetDateTime.of(2022, 6, 24, 6, 6, 13, 0, ZoneOffset.UTC));
tweets[2].setText("#Failure is not the opposite of #success: it's part of success.");
tweets[2].setAuthorId("32"); // Auteur différent
tweets[2].setConversationId("01");
tweets[2].getGeo();
tweets[2].setLang("en"); // Langue anglaise
// === TWEET #4 - EN ANGLAIS AVEC HASHTAG #work ===
// Auteur 34, toujours dans la conversation 01, contient le hashtag #work
tweets[3] = new Tweet();
tweets[3].setId("004");
tweets[3].setCreatedAt(OffsetDateTime.of(2022, 7, 2, 4, 44, 17, 0, ZoneOffset.UTC));
tweets[3].setText("You are not your resume, you are your #work.");
tweets[3].setAuthorId("34");
tweets[3].setConversationId("01");
tweets[3].getGeo();
tweets[3].setLang("en");
// === TWEET #5 - EN ANGLAIS, NOUVELLE CONVERSATION ===
// Auteur 35, nouvelle conversation (03), pas de hashtag spécifique
// Teste la gestion des tweets sans les hashtags principaux (#work, #success, #failure)
tweets[4] = new Tweet();
tweets[4].setId("005");
tweets[4].setCreatedAt(OffsetDateTime.of(2022, 8, 20, 10, 49, 23, 0, ZoneOffset.UTC));
tweets[4].setText("People who wonder if the glass is half empty or half full miss the point. The glass is refillable.");
tweets[4].setAuthorId("35");
tweets[4].setConversationId("03"); // Conversation différente
tweets[4].getGeo();
tweets[4].setLang("en");
// === TWEET #6 - EN ANGLAIS, CONVERSATION 04 ===
// Auteur 33, conversation 04, sans hashtag ciblé
tweets[5] = new Tweet();
tweets[5].setId("006");
tweets[5].setCreatedAt(OffsetDateTime.of(2022, 9, 20, 11, 23, 31, 0, ZoneOffset.UTC));
tweets[5].setText("If you think you are too small to make a difference, try sleeping with a mosquito.");
tweets[5].setAuthorId("33");
tweets[5].setConversationId("04"); // Nouvelle conversation
tweets[5].getGeo();
tweets[5].setLang("en");
// === TWEET #7 - EN ANGLAIS AVEC HASHTAG #work, CONVERSATION 03 ===
// Auteur 34 (déjà vu avant), conversation 03 (partagée avec tweet #5)
// Teste l'incrémentation du compteur #work
tweets[6] = new Tweet();
tweets[6].setId("007");
tweets[6].setCreatedAt(OffsetDateTime.of(2022, 11, 30, 2, 15, 0, 0, ZoneOffset.UTC));
tweets[6].setText("Nothing will #work unless you do.");
tweets[6].setAuthorId("34");
tweets[6].setConversationId("03"); // Retour à la conversation 03
tweets[6].getGeo();
tweets[6].setLang("en");
// === TWEET #8 - EN ANGLAIS, CONVERSATION 05 ===
// Auteur 35, conversation 05 (nouvelle), pas de hashtag ciblé
tweets[7] = new Tweet();
tweets[7].setId("008");
tweets[7].setCreatedAt(OffsetDateTime.of(2022, 12, 1, 8, 30, 20, 0, ZoneOffset.UTC));
tweets[7].setText("If you get tired, learn to rest, not to quit.");
tweets[7].setAuthorId("35");
tweets[7].setConversationId("05"); // Nouvelle conversation
tweets[7].getGeo();
tweets[7].setLang("en");
// === TWEET #9 - EN ANGLAIS AVEC #Failure ET #success, CONVERSATION 02 ===
// Auteur 33, conversation 02, contient deux hashtags cibles (#Failure, #success)
// Teste la détection de plusieurs hashtags différents dans un seul tweet
tweets[8] = new Tweet();
tweets[8].setId("009");
tweets[8].setCreatedAt(OffsetDateTime.of(2023, 1, 2, 7, 55, 56, 0, ZoneOffset.UTC));
tweets[8].setText("#Failure is #success in progress.");
tweets[8].setAuthorId("33");
tweets[8].setConversationId("02"); // Conversation 02
tweets[8].getGeo();
tweets[8].setLang("en");
// === TWEET #10 - EN ANGLAIS AVEC #success ET #work, CONVERSATION 07 ===
// Auteur 36, conversation 07 (dernière conversation), deux hashtags (#success, #work)
// Tweet final du scénario
tweets[9] = new Tweet();
tweets[9].setId("010");
tweets[9].setCreatedAt(OffsetDateTime.of(2023, 8, 3, 12, 40, 25, 0, ZoneOffset.UTC));
tweets[9].setText("The only place #success comes before #work is in the dictionary.");
tweets[9].setAuthorId("36"); // Nouvel auteur
tweets[9].setConversationId("07"); // Nouvelle conversation
tweets[9].getGeo();
tweets[9].setLang("en");
// === PUBLICATION DE TOUS LES TWEETS ===
// Boucle qui itère sur tous les 10 tweets du tableau
// publishTweet() est une méthode héritée du parent qui envoie chaque tweet à Kafka
for (Tweet tweet : tweets) {
// Publie le tweet courant dans le topic Kafka configuré
publishTweet(tweet);
}
// === ENREGISTREMENT DES RÉSULTATS ===
// Enregistre dans les logs que tous les tweets ont été publiés avec succès
logger.info("Scenario completed - all 10 tweets published.");
// Indique que le producteur a terminé son travail mais reste actif
logger.info("Producer finished. Container will remain active.");
// === MAINTIEN DU CONTENEUR EN VIE ===
// Appelle la méthode keepContainerAlive() pour exécuter une boucle infinie
// Cela empêche le programme de terminer et le conteneur Docker de s'arrêter
keepContainerAlive();
// NOTE: close() n'est jamais appelée pour garder le conteneur en exécution
// Si close() était appelée, les ressources Kafka seraient libérées et le conteneur s'arrêterait
}
/**
* Méthode keepContainerAlive() - Maintient le conteneur Docker actif
*
* OBJECTIF ARCHITECTURAL:
* Dans une architecture containerisée (Docker/Kubernetes), si le processus principal
* se termine, le conteneur s'arrête automatiquement. Cependant, nous voulons que le
* producteur Kafka reste connecté au cluster Kafka de manière permanente pour:
* - Maintenir la connexion avec le broker Kafka
* - Garder les ressources réseau ouvertes
* - Assurer une réactivité instantanée en cas de redémarrage du système
*
* SOLUTION:
* Une boucle infinie qui dort 60 secondes à chaque itération garde le thread vivant
* sans consommer beaucoup de ressources CPU (le thread dort, il n'utilise pas le CPU)
*
* NOTE DE SÉCURITÉ:
* L'annotation @SuppressWarnings("java:S2189") supprime l'avertissement SonarQube
* qui détecte les boucles infinies. C'est intentionnel et nécessaire ici.
*/
@SuppressWarnings("java:S2189") // Infinite loop is intentional to keep container running
private void keepContainerAlive() {
try {
// Boucle infinie - continue indéfiniment
while (true) {
// Fait dormir le thread pendant 60 000 millisecondes = 60 secondes
// Pendant cette période, le thread ne consomme pratiquement pas de CPU (état "sleep")
// Le thread sera réveillé automatiquement après 60 secondes ou en cas d'interruption
Thread.sleep(60000); // Sleep forever to keep container running
}
} catch (InterruptedException e) {
// Capture l'exception si le thread est interrompu de l'extérieur
// Cela peut survenir lors d'un arrêt gracieux du conteneur Docker
logger.warning("Producer interrupted"); // Enregistre l'interruption
// Restaure l'état d'interruption du thread pour que le code appelant le sache
// C'est une bonne pratique en Java quand on catch une InterruptedException
Thread.currentThread().interrupt();
}
}
}