TweetFilter.java
package tweetoscope.tweetsFilter;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonSyntaxException;
import com.google.gson.TypeAdapter;
import com.google.gson.stream.JsonReader;
import com.google.gson.stream.JsonToken;
import com.google.gson.stream.JsonWriter;
import com.twitter.clientlib.model.Tweet;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.io.IOException;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* =====================================================
* CLASSE TWEETFILTER - Filtrage en temps réel de tweets
* =====================================================
*
* Cette classe est responsable de :
* 1. Consommer des tweets bruts depuis Kafka (topic "raw-tweets")
* 2. Appliquer une chaîne de filtres configurables (langue, date, etc.)
* 3. Publier les tweets filtrés vers Kafka (topic "filtered-tweets")
* 4. Suivre les statistiques (tweets traités, filtrés, publiés)
*
* Architecture :
* - Utilise un pattern Strategy pour les filtres (interface FilterCondition)
* - Communique avec Kafka en tant que producteur/consommateur
* - Gère la sérialisation/désérialisation JSON avec Gson
* - Supporte plusieurs types de filtres : langue, date, etc.
*/
public class TweetFilter {
// Logger pour tracer l'exécution et les erreurs
private static final Logger logger = Logger.getLogger(TweetFilter.class.getName());
/**
* CONSTANTES KAFKA :
* Ces constantes définissent les noms des topics Kafka utilisés.
* - INPUT_TOPIC : Topic source contenant les tweets bruts non filtrés
* - OUTPUT_TOPIC : Topic destination pour les tweets après filtrage
*/
private static final String INPUT_TOPIC = "raw-tweets";
private static final String OUTPUT_TOPIC = "filtered-tweets";
/**
* VARIABLES MEMBRES :
* - consumer : Lecteur Kafka qui récupère les tweets bruts du topic INPUT_TOPIC
* - producer : Écrivain Kafka qui envoie les tweets filtrés au topic OUTPUT_TOPIC
* - gson : Outil de sérialisation/désérialisation JSON pour convertir strings <-> objets Tweet
* - filterChain : Liste chaînable de filtres appliqués séquentiellement à chaque tweet
* - running : Drapeau volatile pour arrêter proprement la boucle de traitement
*/
private final KafkaConsumer<String, String> consumer;
private final KafkaProducer<String, String> producer;
private final Gson gson;
private final List<FilterCondition> filterChain;
private volatile boolean running = true;
/**
* =====================================================
* CLASSE INTERNE : OffsetDateTimeAdapter
* =====================================================
*
* Adaptateur personnalisé pour Gson qui gère la sérialisation/désérialisation
* des objets OffsetDateTime (dates/heures avec fuseau horaire).
*
* Pourquoi ? Gson ne sait pas nativement gérer les OffsetDateTime de Java.
* Cet adaptateur :
* - Convertit OffsetDateTime -> String JSON (format ISO-8601)
* - Convertit String JSON -> OffsetDateTime (parsing ISO-8601)
*
* Exemple :
* Java: OffsetDateTime.parse("2023-11-25T14:30:00+01:00")
* JSON: "2023-11-25T14:30:00+01:00"
*/
private static class OffsetDateTimeAdapter extends TypeAdapter<OffsetDateTime> {
/**
* SÉRIALISATION (Java -> JSON)
* Convertit un objet OffsetDateTime en string JSON.
* Cas particulier : Si la valeur est null, écrit null en JSON.
*/
@Override
public void write(JsonWriter out, OffsetDateTime value) throws IOException {
if (value == null) {
out.nullValue();
return;
}
// Convertit la date en format ISO-8601 standard (e.g., "2023-11-25T14:30:00+01:00")
out.value(value.toString());
}
/**
* DÉSÉRIALISATION (JSON -> Java)
* Convertit un string JSON en objet OffsetDateTime.
* Cas particulier : Si la valeur JSON est null, retourne null en Java.
*/
@Override
public OffsetDateTime read(JsonReader in) throws IOException {
// Vérifie si la valeur JSON est null
if (in.peek() == JsonToken.NULL) {
in.nextNull();
return null;
}
// Parse la string JSON au format ISO-8601 pour créer un OffsetDateTime
return OffsetDateTime.parse(in.nextString());
}
}
/**
* =====================================================
* INTERFACE FilterCondition - Pattern Strategy
* =====================================================
*
* Interface qui définit le contrat pour tous les filtres.
* Elle permet d'implémenter différents types de filtres de manière modulaire.
*
* Pattern utilisé : STRATEGY
* - Chaque filtre implémente la même interface match()
* - Les filtres peuvent être combinés dans une chaîne (filterChain)
* - Facile d'ajouter de nouveaux filtres sans modifier cette classe
*
* Filtres disponibles :
* - EmptyTweetFilter : accepte tous les tweets (filtre neutre)
* - LangTweetFilter : filtre par langue (ex: "en", "fr")
* - RecentTweetFilter : filtre par année (tweets >= année spécifiée)
*/
public interface FilterCondition {
/**
* Teste si un tweet satisfait les conditions du filtre.
*
* @param tweet Le tweet à examiner
* @return true si le tweet passe le filtre (satisfait les conditions)
* false si le tweet est rejeté (ne satisfait pas les conditions)
*
* Exemple avec LangTweetFilter("en") :
* - match(tweet_anglais) -> true (le tweet est en anglais)
* - match(tweet_francais) -> false (le tweet n'est pas en anglais)
*/
boolean match(Tweet tweet);
}
/**
* =====================================================
* CONSTRUCTEUR 1 : Pour les tests unitaires (Mock)
* =====================================================
*
* Ce constructeur est utilisé UNIQUEMENT pour les tests JUnit.
* Il initialise TweetFilter sans Kafka (mode test/hors ligne).
*
* Avantages :
* - Évite de lancer un broker Kafka lors des tests
* - Permet de tester la logique de filtrage isolément
* - Facilite les tests unitaires purs
*
* @param filterChain Liste de filtres à appliquer (peut être null)
*/
//Mock-->Constructeur pour Junit tests
public TweetFilter(List<FilterCondition> filterChain) {
// Pas de connexion Kafka en mode test
this.consumer = null; // pas de Kafka
this.producer = null; // pas de Kafka
// Initialise la chaîne de filtres (null safe)
this.filterChain = filterChain != null ? filterChain : new ArrayList<>();
this.gson = null;
}
/**
* =====================================================
* CONSTRUCTEUR 2 : Pour l'environnement de production
* =====================================================
*
* Ce constructeur initialise TweetFilter avec connexion Kafka complète.
* Il configure le consommateur, le producteur et la sérialisation JSON.
*
* Étapes d'initialisation :
* 1. Configuration du consumer Kafka (lecture depuis raw-tweets)
* 2. Configuration du producer Kafka (écriture vers filtered-tweets)
* 3. Configuration de Gson avec adapter pour OffsetDateTime
* 4. Stockage de la chaîne de filtres
*
* @param bootstrapServers Adresse du broker Kafka (ex: "localhost:9092")
* @param groupId ID du groupe de consommateurs Kafka (pour le suivi)
* @param filterChain Liste de filtres à appliquer
*/
public TweetFilter(String bootstrapServers, String groupId, List<FilterCondition> filterChain) {
// ========== ÉTAPE 1 : Configuration du CONSUMER Kafka ==========
Properties consumerProps = new Properties();
// Serveur Kafka à contacter
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// Groupe de consommateurs (permet le suivi du décalage de lecture)
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
// Les clés des messages sont des strings (non sérialisées)
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
// Les valeurs des messages sont des strings JSON (non sérialisées)
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
// AUTO_OFFSET_RESET_CONFIG : en cas de démarrage, lire depuis le début du topic
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// Commit automatique du décalage après chaque batch traité
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
// Création du consumer avec les propriétés
this.consumer = new KafkaConsumer<>(consumerProps);
// ========== ÉTAPE 2 : Configuration du PRODUCER Kafka ==========
Properties producerProps = new Properties();
// Serveur Kafka à contacter
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// Les clés sont sérialisées en strings
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// Les valeurs sont sérialisées en strings
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// Création du producer avec les propriétés
this.producer = new KafkaProducer<>(producerProps);
// ========== ÉTAPE 3 : Configuration de Gson (sérialisation JSON) ==========
// Gson par défaut ne sait pas gérer les OffsetDateTime
// On ajoute un adaptateur personnalisé (OffsetDateTimeAdapter)
// pour convertir OffsetDateTime <-> String JSON
this.gson = new GsonBuilder()
.registerTypeAdapter(OffsetDateTime.class, new OffsetDateTimeAdapter())
.create();
// ========== ÉTAPE 4 : Stockage de la chaîne de filtres ==========
// Initialise à une liste vide si null (null safe)
this.filterChain = filterChain != null ? filterChain : new ArrayList<>();
// ========== Logs d'initialisation ==========
logger.info("TweetFilter initialized");
logger.info(() -> "Consuming from: " + INPUT_TOPIC);
logger.info(() -> "Publishing to: " + OUTPUT_TOPIC);
logger.info(() -> "Filter chain size: " + this.filterChain.size());
}
/**
* =====================================================
* MÉTHODE applyFilterChain()
* =====================================================
*
* Applique TOUS les filtres de la chaîne au tweet.
* Utilise une approche "ET logique" : le tweet doit passer TOUS les filtres.
*
* Flux logique :
* 1. Boucle sur chaque filtre de la chaîne
* 2. Si UN filtre rejette le tweet (retourne false), arrête immédiatement
* 3. Si TOUS les filtres acceptent, retourne true
*
* Optimisation : Court-circuit à la première rejection
* - Arrête dès qu'un filtre rejette = gain de performance
* - Les filtres les plus rapides/restrictifs devraient être en premier
*
* Exemple avec 3 filtres [Langue, Année, Vérification] :
* - Tweet rejeté par le 1er filtre -> retourne false (autres pas testés)
* - Tweet passe 1er et 2eme, rejeté par le 3eme -> retourne false
* - Tweet passe les 3 -> retourne true
*
* @param tweet Le tweet à tester
* @return true si le tweet passe TOUS les filtres, false sinon
*/
private boolean applyFilterChain(Tweet tweet) {
// Itère sur chaque filtre dans la chaîne
for (FilterCondition filter : filterChain) {
// Si le filtre courant rejette le tweet
if (!filter.match(tweet)) {
// Arrête immédiatement et rejette le tweet (court-circuit)
return false;
}
}
// Si on arrive ici, TOUS les filtres ont accepté le tweet
return true;
}
/**
* =====================================================
* MÉTHODE run() - Boucle principale de traitement
* =====================================================
*
* C'est la méthode maître qui lance le traitement en continu.
* Elle bloque indéfiniment jusqu'à ce que stop() soit appelé.
*
* Flux d'exécution :
* 1. S'abonne au topic Kafka "raw-tweets"
* 2. Crée un objet Statistics pour suivre les métriques
* 3. Entre dans une boucle infinie :
* a. Reçoit des messages Kafka (avec timeout de 500ms)
* b. Traite chaque message en lot (batch)
* c. Continue jusqu'à ce que running = false
* 4. En cas d'erreur ou arrêt, nettoie les ressources
*
* Raison du polling avec timeout :
* - 500ms : bon équilibre entre réactivité et charge système
* - Si pas de messages, attend 500ms puis réessaye
* - Si messages arrivent, les traite immédiatement
*/
public void run() {
// S'abonne au topic "raw-tweets" pour recevoir les tweets bruts
consumer.subscribe(Collections.singletonList(INPUT_TOPIC));
logger.info(() -> "TweetFilter started - Consuming from '" + INPUT_TOPIC + "'");
// Crée un objet pour suivre les statistiques
Statistics stats = new Statistics();
try {
// Boucle infinie jusqu'à appel de stop()
while (running) {
// Poll Kafka avec timeout de 500ms
// - Si messages disponibles : les retourne immédiatement
// - Si aucun message : attend 500ms puis retourne (vide)
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
// Traite le lot de messages reçu
processRecordBatch(records, stats);
}
} catch (Exception e) {
logger.log(Level.SEVERE, "Error in TweetFilter", e);
} finally {
// Arrêt propre : ferme les ressources (consumer, producer)
cleanup();
}
}
/**
* =====================================================
* MÉTHODE processRecordBatch()
* =====================================================
*
* Traite un lot (batch) de messages Kafka reçus.
* Itère simplement sur chaque message et le traite individuellement.
*
* Avantage du traitement par lot :
* - Kafka peut retourner plusieurs messages à la fois
* - Plus efficace que traiter 1 message à la fois
* - Tous les messages du lot sont traités avant le prochain poll
*
* @param records Lot de messages Kafka reçus du consumer
* @param stats Objet pour mettre à jour les statistiques
*/
private void processRecordBatch(ConsumerRecords<String, String> records, Statistics stats) {
// Boucle sur chaque message du lot
for (ConsumerRecord<String, String> record : records) {
// Traite le message individual
processSingleRecord(record, stats);
}
}
/**
* =====================================================
* MÉTHODE processSingleRecord()
* =====================================================
*
* Traite UN message Kafka (UN tweet).
* C'est le cœur du pipeline de filtrage.
*
* Étapes :
* 1. Incrémente le compteur de tweets traités
* 2. Parse le JSON pour obtenir l'objet Tweet
* 3. Applique la chaîne de filtres
* 4. Soit publie le tweet (passe les filtres), soit le rejette (bloquer)
* 5. Gère les erreurs d'analyse JSON
*
* Gestion des erreurs :
* - JsonSyntaxException : le message n'est pas du JSON valide (warn)
* - Autres exceptions : erreur inattendue (severe)
*
* @param record Message Kafka brut à traiter
* @param stats Objet pour mettre à jour les statistiques
*/
private void processSingleRecord(ConsumerRecord<String, String> record, Statistics stats) {
// Incrémente le compteur global de tweets traités
stats.incrementProcessed();
try {
// Parse la string JSON en objet Tweet
// gson.fromJson(JSON, classe) : convertit "{"text":"...", ...}" -> Tweet
Tweet tweet = gson.fromJson(record.value(), Tweet.class);
// Applique tous les filtres du chaîne au tweet
if (applyFilterChain(tweet)) {
// Le tweet passe TOUS les filtres -> le publier
publishTweet(record, tweet, stats);
} else {
// Le tweet est rejeté par au moins 1 filtre -> le bloquer
logFilteredOut(tweet, stats);
}
} catch (JsonSyntaxException e) {
// Le message n'est pas du JSON valide (log warning)
logger.log(Level.WARNING, "Failed to parse tweet JSON: " + record.value(), e);
} catch (Exception e) {
// Erreur inattendue (log severe)
logger.log(Level.SEVERE, "Unexpected error processing tweet", e);
}
}
/**
* =====================================================
* MÉTHODE publishTweet()
* =====================================================
*
* Envoie un tweet qui a passé TOUS les filtres vers Kafka.
*
* Processus :
* 1. Incrémente le compteur de tweets publiés
* 2. Log un message informatif avec les statistiques
* 3. Crée un nouveau message Kafka pour le topic "filtered-tweets"
* 4. Envoie le message via le producer
*
* Note importante :
* - Le message envoyé est IDENTIQUE au message reçu
* - On ne modifie pas le contenu, juste on le redirige vers un autre topic
* - producer.send() est asynchrone (non-bloquant)
*
* Exemple de log :
* "Tweet 1234567890 passed filters, publishing to filtered-tweets [42/50]"
* -> 42 tweets publiés sur 50 traités
*
* @param record Message Kafka original
* @param tweet Objet Tweet parsé (utilisé juste pour l'ID dans le log)
* @param stats Objet pour mettre à jour les statistiques
*/
private void publishTweet(ConsumerRecord<String, String> record, Tweet tweet, Statistics stats) {
// Incrémente le compteur de tweets publiés
stats.incrementPublished();
// Log informatif avec les statistiques
logger.info("Tweet " + tweet.getId() + " passed filters, publishing to " + OUTPUT_TOPIC
+ " [" + stats.getPublished() + "/" + stats.getProcessed() + "]");
// Crée un nouveau message Kafka pour le topic de sortie
// On réutilise la clé originale et la valeur originale (JSON du tweet)
ProducerRecord<String, String> outputRecord = new ProducerRecord<>(OUTPUT_TOPIC, record.key(), record.value());
// Envoie le message au producer Kafka (asynchrone)
producer.send(outputRecord);
}
/**
* =====================================================
* MÉTHODE logFilteredOut()
* =====================================================
*
* Enregistre un tweet qui a été rejeté par au moins 1 filtre.
* Le tweet N'est PAS envoyé au topic "filtered-tweets".
*
* Processus :
* 1. Incrémente le compteur de tweets filtrés
* 2. Log un message informatif avec les statistiques
*
* Exemple de log :
* "Tweet 1234567890 filtered out [8/50]"
* -> 8 tweets rejetés sur 50 traités
*
* @param tweet Objet Tweet (utilisé pour extraire l'ID)
* @param stats Objet pour mettre à jour les statistiques
*/
private void logFilteredOut(Tweet tweet, Statistics stats) {
// Incrémente le compteur de tweets filtrés (rejetés)
stats.incrementFiltered();
// Extrait l'ID du tweet pour le log
String tweetId = tweet.getId();
// Log informatif avec les statistiques
logger.info("Tweet " + tweetId + " filtered out [" + stats.getFiltered() + "/" + stats.getProcessed() + "]");
}
/**
* =====================================================
* CLASSE INTERNE : Statistics
* =====================================================
*
* Classe simple pour suivre les statistiques de traitement.
* Compteurs :
* - processed : nombre TOTAL de tweets traités
* - filtered : nombre de tweets REJETÉS par les filtres
* - published : nombre de tweets ACCEPTÉS et publiés
*
* Logique de validation :
* published + filtered = processed (tout tweet est soit accepté soit rejeté)
*
* Exemple :
* - 100 tweets traités
* - 70 tweets acceptés (published = 70)
* - 30 tweets rejetés (filtered = 30)
* - Vérification : 70 + 30 = 100 ✓
*/
private static class Statistics {
// Total de tweets traités (acceptés + rejetés)
private long processed = 0;
// Tweets rejetés par les filtres
private long filtered = 0;
// Tweets acceptés et publiés
private long published = 0;
/**
* Incrémente le nombre de tweets traités.
* Appelé pour CHAQUE tweet reçu de Kafka.
*/
void incrementProcessed() {
processed++;
}
/**
* Incrémente le nombre de tweets rejetés.
* Appelé quand un tweet échoue au moins 1 filtre.
*/
void incrementFiltered() {
filtered++;
}
/**
* Incrémente le nombre de tweets publiés.
* Appelé quand un tweet passe TOUS les filtres.
*/
void incrementPublished() {
published++;
}
// ===== GETTERS pour accéder aux statistiques =====
/**
* @return Nombre total de tweets traités
*/
long getProcessed() {
return processed;
}
/**
* @return Nombre de tweets rejetés
*/
long getFiltered() {
return filtered;
}
/**
* @return Nombre de tweets publiés
*/
long getPublished() {
return published;
}
}
/**
* =====================================================
* MÉTHODE stop()
* =====================================================
*
* Arrête proprement la boucle de traitement.
*
* Fonctionnement :
* - Met le drapeau 'running' à false (volatile)
* - La boucle while(running) dans run() s'arrête
* - Puis cleanup() ferme les ressources
*
* Utilisation :
* - Appel via un signal (Ctrl+C, SIGTERM, etc.)
* - Ou depuis un autre thread
*/
public void stop() {
running = false;
}
/**
* =====================================================
* MÉTHODE cleanup()
* =====================================================
*
* Nettoie et ferme les ressources Kafka.
* Appelée automatiquement à l'arrêt de run() (finally block).
*
* Étapes :
* 1. Log un message d'arrêt
* 2. Ferme le consumer Kafka (arrête la lecture)
* 3. Ferme le producer Kafka (arrête l'écriture)
* 4. Log un message de succès
* 5. Attrape et log les erreurs de fermeture
*
* Important :
* - Prévient les fuites de ressources
* - Libère les connexions Kafka
* - Commit les messages lus (consumer.close())
*/
private void cleanup() {
logger.info("Shutting down TweetFilter");
try {
// Ferme le consumer Kafka proprement
// - Arrête la lecture du topic
// - Commit les messages lus
// - Libère les ressources
consumer.close();
// Ferme le producer Kafka proprement
// - Flush les messages en attente
// - Libère les ressources
producer.close();
logger.info("TweetFilter shutdown complete");
} catch (Exception e) {
// Log les erreurs rencontrées lors de la fermeture
logger.log(Level.SEVERE, "Error during shutdown", e);
}
}
/**
* =====================================================
* MÉTHODE main() - Point d'entrée du programme
* =====================================================
*
* Fonction principale qui lance le service de filtrage.
* Exécution :
* 1. Valide les arguments en ligne de commande
* 2. Lit les variables d'environnement pour les filtres
* 3. Crée la chaîne de filtres appropriée
* 4. Initialise TweetFilter
* 5. Lance la boucle de traitement (bloquant)
*
* Arguments requis :
* arg[0] : bootstrap-servers (ex: "kafka:9092" ou "localhost:9092")
* arg[1] : consumer-group (ex: "filter-group-1")
*
* Variables d'environnement optionnelles :
* FILTER_TYPE : type de filtre ("lang", "recent", "none" ou absent)
* FILTER_VALUE : paramètre du filtre
* - Pour "lang" : code langue (ex: "en", "fr", "es")
* - Pour "recent" : année de départ (ex: "2022", "2023")
* - Ignoré pour "none"
*
* Exemples d'exécution :
* 1. Aucun filtre (accepte TOUS les tweets)
* $ FILTER_TYPE=none java TweetFilter kafka:9092 filter-group-1
*
* 2. Filtre par langue (anglais uniquement)
* $ FILTER_TYPE=lang FILTER_VALUE=en java TweetFilter kafka:9092 filter-group-1
*
* 3. Filtre par année (tweets depuis 2023)
* $ FILTER_TYPE=recent FILTER_VALUE=2023 java TweetFilter kafka:9092 filter-group-1
*
* @param args Arguments en ligne de commande [bootstrap-servers, consumer-group]
*/
public static void main(String[] args) {
// ========== ÉTAPE 1 : Validation des arguments ==========
if (args.length < 2) {
// Erreur : pas assez d'arguments
logger.severe("Usage: TweetFilter <bootstrap-servers> <consumer-group>");
System.exit(1);
}
// Récupère les arguments en ligne de commande
String bootstrapServers = args[0];
String consumerGroup = args[1];
logger.info("Starting TweetFilter service");
logger.info(() -> "Bootstrap servers: " + bootstrapServers);
logger.info(() -> "Consumer group: " + consumerGroup);
try {
// ========== ÉTAPE 2 : Lecture des variables d'environnement ==========
// FILTER_TYPE : type de filtre à appliquer
String filterType = System.getenv().getOrDefault("FILTER_TYPE", "none");
// FILTER_VALUE : paramètre du filtre (langue, année, etc.)
String filterValue = System.getenv().getOrDefault("FILTER_VALUE", "");
logger.info(() -> "Filter type: " + filterType);
logger.info(() -> "Filter value: " + filterValue);
// ========== ÉTAPE 3 : Création de la chaîne de filtres ==========
List<FilterCondition> filters = new ArrayList<>();
switch (filterType.toLowerCase()) {
// ===== CAS 1 : Filtre par langue =====
case "lang":
case "language":
// Validation : FILTER_VALUE est obligatoire pour ce filtre
if (filterValue.isEmpty()) {
logger.severe("FILTER_VALUE required for language filter (e.g., 'en', 'fr')");
System.exit(1);
}
// Ajoute le filtre de langue à la chaîne
filters.add(new LangTweetFilter(filterValue));
logger.info(() -> "Using Language Filter: " + filterValue);
break;
// ===== CAS 2 : Filtre par année =====
case "recent":
case "year":
// Validation : FILTER_VALUE est obligatoire pour ce filtre
if (filterValue.isEmpty()) {
logger.severe("FILTER_VALUE required for year filter (e.g., '2022', '2023')");
System.exit(1);
}
// Parse la string en entier
int year = Integer.parseInt(filterValue);
// Ajoute le filtre d'année à la chaîne
filters.add(new RecentTweetFilter(year));
logger.info(() -> "Using Recent Tweet Filter: from " + year + " onwards");
break;
// ===== CAS 3 : Pas de filtre (accepte TOUS les tweets) =====
case "none":
case "empty":
default:
// EmptyTweetFilter accepte tous les tweets
filters.add(new EmptyTweetFilter());
logger.info("Using Empty Filter: accepting all tweets");
break;
}
// ========== ÉTAPE 4 : Initialisation de TweetFilter ==========
TweetFilter filter = new TweetFilter(bootstrapServers, consumerGroup, filters);
logger.info("Filter service initialized");
logger.info("Consuming from: raw-tweets");
logger.info("Publishing to: filtered-tweets");
logger.info("Running in headless mode");
// ========== ÉTAPE 5 : Lancement du traitement (bloquant) ==========
// Cette méthode bloque indéfiniment jusqu'à stop() ou Ctrl+C
filter.run();
} catch (NumberFormatException e) {
// Erreur : FILTER_VALUE n'est pas un nombre pour le filtre "year"
logger.severe("Invalid FILTER_VALUE for year filter: must be a number");
System.exit(1);
} catch (Exception e) {
// Erreur inattendue durant l'initialisation ou l'exécution
logger.log(Level.SEVERE, "Filter service failed", e);
System.exit(1);
}
}
}