Theorems and definitions
PACeLC theorem
Abadi’s “Consistency Tradeoffs in Modern Distributed Database System Design”, 2012.
Theorem: In case of network Partition, the system remains Available or Consistent, else it ensures low Latency or Consistency
- P = (network) Partitioning = A sub-part of the nodes become unreachable.
- A = Availability = Requests try to return the more recent available state of the result.
- C = Consistency = A request result is either up-to-date or an error.
- E = Else
- L = Latency = A request get its result in its more fastly accessible state.
PC | PA | |
---|---|---|
EC | MongoDB, BigTable/HBase and fully ACID$^{[1]}$ systems: VoltDB/H-Store, Megastore, MySQL Cluster | Most in-memory datagrids (Apache Ignite, Hazelcast IMDG) |
EL | PNUTS | DynamoDB, Cassandra, Riak, Cosmos DB |
PACeLC is an extension of the CAP theorem.
ACID DataBase properties (in short)
A transaction is a sequence of database operations that satisfies the following rules:
- [Atomicity] A transaction is managed as a atomic unit that can be fully roll-backed if it fails.
- [Consistency] A transaction is consistent in that it cannot be committed if it violates database system rules.
- [Isolation] Independent transactions are isolated in that running them concurrently or sequentially leads to the same database system state.
- [Durability] A transaction is durable in that once it it fully committed, it will remain committed even in case of system failures (power off, network partitioning…).
Frameworks’ notes
Spark
Hadoop’s MapReduce
Execution steps of a MapReduce job containing 1 Mapper and 1 Reducer (steps in bold rely on hook classes exposed to the user for extension):
- Map phase on mapper nodes:
- RecordReader:
- reads the input file’s blocks from HDFS
- divides them in key->value units called records
- each block’s records will be passed to the same mapper
- Mapper: There is by default as many mappers as input file’s blocks.
- maps through records
- produces 0 to n output record(s) for each input record
- Partitioner:
- organizes mapper output’s records into partitions, based on their keys.
- each partition is intended to be fetched by a single and different reducer node.
- Sort: Sort records on key within each partitions
- (optional) “Combiner” Reducer: an intermediate reducer called on each mapper node.
- within each sorted partition, for each different key, combiner produces an output record having the same type as Mapper ones
- combiner reduces the size of partitions and save network bandwidth
- RecordReader:
- Reduce phase on reduce nodes:
- Fetch: Each reducer node gather its partitions (written by map phase to HDFS), mainly through network connections.
- Sort-Merge: Merge pre-sorted partition chunks into a final sorted reducer input partition.
- Reducer: Produce reduce output record for each key. It leverages that its input is sorted.
ElasticSearch
1h Elasticsearch Tutorial & Getting Started
Parallels with distributed relationnal databases
Elastic Search | Relational DataBase |
---|---|
Cluster | DataBase engine & servers |
Index | Database |
Type | Table |
Document | Record |
Property | Column |
Node | Node |
Index Shard | DB Partition |
Shard’s Replica | Partition’s Replica |
Hadoop
JAVA_HOME
said to be missing but is not
Getting ERROR: JAVA_HOME is not set and could not be found.
but the variable is exported in your .bashrc
and/or .profile
? -> You need to also export it by uncommenting the proper line in <hadoop home>/etc/hadoop/hadoop-env.sh
.
Google Cloud Platform
Submit and monitor a Dataproc spark job (YARN)
Submit
- Submit your spark app in your terminal using
gcloud
cligcloud dataproc jobs submit spark --cluster=<dataproc-cluster-name> --region=<e.g. europe-west1> --jars gs://<path-to-jarname.jar --class com.package.name.to.MainClassName --properties 'spark.executor.cores=2,[...]' -- <arg1 for MainClass> <arg2 for MainClass> [...]
- Visit the page describing your master node VM in Google Cloud Console at:
https://console.cloud.google.com/compute/instancesDetail/zones/<region, e.g.: europe-west1-c>/instances/<dataproc cluster name>-m
- Copy the ephemeral external ip address of the master node VM, we will take the example address
104.155.87.75
as of now - Open a SSH connection to that master VM, forwarding its 1080 port using:
ssh -A -D 1080 104.155.87.75
- Launch a Chrome/Chromium with a proxy on 1080 port (the one passing through our ssh connection)
chromium-browser --user-data-dir --proxy-server="socks5://127.0.0.1:1080"
- Use this browser to access the YARN cluster UI at
http://<dataproc cluster name>-m:8088/cluster/
or the Spark History Server athttp://<dataproc cluster name>-m:18080
BigQuery vs BigTable
- BigQuery excels for OLAP (OnLine Analytical Processing): scalable and efficient analytic querying on unchanging data (or just appending data).
- BigTable excels for OLTP (OnLine Transaction Processing): scalable and efficient read and write
BigQuery
- Storage Level:
- distributed file system: Google Colossus (on hard drives), high troughput thanks to compression
- columnar storage format: Google Capacitor
- Intermediate state storage: in-memory shuffler component (separated from compute ressources)
- Compute Engine: Dremel X (Successor to Dremel)
See: Google’s BigQuery Under the Hood podcast and blog article
Run query using bq
cli
cat /path/to/query.sql | bq query --format json > path/to/output
Find and cancel job
bq ls -j -a --max_results=1000
bq cancel --location=europe-west1 "{JOB_ID}"
Get and Update table schema
- retrieve it:
bq show --format prettyjson project_id:dataset.table | jq ".schema.fields"
- edit
schema_file.json
- update table:
bq update project_id:dataset.table schema.json
Check the slot time consumed by your personal last 10 queries
SELECT
job_id
, (CASE WHEN TIMESTAMP_DIFF(end_time,start_time,MILLISECOND) = 0 THEN NULL ELSE total_slot_ms / TIMESTAMP_DIFF(end_time,start_time,MILLISECOND) END) as num_slot
, total_slot_ms
, *
FROM `region-us`.INFORMATION_SCHEMA.JOBS_BY_PROJECT
WHERE user_email='foo@bar'
ORDER BY creation_time DESC
LIMIT 10
Set partition expiration
to 7 days
bq update --time_partitioning_expiration 604800 --time_partitioning_type DAY project:dataset.table
Delta Lake
DeltaLog & ACID guarantees
0) the DeltaLog
Deltalog = Delta Lake’s transaction log.
The deltalog is a collection of ordered json files. It acts as a single source of truth giving to users access to the last version of a DeltaTable
’s state.
1) Atomicity
- Delta Lake breaks down every operation performed by an user into commits, themselves composed of actions.
- A commit is recorded in the deltalog only once each of its actions has successfully completed (else it is reverted and restarted or an error is thrown), ensuring its atomicity.
2) Consistency
The consistency of a DeltaTable
is guaranteed by their strong schema checking.
3) Isolation
Concurrency of commits is managed to ensure their isolation. An optimistic concurrency control is applied:
- When a commit execution starts, the thread snapshots the current deltalog.
- When the commit actions have completed, the thread checks if the Deltalog has been updated by another one in the meantime:
- If not it records the commit in the deltalog
- Else it updates its
DeltaTable
view and attempts again to register the commit, after a step of reprocessing if needed.
4) Durability
Commits containing actions that mutate the DeltaTable
’s data need to finish their writes/deletions on underlying Parquet files (stored on the filesystem) to be considered as successfully completed, making them durable.
___
Further readings:
Diving Into Delta Lake: Unpacking The Transaction Log
Useful ports
service | port |
---|---|
YARN jobs UI | 8088 |
Spark UI | 4040 |
Spark History Server (Applications level) | 18080 |
Jupyter notebook | 8888 |
File formats
Parquet
Sources:
- Blog post by Mridul Verma
- Twitter engineering blog post
- SO post by Artem Ignatiev
- Blog post by Bartosz Konieczny
Hierarchy
- Root Folder
- File
- Row group
- Column chunk
- Pages: a column chunk is composed of 1 or more data page(s) + optional pages like a dictionary page (only 1 dictionary page per column chunk)
Compression
A compression algorithm is applied by parquet, the compression unit being the page.
Here are the supported algorithms, when set through Spark configuration "spark.sql.parquet.compression.codec"
:
- snappy (default)
- gzip
- lzo
- brotli
- lz4
- zstd
Encodings
https://github.com/apache/parquet-format/blob/master/Encodings.md Parquet framework automatically find out which encoding to use for each column depending on the data type (must be supported by the encoding) and the values characteristics (some rules decide if it is worth to use this or this encoding, depending on the data values). Dictionary encoding is a default that parquet will try on each column: It is considered to be the most efficient encoding if its condition of trigger are met (small number of distinct values).
Dictionary encoding
If dictionary encoding is enabled, parquet’s row groups looks like:
Columns chucks’s data pages consist of a bunch of references relative to the dictionary page that follows them.
This dictionary pages are keys and values encoded using plain encoding.
There is a fallback to plain encoding if it turns out that there is too much unique values to leverage this encoding.
In Spark, one can desactivate dictionary encoding with the config: "parquet.enable.dictionary" -> "false"
, that can be useful as it may take the place of a more efficient encoding. (dictionary encoding taking the place of delta string encoding on BYTE_ARRAY
s for example)
Delta encoding
Supported types are INT32
and INT64
.
This encoding leverage the similarity between successive values, for instance when integers are representing timestamps.
Delta strings encoding
Suppoted type is BYTE_ARRAY
.
Known as Incremental encoding, it is another delta encoding that elude the writing of common prefix and suffix between: may lead to nice compression for close HTML pages.
Data warehouse vs RDBMS vs Key-value store
RDBMS | Data warehouse | Key-Value Store | |
---|---|---|---|
targeted workload | OLTP | OLAP | OLTP |
support relational model | yes | yes | no |
main feature | performant random access using indexes | decouples scaling of storage and processing resources | in memory distributed storage for super fast and scalable key retrieval |
typical data model | 3NF | Dimensional model | key-value pairs |
data format | row-based | columnar | hashmap |
examples | MySQL, Postgres, Oracle | BigQuery, Snowflake, Redshift | Aerospike, Redis, Ignite |
Cloud-based Data Warehouses solutions
Type Snowflake, BigQuery
basic bricks
- infinite cloud file storage capabilities (S3, GCS, Colossus, …)
- a columnar PAX file format providing good space efficiency and query performance (parquet, Capacitor, …)
- table format providing ACID transactions (Iceberg, Delta, …)
- Scalable query execution capabilities (Athena/ Redshift Spectrum, Spark, Hive, Presto, …)
- A user-friendly webapp on top of that
-
Lakehouse
If you have all of this and you use an open table format or file format, then this is called a lakehouse architecture -> ML applications can work with raw files directly without going through the analytics stack.
Databricks lakehouse offer is built with bricks:
- Cloud storage from provider
- parquet
- delta lake
- spark
- notebooks
Fun
One ZB (ZettaByte, 10**21 bytes, 1 billion of GB) of data for one month in archive cloud storage cost 1 billion dollar