A Hadoop keretrendszer alapértelmezett számítás végrehajtó motorja a MapReduce [1]. Ez kiválóan alkalmas kötegelt feldolgozáshoz, a map és reduce műveletek klaszteren elosztott, párhuzamos végrehajtásához. A hatékonyságot leginkább az által éri el, hogy nem a nagy adatokat mozgatja a klaszter node-jai között, hanem az azokat feldolgozó programkódot másolja arra a csomópontra, ahol az adatok is vannak. Így a műveletek lokális adatokon, párhuzamosan futnak, és a végén az eredmények összegződnek. Vannak azonban bizonyos használati esetek, amikor ez a megközelítés nem alkalmazható. Ilyenek a következők:
https://www.tutorialspoint.com/apache_spark/images/iterative_operations_on_mapreduce.jpg
https://www.tutorialspoint.com/apache_spark/images/interactive_operations_on_mapreduce.jpg
Jól látszik, hogy a fenti két tipikus BigData felhasználási mód nagyságrendekkel gyorsabb adatmegosztást követel a párhuzamosan futó job-ok között. Mivel mind az iteratív mind pedig az interaktív alkalmazások nagyon népszerűek, szükség volt a klasszikus MapReduce-tól eltérő megközelítések alkalmazására és újabb végrehajtó motorok fejlesztésére. A MapReduce két legelterjedtebb alternatívája/utódja/fejlesztése az alábbi:
Az Apache Tez [2] az Apache Hadoop YARN-ra épülő adatfeldolgozó motor, amely képes komplex, körmentes irányított gráfba (DAG) szervezett feladatok hatékony és elosztott végrehajtására. Mivel a job-okat össze lehet fűzni, és egyszerre végrehajtani őket, ami korábban több MapReduce job futtatását igényelte, a Tez jelentős teljesítmény növekedést hoz a MapReduce-hoz képest.
A fenti ábrán jól látszik, hogy ha például egy Hive SQL lekérdezés a fenti módon több különböző forrást használ (pl. tábla JOIN
vagy egyéb összetett lekérdezés miatt), akkor az több MapReduce job-ot eredményez, amiket külön-külön végre kell hajtani, és az egyes job-ok (köztes) eredményei HDFS-en tárolásra kerülnek. Ezzel szemben ha a Tez végrehajtó motort használjuk (Hive teljes körűen támogatja), a fenti irányított körmentes gráfba rendezett feladatokat kapjuk, amelyek egyben futtathatók a köztes eredmények kiírása nélkül. Ez nagy teljesítmény javulást hoz a MapReduce-hoz képest, emlékezzünk vissza a Hive használata során megjelenő figyelmeztető üzenetekre: MapReduce helyett használjuk a Tez vagy a Spark futtató motort, mert az hatékonyabb!
A Tez az alábbi két csoportba sorolható tervezési elveket követi :
Felhasználást megkönnyítő funkciók:
Végrehajtási teljesítmény:
Az alábbi Java programkód [5] egy két node-ból álló Tez DAG létrehozását és futtatását demonstrálja, amely a klasszikus word count problémát oldja meg:
x
package io.github.ouyi.tez;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.tez.client.CallerContext;
import org.apache.tez.client.TezClient;
import org.apache.tez.dag.api.*;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.client.StatusGetOpts;
import org.apache.tez.mapreduce.input.MRInput;
import org.apache.tez.mapreduce.output.MROutput;
import org.apache.tez.mapreduce.processor.SimpleMRProcessor;
import org.apache.tez.runtime.api.ProcessorContext;
import org.apache.tez.runtime.library.api.KeyValueReader;
import org.apache.tez.runtime.library.api.KeyValueWriter;
import org.apache.tez.runtime.library.api.KeyValuesReader;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfig;
import org.apache.tez.runtime.library.partitioner.HashPartitioner;
import org.apache.tez.runtime.library.processor.SimpleProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Arrays;
import java.util.Optional;
import java.util.Set;
import java.util.StringTokenizer;
public class HelloWorld extends Configured implements Tool {
private static final String INPUT = "Input";
private static final String TOKENIZER_VERTEX = "TokenizerVertex";
private static final String SUMMATION_VERTEX = "SummationVertex";
private static final String OUTPUT = "Output";
private static final int PARALLELISM = 1;
private static final boolean ENABLE_SPLIT_GROUPING = true;
private static final boolean GENERATE_SPLIT_IN_AM = true;
private static final Logger LOGGER = LoggerFactory.getLogger(HelloWorld.class);
public static class TokenProcessor extends SimpleProcessor {
private final IntWritable one = new IntWritable(1);
private final Text word = new Text();
public TokenProcessor(ProcessorContext context) {
super(context);
}
public void run() throws Exception {
Preconditions.checkArgument(getInputs().size() == 1);
Preconditions.checkArgument(getOutputs().size() == 1);
KeyValueReader kvReader = (KeyValueReader) getInputs().get(INPUT).getReader();
KeyValueWriter kvWriter = (KeyValueWriter) getOutputs().get(SUMMATION_VERTEX).getWriter();
while (kvReader.next()) {
StringTokenizer itr = new StringTokenizer(kvReader.getCurrentValue().toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
kvWriter.write(word, one);
}
}
}
}
public static class SumProcessor extends SimpleMRProcessor {
public SumProcessor(ProcessorContext context) {
super(context);
}
public void run() throws Exception {
Preconditions.checkArgument(getInputs().size() == 1);
Preconditions.checkArgument(getOutputs().size() == 1);
KeyValueWriter kvWriter = (KeyValueWriter) getOutputs().get(OUTPUT).getWriter();
KeyValuesReader kvReader = (KeyValuesReader) getInputs().get(TOKENIZER_VERTEX).getReader();
while (kvReader.next()) {
Text word = (Text) kvReader.getCurrentKey();
int sum = 0;
for (Object value : kvReader.getCurrentValues()) {
sum += ((IntWritable) value).get();
}
kvWriter.write(word, new IntWritable(sum));
}
}
}
public int run(String[] args) throws Exception {
// Parse args
LOGGER.info(Arrays.toString(args));
String inputPath = args[0];
String outputPath = args[1];
boolean localMode = args.length > 2 ? Boolean.parseBoolean(args[2]) : false;
// Setup configs
Configuration conf = Optional.ofNullable(getConf()).orElse(new Configuration());
TezConfiguration tezConf = new TezConfiguration(conf);
if (localMode) {
tezConf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, localMode);
conf.set("fs.default.name", "file:///");
conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true);
}
// Create and run DAG
TezClient tezClient = TezClient.create(getClass().getSimpleName(), tezConf);
DAG dag = createDAG(inputPath, outputPath, tezConf);
return runDAG(dag, tezClient, tezConf);
}
private DAG createDAG(String inputPath, String outputPath, TezConfiguration tezConf) {
// Create the tokenizer vertex with the input data source and TextInputFormat
DataSourceDescriptor dataSourceDescriptor = MRInput.createConfigBuilder(new Configuration(tezConf), TextInputFormat.class, inputPath)
.groupSplits(ENABLE_SPLIT_GROUPING)
.generateSplitsInAM(GENERATE_SPLIT_IN_AM)
.build();
Vertex tokenizerVertex = Vertex.create(TOKENIZER_VERTEX, ProcessorDescriptor.create(TokenProcessor.class.getName()))
.addDataSource(INPUT, dataSourceDescriptor);
// Create the summation vertex with the output data source and TextOutputFormat
DataSinkDescriptor dataSinkDescriptor = MROutput.createConfigBuilder(new Configuration(tezConf), TextOutputFormat.class, outputPath)
.build();
Vertex summationVertex = Vertex.create(SUMMATION_VERTEX, ProcessorDescriptor.create(SumProcessor.class.getName()), PARALLELISM)
.addDataSink(OUTPUT, dataSinkDescriptor);
// Create a key-value edge with Text key type and IntWritable value type
OrderedPartitionedKVEdgeConfig edgeConfig = OrderedPartitionedKVEdgeConfig
.newBuilder(Text.class.getName(), IntWritable.class.getName(), HashPartitioner.class.getName())
.setFromConfiguration(tezConf)
.build();
Edge edge = Edge.create(tokenizerVertex, summationVertex, edgeConfig.createDefaultEdgeProperty());
DAG dag = DAG.create("HelloWorld DAG")
.addVertex(tokenizerVertex)
.addVertex(summationVertex)
.addEdge(edge);
return dag;
}
public int runDAG(DAG dag, TezClient tezClient, TezConfiguration tezConf) throws TezException,
InterruptedException, IOException {
try {
tezClient.start();
tezClient.waitTillReady();
// Set up caller context
ApplicationId appId = tezClient.getAppMasterApplicationId();
CallerContext callerContext = CallerContext.create("HelloWorldContext", "Caller id: " + appId, "HelloWorldType", "Tez HelloWorld DAG: " + dag.getName());
dag.setCallerContext(callerContext);
// Submit DAG and wait for completion
DAGClient dagClient = tezClient.submitDAG(dag);
Set<StatusGetOpts> statusGetOpts = Sets.newHashSet(StatusGetOpts.GET_COUNTERS);
DAGStatus dagStatus = dagClient.waitForCompletionWithStatusUpdates(statusGetOpts);
// Check status
if (dagStatus.getState() == DAGStatus.State.SUCCEEDED) {
return 0;
} else {
LOGGER.error("DAG diagnostics: " + dagStatus.getDiagnostics());
return -1;
}
} finally {
tezClient.stop();
}
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(null, new HelloWorld(), args);
System.exit(res);
}
}
Az Apache Spark [3] egy villámgyors klaszter számítási keretrendszer, amit nagyon gyors adatfeldolgozásra terveztek. A Hadoop MapReduce modellen alapul (koncepcionálisan, nem kód szintjén), de olyan módon általánosítja és terjeszti ki azt, ami lehetővé teszi a hatékony felhasználását interaktív lekérdezések készítéséhez vagy stream feldolgozáshoz is. Az alapvető technika, ami ezt a fajta sebesség növekedést lehetővé teszi, a memóriában tárolt klaszter számítási modell. A Spark (ellentétben a gyakori tévhittel) nem a Hadoop módosított verziója, a Hadoop ökoszisztémát tárolásra (HDFS) használja, de ettől sem függ, mert saját klaszter menedzsmenttel is rendelkezik. A Spark-ot széleskörű használatra tervezték, támogatja többek közt a kötegelt feldolgozást, iteratív algoritmusokat, interaktív lekérdezéseket és a stream feldolgozást is, a fent vázolt MapReduce hiányosságokat az alábbi módon küszöböli ki.
Iteratív alkalmazások felgyorsítása a köztes eredmények memóriában való tárolása által.
https://www.tutorialspoint.com/apache_spark/images/iterative_operations_on_spark_rdd.jpg
Interaktív alkalmazások felgyorsítása az adatok egyszeri elosztott memóriába történő beolvasásával.
https://www.tutorialspoint.com/apache_spark/images/interactive_operations_on_spark_rdd.jpg
A számos felhasználási mód támogatása mellett nagy előnye még az üzemeltetési költségek csökkentése azáltal, hogy a különböző feladatok megoldására szolgáló eszközöket mind tartalmazza, nem kell azokat külön-külön karbantartani. A Spark a Hadoop egy al-projektje, amit 2009-ben kezdett fejleszteni Matei Zaharia a UC Berkeley AMPLab-jában. 2010-ben BSD licensszel nyílt forrásúvá tették, és 2013-ban az Apache szoftver alapítványnak adományozták. Így a Spark 2014-től felső szintű Apache projektté vált.
Az Apache Spark az alábbi főbb jellemzőkkel bír:
map
és reduce
műveleteket támogatja. Beépített SQL lekérdezési lehetőséggel, adat streameléssel, gépi tanulási modulokkal és gráf algoritmusokkal is rendelkezik (lásd lenti ábra).A Spark több módon is integrálható Hadoop-pal. Az alábbi ábra szemlélteti a három alapvető konfigurációt, amellyel a Spark és a Hadoop összekapcsolható.
https://www.tutorialspoint.com/apache_spark/images/spark_built_on_hadoop.jpg
Node
b) Vertex
c) Edge
d) DAG
?[4] http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YARN.html
[5] https://github.com/ouyi/tez-demo/blob/master/src/main/java/io/github/ouyi/tez/HelloWorld.java
[6] https://www.xplenty.com/blog/apache-spark-vs-tez-comparison/