database big-data

Spark

Statistics optimisations

Par defaut, spark n'exploite pas les statistiques:

spark.config("spark.sql.cbo.joinReorder.enabled", "true")
spark.config("spark.sql.cbo.enabled","true")
spark.config("spark.sql.cbo.joinReorder.dp.star.filter", "true")
spark.config("spark.sql.statistics.histogram.enabled", "true")

Analyser les statistiques pour les tables

spark.sql(s"ANALYZE TABLE <table> COMPUTE STATISTICS")
spark.sql(s"ANALYZE TABLE <table> COMPUTE STATISTICS FOR COLUMNS <col1>,<col2>")

By the way when analyzing for columns, the whole table statistics are also updated. So the first one is not relevant in case the second one is ran.

Récupérer les statistiques dans le metastore

select * from "TABLE_PARAMS" where  "TBL_ID" = 34900;

  34900 | spark.sql.statistics.colStats.person_id.max           | 9454641234
  34900 | spark.sql.statistics.colStats.person_id.maxLen        | 8
  34900 | spark.sql.statistics.colStats.person_id.avgLen        | 8
  34900 | spark.sql.statistics.colStats.person_id.version       | 1
  34900 | spark.sql.statistics.colStats.person_id.distinctCount | 8119231
  34900 | spark.sql.statistics.colStats.person_id.min           | 185429806
  34900 | spark.sql.statistics.colStats.person_id.nullCount     | 0
  34900 | spark.sql.statistics.totalSize                        | 1738777345
  34900 | spark.sql.statistics.numRows                          | 39720020
  34900 | numFiles                                              | 0
  34900 | totalSize                                             | 0
  34900 | spark.sql.sources.schema.numParts                     | 1
  34900 | EXTERNAL                                              | TRUE
  34900 | spark.sql.create.version                              | 2.4.3
  34900 | spark.sql.sources.provider                            | delta
  34900 | transient_lastDdlTime                                 | 1583162364

The transient_lastDdlTime represents the last updated statistics timestamp. It can be retrieved with:

select to_timestamp(cast(1583162364 as bigint))::timestamp 

    to_timestamp  
---------------------
 2020-03-02 15:19:24

Joins

good article about joins

Range join

efficient range join The idea is to bucket both columns and add this join condition.

Register delta table in hive

you can simply save the table like :

spark.write.format("delta").saveAsTable("the_table")

you can also use an external table :

spark.range(10).write.format("delta").save("/tmp/events")
spark.sql("create table events using delta location '/tmp/events'")
spark.sql("select * from events").show(100)

It is also possible to directly write a delta table into the metastore:

df.write.format("delta").saveAsTable("myDeltaTable")

Partitionning

This will create a folder per enumerated column content. Later, any filter (in, =, rlike) will allow to skip partitions.

WARNING: The partition column shall not contain special spark column character such upper case, dot... Otherwize, the filter will throw an exception.

Bucketing

see

It has two benefits and one drawback. The drawback is it needs to shuffle and sort the data before writing it, hence increasing a lot the creation of the table. The two benefits are:

Best is when both tables are bucketed (again, same bucket number and same bucket column).

It is also possible to join a single bucketed table to a on the fly repartitionned table. The latter has to be repartitioned into the same number bucket and on the same column.

table1.repartition(100) // 20 * 100 parquet files
.write.format("parquet")
.bucketBy(20,"encounter") // 20 buckets
.sortBy("encounter")      // sorted
.saveAsTable("bucketTable")

val notBucketTable = table2
.repartition(20,$"encounter")
.sortWithinPartitions("encounter")

spark.table("bucketTable")
.join(notBucketTable, Seq("encounter"))

Partitionning + Bucketing

It is possible to use both partitionnning and bucketing.

table1.repartition(100).write.mode("overwrite").partitionBy("typesimple").bucketBy(20,"encounter").sortBy("encounter").saveAsTable("edsomop.encounterAphp")

Spark Streaming

A streaming context can be created from a sparkContext:

import org.apache.spark.streaming.{Seconds,StreamingContext}
val ssc = new StreamingContext(spark.sparkContext, Seconds(20)) 
val lines =ssc.textFileStream("hdfs:///user/nparis/streamingRepository/*") 
lines.foreachRDD{rdd => {val df = rdd.toDF;df.show}} 
ssc.start

The textFileStream gets the texts files within a given repository, and produces a RDD with a row per files rows.

A DStream object, supports transformations such filter,count, or map which returns back a new DStream.

The streaming process can only begin when an "output operation" happens on the stream, such print, foreachRDD, saveAsTextFile...

Structured streaming

scaladoc of structured streaming wikibooks

Structured Streaming allows spark streaming to manipulate structured input/output such parquet, csv, json or delta. It can also handle any output datasource thanks to writeStream.foreachand foreachBatch methods.

// schema is necessary
val schema = spark.read.json("structuredStreaming").schema 
val c = spark.readStream.schema(schema).json("structuredStreaming")
// checkpointing is necessary
spark.conf.set("spark.sql.streaming.checkpointLocation","structuredStreamingCheckpoint")
c.writeStream.format("delta").option("path","structuredStreamingDelta").start
// this alternative allows to apply any thing on the stream
c.writeStream.foreachBatch((b,i)=>{println(i);b.show}).start

Each batch add a new parquet file with the content.

delta structured streaming

val in = spark.readStream.format("delta").option("ignoreChanges", "true").option("path","structuredStreamingDelta").load

// update the input table with an other process
DeltaTable.forPath("structuredStreamingDelta").updateExpr("query = '91400+11457491401+54+rue+alphonse+daudet'",Map("query"-> "'daudet'"))

// this will resend all the data of every parquet file containing the updated row.

In case of delta source, it does not handle delete. Hewever it handles updates: every parquet files which are rewritten by the delta acid method are sent again to the streamWriter. This will cause duplicated record and they have to be handled by the datasource.

The main(?) advantage of using delta as a datasource for strucuted streaming vacuum processes allow to reduce the number of files that would increase batch after batch.

RDD Operations

pairRDD scaladoc: RDD transformations Consider the below RDD:

val keysWithValuesList = Array("foo=A", "foo=A", "foo=A", "foo=A", "foo=B", "bar=C", "bar=D", "bar=D")
val data = sc.parallelize(keysWithValuesList)

groupByKey

data
.flatMap(line => line.split("=") )
.map(word => (word,1))
.groupByKey()
.mapValues(_.sum)
+---+---+
| _1| _2|
+---+---+
|bar|  3|
|  A|  4|
|  B|  1|
|foo|  5|
|  C|  1|
|  D|  2|
+---+---+

reduceByKey

data
.flatMap(line => line.split("=") )
.map(word => (word,1))
.reduceByKey((x,y)=>(x+y))
+---+---+
| _1| _2|
+---+---+
|bar|  3|
|  A|  4|
|  B|  1|
|foo|  5|
|  C|  1|
|  D|  2|
+---+---+

aggregateByKey

def
aggregateByKey[U](zeroValue: U)(seqOp: (U, V) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): RDD[(K, U)]

Aggregate the values of each key, using given combine functions and a
neutral "zero value". This function can return a different result
type, U, than the type of the values in this RDD, V. Thus, we need one
operation for merging a V into a U and one operation for merging two
U's, as in scala.TraversableOnce. The former operation is used for
merging values within a partition, and the latter is used for merging
values between partitions. To avoid memory allocation, both of these
functions are allowed to modify and return their first argument
instead of creating a new U.
val kv = data.map(_.split("=")).map(v => (v(0), v(1))).cache()
val initialCount = 0;
val addToCounts = (n: Int, v: String) => n + 1
val sumPartitionCounts = (p1: Int, p2: Int) => p1 + p2
val countByKey = kv.aggregateByKey(initialCount)(addToCounts, sumPartitionCounts)
+---+---+
| _1| _2|
+---+---+
|bar|  3|
|foo|  5|
+---+---+

Catalyst

catalyst optimizer explained

SparkSession

spark concurrent fast queries

This page was last modified: