MockTwitterStreamRecorded.java
package tweetoscope.tweetsProducer;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.time.OffsetDateTime;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonArray;
import com.google.gson.JsonDeserializer;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.twitter.clientlib.model.Tweet;
/**
* Classe MockTwitterStreamRecorded
*
* Cette classe simule un flux de tweets en lisant des tweets pré-enregistrés
* depuis un fichier JSON. Elle est particulièrement utile lorsque les limites
* de taux de l'API Twitter sont dépassées ou pour tester le système avec
* des données contrôlées et reproductibles.
*
* Fonctionnalité principale:
* - Lit un fichier JSON contenant des tweets enregistrés
* - Parse les tweets en utilisant la bibliothèque GSON
* - Publie les tweets dans un topic Kafka
* - Maintient le conteneur actif après la publication
*
* @author Virginie Galtier
*/
public final class MockTwitterStreamRecorded extends TweetsProducer {
// Logger statique utilisé pour tracer les événements et les erreurs de cette classe
// permet de suivre le cycle de vie du producteur et diagnostiquer les problèmes
private static final java.util.logging.Logger logger = java.util.logging.Logger.getLogger(MockTwitterStreamRecorded.class.getName());
/**
* Attribut fileName : chemin vers le fichier JSON contenant les tweets pré-enregistrés
* Ce fichier doit être au format JSON avec un tableau "tweets" contenant des objets Tweet
*/
protected String fileName;
/**
* Constructeur du producteur MockTwitterStreamRecorded
*
* @param brokers Adresse des serveurs Kafka (bootstrap servers) où les tweets seront publiés
* Format typique: "localhost:9092" ou "kafka1:9092,kafka2:9092,kafka3:9092"
* @param fileName Chemin absolu ou relatif du fichier JSON contenant les tweets pré-enregistrés
* Le fichier doit contenir un objet JSON avec un tableau nommé "tweets"
*/
public MockTwitterStreamRecorded(String brokers, String fileName) {
super(brokers); // Appelle le constructeur parent avec les serveurs Kafka
this.fileName = fileName; // Stocke le chemin du fichier pour utilisation ultérieure
}
/**
* Méthode run() - Point d'entrée principal du producteur
*
* Cette méthode exécute le processus suivant:
* 1. Lit le fichier JSON contenant les tweets enregistrés
* 2. Configure GSON pour parser correctement les dates OffsetDateTime
* 3. Parcourt chaque tweet et le publie dans le topic Kafka
* 4. Enregistre le nombre de tweets publiés
* 5. Maintient le conteneur actif en boucle infinie
*
* Gestion des erreurs: Si le fichier n'existe pas ou est inaccessible,
* une exception IOException est capturée et enregistrée dans les logs
*/
@Override
public void run() {
try {
// === ÉTAPE 1: Lecture du fichier JSON ===
// Lit l'intégralité du contenu du fichier comme chaîne de caractères
// Files.readAllBytes() charge le fichier en mémoire et le convertit en String
String jsonString = new String(Files.readAllBytes(Paths.get(fileName)));
// === ÉTAPE 2: Configuration du parseur JSON (GSON) ===
// Crée un constructeur GSON personnalisé pour gérer les types personnalisés
// En particulier, configure la désérialisation des dates au format OffsetDateTime
Gson gson = new GsonBuilder()
// Enregistre un désérialiseur personnalisé pour OffsetDateTime
// Cela permet à GSON de convertir les chaînes de date JSON en objets OffsetDateTime
// Les dates JSON sont typiquement au format ISO-8601: "2023-11-25T10:30:00Z"
.registerTypeAdapter(OffsetDateTime.class, (JsonDeserializer<OffsetDateTime>) (json, type,
context) -> OffsetDateTime.parse(json.getAsString()))
.create(); // Crée l'instance GSON configurée
// === ÉTAPE 3: Parsing du JSON en objet structure ===
// Convertit la chaîne JSON en objet JsonObject pour accéder à sa structure
JsonObject jsonObject = gson.fromJson(jsonString, JsonObject.class);
// === ÉTAPE 4: Extraction du tableau de tweets ===
// Récupère le tableau "tweets" du fichier JSON
// Par exemple: {"tweets": [{...}, {...}, ...]}
JsonArray prerecordedtweets = jsonObject.getAsJsonArray("tweets");
// === ÉTAPE 5: Publication des tweets ===
// Compteur pour suivre le nombre de tweets publiés
int count = 0;
// Itération sur chaque élément du tableau de tweets
for (JsonElement je : prerecordedtweets) {
// Convertit chaque élément JSON en objet Tweet de l'API Twitter
Tweet tweet = gson.fromJson(je, Tweet.class);
// Publie le tweet dans le topic Kafka via la méthode héritée
publishTweet(tweet);
// Incrémente le compteur
count++;
}
// === ÉTAPE 6: Enregistrement des résultats ===
// Enregistre dans les logs le nombre de tweets publiés et le fichier utilisé
logger.info(String.format("Recorded tweets published - %d tweets from %s", count, fileName));
logger.info("Producer finished. Container will remain active.");
// === ÉTAPE 7: Maintien du conteneur en vie ===
// Appelle une méthode qui maintient le producteur actif en permanence
keepContainerAlive();
} catch (IOException e) {
// Capture les erreurs d'entrée/sortie (fichier non trouvé, permission refusée, etc.)
// Enregistre l'erreur au niveau SEVERE dans les logs avec le message et la trace complète
logger.log(java.util.logging.Level.SEVERE, "Failed to read recorded tweets file: " + fileName, e);
}
}
/**
* Méthode keepContainerAlive() - Maintient le conteneur Docker actif
*
* OBJECTIF:
* Cette méthode est essentielle dans une architecture containerisée (Docker/Kubernetes).
* Normalement, un processus qui se termine fermera le conteneur. Or, nous voulons que
* le conteneur du producteur Kafka reste actif pour rester connecté au cluster Kafka.
*
* FONCTIONNEMENT:
* La méthode exécute une boucle infinie qui dort 60 secondes à chaque itération.
* Cela permet au thread de rester vivant sans consommer beaucoup de ressources CPU.
*
* NOTE DE SÉCURITÉ:
* L'annotation @SuppressWarnings("java:S2189") supprime l'avertissement SonarQube
* qui détecte normalement les boucles infinies comme étant potentiellement problématiques.
* Ici, c'est intentionnel et nécessaire pour le bon fonctionnement de l'application.
*/
@SuppressWarnings("java:S2189") // Infinite loop is intentional to keep container running
private void keepContainerAlive() {
try {
// Boucle infinie - continue indéfiniment
while (true) {
// Fait dormir le thread pendant 60 000 millisecondes (60 secondes)
// Cela libère les ressources CPU pendant l'attente, car le thread ne consomme pas de CPU lors du sleep
Thread.sleep(60000); // Sleep forever to keep container running
}
} catch (InterruptedException e) {
// Capture l'exception si le thread est interrompu de l'extérieur (ex: arrêt du conteneur)
logger.warning("Producer interrupted"); // Enregistre un avertissement dans les logs
// Restaure l'état d'interruption du thread
// Cela permet au code appelant de déterminer que le thread a été interrompu
Thread.currentThread().interrupt();
}
}
}