database big-data
Spark
→ Statistics optimisations
Par defaut, spark n'exploite pas les statistiques:
spark.config("spark.sql.cbo.joinReorder.enabled", "true")
spark.config("spark.sql.cbo.enabled","true")
spark.config("spark.sql.cbo.joinReorder.dp.star.filter", "true")
spark.config("spark.sql.statistics.histogram.enabled", "true")
→ Analyser les statistiques pour les tables
spark.sql(s"ANALYZE TABLE <table> COMPUTE STATISTICS")
spark.sql(s"ANALYZE TABLE <table> COMPUTE STATISTICS FOR COLUMNS <col1>,<col2>")
By the way when analyzing for columns, the whole table statistics are also updated. So the first one is not relevant in case the second one is ran.
→ Récupérer les statistiques dans le metastore
select * from "TABLE_PARAMS" where "TBL_ID" = 34900;
34900 | spark.sql.statistics.colStats.person_id.max | 9454641234
34900 | spark.sql.statistics.colStats.person_id.maxLen | 8
34900 | spark.sql.statistics.colStats.person_id.avgLen | 8
34900 | spark.sql.statistics.colStats.person_id.version | 1
34900 | spark.sql.statistics.colStats.person_id.distinctCount | 8119231
34900 | spark.sql.statistics.colStats.person_id.min | 185429806
34900 | spark.sql.statistics.colStats.person_id.nullCount | 0
34900 | spark.sql.statistics.totalSize | 1738777345
34900 | spark.sql.statistics.numRows | 39720020
34900 | numFiles | 0
34900 | totalSize | 0
34900 | spark.sql.sources.schema.numParts | 1
34900 | EXTERNAL | TRUE
34900 | spark.sql.create.version | 2.4.3
34900 | spark.sql.sources.provider | delta
34900 | transient_lastDdlTime | 1583162364
The transient_lastDdlTime represents the last updated statistics timestamp. It can be retrieved with:
select to_timestamp(cast(1583162364 as bigint))::timestamp
to_timestamp
---------------------
2020-03-02 15:19:24
→ Joins
→ Range join
efficient range join The idea is to bucket both columns and add this join condition.
→ Register delta table in hive
you can simply save the table like :
spark.write.format("delta").saveAsTable("the_table")
you can also use an external table :
spark.range(10).write.format("delta").save("/tmp/events")
spark.sql("create table events using delta location '/tmp/events'")
spark.sql("select * from events").show(100)
It is also possible to directly write a delta table into the metastore:
df.write.format("delta").saveAsTable("myDeltaTable")
→ Partitionning
This will create a folder per enumerated column content. Later, any filter (in, =, rlike) will allow to skip partitions.
WARNING: The partition column shall not contain special spark column character such upper case, dot... Otherwize, the filter will throw an exception.
→ Bucketing
It has two benefits and one drawback. The drawback is it needs to shuffle and sort the data before writing it, hence increasing a lot the creation of the table. The two benefits are:
- speed-up filters on the column (skips unused buckets)
- speed-up joins on the column
Best is when both tables are bucketed (again, same bucket number and same bucket column).
It is also possible to join a single bucketed table to a on the fly
repartitionned table. The latter has to be repartitioned into the same number bucket and on the same column.
table1.repartition(100) // 20 * 100 parquet files
.write.format("parquet")
.bucketBy(20,"encounter") // 20 buckets
.sortBy("encounter") // sorted
.saveAsTable("bucketTable")
val notBucketTable = table2
.repartition(20,$"encounter")
.sortWithinPartitions("encounter")
spark.table("bucketTable")
.join(notBucketTable, Seq("encounter"))
→ Partitionning + Bucketing
It is possible to use both partitionnning and bucketing.
table1.repartition(100).write.mode("overwrite").partitionBy("typesimple").bucketBy(20,"encounter").sortBy("encounter").saveAsTable("edsomop.encounterAphp")
Spark Streaming
A streaming context can be created from a sparkContext:
import org.apache.spark.streaming.{Seconds,StreamingContext}
val ssc = new StreamingContext(spark.sparkContext, Seconds(20))
val lines =ssc.textFileStream("hdfs:///user/nparis/streamingRepository/*")
lines.foreachRDD{rdd => {val df = rdd.toDF;df.show}}
ssc.start
The textFileStream
gets the texts files within a given repository,
and produces a RDD with a row per files rows.
A DStream object, supports transformations such filter
,count
, or
map
which returns back a new DStream.
The streaming process can only begin when an "output operation"
happens on the stream, such print
, foreachRDD
, saveAsTextFile
...
→ Structured streaming
scaladoc of structured streaming wikibooks
Structured Streaming allows spark streaming to manipulate structured
input/output such parquet
, csv
, json
or delta
. It can also
handle any output datasource thanks to writeStream.foreach
and
foreachBatch
methods.
// schema is necessary
val schema = spark.read.json("structuredStreaming").schema
val c = spark.readStream.schema(schema).json("structuredStreaming")
// checkpointing is necessary
spark.conf.set("spark.sql.streaming.checkpointLocation","structuredStreamingCheckpoint")
c.writeStream.format("delta").option("path","structuredStreamingDelta").start
// this alternative allows to apply any thing on the stream
c.writeStream.foreachBatch((b,i)=>{println(i);b.show}).start
Each batch add a new parquet file with the content.
val in = spark.readStream.format("delta").option("ignoreChanges", "true").option("path","structuredStreamingDelta").load
// update the input table with an other process
DeltaTable.forPath("structuredStreamingDelta").updateExpr("query = '91400+11457491401+54+rue+alphonse+daudet'",Map("query"-> "'daudet'"))
// this will resend all the data of every parquet file containing the updated row.
In case of delta
source, it does not handle delete
. Hewever it
handles updates
: every parquet files which are rewritten by the
delta acid method are sent again to the streamWriter. This will cause
duplicated record and they have to be handled by the datasource.
The main(?) advantage of using delta as a datasource for strucuted
streaming vacuum
processes allow to reduce the number of files that
would increase batch after batch.
→ RDD Operations
pairRDD scaladoc: RDD transformations Consider the below RDD:
val keysWithValuesList = Array("foo=A", "foo=A", "foo=A", "foo=A", "foo=B", "bar=C", "bar=D", "bar=D")
val data = sc.parallelize(keysWithValuesList)
→ groupByKey
data
.flatMap(line => line.split("=") )
.map(word => (word,1))
.groupByKey()
.mapValues(_.sum)
+---+---+
| _1| _2|
+---+---+
|bar| 3|
| A| 4|
| B| 1|
|foo| 5|
| C| 1|
| D| 2|
+---+---+
→ reduceByKey
data
.flatMap(line => line.split("=") )
.map(word => (word,1))
.reduceByKey((x,y)=>(x+y))
+---+---+
| _1| _2|
+---+---+
|bar| 3|
| A| 4|
| B| 1|
|foo| 5|
| C| 1|
| D| 2|
+---+---+
→ aggregateByKey
def
aggregateByKey[U](zeroValue: U)(seqOp: (U, V) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): RDD[(K, U)]
Aggregate the values of each key, using given combine functions and a
neutral "zero value". This function can return a different result
type, U, than the type of the values in this RDD, V. Thus, we need one
operation for merging a V into a U and one operation for merging two
U's, as in scala.TraversableOnce. The former operation is used for
merging values within a partition, and the latter is used for merging
values between partitions. To avoid memory allocation, both of these
functions are allowed to modify and return their first argument
instead of creating a new U.
val kv = data.map(_.split("=")).map(v => (v(0), v(1))).cache()
val initialCount = 0;
val addToCounts = (n: Int, v: String) => n + 1
val sumPartitionCounts = (p1: Int, p2: Int) => p1 + p2
val countByKey = kv.aggregateByKey(initialCount)(addToCounts, sumPartitionCounts)
+---+---+
| _1| _2|
+---+---+
|bar| 3|
|foo| 5|
+---+---+