https://www.youtube.com/watch?v=g7Qpnmi0Q-s
https://www.youtube.com/watch?v=WdIiTgYI5QI
Ahogy korábban már láttuk, a Hadoop MapReduce egy keretrendszer, amely lehetővé teszi olyan alkalmazások készítését, amelyek hatalmas mennyiségű adatot (több terra byte-os adathalmazok) párhuzamosan dolgoznak fel nagyméretű hagyományos számítógépekből álló klasztereken (több ezer csomópont) megbízható és hibatűrő módon.
Egy MapReduce job általában felosztja a bemeneti adatkészletet független részekre, amelyeket a leképező (map) feladatok teljesen párhuzamosan dolgoznak fel. A keretrendszer rendezi a leképezések kimeneteit, majd ezután a reduce feladatok ezt feldolgozzák. Általában a feladat bemenete és kimenete egy fájlrendszerben kerül tárolásra. A keretrendszer gondoskodik a feladatok ütemezéséről, megfigyeléséről és a sikertelen feladatok újbóli végrehajtásáról.
Egy minimális MapReduce alkalmazáshoz meg kell adnunk a bemeneti/kimeneti adatok helyét és meg kell valósítanunk a map és reduce függvényeket a megfelelő interfészek és / vagy absztrakt osztályok implementálásával. Ezek és más job paraméterek képezik az ún. job konfigurációt .
A Hadoop job kliens ezután elküldi a feladatot (jar / végrehajtható stb.) és a konfigurációt a ResourceManager-nek, amely kiosztja a szoftvert / konfigurációt a worker node-oknak, elvégzi a feladatok ütemezését és megfigyelését, valamint gondoskodik az állapot- és diagnosztikai információknak a klienshez történő visszacsatolásáról.
Bár a Hadoop keretrendszert Java nyelven írták, a MapReduce alkalmazásokat nem kell feltétlenül Java-ban írni.
Kezdjük a MapReduce programozási modellel való ismerkedést egy klasszikus feladaton keresztül. Adott egy vagy több szövegdokumentum, és a feladat az, hogy számítsuk ki az egyes szavak előfordulási gyakoriságát a teljes szöveghalmazra. Ez egy olyan feladat, amit viszonylag könnyen be tudunk illeszteni a MapReduce filozófiába. Lássuk a feladat megoldását egészben a Hadoop MapReduce keretrendszert felhasználva.
ximport java.io.IOException;import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount {
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1); private Text word = new Text();
public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } }
public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } }
public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); }}A program fordítása és parancssori futtatása az alábbi módon történik (feltétel a Hadoop keretrendszer telepítése vagy docker-ben történő futtatása, lásd az első videó leckét, illetve az Apache Hadoop és HDFS gyakorlati anyagokat):
xxxxxxxxxx$ export HADOOP_CLASSPATH=$JAVA_HOME/lib/tools.jar$ hadoop com.sun.tools.javac.Main WordCount.java$ jar cf wc.jar WordCount*.classFuttatás előtt a megfelelő input szöveg állományokat file01 és file02 másoljuk fel a HDFS fájlrendszerre
xxxxxxxxxx$ hadoop fs -mkdir /input$ hadoop fs -copyFromLocal file01 /input/file01$ hadoop fs -copyFromLocal file02 /input/file02$ hadoop fs -ls /input/input/file01/input/file02
$ hadoop fs -cat /input/file01Hello World Bye World
$ hadoop fs -cat /input/file02Hello Hadoop Goodbye HadoopEzután már futtathatjuk a MapReduce alkalmazásunkat e lefordított jar segítségével:
xxxxxxxxxx$ hadoop jar wc.jar WordCount /input /output
$ hadoop fs -cat /output/part-r-00000Bye 1Goodbye 1Hadoop 2Hello 2World 2xxxxxxxxxxpublic void map(Object key, Text value, Context context ) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); }}A map függvény megvalósítása a Mapper interfész implementálását jelenti, ami soronként dolgozza fel a bemenetet, aminek a típusa Text. Ezután tokenizálja a sort whitespace-k mentén, és minden egyes tokenhez (azaz szóhoz jelen esetben) egy kulcs-érték párt ír ki (emit) a következő formában: < <szó>, 1> Jelen példában az első map eredménye:
xxxxxxxxxx< Hello, 1>< World, 1>< Bye, 1>< World, 1>
Míg a második map eredménye:
xxxxxxxxxx< Hello, 1>< Hadoop, 1>< Goodbye, 1>< Hadoop, 1>
A WordCount példa használ combiner task-ot is, így minden map eredménye keresztülmegy egy lokális combiner folyamaton (ami jelen esetben ugyanaz, mint a reduce), hogy a map eredményeit lokálisan aggregáljuk, miután kulcsok szerint sorba rendeztük őket.
Ez első map eredményei a combine után így alakulnak:
xxxxxxxxxx< Bye, 1>< Hello, 1>< World, 2>
Míg a másodiké így:
xxxxxxxxxx< Goodbye, 1>< Hadoop, 2>< Hello, 1>
Ezek az adatok aztán bekerülnek a reduce függvénybe feldolgozásra:
xxxxxxxxxxpublic void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result);}Figyeljük meg, hogy a reduce függvény kulcsonként kapja meg az értékek listáját, amiket a map (vagy ha használjuk a combiner) függvények állítanak elő (amennyiben combiner-t használunk, az is ily módon kapja meg a bemenetet). Jelen esetben ezeket:
xxxxxxxxxx< Bye, <1> >< Goodbye, <1> >< Hadoop, <2> >< Hello, <1, 1> >< World, <2> >
A reduce végeredménye tehát a következő:
xxxxxxxxxx< Bye, 1>< Goodbye, 1>< Hadoop, 2>< Hello, 2>< World, 2>
Amit a HDFS fájlrendszer /output mappába generált fájl tartalmazza. A program main metódusában számos beállítást adtunk meg:
Készítsünk egy olyan MapReduce programot, amely kiszámítja a daily_csv.csv adatfájlban szereplő összes ország pénznemének USD-hez viszonyított átlagos árfolyamát a napi árfolyam adatok alapján. Az árfolyam adatfájl cím és első adatsora így néz ki:
xxxxxxxxxxDate,Country,Value1971-01-04,Australia,0.8987
Az adatfájl több százezer sornyi napi árfolyam adatot tartalmaz 22 országra. Írjuk meg a fenti példa alapján azt a MapReduce alkalmazást, ami ehhez a 22 országhoz kiszámítja az összes napra vett átlagos árfolyamértéket.
A mapper megvalósítás:
xxxxxxxxxxpublic static class TokenizerMapper extends Mapper<Object, Text, Text, DoubleWritable> {
private final static DoubleWritable currencyPrice = new DoubleWritable(); private final Text country = new Text();
public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString(), "\n"); while (itr.hasMoreTokens()) { String csvLine = itr.nextToken(); if (csvLine.startsWith("Date")) { continue; } String[] csvVals = csvLine.split(","); country.set(csvVals[1]); double val = csvVals.length < 3 ? 0.0 : Double.parseDouble(csvVals[2]); if (val > 0) { currencyPrice.set(val); context.write(country, currencyPrice); } } } }StringTokenizer soronként bontja az inputotA reducer megvalósítás:
xxxxxxxxxxpublic static class AvgReducer extends Reducer<Text,DoubleWritable,Text,DoubleWritable> { private final DoubleWritable result = new DoubleWritable();
public void reduce(Text key, Iterable<DoubleWritable> values, Context context ) throws IOException, InterruptedException { double sum = 0.0; int count = 0; for (DoubleWritable val : values) { count++; sum += val.get(); } result.set(sum/count); context.write(key, result); } }Végig megyünk az egy kulcshoz (ország) tartozó értékeken (napi árfolyam adatok) és képezzük azok átlagát
[2] https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/HdfsDesign.html
[3] https://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YARN.html
[4] https://hadoop.apache.org/docs/r1.2.1/streaming.html
[5] https://hadoop.apache.org/docs/current/api/org/apache/hadoop/mapred/pipes/package-summary.html
[8] https://data-flair.training/blogs/hadoop-combiner-tutorial