MockTwitterStreamScenario.java

package tweetoscope.tweetsProducer;

import java.time.OffsetDateTime;
import java.time.ZoneOffset;

import com.twitter.clientlib.model.Tweet;

/**
 * Classe MockTwitterStreamScenario
 * 
 * OBJECTIF:
 * Cette classe crée et publie un scénario pré-défini de 10 tweets dans Kafka.
 * Elle simule un flux Twitter avec des tweets test qui contiennent des hashtags
 * spécifiques (notamment #travail, #work, #success, #failure) pour faciliter
 * le testing du système de comptage de hashtags.
 * 
 * CAS D'USAGE:
 * - Tests et démonstrations du système de filtrage et comptage de tweets
 * - Validation du pipeline de traitement sans dépendre de l'API Twitter réelle
 * - Scénarios de test reproductibles avec données déterministes
 * 
 * DONNÉES INCLUSES:
 * Le scénario inclut 10 tweets multilingues (FR/EN) avec différents auteurs,
 * conversations et hashtags pour couvrir divers cas d'usage du système.
 * 
 */
public final class MockTwitterStreamScenario extends TweetsProducer {
	// Logger statique pour tracer les événements et erreurs durant l'exécution
	// Utilisé pour enregistrer l'avancement du scénario et les interruptions
	private static final java.util.logging.Logger logger = java.util.logging.Logger.getLogger(MockTwitterStreamScenario.class.getName());

	/**
	 * Constructeur du producteur de scénario
	 * 
	 * @param brokers Adresse des serveurs Kafka (bootstrap servers) où les tweets seront publiés
	 *                Format: "localhost:9092" ou "kafka-broker-1:9092,kafka-broker-2:9092"
	 */
	public MockTwitterStreamScenario(String brokers) {
		super(brokers);  // Initialise le producteur parent avec l'adresse des serveurs Kafka
	}

	/**
	 * Méthode run() - Exécute le scénario de test complet
	 * 
	 * PROCESSUS:
	 * 1. Crée un tableau de 10 tweets pré-définis
	 * 2. Configure chaque tweet avec ses propriétés (ID, texte, auteur, date, langue, etc.)
	 * 3. Publie tous les tweets dans le topic Kafka
	 * 4. Enregistre les résultats dans les logs
	 * 5. Maintient le conteneur actif indéfiniment
	 * 
	 * STRUCTURE DES TWEETS:
	 * Chaque tweet contient: ID unique, date/heure précise, contenu textuel,
	 * ID d'auteur, ID de conversation, langue et hashtags spécifiques
	 */
	@Override
	public void run() {
		// === CRÉATION DU TABLEAU DE TWEETS ===
		// Initialise un tableau de 10 éléments pour stocker les tweets du scénario
		// Le nombre 10 est choisi pour couvrir plusieurs cas de test différents
		Tweet[] tweets = new Tweet[10];
		
		// === TWEET #1 - EN FRANÇAIS SUR LE TRAVAIL ===
		// Crée le premier tweet du scénario
		tweets[0] = new Tweet();
		
		// ID unique du tweet pour l'identifier dans le système
		tweets[0].setId("001");
		
		// Date et heure précise de création: 20 juin 2021 à 11h46:23 UTC
		// Utilisée pour trier chronologiquement et analyser les tendances temporelles
		tweets[0].setCreatedAt(OffsetDateTime.of(2021, 6, 20, 11, 46, 23, 0, ZoneOffset.UTC));
		
		// Texte du tweet contenant le hashtag #travail (travail en français)
		// Ce hashtag sera extrait et compté par le système de comptage de hashtags
		tweets[0].setText("Choisissez un #travail que vous aimez et vous n'aurez pas à travailler un seul jour de votre vie");
		
		// ID de l'auteur qui a écrit ce tweet (l'auteur est l'utilisateur 31)
		tweets[0].setAuthorId("31");
		
		// ID de conversation: groupe de tweets liés (conversation 01 avec plusieurs tweets)
		tweets[0].setConversationId("01");
		
		// Récupère les informations géographiques du tweet (optionnel, peut être null)
		tweets[0].getGeo();
		
		// Langue du tweet: FR pour French (français)
		// Important pour le filtrage par langue et l'analyse linguistique
		tweets[0].setLang("fr");
		
		// === TWEET #2 - EN FRANÇAIS, MÊME CONVERSATION QUE TWEET #1 ===
		// Crée le deuxième tweet du même auteur (31) dans la même conversation (01)
		tweets[1] = new Tweet();
		tweets[1].setId("002");
		tweets[1].setCreatedAt(OffsetDateTime.of(2021, 6, 20, 11, 48, 20, 0, ZoneOffset.UTC));
		tweets[1].setText("Si on travaille pour gagner sa vie, pourquoi se tuer au #travail ?");
		tweets[1].setAuthorId("31");  // Même auteur que tweet #0
		tweets[1].setConversationId("01");  // Même conversation pour montrer un thread/fil de discussion
		tweets[1].getGeo();
		tweets[1].setLang("fr");
		
		// === TWEET #3 - EN ANGLAIS AVEC HASHTAGS #Failure ET #success ===
		// Nouvel auteur (32), mais même conversation (01), langue anglaise
		// Teste le comptage de multiples hashtags dans un même tweet
		tweets[2] = new Tweet();
		tweets[2].setId("003");
		tweets[2].setCreatedAt(OffsetDateTime.of(2022, 6, 24, 6, 6, 13, 0, ZoneOffset.UTC));
		tweets[2].setText("#Failure is not the opposite of #success: it's part of success.");
		tweets[2].setAuthorId("32");  // Auteur différent
		tweets[2].setConversationId("01");
		tweets[2].getGeo();
		tweets[2].setLang("en");  // Langue anglaise

		// === TWEET #4 - EN ANGLAIS AVEC HASHTAG #work ===
		// Auteur 34, toujours dans la conversation 01, contient le hashtag #work
		tweets[3] = new Tweet();
		tweets[3].setId("004");
		tweets[3].setCreatedAt(OffsetDateTime.of(2022, 7, 2, 4, 44, 17, 0, ZoneOffset.UTC));
		tweets[3].setText("You are not your resume, you are your #work.");
		tweets[3].setAuthorId("34");
		tweets[3].setConversationId("01");
		tweets[3].getGeo();
		tweets[3].setLang("en");

		// === TWEET #5 - EN ANGLAIS, NOUVELLE CONVERSATION ===
		// Auteur 35, nouvelle conversation (03), pas de hashtag spécifique
		// Teste la gestion des tweets sans les hashtags principaux (#work, #success, #failure)
		tweets[4] = new Tweet();
		tweets[4].setId("005");
		tweets[4].setCreatedAt(OffsetDateTime.of(2022, 8, 20, 10, 49, 23, 0, ZoneOffset.UTC));
		tweets[4].setText("People who wonder if the glass is half empty or half full miss the point. The glass is refillable.");
		tweets[4].setAuthorId("35");
		tweets[4].setConversationId("03");  // Conversation différente
		tweets[4].getGeo();
		tweets[4].setLang("en");

		// === TWEET #6 - EN ANGLAIS, CONVERSATION 04 ===
		// Auteur 33, conversation 04, sans hashtag ciblé
		tweets[5] = new Tweet();
		tweets[5].setId("006");
		tweets[5].setCreatedAt(OffsetDateTime.of(2022, 9, 20, 11, 23, 31, 0, ZoneOffset.UTC));
		tweets[5].setText("If you think you are too small to make a difference, try sleeping with a mosquito.");
		tweets[5].setAuthorId("33");
		tweets[5].setConversationId("04");  // Nouvelle conversation
		tweets[5].getGeo();
		tweets[5].setLang("en");

		// === TWEET #7 - EN ANGLAIS AVEC HASHTAG #work, CONVERSATION 03 ===
		// Auteur 34 (déjà vu avant), conversation 03 (partagée avec tweet #5)
		// Teste l'incrémentation du compteur #work
		tweets[6] = new Tweet();
		tweets[6].setId("007");
		tweets[6].setCreatedAt(OffsetDateTime.of(2022, 11, 30, 2, 15, 0, 0, ZoneOffset.UTC));
		tweets[6].setText("Nothing will #work unless you do.");
		tweets[6].setAuthorId("34");
		tweets[6].setConversationId("03");  // Retour à la conversation 03
		tweets[6].getGeo();
		tweets[6].setLang("en");

		// === TWEET #8 - EN ANGLAIS, CONVERSATION 05 ===
		// Auteur 35, conversation 05 (nouvelle), pas de hashtag ciblé
		tweets[7] = new Tweet();
		tweets[7].setId("008");
		tweets[7].setCreatedAt(OffsetDateTime.of(2022, 12, 1, 8, 30, 20, 0, ZoneOffset.UTC));
		tweets[7].setText("If you get tired, learn to rest, not to quit.");
		tweets[7].setAuthorId("35");
		tweets[7].setConversationId("05");  // Nouvelle conversation
		tweets[7].getGeo();
		tweets[7].setLang("en");

		// === TWEET #9 - EN ANGLAIS AVEC #Failure ET #success, CONVERSATION 02 ===
		// Auteur 33, conversation 02, contient deux hashtags cibles (#Failure, #success)
		// Teste la détection de plusieurs hashtags différents dans un seul tweet
		tweets[8] = new Tweet();
		tweets[8].setId("009");
		tweets[8].setCreatedAt(OffsetDateTime.of(2023, 1, 2, 7, 55, 56, 0, ZoneOffset.UTC));
		tweets[8].setText("#Failure is #success in progress.");
		tweets[8].setAuthorId("33");
		tweets[8].setConversationId("02");  // Conversation 02
		tweets[8].getGeo();
		tweets[8].setLang("en");

		// === TWEET #10 - EN ANGLAIS AVEC #success ET #work, CONVERSATION 07 ===
		// Auteur 36, conversation 07 (dernière conversation), deux hashtags (#success, #work)
		// Tweet final du scénario
		tweets[9] = new Tweet();
		tweets[9].setId("010");
		tweets[9].setCreatedAt(OffsetDateTime.of(2023, 8, 3, 12, 40, 25, 0, ZoneOffset.UTC));
		tweets[9].setText("The only place #success comes before #work is in the dictionary.");
		tweets[9].setAuthorId("36");  // Nouvel auteur
		tweets[9].setConversationId("07");  // Nouvelle conversation
		tweets[9].getGeo();
		tweets[9].setLang("en");

		// === PUBLICATION DE TOUS LES TWEETS ===
		// Boucle qui itère sur tous les 10 tweets du tableau
		// publishTweet() est une méthode héritée du parent qui envoie chaque tweet à Kafka
		for (Tweet tweet : tweets) {
			// Publie le tweet courant dans le topic Kafka configuré
			publishTweet(tweet);
		}
		
	// === ENREGISTREMENT DES RÉSULTATS ===
	// Enregistre dans les logs que tous les tweets ont été publiés avec succès
	logger.info("Scenario completed - all 10 tweets published.");
	
	// Indique que le producteur a terminé son travail mais reste actif
	logger.info("Producer finished. Container will remain active.");
	
	// === MAINTIEN DU CONTENEUR EN VIE ===
	// Appelle la méthode keepContainerAlive() pour exécuter une boucle infinie
	// Cela empêche le programme de terminer et le conteneur Docker de s'arrêter
	keepContainerAlive();
	
	// NOTE: close() n'est jamais appelée pour garder le conteneur en exécution
	// Si close() était appelée, les ressources Kafka seraient libérées et le conteneur s'arrêterait
	}
	
	/**
	 * Méthode keepContainerAlive() - Maintient le conteneur Docker actif
	 * 
	 * OBJECTIF ARCHITECTURAL:
	 * Dans une architecture containerisée (Docker/Kubernetes), si le processus principal
	 * se termine, le conteneur s'arrête automatiquement. Cependant, nous voulons que le
	 * producteur Kafka reste connecté au cluster Kafka de manière permanente pour:
	 * - Maintenir la connexion avec le broker Kafka
	 * - Garder les ressources réseau ouvertes
	 * - Assurer une réactivité instantanée en cas de redémarrage du système
	 * 
	 * SOLUTION:
	 * Une boucle infinie qui dort 60 secondes à chaque itération garde le thread vivant
	 * sans consommer beaucoup de ressources CPU (le thread dort, il n'utilise pas le CPU)
	 * 
	 * NOTE DE SÉCURITÉ:
	 * L'annotation @SuppressWarnings("java:S2189") supprime l'avertissement SonarQube
	 * qui détecte les boucles infinies. C'est intentionnel et nécessaire ici.
	 */
	@SuppressWarnings("java:S2189")  // Infinite loop is intentional to keep container running
	private void keepContainerAlive() {
		try {
			// Boucle infinie - continue indéfiniment
			while (true) {
				// Fait dormir le thread pendant 60 000 millisecondes = 60 secondes
				// Pendant cette période, le thread ne consomme pratiquement pas de CPU (état "sleep")
				// Le thread sera réveillé automatiquement après 60 secondes ou en cas d'interruption
				Thread.sleep(60000); // Sleep forever to keep container running
			}
		} catch (InterruptedException e) {
			// Capture l'exception si le thread est interrompu de l'extérieur
			// Cela peut survenir lors d'un arrêt gracieux du conteneur Docker
			logger.warning("Producer interrupted");  // Enregistre l'interruption
			
			// Restaure l'état d'interruption du thread pour que le code appelant le sache
			// C'est une bonne pratique en Java quand on catch une InterruptedException
			Thread.currentThread().interrupt();
		}
	}
}