MockTwitterStreamRecorded.java

package tweetoscope.tweetsProducer;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.time.OffsetDateTime;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonArray;
import com.google.gson.JsonDeserializer;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.twitter.clientlib.model.Tweet;

/**
 * Classe MockTwitterStreamRecorded
 * 
 * Cette classe simule un flux de tweets en lisant des tweets pré-enregistrés 
 * depuis un fichier JSON. Elle est particulièrement utile lorsque les limites 
 * de taux de l'API Twitter sont dépassées ou pour tester le système avec 
 * des données contrôlées et reproductibles.
 * 
 * Fonctionnalité principale:
 * - Lit un fichier JSON contenant des tweets enregistrés
 * - Parse les tweets en utilisant la bibliothèque GSON
 * - Publie les tweets dans un topic Kafka
 * - Maintient le conteneur actif après la publication
 * 
 * @author Virginie Galtier
 */
public final class MockTwitterStreamRecorded extends TweetsProducer {
	// Logger statique utilisé pour tracer les événements et les erreurs de cette classe
	// permet de suivre le cycle de vie du producteur et diagnostiquer les problèmes
	private static final java.util.logging.Logger logger = java.util.logging.Logger.getLogger(MockTwitterStreamRecorded.class.getName());

	/**
	 * Attribut fileName : chemin vers le fichier JSON contenant les tweets pré-enregistrés
	 * Ce fichier doit être au format JSON avec un tableau "tweets" contenant des objets Tweet
	 */
	protected String fileName;

	/**
	 * Constructeur du producteur MockTwitterStreamRecorded
	 * 
	 * @param brokers Adresse des serveurs Kafka (bootstrap servers) où les tweets seront publiés
	 *                Format typique: "localhost:9092" ou "kafka1:9092,kafka2:9092,kafka3:9092"
	 * @param fileName Chemin absolu ou relatif du fichier JSON contenant les tweets pré-enregistrés
	 *                 Le fichier doit contenir un objet JSON avec un tableau nommé "tweets"
	 */
	public MockTwitterStreamRecorded(String brokers, String fileName) {
		super(brokers);  // Appelle le constructeur parent avec les serveurs Kafka
		this.fileName = fileName;  // Stocke le chemin du fichier pour utilisation ultérieure
	}

	/**
	 * Méthode run() - Point d'entrée principal du producteur
	 * 
	 * Cette méthode exécute le processus suivant:
	 * 1. Lit le fichier JSON contenant les tweets enregistrés
	 * 2. Configure GSON pour parser correctement les dates OffsetDateTime
	 * 3. Parcourt chaque tweet et le publie dans le topic Kafka
	 * 4. Enregistre le nombre de tweets publiés
	 * 5. Maintient le conteneur actif en boucle infinie
	 * 
	 * Gestion des erreurs: Si le fichier n'existe pas ou est inaccessible,
	 * une exception IOException est capturée et enregistrée dans les logs
	 */
	@Override
	public void run() {
		try {
			// === ÉTAPE 1: Lecture du fichier JSON ===
			// Lit l'intégralité du contenu du fichier comme chaîne de caractères
			// Files.readAllBytes() charge le fichier en mémoire et le convertit en String
			String jsonString = new String(Files.readAllBytes(Paths.get(fileName)));
			
			// === ÉTAPE 2: Configuration du parseur JSON (GSON) ===
			// Crée un constructeur GSON personnalisé pour gérer les types personnalisés
			// En particulier, configure la désérialisation des dates au format OffsetDateTime
			Gson gson = new GsonBuilder()
					// Enregistre un désérialiseur personnalisé pour OffsetDateTime
					// Cela permet à GSON de convertir les chaînes de date JSON en objets OffsetDateTime
					// Les dates JSON sont typiquement au format ISO-8601: "2023-11-25T10:30:00Z"
					.registerTypeAdapter(OffsetDateTime.class, (JsonDeserializer<OffsetDateTime>) (json, type,
							context) -> OffsetDateTime.parse(json.getAsString()))
					.create();  // Crée l'instance GSON configurée
			
			// === ÉTAPE 3: Parsing du JSON en objet structure ===
			// Convertit la chaîne JSON en objet JsonObject pour accéder à sa structure
			JsonObject jsonObject = gson.fromJson(jsonString, JsonObject.class);
			
			// === ÉTAPE 4: Extraction du tableau de tweets ===
			// Récupère le tableau "tweets" du fichier JSON
			// Par exemple: {"tweets": [{...}, {...}, ...]}
			JsonArray prerecordedtweets = jsonObject.getAsJsonArray("tweets");
			
			// === ÉTAPE 5: Publication des tweets ===
			// Compteur pour suivre le nombre de tweets publiés
			int count = 0;
			
			// Itération sur chaque élément du tableau de tweets
			for (JsonElement je : prerecordedtweets) {
				// Convertit chaque élément JSON en objet Tweet de l'API Twitter
				Tweet tweet = gson.fromJson(je, Tweet.class);
				
				// Publie le tweet dans le topic Kafka via la méthode héritée
				publishTweet(tweet);
				
				// Incrémente le compteur
				count++;
			}
			
			// === ÉTAPE 6: Enregistrement des résultats ===
			// Enregistre dans les logs le nombre de tweets publiés et le fichier utilisé
			logger.info(String.format("Recorded tweets published - %d tweets from %s", count, fileName));
			logger.info("Producer finished. Container will remain active.");
			
			// === ÉTAPE 7: Maintien du conteneur en vie ===
			// Appelle une méthode qui maintient le producteur actif en permanence
			keepContainerAlive();
			
		} catch (IOException e) {
			// Capture les erreurs d'entrée/sortie (fichier non trouvé, permission refusée, etc.)
			// Enregistre l'erreur au niveau SEVERE dans les logs avec le message et la trace complète
			logger.log(java.util.logging.Level.SEVERE, "Failed to read recorded tweets file: " + fileName, e);
		}
	}
	
	/**
	 * Méthode keepContainerAlive() - Maintient le conteneur Docker actif
	 * 
	 * OBJECTIF:
	 * Cette méthode est essentielle dans une architecture containerisée (Docker/Kubernetes).
	 * Normalement, un processus qui se termine fermera le conteneur. Or, nous voulons que
	 * le conteneur du producteur Kafka reste actif pour rester connecté au cluster Kafka.
	 * 
	 * FONCTIONNEMENT:
	 * La méthode exécute une boucle infinie qui dort 60 secondes à chaque itération.
	 * Cela permet au thread de rester vivant sans consommer beaucoup de ressources CPU.
	 * 
	 * NOTE DE SÉCURITÉ:
	 * L'annotation @SuppressWarnings("java:S2189") supprime l'avertissement SonarQube
	 * qui détecte normalement les boucles infinies comme étant potentiellement problématiques.
	 * Ici, c'est intentionnel et nécessaire pour le bon fonctionnement de l'application.
	 */
	@SuppressWarnings("java:S2189")  // Infinite loop is intentional to keep container running
	private void keepContainerAlive() {
		try {
			// Boucle infinie - continue indéfiniment
			while (true) {
				// Fait dormir le thread pendant 60 000 millisecondes (60 secondes)
				// Cela libère les ressources CPU pendant l'attente, car le thread ne consomme pas de CPU lors du sleep
				Thread.sleep(60000); // Sleep forever to keep container running
			}
		} catch (InterruptedException e) {
			// Capture l'exception si le thread est interrompu de l'extérieur (ex: arrêt du conteneur)
			logger.warning("Producer interrupted");  // Enregistre un avertissement dans les logs
			
			// Restaure l'état d'interruption du thread
			// Cela permet au code appelant de déterminer que le thread a été interrompu
			Thread.currentThread().interrupt();
		}
	}
}