Nessun risultato. Prova con un altro termine.
Guide
Notizie
Software
Tutorial

Implementazione SPOUT WordReader

Link copiato negli appunti

Per implementare lo SPOUT che abbiamo chiamato WordReader, creiamo una classe nel package SPOUTS e la chiamiamo WordReader. Durante la creazione della classe, dobbiamo avere cura di ereditare la classe BaseRichSPOUT. Per fare ciò, possiamo parametrizzare la maschera di creazione di una classe come nella figura sottostante:

Figura 11. Creazione della classe WordReader
Creazione della classe WordReader

La classe che viene creata ha la struttura seguente:

public class WordReader extends BaseRichSPOUT {
	public void open(Map conf, TOPOLOGYContext context,
		SPOUTOutputCollector collector) {
	}
	public void nextTuple() {
	}
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
	}
}

Il metodo open() viene utilizzato per inizializzare la configurazione dello SPOUT. Il parametro conf contiene tutte le informazioni per la configurazione del cluster Storm (compreso ad esempio il nome del file da utilizzare e che dichiareremo nel TOPOLOGY); il parametro context viene utilizzato per ottenere informazioni relative al processo in esecuzione sul cluster; il parametro collector viene utilizzato per emettere le tuple dallo SPOUT ai BOLTS (operazione svolta nel metodo nextTuple()). Possiamo utilizzare il metodo open per istanziare un oggetto di tipo FileReader, per aprire lo stream con il file sul disco, e per configurare il collector.

A tal proposito, dichiariamo due oggetti della classe: uno di tipo FileReader e un altro di tipo SPOUTOutputCollector. Sulla base di tutte queste informazioni quindi, possiamo implementare il metodo come segue:

public void open(Map conf, TOPOLOGYContext context,
	SPOUTOutputCollector collector) {
		try {
			String nomeFile = conf.get("wordsFile").toString();
			this.fileReader = new FileReader(nomeFile);
		} catch (FileNotFoundException e) {
			throw new RuntimeException("Errore nella lettura del file ["+conf.get("wordFile")+"]");
		}
			this.collector = collector;
}

Il metodo nextTuple() ha un duplice ruolo: leggere lo stream continuamente ed incapsulare i dati letti in tuple per poi emetterle verso i BOLTS. La lettura continua dello stream viene gestita invocando il metodo ripetutamente durante tutta l'esecuzione dello SPOUT. Questa esecuzione ripetuta è un'arma a doppio taglio: se da un lato permette di leggere lo stream facilmente, dall'altro genera attesa attiva anche quando non ci sono dati aggiornati da elaborare. Per rendere l'esecuzione del metodo più efficiente, si può mettere in sleep il thread che esegue il metodo per poi risvegliarlo quando serve. Vediamo praticamente:

public void nextTuple() {
	//Prima parte
	if(completed){
		try {
			Thread.sleep(1000);
		} catch (InterruptedException e) { }
		return;
	}
	//Seconda parte
	String str;
	BufferedReader reader = new BufferedReader(fileReader);
	try{
		while((str = reader.readLine()) != null){
			this.collector.emit(new Values(str));
		}
	}catch(Exception e){
		throw new RuntimeException("Errore nella lettura della tupla",e);
	}finally{
		completed = true;
	}
}

Nella prima parte del codice, viene gestita l'attesa attiva mettendo in sleep il thread se la variabile booleana completed è true. Questa variabile viene settata a true quando termina il ciclo di lettura del file (seconda parte del codice). Ovviamente è necessario dichiarare questa variabile all'interno della classe. La seconda parte del codice invece effettua la lettura del file e ad ogni riga letta, viene invocato emit(): questo è il metodo che inoltra le tuple all'esterno dello SPOUT. I dati letti dal file vengono incapsulati in una tupla istanziando l'oggetto Values.

Terminata la lettura del file (con successo o meno), la variabile completed viene settata a true e dunque la successiva invocazione del metodo innesca l'invocazione del metodo sleep che blocca l'attesa attiva.

Un particolare che fino ad ora non è stato sottolineato è il fatto che un BOLT può ricevere tuple da più SPOUTS. Come possiamo quindi differenziare le tuple? Con il metodo declareOutputFields() possiamo associare un'etichetta alla tupla che lo SPOUT emette.

public void declareOutputFields(OutputFieldsDeclarer declarer) {
	declarer.declare(new Fields("line"));
}

Possiamo incapsulare l'etichetta in un oggetto Fields e associarla alla tupla con il metodo declare() dell'oggetto OutputFieldsDeclarer.

Ti consigliamo anche