TweetFilter.java

package tweetoscope.tweetsFilter;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonSyntaxException;
import com.google.gson.TypeAdapter;
import com.google.gson.stream.JsonReader;
import com.google.gson.stream.JsonToken;
import com.google.gson.stream.JsonWriter;
import com.twitter.clientlib.model.Tweet;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.io.IOException;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 * =====================================================
 * CLASSE TWEETFILTER - Filtrage en temps réel de tweets
 * =====================================================
 * 
 * Cette classe est responsable de :
 * 1. Consommer des tweets bruts depuis Kafka (topic "raw-tweets")
 * 2. Appliquer une chaîne de filtres configurables (langue, date, etc.)
 * 3. Publier les tweets filtrés vers Kafka (topic "filtered-tweets")
 * 4. Suivre les statistiques (tweets traités, filtrés, publiés)
 * 
 * Architecture :
 * - Utilise un pattern Strategy pour les filtres (interface FilterCondition)
 * - Communique avec Kafka en tant que producteur/consommateur
 * - Gère la sérialisation/désérialisation JSON avec Gson
 * - Supporte plusieurs types de filtres : langue, date, etc.
 */
public class TweetFilter {

    // Logger pour tracer l'exécution et les erreurs
    private static final Logger logger = Logger.getLogger(TweetFilter.class.getName());

    /**
     * CONSTANTES KAFKA :
     * Ces constantes définissent les noms des topics Kafka utilisés.
     * - INPUT_TOPIC : Topic source contenant les tweets bruts non filtrés
     * - OUTPUT_TOPIC : Topic destination pour les tweets après filtrage
     */
    private static final String INPUT_TOPIC = "raw-tweets";
    private static final String OUTPUT_TOPIC = "filtered-tweets";
    
    /**
     * VARIABLES MEMBRES :
     * - consumer : Lecteur Kafka qui récupère les tweets bruts du topic INPUT_TOPIC
     * - producer : Écrivain Kafka qui envoie les tweets filtrés au topic OUTPUT_TOPIC
     * - gson : Outil de sérialisation/désérialisation JSON pour convertir strings <-> objets Tweet
     * - filterChain : Liste chaînable de filtres appliqués séquentiellement à chaque tweet
     * - running : Drapeau volatile pour arrêter proprement la boucle de traitement
     */
    private final KafkaConsumer<String, String> consumer;
    private final KafkaProducer<String, String> producer;
    private final Gson gson;
    private final List<FilterCondition> filterChain;
    private volatile boolean running = true;

    /**
     * =====================================================
     * CLASSE INTERNE : OffsetDateTimeAdapter
     * =====================================================
     * 
     * Adaptateur personnalisé pour Gson qui gère la sérialisation/désérialisation
     * des objets OffsetDateTime (dates/heures avec fuseau horaire).
     * 
     * Pourquoi ? Gson ne sait pas nativement gérer les OffsetDateTime de Java.
     * Cet adaptateur :
     * - Convertit OffsetDateTime -> String JSON (format ISO-8601)
     * - Convertit String JSON -> OffsetDateTime (parsing ISO-8601)
     * 
     * Exemple :
     *   Java: OffsetDateTime.parse("2023-11-25T14:30:00+01:00")
     *   JSON: "2023-11-25T14:30:00+01:00"
     */
    private static class OffsetDateTimeAdapter extends TypeAdapter<OffsetDateTime> {
        
        /**
         * SÉRIALISATION (Java -> JSON)
         * Convertit un objet OffsetDateTime en string JSON.
         * Cas particulier : Si la valeur est null, écrit null en JSON.
         */
        @Override
        public void write(JsonWriter out, OffsetDateTime value) throws IOException {
            if (value == null) {
                out.nullValue();
                return;
            }
            // Convertit la date en format ISO-8601 standard (e.g., "2023-11-25T14:30:00+01:00")
            out.value(value.toString());
        }

        /**
         * DÉSÉRIALISATION (JSON -> Java)
         * Convertit un string JSON en objet OffsetDateTime.
         * Cas particulier : Si la valeur JSON est null, retourne null en Java.
         */
        @Override
        public OffsetDateTime read(JsonReader in) throws IOException {
            // Vérifie si la valeur JSON est null
            if (in.peek() == JsonToken.NULL) {
                in.nextNull();
                return null;
            }
            // Parse la string JSON au format ISO-8601 pour créer un OffsetDateTime
            return OffsetDateTime.parse(in.nextString());
        }
    }

    /**
     * =====================================================
     * INTERFACE FilterCondition - Pattern Strategy
     * =====================================================
     * 
     * Interface qui définit le contrat pour tous les filtres.
     * Elle permet d'implémenter différents types de filtres de manière modulaire.
     * 
     * Pattern utilisé : STRATEGY
     * - Chaque filtre implémente la même interface match()
     * - Les filtres peuvent être combinés dans une chaîne (filterChain)
     * - Facile d'ajouter de nouveaux filtres sans modifier cette classe
     * 
     * Filtres disponibles :
     * - EmptyTweetFilter : accepte tous les tweets (filtre neutre)
     * - LangTweetFilter : filtre par langue (ex: "en", "fr")
     * - RecentTweetFilter : filtre par année (tweets >= année spécifiée)
     */
    public interface FilterCondition {
        /**
         * Teste si un tweet satisfait les conditions du filtre.
         * 
         * @param tweet Le tweet à examiner
         * @return true si le tweet passe le filtre (satisfait les conditions)
         *         false si le tweet est rejeté (ne satisfait pas les conditions)
         * 
         * Exemple avec LangTweetFilter("en") :
         *   - match(tweet_anglais) -> true (le tweet est en anglais)
         *   - match(tweet_francais) -> false (le tweet n'est pas en anglais)
         */
        boolean match(Tweet tweet);
    }

    /**
     * =====================================================
     * CONSTRUCTEUR 1 : Pour les tests unitaires (Mock)
     * =====================================================
     * 
     * Ce constructeur est utilisé UNIQUEMENT pour les tests JUnit.
     * Il initialise TweetFilter sans Kafka (mode test/hors ligne).
     * 
     * Avantages :
     * - Évite de lancer un broker Kafka lors des tests
     * - Permet de tester la logique de filtrage isolément
     * - Facilite les tests unitaires purs
     * 
     * @param filterChain Liste de filtres à appliquer (peut être null)
     */
    //Mock-->Constructeur pour Junit tests
    public TweetFilter(List<FilterCondition> filterChain) {
        // Pas de connexion Kafka en mode test
        this.consumer = null;  // pas de Kafka
        this.producer = null;  // pas de Kafka
        // Initialise la chaîne de filtres (null safe)
        this.filterChain = filterChain != null ? filterChain : new ArrayList<>();
        this.gson = null;
    }

    /**
     * =====================================================
     * CONSTRUCTEUR 2 : Pour l'environnement de production
     * =====================================================
     * 
     * Ce constructeur initialise TweetFilter avec connexion Kafka complète.
     * Il configure le consommateur, le producteur et la sérialisation JSON.
     * 
     * Étapes d'initialisation :
     * 1. Configuration du consumer Kafka (lecture depuis raw-tweets)
     * 2. Configuration du producer Kafka (écriture vers filtered-tweets)
     * 3. Configuration de Gson avec adapter pour OffsetDateTime
     * 4. Stockage de la chaîne de filtres
     * 
     * @param bootstrapServers Adresse du broker Kafka (ex: "localhost:9092")
     * @param groupId ID du groupe de consommateurs Kafka (pour le suivi)
     * @param filterChain Liste de filtres à appliquer
     */
    public TweetFilter(String bootstrapServers, String groupId, List<FilterCondition> filterChain) {
        
        // ========== ÉTAPE 1 : Configuration du CONSUMER Kafka ==========
        Properties consumerProps = new Properties();
        // Serveur Kafka à contacter
        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        // Groupe de consommateurs (permet le suivi du décalage de lecture)
        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        // Les clés des messages sont des strings (non sérialisées)
        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        // Les valeurs des messages sont des strings JSON (non sérialisées)
        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        // AUTO_OFFSET_RESET_CONFIG : en cas de démarrage, lire depuis le début du topic
        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        // Commit automatique du décalage après chaque batch traité
        consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        // Création du consumer avec les propriétés
        this.consumer = new KafkaConsumer<>(consumerProps);

        // ========== ÉTAPE 2 : Configuration du PRODUCER Kafka ==========
        Properties producerProps = new Properties();
        // Serveur Kafka à contacter
        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        // Les clés sont sérialisées en strings
        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        // Les valeurs sont sérialisées en strings
        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        // Création du producer avec les propriétés
        this.producer = new KafkaProducer<>(producerProps);

        // ========== ÉTAPE 3 : Configuration de Gson (sérialisation JSON) ==========
        // Gson par défaut ne sait pas gérer les OffsetDateTime
        // On ajoute un adaptateur personnalisé (OffsetDateTimeAdapter)
        // pour convertir OffsetDateTime <-> String JSON
        this.gson = new GsonBuilder()
                .registerTypeAdapter(OffsetDateTime.class, new OffsetDateTimeAdapter())
                .create();
        
        // ========== ÉTAPE 4 : Stockage de la chaîne de filtres ==========
        // Initialise à une liste vide si null (null safe)
        this.filterChain = filterChain != null ? filterChain : new ArrayList<>();

        // ========== Logs d'initialisation ==========
        logger.info("TweetFilter initialized");
        logger.info(() -> "Consuming from: " + INPUT_TOPIC);
        logger.info(() -> "Publishing to: " + OUTPUT_TOPIC);
        logger.info(() -> "Filter chain size: " + this.filterChain.size());
    }

    /**
     * =====================================================
     * MÉTHODE applyFilterChain()
     * =====================================================
     * 
     * Applique TOUS les filtres de la chaîne au tweet.
     * Utilise une approche "ET logique" : le tweet doit passer TOUS les filtres.
     * 
     * Flux logique :
     * 1. Boucle sur chaque filtre de la chaîne
     * 2. Si UN filtre rejette le tweet (retourne false), arrête immédiatement
     * 3. Si TOUS les filtres acceptent, retourne true
     * 
     * Optimisation : Court-circuit à la première rejection
     * - Arrête dès qu'un filtre rejette = gain de performance
     * - Les filtres les plus rapides/restrictifs devraient être en premier
     * 
     * Exemple avec 3 filtres [Langue, Année, Vérification] :
     *   - Tweet rejeté par le 1er filtre -> retourne false (autres pas testés)
     *   - Tweet passe 1er et 2eme, rejeté par le 3eme -> retourne false
     *   - Tweet passe les 3 -> retourne true
     * 
     * @param tweet Le tweet à tester
     * @return true si le tweet passe TOUS les filtres, false sinon
     */
    private boolean applyFilterChain(Tweet tweet) {
        // Itère sur chaque filtre dans la chaîne
        for (FilterCondition filter : filterChain) {
            // Si le filtre courant rejette le tweet
            if (!filter.match(tweet)) {
                // Arrête immédiatement et rejette le tweet (court-circuit)
                return false;
            }
        }
        // Si on arrive ici, TOUS les filtres ont accepté le tweet
        return true;
    }

    /**
     * =====================================================
     * MÉTHODE run() - Boucle principale de traitement
     * =====================================================
     * 
     * C'est la méthode maître qui lance le traitement en continu.
     * Elle bloque indéfiniment jusqu'à ce que stop() soit appelé.
     * 
     * Flux d'exécution :
     * 1. S'abonne au topic Kafka "raw-tweets"
     * 2. Crée un objet Statistics pour suivre les métriques
     * 3. Entre dans une boucle infinie :
     *    a. Reçoit des messages Kafka (avec timeout de 500ms)
     *    b. Traite chaque message en lot (batch)
     *    c. Continue jusqu'à ce que running = false
     * 4. En cas d'erreur ou arrêt, nettoie les ressources
     * 
     * Raison du polling avec timeout :
     * - 500ms : bon équilibre entre réactivité et charge système
     * - Si pas de messages, attend 500ms puis réessaye
     * - Si messages arrivent, les traite immédiatement
     */
    public void run() {
        // S'abonne au topic "raw-tweets" pour recevoir les tweets bruts
        consumer.subscribe(Collections.singletonList(INPUT_TOPIC));
        logger.info(() -> "TweetFilter started - Consuming from '" + INPUT_TOPIC + "'");

        // Crée un objet pour suivre les statistiques
        Statistics stats = new Statistics();

        try {
            // Boucle infinie jusqu'à appel de stop()
            while (running) {
                // Poll Kafka avec timeout de 500ms
                // - Si messages disponibles : les retourne immédiatement
                // - Si aucun message : attend 500ms puis retourne (vide)
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
                // Traite le lot de messages reçu
                processRecordBatch(records, stats);
            }
        } catch (Exception e) {
            logger.log(Level.SEVERE, "Error in TweetFilter", e);
        } finally {
            // Arrêt propre : ferme les ressources (consumer, producer)
            cleanup();
        }
    }

    /**
     * =====================================================
     * MÉTHODE processRecordBatch()
     * =====================================================
     * 
     * Traite un lot (batch) de messages Kafka reçus.
     * Itère simplement sur chaque message et le traite individuellement.
     * 
     * Avantage du traitement par lot :
     * - Kafka peut retourner plusieurs messages à la fois
     * - Plus efficace que traiter 1 message à la fois
     * - Tous les messages du lot sont traités avant le prochain poll
     * 
     * @param records Lot de messages Kafka reçus du consumer
     * @param stats Objet pour mettre à jour les statistiques
     */
    private void processRecordBatch(ConsumerRecords<String, String> records, Statistics stats) {
        // Boucle sur chaque message du lot
        for (ConsumerRecord<String, String> record : records) {
            // Traite le message individual
            processSingleRecord(record, stats);
        }
    }

    /**
     * =====================================================
     * MÉTHODE processSingleRecord()
     * =====================================================
     * 
     * Traite UN message Kafka (UN tweet).
     * C'est le cœur du pipeline de filtrage.
     * 
     * Étapes :
     * 1. Incrémente le compteur de tweets traités
     * 2. Parse le JSON pour obtenir l'objet Tweet
     * 3. Applique la chaîne de filtres
     * 4. Soit publie le tweet (passe les filtres), soit le rejette (bloquer)
     * 5. Gère les erreurs d'analyse JSON
     * 
     * Gestion des erreurs :
     * - JsonSyntaxException : le message n'est pas du JSON valide (warn)
     * - Autres exceptions : erreur inattendue (severe)
     * 
     * @param record Message Kafka brut à traiter
     * @param stats Objet pour mettre à jour les statistiques
     */
    private void processSingleRecord(ConsumerRecord<String, String> record, Statistics stats) {
        // Incrémente le compteur global de tweets traités
        stats.incrementProcessed();
        
        try {
            // Parse la string JSON en objet Tweet
            // gson.fromJson(JSON, classe) : convertit "{"text":"...", ...}" -> Tweet
            Tweet tweet = gson.fromJson(record.value(), Tweet.class);
            
            // Applique tous les filtres du chaîne au tweet
            if (applyFilterChain(tweet)) {
                // Le tweet passe TOUS les filtres -> le publier
                publishTweet(record, tweet, stats);
            } else {
                // Le tweet est rejeté par au moins 1 filtre -> le bloquer
                logFilteredOut(tweet, stats);
            }
        } catch (JsonSyntaxException e) {
            // Le message n'est pas du JSON valide (log warning)
            logger.log(Level.WARNING, "Failed to parse tweet JSON: " + record.value(), e);
        } catch (Exception e) {
            // Erreur inattendue (log severe)
            logger.log(Level.SEVERE, "Unexpected error processing tweet", e);
        }
    }

    /**
     * =====================================================
     * MÉTHODE publishTweet()
     * =====================================================
     * 
     * Envoie un tweet qui a passé TOUS les filtres vers Kafka.
     * 
     * Processus :
     * 1. Incrémente le compteur de tweets publiés
     * 2. Log un message informatif avec les statistiques
     * 3. Crée un nouveau message Kafka pour le topic "filtered-tweets"
     * 4. Envoie le message via le producer
     * 
     * Note importante :
     * - Le message envoyé est IDENTIQUE au message reçu
     * - On ne modifie pas le contenu, juste on le redirige vers un autre topic
     * - producer.send() est asynchrone (non-bloquant)
     * 
     * Exemple de log :
     *   "Tweet 1234567890 passed filters, publishing to filtered-tweets [42/50]"
     *   -> 42 tweets publiés sur 50 traités
     * 
     * @param record Message Kafka original
     * @param tweet Objet Tweet parsé (utilisé juste pour l'ID dans le log)
     * @param stats Objet pour mettre à jour les statistiques
     */
    private void publishTweet(ConsumerRecord<String, String> record, Tweet tweet, Statistics stats) {
        // Incrémente le compteur de tweets publiés
        stats.incrementPublished();
        
        // Log informatif avec les statistiques
        logger.info("Tweet " + tweet.getId() + " passed filters, publishing to " + OUTPUT_TOPIC 
                    + " [" + stats.getPublished() + "/" + stats.getProcessed() + "]");
        
        // Crée un nouveau message Kafka pour le topic de sortie
        // On réutilise la clé originale et la valeur originale (JSON du tweet)
        ProducerRecord<String, String> outputRecord = new ProducerRecord<>(OUTPUT_TOPIC, record.key(), record.value());
        
        // Envoie le message au producer Kafka (asynchrone)
        producer.send(outputRecord);
    }

    /**
     * =====================================================
     * MÉTHODE logFilteredOut()
     * =====================================================
     * 
     * Enregistre un tweet qui a été rejeté par au moins 1 filtre.
     * Le tweet N'est PAS envoyé au topic "filtered-tweets".
     * 
     * Processus :
     * 1. Incrémente le compteur de tweets filtrés
     * 2. Log un message informatif avec les statistiques
     * 
     * Exemple de log :
     *   "Tweet 1234567890 filtered out [8/50]"
     *   -> 8 tweets rejetés sur 50 traités
     * 
     * @param tweet Objet Tweet (utilisé pour extraire l'ID)
     * @param stats Objet pour mettre à jour les statistiques
     */
    private void logFilteredOut(Tweet tweet, Statistics stats) {
        // Incrémente le compteur de tweets filtrés (rejetés)
        stats.incrementFiltered();
        
        // Extrait l'ID du tweet pour le log
        String tweetId = tweet.getId();
        
        // Log informatif avec les statistiques
        logger.info("Tweet " + tweetId + " filtered out [" + stats.getFiltered() + "/" + stats.getProcessed() + "]");
    }

    /**
     * =====================================================
     * CLASSE INTERNE : Statistics
     * =====================================================
     * 
     * Classe simple pour suivre les statistiques de traitement.
     * Compteurs :
     * - processed : nombre TOTAL de tweets traités
     * - filtered : nombre de tweets REJETÉS par les filtres
     * - published : nombre de tweets ACCEPTÉS et publiés
     * 
     * Logique de validation :
     *   published + filtered = processed (tout tweet est soit accepté soit rejeté)
     * 
     * Exemple :
     *   - 100 tweets traités
     *   - 70 tweets acceptés (published = 70)
     *   - 30 tweets rejetés (filtered = 30)
     *   - Vérification : 70 + 30 = 100 ✓
     */
    private static class Statistics {
        // Total de tweets traités (acceptés + rejetés)
        private long processed = 0;
        // Tweets rejetés par les filtres
        private long filtered = 0;
        // Tweets acceptés et publiés
        private long published = 0;

        /**
         * Incrémente le nombre de tweets traités.
         * Appelé pour CHAQUE tweet reçu de Kafka.
         */
        void incrementProcessed() {
            processed++;
        }

        /**
         * Incrémente le nombre de tweets rejetés.
         * Appelé quand un tweet échoue au moins 1 filtre.
         */
        void incrementFiltered() {
            filtered++;
        }

        /**
         * Incrémente le nombre de tweets publiés.
         * Appelé quand un tweet passe TOUS les filtres.
         */
        void incrementPublished() {
            published++;
        }

        // ===== GETTERS pour accéder aux statistiques =====
        
        /**
         * @return Nombre total de tweets traités
         */
        long getProcessed() {
            return processed;
        }

        /**
         * @return Nombre de tweets rejetés
         */
        long getFiltered() {
            return filtered;
        }

        /**
         * @return Nombre de tweets publiés
         */
        long getPublished() {
            return published;
        }
    }

    /**
     * =====================================================
     * MÉTHODE stop()
     * =====================================================
     * 
     * Arrête proprement la boucle de traitement.
     * 
     * Fonctionnement :
     * - Met le drapeau 'running' à false (volatile)
     * - La boucle while(running) dans run() s'arrête
     * - Puis cleanup() ferme les ressources
     * 
     * Utilisation :
     * - Appel via un signal (Ctrl+C, SIGTERM, etc.)
     * - Ou depuis un autre thread
     */
    public void stop() {
        running = false;
    }

    /**
     * =====================================================
     * MÉTHODE cleanup()
     * =====================================================
     * 
     * Nettoie et ferme les ressources Kafka.
     * Appelée automatiquement à l'arrêt de run() (finally block).
     * 
     * Étapes :
     * 1. Log un message d'arrêt
     * 2. Ferme le consumer Kafka (arrête la lecture)
     * 3. Ferme le producer Kafka (arrête l'écriture)
     * 4. Log un message de succès
     * 5. Attrape et log les erreurs de fermeture
     * 
     * Important :
     * - Prévient les fuites de ressources
     * - Libère les connexions Kafka
     * - Commit les messages lus (consumer.close())
     */
    private void cleanup() {
        logger.info("Shutting down TweetFilter");
        try {
            // Ferme le consumer Kafka proprement
            // - Arrête la lecture du topic
            // - Commit les messages lus
            // - Libère les ressources
            consumer.close();
            
            // Ferme le producer Kafka proprement
            // - Flush les messages en attente
            // - Libère les ressources
            producer.close();
            
            logger.info("TweetFilter shutdown complete");
        } catch (Exception e) {
            // Log les erreurs rencontrées lors de la fermeture
            logger.log(Level.SEVERE, "Error during shutdown", e);
        }
    }

    /**
     * =====================================================
     * MÉTHODE main() - Point d'entrée du programme
     * =====================================================
     * 
     * Fonction principale qui lance le service de filtrage.
     * Exécution :
     * 1. Valide les arguments en ligne de commande
     * 2. Lit les variables d'environnement pour les filtres
     * 3. Crée la chaîne de filtres appropriée
     * 4. Initialise TweetFilter
     * 5. Lance la boucle de traitement (bloquant)
     * 
     * Arguments requis :
     *   arg[0] : bootstrap-servers (ex: "kafka:9092" ou "localhost:9092")
     *   arg[1] : consumer-group (ex: "filter-group-1")
     * 
     * Variables d'environnement optionnelles :
     *   FILTER_TYPE : type de filtre ("lang", "recent", "none" ou absent)
     *   FILTER_VALUE : paramètre du filtre
     *     - Pour "lang" : code langue (ex: "en", "fr", "es")
     *     - Pour "recent" : année de départ (ex: "2022", "2023")
     *     - Ignoré pour "none"
     * 
     * Exemples d'exécution :
     *   1. Aucun filtre (accepte TOUS les tweets)
     *      $ FILTER_TYPE=none java TweetFilter kafka:9092 filter-group-1
     *   
     *   2. Filtre par langue (anglais uniquement)
     *      $ FILTER_TYPE=lang FILTER_VALUE=en java TweetFilter kafka:9092 filter-group-1
     *   
     *   3. Filtre par année (tweets depuis 2023)
     *      $ FILTER_TYPE=recent FILTER_VALUE=2023 java TweetFilter kafka:9092 filter-group-1
     * 
     * @param args Arguments en ligne de commande [bootstrap-servers, consumer-group]
     */
    public static void main(String[] args) {
        // ========== ÉTAPE 1 : Validation des arguments ==========
        if (args.length < 2) {
            // Erreur : pas assez d'arguments
            logger.severe("Usage: TweetFilter <bootstrap-servers> <consumer-group>");
            System.exit(1);
        }

        // Récupère les arguments en ligne de commande
        String bootstrapServers = args[0];
        String consumerGroup = args[1];

        logger.info("Starting TweetFilter service");
        logger.info(() -> "Bootstrap servers: " + bootstrapServers);
        logger.info(() -> "Consumer group: " + consumerGroup);

        try {
            // ========== ÉTAPE 2 : Lecture des variables d'environnement ==========
            // FILTER_TYPE : type de filtre à appliquer
            String filterType = System.getenv().getOrDefault("FILTER_TYPE", "none");
            // FILTER_VALUE : paramètre du filtre (langue, année, etc.)
            String filterValue = System.getenv().getOrDefault("FILTER_VALUE", "");
            
            logger.info(() -> "Filter type: " + filterType);
            logger.info(() -> "Filter value: " + filterValue);
            
            // ========== ÉTAPE 3 : Création de la chaîne de filtres ==========
            List<FilterCondition> filters = new ArrayList<>();
            
            switch (filterType.toLowerCase()) {
                // ===== CAS 1 : Filtre par langue =====
                case "lang":
                case "language":
                    // Validation : FILTER_VALUE est obligatoire pour ce filtre
                    if (filterValue.isEmpty()) {
                        logger.severe("FILTER_VALUE required for language filter (e.g., 'en', 'fr')");
                        System.exit(1);
                    }
                    // Ajoute le filtre de langue à la chaîne
                    filters.add(new LangTweetFilter(filterValue));
                    logger.info(() -> "Using Language Filter: " + filterValue);
                    break;
                    
                // ===== CAS 2 : Filtre par année =====
                case "recent":
                case "year":
                    // Validation : FILTER_VALUE est obligatoire pour ce filtre
                    if (filterValue.isEmpty()) {
                        logger.severe("FILTER_VALUE required for year filter (e.g., '2022', '2023')");
                        System.exit(1);
                    }
                    // Parse la string en entier
                    int year = Integer.parseInt(filterValue);
                    // Ajoute le filtre d'année à la chaîne
                    filters.add(new RecentTweetFilter(year));
                    logger.info(() -> "Using Recent Tweet Filter: from " + year + " onwards");
                    break;
                    
                // ===== CAS 3 : Pas de filtre (accepte TOUS les tweets) =====
                case "none":
                case "empty":
                default:
                    // EmptyTweetFilter accepte tous les tweets
                    filters.add(new EmptyTweetFilter());
                    logger.info("Using Empty Filter: accepting all tweets");
                    break;
            }
            
            // ========== ÉTAPE 4 : Initialisation de TweetFilter ==========
            TweetFilter filter = new TweetFilter(bootstrapServers, consumerGroup, filters);
            
            logger.info("Filter service initialized");
            logger.info("Consuming from: raw-tweets");
            logger.info("Publishing to: filtered-tweets");
            logger.info("Running in headless mode");
            
            // ========== ÉTAPE 5 : Lancement du traitement (bloquant) ==========
            // Cette méthode bloque indéfiniment jusqu'à stop() ou Ctrl+C
            filter.run();
            
        } catch (NumberFormatException e) {
            // Erreur : FILTER_VALUE n'est pas un nombre pour le filtre "year"
            logger.severe("Invalid FILTER_VALUE for year filter: must be a number");
            System.exit(1);
        } catch (Exception e) {
            // Erreur inattendue durant l'initialisation ou l'exécution
            logger.log(Level.SEVERE, "Filter service failed", e);
            System.exit(1);
        }
    }
}