TweetsProducer.java
package tweetoscope.tweetsProducer;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.TypeAdapter;
import com.google.gson.stream.JsonReader;
import com.google.gson.stream.JsonToken;
import com.google.gson.stream.JsonWriter;
import com.twitter.clientlib.model.Tweet;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.io.IOException;
import java.time.OffsetDateTime;
import java.util.Properties;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* =====================================================
* CLASSE ABSTRAITE TweetsProducer - Producteur Kafka
* =====================================================
*
* Cette classe abstraite est la BASE pour tous les producteurs de tweets.
* Elle gère :
* 1. La connexion au broker Kafka
* 2. La sérialisation/désérialisation JSON des tweets
* 3. L'envoi des tweets vers le topic "raw-tweets"
*
* Architecture :
* - Classe ABSTRAITE : ne peut pas être instanciée directement
* - Implémente Runnable : peut être exécutée dans un thread
* - Classe PARENTE pour les implémentations concrètes :
* * MockTwitterStreamRandom : tweets aléatoires
* * MockTwitterStreamRecorded : tweets depuis un fichier
* * MockTwitterStreamScenario : tweets d'un scénario
*
* Pipeline :
* TweetsProducer (classe abstraite)
* ↑
* └─ Crée des tweets (implémentations concrètes)
* ↓
* publishTweet() : sérialize en JSON
* ↓
* KafkaProducer : envoie vers Kafka
* ↓
* Topic "raw-tweets" : reçoit les tweets bruts
* ↓
* TweetFilter : lit et filtre
*/
public abstract class TweetsProducer implements Runnable {
// Logger pour tracer l'exécution, les erreurs et les événements
private static final Logger logger = Logger.getLogger(TweetsProducer.class.getName());
/**
* CONSTANTE : TOPIC
*
* Le nom du topic Kafka où seront envoyés les tweets bruts.
* - "raw-tweets" = topic source pour tous les tweets non filtrés
* - Les tweets bruts arrivent ici depuis le producteur
* - Les filtres lisent depuis ce topic et les redistribuent
*
* Exemple de flow :
* TweetsProducer → "raw-tweets" (topic) → TweetFilter
*/
private static final String TOPIC = "raw-tweets";
/**
* VARIABLE MEMBRE : producer
*
* Client Kafka PRODUCTEUR qui envoie les messages.
*
* Rôle :
* - Établit la connexion au broker Kafka
* - Envoie les tweets au topic "raw-tweets"
* - Gère les callbacks (succès/erreur de livraison)
*
* Type :
* - KafkaProducer<String, String>
* - Clé : String (identifiant du message, peut être null)
* - Valeur : String (JSON du tweet)
*
* Sérialisation :
* - Les clés et valeurs sont sérialisées en strings
* - StringSerializer convertit les objets en bytes
*/
protected final KafkaProducer<String, String> producer;
/**
* VARIABLE MEMBRE : gson
*
* Outil de sérialisation/désérialisation JSON.
*
* Rôle :
* - Convertit Tweet → String JSON (pour Kafka)
* - Convertit String JSON → Tweet (pour lecture)
* - Gère les types complexes comme OffsetDateTime
*
* Adaptateur personnalisé :
* - OffsetDateTime nécessite un adapter Gson custom
* - Car Gson ne sait pas nativement gérer les OffsetDateTime
*/
private final Gson gson;
/**
* =====================================================
* CONSTRUCTEUR : Initialise le producteur Kafka
* =====================================================
*
* Étapes d'initialisation :
* 1. Configure les propriétés Kafka (broker, sérializers)
* 2. Crée le KafkaProducer
* 3. Configure Gson avec adapter OffsetDateTime
*
* @param brokers Adresse du broker Kafka (ex: "localhost:9092" ou "kafka:9092")
*/
public TweetsProducer(String brokers) {
// ========== ÉTAPE 1 : Configuration du Producer Kafka ==========
Properties props = new Properties();
// Serveur Kafka à contacter
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
// Sérialisation des CLÉS : String → bytes
// Les clés identifient les messages (peuvent être null)
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
// Sérialisation des VALEURS : String (JSON) → bytes
// Les valeurs contiennent les tweets sérialisés en JSON
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
// ========== ÉTAPE 2 : Création du KafkaProducer ==========
// Initialise le producteur avec les propriétés
this.producer = new KafkaProducer<>(props);
// ========== ÉTAPE 3 : Configuration de Gson ==========
// GsonBuilder permet de configurer Gson avec des adaptateurs custom
this.gson = new GsonBuilder()
// Enregistre un adapter pour OffsetDateTime
// Car Gson ne sait pas nativement gérer ce type
.registerTypeAdapter(OffsetDateTime.class, new TypeAdapter<OffsetDateTime>() {
/**
* SÉRIALISATION : OffsetDateTime → JSON
* Convertit une date en string au format ISO-8601
*/
@Override
public void write(JsonWriter out, OffsetDateTime value) throws IOException {
// Cas spécial : si la date est null
if (value == null) {
out.nullValue();
} else {
// Convertit la date en string ISO-8601
// Exemple : "2023-11-25T14:30:00+02:00"
out.value(value.toString());
}
}
/**
* DÉSÉRIALISATION : JSON → OffsetDateTime
* Convertit un string JSON en OffsetDateTime
*/
@Override
public OffsetDateTime read(JsonReader in) throws IOException {
// Vérifie si la valeur JSON est null
if (in.peek() == JsonToken.NULL) {
in.nextNull();
return null;
} else {
// Parse la string au format ISO-8601 en OffsetDateTime
return OffsetDateTime.parse(in.nextString());
}
}
})
// Finalise la configuration Gson
.create();
}
/**
* =====================================================
* MÉTHODE publishTweet() - Envoie un tweet à Kafka
* =====================================================
*
* C'est la méthode clé qui publie les tweets.
*
* Flux complet :
* 1. Valide que le tweet n'est pas null
* 2. Sérialise le tweet en JSON
* 3. Crée un ProducerRecord (message Kafka)
* 4. Envoie le message via producer.send()
* 5. Gère la réponse (succès ou erreur) via callback
*
* Asynchrone :
* - producer.send() retourne immédiatement
* - Le callback s'exécute quand Kafka répond
* - Non-bloquant = bonne performance
*
* @param tweet L'objet Tweet à publier
*/
protected void publishTweet(Tweet tweet) {
// ========== ÉTAPE 1 : Validation du tweet ==========
// Vérifie que le tweet n'est pas null (données invalides)
if (tweet == null) {
logger.warning("Received a null tweet. Skipping.");
return; // Arrête la publication
}
// ========== ÉTAPE 2 : Sérialisation JSON ==========
// Convertit l'objet Tweet en string JSON
// Exemple :
// Tweet(id=123, text="Hello world", ...)
// ↓ gson.toJson()
// {"id":"123", "text":"Hello world", ...}
String jsonTweet = gson.toJson(tweet);
// ========== ÉTAPE 3 : Création du message Kafka ==========
// ProducerRecord = message à envoyer à Kafka
// Format : ProducerRecord<Clé, Valeur>
// - TOPIC : "raw-tweets" (destination)
// - null : pas de clé spécifique (Kafka choisit une partition au hasard)
// - jsonTweet : le JSON du tweet
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, jsonTweet);
// ========== ÉTAPE 4 : Envoi ASYNCHRONE ==========
// producer.send() envoie le message et retourne immédiatement
// Le callback s'exécute quand Kafka répond
producer.send(record, (metadata, exception) -> {
// ===== CALLBACK : Réponse de Kafka =====
if (exception != null) {
// ERREUR : Kafka n'a pas pu livrer le message
logger.log(Level.SEVERE, "Failed to publish tweet", exception);
} else {
// SUCCÈS : Tweet livré à Kafka
// metadata contient les infos :
// - partition : la partition Kafka utilisée
// - offset : la position dans la partition
// - timestamp : quand Kafka a reçu le message
logger.info("Published tweet: " + tweet.getText());
}
});
}
/**
* =====================================================
* MÉTHODE close() - Arrête le producteur
* =====================================================
*
* Ferme proprement la connexion Kafka.
*
* Nettoyage :
* - Flush les messages en attente
* - Ferme la connexion au broker
* - Libère les ressources
*
* Utilisation :
* - Appeler avant d'arrêter le programme
* - Garantit que tous les messages sont envoyés
* - Évite les fuites de ressources
*
* Exemple :
* producer.close(); // Arrête proprement
*/
public void close() {
producer.close();
}
/**
* =====================================================
* MÉTHODE ABSTRAITE run() - À implémenter par les sous-classes
* =====================================================
*
* Chaque sous-classe DOIT implémenter run() pour :
* 1. Générer les tweets (aléatoires, fichier, scénario, etc.)
* 2. Appeler publishTweet() pour envoyer à Kafka
* 3. Gérer la boucle de production (infinie ou finie)
*
* Implémentations :
* - MockTwitterStreamRandom : tweets aléatoires en boucle
* - MockTwitterStreamRecorded : tweets d'un fichier
* - MockTwitterStreamScenario : tweets d'un scénario
*
* Pattern :
* - Interface Runnable : permet l'exécution dans un thread
* - Abstract : force les sous-classes à implémenter
*/
@Override
public abstract void run();
/**
* =====================================================
* MÉTHODE main() - Point d'entrée du programme
* =====================================================
*
* Fonction principale qui lance le service de production de tweets.
*
* Flux :
* 1. Valide les arguments en ligne de commande
* 2. Lit la source de tweets (variable d'environnement ou argument)
* 3. Sélectionne l'implémentation appropriée
* 4. Lance la production (bloquant)
*
* Arguments :
* - arg[0] : bootstrap-servers (OBLIGATOIRE, ex: "kafka:9092")
* - arg[1] : source de tweets (OPTIONNEL, ex: "random", "recorded")
*
* Variable d'environnement :
* - TWEETS_SOURCE : source par défaut si arg[1] absent
* Valeurs : "random", "recorded", "scenario", "mini", "large"
*
* @param args Arguments en ligne de commande
*/
public static void main(String[] args) {
// ========== ÉTAPE 1 : Validation des arguments ==========
if (args.length < 1) {
// Erreur : bootstrap-servers est obligatoire
logger.severe("Usage: TweetsProducer <bootstrap-servers> [source]");
System.exit(1);
}
// Récupère l'adresse du broker Kafka
String bootstrapServers = args[0];
// Récupère la source de tweets :
// - Soit depuis l'argument arg[1]
// - Soit depuis la variable d'environnement TWEETS_SOURCE
// - Soit "random" par défaut
String source = args.length > 1 ? args[1] : System.getenv().getOrDefault("TWEETS_SOURCE", "random");
// ========== ÉTAPE 2 : Logs d'initialisation ==========
logger.info("Starting TweetsProducer service");
logger.info(() -> "Bootstrap servers: " + bootstrapServers);
logger.info(() -> "Tweet source: " + source);
try {
// ========== ÉTAPE 3 : Sélection de l'implémentation ==========
TweetsProducer producer;
// Switch sur la source de tweets
// Chaque cas sélectionne une implémentation différente
switch (source.toLowerCase()) {
// ===== CAS 1 : Tweets d'un scénario =====
case "scenario":
logger.info("Using MockTwitterStreamScenario (scenario-based tweets)");
// MockTwitterStreamScenario génère des tweets basés sur un scénario
producer = new MockTwitterStreamScenario(bootstrapServers);
break;
// ===== CAS 2 : Tweets enregistrés (mini test base) =====
case "recorded":
case "mini":
logger.info("Using MockTwitterStreamRecorded (mini test base)");
// MockTwitterStreamRecorded lit depuis un fichier texte
// "TestBases/miniTestBase.txt" = petit ensemble de tweets pour les tests
producer = new MockTwitterStreamRecorded(bootstrapServers, "TestBases/miniTestBase.txt");
break;
// ===== CAS 3 : Tweets enregistrés (large test base) =====
case "large":
logger.info("Using MockTwitterStreamRecorded (large test base)");
// "TestBases/largeTestBase.txt" = grand ensemble de tweets pour les tests
producer = new MockTwitterStreamRecorded(bootstrapServers, "TestBases/largeTestBase.txt");
break;
// ===== CAS 4 : Tweets aléatoires (par défaut) =====
case "random":
default:
logger.info("Using MockTwitterStreamRandom (random tweets)");
// MockTwitterStreamRandom génère des tweets aléatoires en continu
producer = new MockTwitterStreamRandom(bootstrapServers);
break;
}
// ========== ÉTAPE 4 : Logs de confirmation ==========
logger.info("Producer service initialized");
logger.info("Publishing to topic: raw-tweets");
logger.info("Running in headless mode");
// ========== ÉTAPE 5 : Lancement de la production ==========
// producer.run() bloque indéfiniment jusqu'à Ctrl+C ou exception
// Elle génère continuellement des tweets et les envoie à Kafka
producer.run();
} catch (Exception e) {
// En cas d'erreur critique lors de l'initialisation
logger.log(Level.SEVERE, "Producer service failed", e);
System.exit(1);
}
}
}