TweetsProducer.java

package tweetoscope.tweetsProducer;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.TypeAdapter;
import com.google.gson.stream.JsonReader;
import com.google.gson.stream.JsonToken;
import com.google.gson.stream.JsonWriter;
import com.twitter.clientlib.model.Tweet;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.io.IOException;
import java.time.OffsetDateTime;
import java.util.Properties;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 * =====================================================
 * CLASSE ABSTRAITE TweetsProducer - Producteur Kafka
 * =====================================================
 * 
 * Cette classe abstraite est la BASE pour tous les producteurs de tweets.
 * Elle gère :
 * 1. La connexion au broker Kafka
 * 2. La sérialisation/désérialisation JSON des tweets
 * 3. L'envoi des tweets vers le topic "raw-tweets"
 * 
 * Architecture :
 * - Classe ABSTRAITE : ne peut pas être instanciée directement
 * - Implémente Runnable : peut être exécutée dans un thread
 * - Classe PARENTE pour les implémentations concrètes :
 *   * MockTwitterStreamRandom : tweets aléatoires
 *   * MockTwitterStreamRecorded : tweets depuis un fichier
 *   * MockTwitterStreamScenario : tweets d'un scénario
 * 
 * Pipeline :
 *   TweetsProducer (classe abstraite)
 *        ↑
 *        └─ Crée des tweets (implémentations concrètes)
 *             ↓
 *   publishTweet() : sérialize en JSON
 *             ↓
 *   KafkaProducer : envoie vers Kafka
 *             ↓
 *   Topic "raw-tweets" : reçoit les tweets bruts
 *             ↓
 *   TweetFilter : lit et filtre
 */
public abstract class TweetsProducer implements Runnable {
    // Logger pour tracer l'exécution, les erreurs et les événements
    private static final Logger logger = Logger.getLogger(TweetsProducer.class.getName());
    
    /**
     * CONSTANTE : TOPIC
     * 
     * Le nom du topic Kafka où seront envoyés les tweets bruts.
     * - "raw-tweets" = topic source pour tous les tweets non filtrés
     * - Les tweets bruts arrivent ici depuis le producteur
     * - Les filtres lisent depuis ce topic et les redistribuent
     * 
     * Exemple de flow :
     *   TweetsProducer → "raw-tweets" (topic) → TweetFilter
     */
    private static final String TOPIC = "raw-tweets";
    
    /**
     * VARIABLE MEMBRE : producer
     * 
     * Client Kafka PRODUCTEUR qui envoie les messages.
     * 
     * Rôle :
     * - Établit la connexion au broker Kafka
     * - Envoie les tweets au topic "raw-tweets"
     * - Gère les callbacks (succès/erreur de livraison)
     * 
     * Type :
     * - KafkaProducer<String, String>
     * - Clé : String (identifiant du message, peut être null)
     * - Valeur : String (JSON du tweet)
     * 
     * Sérialisation :
     * - Les clés et valeurs sont sérialisées en strings
     * - StringSerializer convertit les objets en bytes
     */
    protected final KafkaProducer<String, String> producer;
    
    /**
     * VARIABLE MEMBRE : gson
     * 
     * Outil de sérialisation/désérialisation JSON.
     * 
     * Rôle :
     * - Convertit Tweet → String JSON (pour Kafka)
     * - Convertit String JSON → Tweet (pour lecture)
     * - Gère les types complexes comme OffsetDateTime
     * 
     * Adaptateur personnalisé :
     * - OffsetDateTime nécessite un adapter Gson custom
     * - Car Gson ne sait pas nativement gérer les OffsetDateTime
     */
    private final Gson gson;

    /**
     * =====================================================
     * CONSTRUCTEUR : Initialise le producteur Kafka
     * =====================================================
     * 
     * Étapes d'initialisation :
     * 1. Configure les propriétés Kafka (broker, sérializers)
     * 2. Crée le KafkaProducer
     * 3. Configure Gson avec adapter OffsetDateTime
     * 
     * @param brokers Adresse du broker Kafka (ex: "localhost:9092" ou "kafka:9092")
     */
    public TweetsProducer(String brokers) {
        // ========== ÉTAPE 1 : Configuration du Producer Kafka ==========
        Properties props = new Properties();
        
        // Serveur Kafka à contacter
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
        
        // Sérialisation des CLÉS : String → bytes
        // Les clés identifient les messages (peuvent être null)
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
                  "org.apache.kafka.common.serialization.StringSerializer");
        
        // Sérialisation des VALEURS : String (JSON) → bytes
        // Les valeurs contiennent les tweets sérialisés en JSON
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
                  "org.apache.kafka.common.serialization.StringSerializer");
        
        // ========== ÉTAPE 2 : Création du KafkaProducer ==========
        // Initialise le producteur avec les propriétés
        this.producer = new KafkaProducer<>(props);

        // ========== ÉTAPE 3 : Configuration de Gson ==========
        // GsonBuilder permet de configurer Gson avec des adaptateurs custom
        this.gson = new GsonBuilder()
                // Enregistre un adapter pour OffsetDateTime
                // Car Gson ne sait pas nativement gérer ce type
                .registerTypeAdapter(OffsetDateTime.class, new TypeAdapter<OffsetDateTime>() {
                    
                    /**
                     * SÉRIALISATION : OffsetDateTime → JSON
                     * Convertit une date en string au format ISO-8601
                     */
                    @Override
                    public void write(JsonWriter out, OffsetDateTime value) throws IOException {
                        // Cas spécial : si la date est null
                        if (value == null) {
                            out.nullValue();
                        } else {
                            // Convertit la date en string ISO-8601
                            // Exemple : "2023-11-25T14:30:00+02:00"
                            out.value(value.toString());
                        }
                    }

                    /**
                     * DÉSÉRIALISATION : JSON → OffsetDateTime
                     * Convertit un string JSON en OffsetDateTime
                     */
                    @Override
                    public OffsetDateTime read(JsonReader in) throws IOException {
                        // Vérifie si la valeur JSON est null
                        if (in.peek() == JsonToken.NULL) {
                            in.nextNull();
                            return null;
                        } else {
                            // Parse la string au format ISO-8601 en OffsetDateTime
                            return OffsetDateTime.parse(in.nextString());
                        }
                    }
                })
                // Finalise la configuration Gson
                .create();
    }

    /**
     * =====================================================
     * MÉTHODE publishTweet() - Envoie un tweet à Kafka
     * =====================================================
     * 
     * C'est la méthode clé qui publie les tweets.
     * 
     * Flux complet :
     * 1. Valide que le tweet n'est pas null
     * 2. Sérialise le tweet en JSON
     * 3. Crée un ProducerRecord (message Kafka)
     * 4. Envoie le message via producer.send()
     * 5. Gère la réponse (succès ou erreur) via callback
     * 
     * Asynchrone :
     * - producer.send() retourne immédiatement
     * - Le callback s'exécute quand Kafka répond
     * - Non-bloquant = bonne performance
     * 
     * @param tweet L'objet Tweet à publier
     */
    protected void publishTweet(Tweet tweet) {
        // ========== ÉTAPE 1 : Validation du tweet ==========
        // Vérifie que le tweet n'est pas null (données invalides)
        if (tweet == null) {
            logger.warning("Received a null tweet. Skipping.");
            return;  // Arrête la publication
        }

        // ========== ÉTAPE 2 : Sérialisation JSON ==========
        // Convertit l'objet Tweet en string JSON
        // Exemple :
        //   Tweet(id=123, text="Hello world", ...)
        //   ↓ gson.toJson()
        //   {"id":"123", "text":"Hello world", ...}
        String jsonTweet = gson.toJson(tweet);
        
        // ========== ÉTAPE 3 : Création du message Kafka ==========
        // ProducerRecord = message à envoyer à Kafka
        // Format : ProducerRecord<Clé, Valeur>
        // - TOPIC : "raw-tweets" (destination)
        // - null : pas de clé spécifique (Kafka choisit une partition au hasard)
        // - jsonTweet : le JSON du tweet
        ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, jsonTweet);

        // ========== ÉTAPE 4 : Envoi ASYNCHRONE ==========
        // producer.send() envoie le message et retourne immédiatement
        // Le callback s'exécute quand Kafka répond
        producer.send(record, (metadata, exception) -> {
            // ===== CALLBACK : Réponse de Kafka =====
            
            if (exception != null) {
                // ERREUR : Kafka n'a pas pu livrer le message
                logger.log(Level.SEVERE, "Failed to publish tweet", exception);
            } else {
                // SUCCÈS : Tweet livré à Kafka
                // metadata contient les infos :
                // - partition : la partition Kafka utilisée
                // - offset : la position dans la partition
                // - timestamp : quand Kafka a reçu le message
                logger.info("Published tweet: " + tweet.getText());
            }
        });
    }

    /**
     * =====================================================
     * MÉTHODE close() - Arrête le producteur
     * =====================================================
     * 
     * Ferme proprement la connexion Kafka.
     * 
     * Nettoyage :
     * - Flush les messages en attente
     * - Ferme la connexion au broker
     * - Libère les ressources
     * 
     * Utilisation :
     * - Appeler avant d'arrêter le programme
     * - Garantit que tous les messages sont envoyés
     * - Évite les fuites de ressources
     * 
     * Exemple :
     *   producer.close();  // Arrête proprement
     */
    public void close() {
        producer.close();
    }

    /**
     * =====================================================
     * MÉTHODE ABSTRAITE run() - À implémenter par les sous-classes
     * =====================================================
     * 
     * Chaque sous-classe DOIT implémenter run() pour :
     * 1. Générer les tweets (aléatoires, fichier, scénario, etc.)
     * 2. Appeler publishTweet() pour envoyer à Kafka
     * 3. Gérer la boucle de production (infinie ou finie)
     * 
     * Implémentations :
     * - MockTwitterStreamRandom : tweets aléatoires en boucle
     * - MockTwitterStreamRecorded : tweets d'un fichier
     * - MockTwitterStreamScenario : tweets d'un scénario
     * 
     * Pattern :
     * - Interface Runnable : permet l'exécution dans un thread
     * - Abstract : force les sous-classes à implémenter
     */
    @Override
    public abstract void run();

    /**
     * =====================================================
     * MÉTHODE main() - Point d'entrée du programme
     * =====================================================
     * 
     * Fonction principale qui lance le service de production de tweets.
     * 
     * Flux :
     * 1. Valide les arguments en ligne de commande
     * 2. Lit la source de tweets (variable d'environnement ou argument)
     * 3. Sélectionne l'implémentation appropriée
     * 4. Lance la production (bloquant)
     * 
     * Arguments :
     * - arg[0] : bootstrap-servers (OBLIGATOIRE, ex: "kafka:9092")
     * - arg[1] : source de tweets (OPTIONNEL, ex: "random", "recorded")
     * 
     * Variable d'environnement :
     * - TWEETS_SOURCE : source par défaut si arg[1] absent
     *   Valeurs : "random", "recorded", "scenario", "mini", "large"
     * 
     * @param args Arguments en ligne de commande
     */
    public static void main(String[] args) {
        // ========== ÉTAPE 1 : Validation des arguments ==========
        if (args.length < 1) {
            // Erreur : bootstrap-servers est obligatoire
            logger.severe("Usage: TweetsProducer <bootstrap-servers> [source]");
            System.exit(1);
        }

        // Récupère l'adresse du broker Kafka
        String bootstrapServers = args[0];
        
        // Récupère la source de tweets :
        // - Soit depuis l'argument arg[1]
        // - Soit depuis la variable d'environnement TWEETS_SOURCE
        // - Soit "random" par défaut
        String source = args.length > 1 ? args[1] : System.getenv().getOrDefault("TWEETS_SOURCE", "random");

        // ========== ÉTAPE 2 : Logs d'initialisation ==========
        logger.info("Starting TweetsProducer service");
        logger.info(() -> "Bootstrap servers: " + bootstrapServers);
        logger.info(() -> "Tweet source: " + source);
        
        try {
            // ========== ÉTAPE 3 : Sélection de l'implémentation ==========
            TweetsProducer producer;
            
            // Switch sur la source de tweets
            // Chaque cas sélectionne une implémentation différente
            switch (source.toLowerCase()) {
                
                // ===== CAS 1 : Tweets d'un scénario =====
                case "scenario":
                    logger.info("Using MockTwitterStreamScenario (scenario-based tweets)");
                    // MockTwitterStreamScenario génère des tweets basés sur un scénario
                    producer = new MockTwitterStreamScenario(bootstrapServers);
                    break;
                    
                // ===== CAS 2 : Tweets enregistrés (mini test base) =====
                case "recorded":
                case "mini":
                    logger.info("Using MockTwitterStreamRecorded (mini test base)");
                    // MockTwitterStreamRecorded lit depuis un fichier texte
                    // "TestBases/miniTestBase.txt" = petit ensemble de tweets pour les tests
                    producer = new MockTwitterStreamRecorded(bootstrapServers, "TestBases/miniTestBase.txt");
                    break;
                    
                // ===== CAS 3 : Tweets enregistrés (large test base) =====
                case "large":
                    logger.info("Using MockTwitterStreamRecorded (large test base)");
                    // "TestBases/largeTestBase.txt" = grand ensemble de tweets pour les tests
                    producer = new MockTwitterStreamRecorded(bootstrapServers, "TestBases/largeTestBase.txt");
                    break;
                    
                // ===== CAS 4 : Tweets aléatoires (par défaut) =====
                case "random":
                default:
                    logger.info("Using MockTwitterStreamRandom (random tweets)");
                    // MockTwitterStreamRandom génère des tweets aléatoires en continu
                    producer = new MockTwitterStreamRandom(bootstrapServers);
                    break;
            }
            
            // ========== ÉTAPE 4 : Logs de confirmation ==========
            logger.info("Producer service initialized");
            logger.info("Publishing to topic: raw-tweets");
            logger.info("Running in headless mode");
            
            // ========== ÉTAPE 5 : Lancement de la production ==========
            // producer.run() bloque indéfiniment jusqu'à Ctrl+C ou exception
            // Elle génère continuellement des tweets et les envoie à Kafka
            producer.run();
            
        } catch (Exception e) {
            // En cas d'erreur critique lors de l'initialisation
            logger.log(Level.SEVERE, "Producer service failed", e);
            System.exit(1);
        }
    }
}