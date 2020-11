Obsah

Ve zhruba posledním desetiletí se stále častěji můžeme setkat s takzvanými sloupcovými databázemi neboli column-oriented DBMS popř. columnar DBMS, v nichž se – jak už jejich název naznačuje – data ukládají po sloupcích a nikoli po jednotlivých záznamech (řádcích). Způsob uložení dat má poměrně velký vliv jak na velikost výsledných souborů (lze totiž použít efektivní metody kódování a popř. i komprimace údajů), tak i na rychlost přístupu k datům ve chvíli, kdy potřebujeme zpracovat pouze jeden či několik sloupců – a to bývá v některých oblastech velmi častý požadavek („vypočti průměrnou útratu“ atd.). Nejdůležitější vlastnosti sloupcových databází byly popsány v pěkném článku Několik poznámek ke sloupcovým databázím od Pavla Stěhuleho a právě s těmito vlastnostmi se setkáme při studiu resp. při používání Parquet souborů (viz navazující kapitoly).

Poznámka: uložení do sloupců lze realizovat i v některých klasických relačních databázích – ostatně to, že je databáze relační vlastně nijak neurčuje fyzický způsob uložení dat.

Dnes se zaměříme na přímou práci s Parquet soubory, a to s využitím programovacího jazyka Go a balíčku go-parquet. Jedná se o souborový formát určený právě pro použití ve sloupcových databázích. Ovšem Parquet soubory lze vytvářet a popř. i číst i bez toho, aby nad nimi byla vytvořena další mezivrstva (nebo chcete-li rozhraní) ve formě SQL či jiného podobně koncipovaného DSL. Příklad použití můžeme vidět na prvním obrázku:

Obrázek 1: Parquet soubory mohou vznikat například na konci nějaké „pipeline“, v níž se shromažďují a transformují nějaká vstupní data (události). V takovém případě lze do souborů zapisovat přímo, bez použití DSL.

2. Souborový formát Parquet

Formát Parquet souborů byl navržen s ohledem na to, aby bylo data možné kódovat a zmenšit tak jejich velikost. Podporována je i komprimace na úrovni jednotlivých sloupců (které jsou ovšem rozděleny do oddílů). Zmenšení velikosti souborů nevede jen k úsporám místa na médiu (to již nemusí být kritické), ale (společně se zápisem po sloupcích) i k výraznému urychlení přístupu k datům – méně načtených bloků z disku, méně výpadků L1 a L2 cache a v případě Parquet formátu se jeho tvůrci zaměřili i na to, aby programový kód obsahoval co nejméně rozeskoků (a tím pádem potenciálních výpadků instrukční pipeline).

Parquet formát podporuje následující tzv. primitivní typy hodnot:

# Typ Popis Poznámka 1 BOOLEAN 1 bit osm bitů ve sloupci zakódováno do jediného bajtu 2 INT32 32 bitů lze využít DELTA kódování 3 INT64 64 bitů lze využít DELTA kódování 4 INT96 96 bitů z historických důvodů 5 FLOAT 32 bitů podle IEEE 754 6 DOUBLE 64 bitů podle IEEE 754 7 BYTE_ARRAY libovolná délka pro řetězce, seznamy atd. 8 FIXED_LEN_BYTE_ARRAY specifikovaná délka pro řetězce, seznamy atd.

Tyto primitivní typy jsou namapovány na logické typy následujícím způsobem:

Parquet Type Primitive Type Go Type BOOLEAN BOOLEAN bool INT32 INT32 int32 INT64 INT64 int64 INT96 INT96 string FLOAT FLOAT float32 DOUBLE DOUBLE float64 BYTE_ARRAY BYTE_ARRAY string FIXED_LEN_BYTE_ARRAY FIXED_LEN_BYTE_ARRAY string UTF8 BYTE_ARRAY string INT 8 INT32 int8 INT 16 INT32 int16 INT 32 INT32 int32 INT 64 INT64 int64 UINT 8 INT32 uint8 UINT 16 INT32 uint16 UINT 32 INT32 uint32 UINT 64 INT64 uint64 DATE INT32 int32 TIME_MILLIS INT32 int32 TIME_MICROS INT64 int64 TIMESTAMP_MILLIS INT64 int64 TIMESTAMP_MICROS INT64 int64 INTERVAL FIXED_LEN_BYTE_ARRAY string DECIMAL INT32,INT64,FIXED_LEN_BYTE_AR­RAY,BYTE_ARRAY int32, int64, string, string LIST slice MAP map

Poznámka: dnes se zaměříme na práci s typy BOOLEAN, INT32/INT64 a UTF8, tedy na ukládání pravdivostních hodnot, celých čísel a řetězců. Příště si vysvětlíme složitější datové typy a taktéž způsob zpracování hodnoty NULL.

Obrázek 2: I když to může vypadat podivně, setkáme se i s následující pipeline, která kombinuje Apache Kafku (tedy proud událostí), relační databázi uchovávající aktuální stav „světa“ a Parquet soubory.

3. Zápis záznamů do Parquet souborů

Nejprve se podívejme na způsob zápisu záznamů do Parquet souborů. K tomuto účelu použijeme knihovnu nazvanou parquet-go. Vytvoříme si kostru projektu:

$ go mod init parquet-writer

V souboru go.mod buď přímo upravíme seznam potřebných balíčků:

module parquet-generator go 1.13 require ( github.com/xitongsys/parquet-go v1.5.4 github.com/xitongsys/parquet-go-source v0.0.0-20201108113611-f372b7d813be )

Nebo přímo začneme psát zdrojový kód s tím, že se seznam závislých balíčků nastaví automaticky při překladu aplikace:

package main import ( "log" "os" "github.com/xitongsys/parquet-go/parquet" "github.com/xitongsys/parquet-go/writer" )

V projektu vytvoříme běžnou datovou strukturu, u které v komentáři popíšeme mapování na sloupce v Parquet souboru, datový typ sloupce a popř. i kódování (viz další kapitoly):

type Record struct { Id int64 `parquet:"name=id, type=INT64"` Name string `parquet:"name=name, type=UTF8, encoding=PLAIN"` Surname string `parquet:"name=surname, type=UTF8, encoding=PLAIN"` Active bool `parquet:"name=active, type=BOOLEAN"` Remark string `parquet:"name=remark, type=UTF8, encoding=PLAIN"` }

Poznámka: podobným způsobem jsme již popisovali mapování mezi prvky struktury a soubory JSON. Nejedná se o ideální způsob, protože případné chyby nejsou odhaleny překladačem, ovšem současná verze jazyka Go nám lepší způsob neumožňuje.

Dále vytvoříme nový soubor a následně zkonstruujeme instanci objektu typu ParquetWriter. Můžeme taktéž nastavit velikost bloků a použitý komprimační algoritmus:

w, err := os.Create("flat.parquet") if err != nil { log.Println("Can't create local file", err) return } defer w.Close() // initialize Parquet file writer pw, err := writer.NewParquetWriterFromWriter(w, new(Record), 1) if err != nil { log.Println("Can't create parquet writer", err) return } pw.RowGroupSize = 128 * 1024 * 1024 //128M pw.CompressionType = parquet.CompressionCodec_SNAPPY defer stopWrite(pw)

Poznámka: povšimněte si volání stopWrite na konci práce s Parquet souborem. Potřebujeme totiž explicitně zjistit případné chyby – ty se mnohdy hlásí až při uzavírání souboru! (například špatné mapování atd.)

Samotný zápis je realizován ve funkci writeRecord, která je součástí úplného zdrojového kódu tohoto demonstračního příkladu:

package main import ( "log" "os" "github.com/xitongsys/parquet-go/parquet" "github.com/xitongsys/parquet-go/writer" ) const defaultOutputFile = "flat.parquet" // Record represents one record stored in Parquet file type Record struct { Id int64 `parquet:"name=id, type=INT64"` Name string `parquet:"name=name, type=UTF8, encoding=PLAIN"` Surname string `parquet:"name=surname, type=UTF8, encoding=PLAIN"` Active bool `parquet:"name=active, type=BOOLEAN"` Remark string `parquet:"name=remark, type=UTF8, encoding=PLAIN"` } func writeRecord(pw *writer.ParquetWriter) { // create report structure to be stored in Parquet file record := Record{ Id: int64(1), Name: "Pepa", Surname: "Vyskoč", Active: false, Remark: "foo bar baz", } // write the record structure into Parquet file err := pw.Write(record) if err != nil { log.Println("Write into Parquet error", err) } } // stopWrite function stop writing into Parquet file func stopWrite(pw *writer.ParquetWriter) { err := pw.WriteStop() // most write errors are caught at this time if err != nil { log.Println("WriteStop error", err) } } func main() { w, err := os.Create("flat.parquet") if err != nil { log.Println("Can't create local file", err) return } defer w.Close() // initialize Parquet file writer pw, err := writer.NewParquetWriterFromWriter(w, new(Record), 1) if err != nil { log.Println("Can't create parquet writer", err) return } pw.RowGroupSize = 128 * 1024 * 1024 //128M pw.CompressionType = parquet.CompressionCodec_SNAPPY defer stopWrite(pw) writeRecord(pw) log.Println("Write Finished") }

4. Přečtení záznamu z Parquet souboru

Ukažme si i způsob přečtení záznamů z Parquet souboru. Na tomto místě je však vhodné poznamenat, že čtení po záznamech a nikoli po sloupcích nebude efektivní, zejména v případech, kdy postupně budeme číst jednotlivé záznamy a nikoli celé bloky. To si ostatně ukážeme v posledních kapitolách věnovaných jednoduchým benchmarkům.

Otevření souboru pro čtení probíhá podobným způsobem, jako jeho otevření pro zápis. Zajímavé je, že lze určit počet gorutin s programovým kódem, který ze souborů čte. V našem případě je sice počet gorutin nastaven na 4, ovšem využije se gorutina jediná:

const parallelNumber = 4 fileReader, err := local.NewLocalFileReader(fileName) if err != nil { log.Fatal("Can't open file", err) return } // fileReader needs to be closed properly defer closeReader(fileReader) // initializa Parquet file reader parquetReader, err := reader.NewParquetReader(fileReader, new(Record), parallelNumber) if err != nil { log.Fatal("Can't create parquet reader", err) return } // parquetReader needs to be stopped defer parquetReader.ReadStop()

Přečtení záznamů tím nejméně efektivním způsobem, tedy po jednotlivých záznamech (tím se zcela zbavujeme výhod sloupcových databází!). Nejprve vytvoříme řez s jediným prvkem, který následně načteme:

recordCount := int(parquetReader.GetNumRows()) // try to read and display all records for i := 0; i < recordCount; i++ { record := make([]Record, 1) // try to read record err := parquetReader.Read(&record) if err != nil { log.Println("Read error", err) } else { // and display it log.Println(record) } }

Následuje výpis úplného zdrojového kódu tohoto demonstračního příkladu:

// This tool is able to read all records stored in selected Parquet file. // Currently, only records with the structure `Record` is read correctly. Name // of input Parquet file needs to be selected from command line. package main import ( "log" "github.com/xitongsys/parquet-go-source/local" "github.com/xitongsys/parquet-go/reader" "github.com/xitongsys/parquet-go/source" ) const defaultInputFile = "flat.parquet" // Record represents one record stored in Parquet file type Record struct { Id int64 `parquet:"name=id, type=INT64"` Name string `parquet:"name=name, type=UTF8, encoding=PLAIN"` Surname string `parquet:"name=surname, type=UTF8, encoding=PLAIN"` Active bool `parquet:"name=active, type=BOOLEAN"` Remark string `parquet:"name=remark, type=UTF8, encoding=PLAIN"` } // closeReader tries to close the given Parquet file reader func closeReader(reader source.ParquetFile) { err := reader.Close() if err != nil { log.Println("close reader:", err) } } func displayContentOfParquetFile(fileName string) { const parallelNumber = 4 // construct the file reader and try to open the Parquet file for // reading fileReader, err := local.NewLocalFileReader(fileName) if err != nil { log.Fatal("Can't open file", err) return } // fileReader needs to be closed properly defer closeReader(fileReader) // initializa Parquet file reader parquetReader, err := reader.NewParquetReader(fileReader, new(Record), parallelNumber) if err != nil { log.Fatal("Can't create parquet reader", err) return } // parquetReader needs to be stopped defer parquetReader.ReadStop() displayRecords(parquetReader) } // displayRecords function lists all records from Parquet file func displayRecords(parquetReader *reader.ParquetReader) { recordCount := int(parquetReader.GetNumRows()) // try to read and display all records for i := 0; i < recordCount; i++ { record := make([]Record, 1) // try to read record err := parquetReader.Read(&record) if err != nil { log.Println("Read error", err) } else { // and display it log.Println(record) } } } func main() { displayContentOfParquetFile(defaultInputFile) }

5. Ukládání pravdivostních hodnot

Ve druhé kapitole jsme si řekli, že pravdivostní hodnoty jsou ukládány takovým způsobem, že se vždy osm hodnot umístí do jediného bajtu. Takové sloupce – a ty bývají v OLAP poměrně časté – jsou tak ukládány velmi efektivním způsobem. Ukažme si to na příkladu se záznamy obsahujícími jediný prvek – tím pádem bude výsledný Parquet soubor obsahovat jediný sloupec:

package main import ( "log" "os" "github.com/xitongsys/parquet-go/parquet" "github.com/xitongsys/parquet-go/writer" ) const defaultOutputFile = "flat.parquet" // Record represents one record stored in Parquet file type Record struct { Active bool `parquet:"name=active, type=BOOLEAN"` } func writeRecords(pw *writer.ParquetWriter, n int) { // create report structure to be stored in Parquet file record := Record{ Active: false, } for i := 0; i < n; i++ { record.Active = i%2 == 0 // write the record structure into Parquet file err := pw.Write(record) if err != nil { log.Println("Write into Parquet error", err) } } } // stopWrite function stop writing into Parquet file func stopWrite(pw *writer.ParquetWriter) { err := pw.WriteStop() // most write errors are caught at this time if err != nil { log.Println("WriteStop error", err) } } func createAndWriteIntoParquetFile(filename string, records int) { w, err := os.Create(filename) if err != nil { log.Println("Can't create local file", err) return } defer w.Close() // initialize Parquet file writer pw, err := writer.NewParquetWriterFromWriter(w, new(Record), 1) if err != nil { log.Println("Can't create parquet writer", err) return } pw.RowGroupSize = 128 * 1024 * 1024 //128M pw.CompressionType = parquet.CompressionCodec_UNCOMPRESSED defer stopWrite(pw) writeRecords(pw, records) log.Println("Write Finished") } func main() { createAndWriteIntoParquetFile("0records.parquet", 0) createAndWriteIntoParquetFile("1record.parquet", 1) createAndWriteIntoParquetFile("10records.parquet", 10) createAndWriteIntoParquetFile("100records.parquet", 100) createAndWriteIntoParquetFile("1000records.parquet", 1000) createAndWriteIntoParquetFile("10000records.parquet", 10000) createAndWriteIntoParquetFile("100000records.parquet", 100000) }

Poznámka: po spuštění příkladu se vytvoří osm souborů s 0, 1, 10, 100, 1000, 10000 a 100000 prvky v jediném sloupci.

6. Porovnání výsledků – rozdíl mezi velikostí čistých dat a velikostí Parquet souboru

Formát Parquet souborů je popsán zde. V souborech se kromě vlastních dat nachází i další pomocné údaje, hlavičky atd., takže (pokud nezapneme komprimaci sloupců) bude velikost souboru vždy větší, než teoretická velikost, kterou získáme, pokud vynásobíme šířku dat v každém sloupci počtem záznamů. Ostatně se můžeme podívat, jaká je velikost souborů s jediným sloupcem hodnot typu bool. Teoretická velikost je vypočtena jako celkový počet záznamů podělených osmi, protože každá pravdivostní hodnota je reprezentována jediným bitem:

# Počet záznamů Teoretická velikost Skutečná velikost Rozdíl 1 0 0 91 91 2 1 1 175 174 3 10 2 176 174 4 100 13 191 178 5 1000 126 308 182 6 10000 1250 1471 221 7 100000 12500 13472 972

Vidíme, že v tomto případě s rostoucím počtem záznamů klesá poměr dalších údajů na cca 7%.

7. Ukládání celočíselných hodnot

Ve čtvrtém demonstračním příkladu je ukázáno ukládání celočíselných hodnot typu uint8. Takové hodnoty nejsou v Parquet formátu nativně podporovány (opět viz druhou kapitolu), takže se ve skutečnosti budou ukládat 32bitové hodnoty, což může být dosti neefektivní (pokud tedy použijeme výchozí kódování PLAIN – viz podtržený text):

package main import ( "log" "os" "github.com/xitongsys/parquet-go/parquet" "github.com/xitongsys/parquet-go/writer" ) const defaultOutputFile = "flat.parquet" // Record represents one record stored in Parquet file type Record struct { ID uint8 `parquet:"name=id, type=UINT_8, encoding=PLAIN"` } func writeRecords(pw *writer.ParquetWriter, n int) { // create report structure to be stored in Parquet file record := Record{} for i := 0; i < n; i++ { record.ID = uint8(i % 256) // write the record structure into Parquet file err := pw.Write(record) if err != nil { log.Println("Write into Parquet error", err) } } } // stopWrite function stop writing into Parquet file func stopWrite(pw *writer.ParquetWriter) { err := pw.WriteStop() // most write errors are caught at this time if err != nil { log.Println("WriteStop error", err) } } func createAndWriteIntoParquetFile(filename string, records int) { w, err := os.Create(filename) if err != nil { log.Println("Can't create local file", err) return } defer w.Close() // initialize Parquet file writer pw, err := writer.NewParquetWriterFromWriter(w, new(Record), 1) if err != nil { log.Println("Can't create parquet writer", err) return } pw.RowGroupSize = 128 * 1024 * 1024 //128M pw.CompressionType = parquet.CompressionCodec_UNCOMPRESSED defer stopWrite(pw) writeRecords(pw, records) log.Println("Write Finished") } func main() { createAndWriteIntoParquetFile("0records.parquet", 0) createAndWriteIntoParquetFile("1record.parquet", 1) createAndWriteIntoParquetFile("10records.parquet", 10) createAndWriteIntoParquetFile("100records.parquet", 100) createAndWriteIntoParquetFile("1000records.parquet", 1000) createAndWriteIntoParquetFile("10000records.parquet", 10000) createAndWriteIntoParquetFile("100000records.parquet", 100000) }

Efektivita uložení malých celočíselných hodnot se může razantně zvýšit při použití kódování DELTA_BINARY_PACKED, což je ostatně jediná změna, kterou jsme provedli v pořadí již pátém demonstračním příkladu (viz podtržená část):

package main import ( "log" "os" "github.com/xitongsys/parquet-go/parquet" "github.com/xitongsys/parquet-go/writer" ) const defaultOutputFile = "flat.parquet" // Record represents one record stored in Parquet file type Record struct { ID uint8 `parquet:"name=id, type=UINT_8, encoding=DELTA_BINARY_PACKED"` } func writeRecords(pw *writer.ParquetWriter, n int) { // create report structure to be stored in Parquet file record := Record{} for i := 0; i < n; i++ { record.ID = uint8(i % 256) // write the record structure into Parquet file err := pw.Write(record) if err != nil { log.Println("Write into Parquet error", err) } } } // stopWrite function stop writing into Parquet file func stopWrite(pw *writer.ParquetWriter) { err := pw.WriteStop() // most write errors are caught at this time if err != nil { log.Println("WriteStop error", err) } } func createAndWriteIntoParquetFile(filename string, records int) { w, err := os.Create(filename) if err != nil { log.Println("Can't create local file", err) return } defer w.Close() // initialize Parquet file writer pw, err := writer.NewParquetWriterFromWriter(w, new(Record), 1) if err != nil { log.Println("Can't create parquet writer", err) return } pw.RowGroupSize = 128 * 1024 * 1024 //128M pw.CompressionType = parquet.CompressionCodec_UNCOMPRESSED defer stopWrite(pw) writeRecords(pw, records) log.Println("Write Finished") } func main() { createAndWriteIntoParquetFile("0records.parquet", 0) createAndWriteIntoParquetFile("1record.parquet", 1) createAndWriteIntoParquetFile("10records.parquet", 10) createAndWriteIntoParquetFile("100records.parquet", 100) createAndWriteIntoParquetFile("1000records.parquet", 1000) createAndWriteIntoParquetFile("10000records.parquet", 10000) createAndWriteIntoParquetFile("100000records.parquet", 100000) }

8. Porovnání výsledků – přímé uložení celočíselných hodnot vs. změna hodnot

Opět se podívejme na rozdíl mezi přímým uložením celočíselných hodnot (čtyři bajty na každou hodnotu) a uložením pouze změn mezi dvěma sousedícími záznamy:

# Počet záznamů PLAIN DELTA_BINARY_PACKED % 1 0 89 89 100% 2 1 196 197 101% 3 10 235 202 86% 4 100 604 206 34% 5 1000 4201 682 16% 6 10000 40485 6545 16% 7 100000 403015 63807 16%

Poznámka: kódování DELTA_, tedy uložení rozdílů, je podporováno i u časových razítek, kde je velmi efektivní, například při práci s logy atd. To si ostatně ukážeme příště.

9. Nastavení komprimačního algoritmu

Sloupce v Parquet souborech je možné buď ukládat přímo tak, jak byl vypočítán jejich obsah (přímé popř. delta kódování + ukládání bajtů po osmicích), nebo je možné obsah sloupců zkomprimovat. Komprimační algoritmus se nastavuje následovně:

pw, err := writer.NewParquetWriterFromWriter(w, new(Record), 1) pw.RowGroupSize = 128 * 1024 * 1024 //128M

Volit je možné mezi přímým zápisem (žádný algoritmus), algoritmem Snappy založeným na slavném LZ77 (viz https://en.wikipedia.org/wi­ki/Snappy_(compression)) a klasickým GZIPem:

# Algoritmus 1 parquet.CompressionCodec_UNCOMPRESSED 2 parquet.CompressionCodec_SNAPPY 3 parquet.CompressionCodec_GZIP

Poznámka: v naprosté většině případů nemá komprimace zásadní negativní vliv na rychlost zápisu (spíše naopak při použití pevných disků nebo vzdáleného úložiště), ovšem může ovlivnit náhodné čtení z Parquet souborů.

Podívejme se nyní na úplný zdrojový kód příkladu, v němž se provádí zápis stejných dat, ovšem pokaždé s jiným komprimačním algoritmem:

package main import ( "log" "os" "github.com/xitongsys/parquet-go/parquet" "github.com/xitongsys/parquet-go/writer" ) const defaultOutputFile = "flat.parquet" // Record represents one record stored in Parquet file type Record struct { Active bool `parquet:"name=active, type=BOOLEAN"` } func writeRecords(pw *writer.ParquetWriter, n int) { // create report structure to be stored in Parquet file record := Record{ Active: false, } for i := 0; i < n; i++ { record.Active = i%2 == 0 // write the record structure into Parquet file err := pw.Write(record) if err != nil { log.Println("Write into Parquet error", err) } } } // stopWrite function stop writing into Parquet file func stopWrite(pw *writer.ParquetWriter) { err := pw.WriteStop() // most write errors are caught at this time if err != nil { log.Println("WriteStop error", err) } } func createAndWriteIntoParquetFile(filename string, records int, compression parquet.CompressionCodec) { w, err := os.Create(filename) if err != nil { log.Println("Can't create local file", err) return } defer w.Close() // initialize Parquet file writer pw, err := writer.NewParquetWriterFromWriter(w, new(Record), 1) if err != nil { log.Println("Can't create parquet writer", err) return } pw.RowGroupSize = 128 * 1024 * 1024 //128M pw.CompressionType = compression defer stopWrite(pw) writeRecords(pw, records) log.Println("Write Finished") } func main() { createAndWriteIntoParquetFile("1000000records_compression_none.parquet", 1000000, parquet.CompressionCodec_UNCOMPRESSED) createAndWriteIntoParquetFile("1000000records_compression_snappy.parquet", 1000000, parquet.CompressionCodec_SNAPPY) createAndWriteIntoParquetFile("1000000records_compression_gzip.parquet", 1000000, parquet.CompressionCodec_GZIP) }

10. Porovnání výsledků – vliv komprimačního algoritmu na výslednou velikost souboru

Opět si porovnejme výsledky velikostí souborů při použití různých komprimačních algoritmů. V následující tabulce je zobrazena jak výsledná velikost souboru, tak i poměr velikosti souboru vůči nezkomprimované variantě (první řádek):

# Algoritmus Velikost % 1 UNCOMPRESSED 133452 100% 2 SNAPPY 15045 11% 3 GZIP 17056 13%

Poznámka: ve skutečnosti lze volit komprimační algoritmus pro každý sloupec zvlášť, což se hodí při práci s opravdu rozsáhlými soubory, u nichž se vyplatí provádět měření a analýzy.

11. Ukládání řetězců

V Parquet souborech lze pochopitelně pracovat i se sloupci, které obsahují řetězce. V tomto případě můžeme volit mezi uložením řetězců v původní podobě (typicky se jedná o UTF-8) nebo o algoritmus, který namísto řetězců ukládá do souborů index do slovníku. Soubor tedy na začátku obsahuje slovník hodnot a ve sloupci jsou jen odkazy do tohoto slovníku, což vede (většinou) ke značné úspoře místa, ovšem za předpokladu, že sloupec obsahuje relativně malé množství hodnot (v mnoha případech se tedy jedná o nevhodný způsob).

Nejprve se podívejme, jak se řetězce ukládají v původní podobě (bez použití slovníku). I tyto řetězce lze pochopitelně komprimovat. V následujícím demonstračním příkladu se do jediného sloupce ukládají názvy barev generované touto funkcí:

func generateColor() string { var colors []string = []string{ "black", "blue", "red", "magenta", "green", "cyan", "yellow", "white", } return colors[rand.Int()%len(colors)] }

Poznámka: uhodneme, odkud byla získána sekvence barev v předchozí funkci?

Struktura záznamu, resp. jediného sloupce:

// Record represents one record stored in Parquet file type Record struct { Color string `parquet:"name=color, type=UTF8, encoding=PLAIN"` }

Celý zdrojový kód tohoto příkladu vypadá následovně:

package main import ( "log" "math/rand" "os" "github.com/xitongsys/parquet-go/parquet" "github.com/xitongsys/parquet-go/writer" ) // Record represents one record stored in Parquet file type Record struct { Color string `parquet:"name=color, type=UTF8, encoding=PLAIN"` } func generateColor() string { var colors []string = []string{ "black", "blue", "red", "magenta", "green", "cyan", "yellow", "white", } return colors[rand.Int()%len(colors)] } func writeRecords(pw *writer.ParquetWriter, n int) { // create report structure to be stored in Parquet file record := Record{} for i := 0; i < n; i++ { record.Color = generateColor() // write the record structure into Parquet file err := pw.Write(record) if err != nil { log.Println("Write into Parquet error", err) } } } // stopWrite function stop writing into Parquet file func stopWrite(pw *writer.ParquetWriter) { err := pw.WriteStop() // most write errors are caught at this time if err != nil { log.Println("WriteStop error", err) } } func createAndWriteIntoParquetFile(filename string, records int, compression parquet.CompressionCodec) { w, err := os.Create(filename) if err != nil { log.Println("Can't create local file", err) return } defer w.Close() // initialize Parquet file writer pw, err := writer.NewParquetWriterFromWriter(w, new(Record), 1) if err != nil { log.Println("Can't create parquet writer", err) return } pw.RowGroupSize = 128 * 1024 * 1024 //128M pw.CompressionType = compression defer stopWrite(pw) writeRecords(pw, records) log.Println("Write Finished") } func main() { createAndWriteIntoParquetFile("10000records_compression_none.parquet", 10000, parquet.CompressionCodec_UNCOMPRESSED) createAndWriteIntoParquetFile("10000records_compression_snappy.parquet", 10000, parquet.CompressionCodec_SNAPPY) createAndWriteIntoParquetFile("10000records_compression_gzip.parquet", 10000, parquet.CompressionCodec_GZIP) }

Řetězce jsou skutečně ukládány přímo tak, jak jsou zapsány:

$ xxd -g 1 10000records_compression_none.parquet | head -n 20 00000000: 50 41 52 31 15 00 15 ec e8 01 15 ec e8 01 2c 15 PAR1..........,. 00000010: 98 1a 15 00 15 06 15 06 1c 18 06 79 65 6c 6c 6f ...........yello 00000020: 77 18 05 62 6c 61 63 6b 38 06 79 65 6c 6c 6f 77 w..black8.yellow 00000030: 18 05 62 6c 61 63 6b 00 00 00 03 00 00 00 72 65 ..black.......re 00000040: 64 05 00 00 00 77 68 69 74 65 04 00 00 00 63 79 d....white....cy 00000050: 61 6e 07 00 00 00 6d 61 67 65 6e 74 61 04 00 00 an....magenta... 00000060: 00 62 6c 75 65 05 00 00 00 62 6c 61 63 6b 06 00 .blue....black.. 00000070: 00 00 79 65 6c 6c 6f 77 05 00 00 00 67 72 65 65 ..yellow....gree 00000080: 6e 05 00 00 00 62 6c 61 63 6b 04 00 00 00 62 6c n....black....bl 00000090: 75 65 05 00 00 00 67 72 65 65 6e 05 00 00 00 77 ue....green....w 000000a0: 68 69 74 65 06 00 00 00 79 65 6c 6c 6f 77 05 00 hite....yellow.. 000000b0: 00 00 67 72 65 65 6e 07 00 00 00 6d 61 67 65 6e ..green....magen 000000c0: 74 61 04 00 00 00 62 6c 75 65 05 00 00 00 62 6c ta....blue....bl 000000d0: 61 63 6b 07 00 00 00 6d 61 67 65 6e 74 61 06 00 ack....magenta.. 000000e0: 00 00 79 65 6c 6c 6f 77 07 00 00 00 6d 61 67 65 ..yellow....mage 000000f0: 6e 74 61 04 00 00 00 62 6c 75 65 05 00 00 00 67 nta....blue....g 00000100: 72 65 65 6e 05 00 00 00 77 68 69 74 65 04 00 00 reen....white... 00000110: 00 63 79 61 6e 03 00 00 00 72 65 64 04 00 00 00 .cyan....red.... 00000120: 63 79 61 6e 05 00 00 00 62 6c 61 63 6b 03 00 00 cyan....black... 00000130: 00 72 65 64 05 00 00 00 67 72 65 65 6e 07 00 00 .red....green...

12. Použití slovníků

V dalším demonstračním příkladu budeme řetězce ukládat ve formě indexů do slovníku. To je mnohem výhodnější, minimálně v tomto případě, protože počet hodnot ve sloupci je jasně omezen – jedná se o osm jmen barev.

Struktura záznamu se specifikací formátu uložení:

// Record represents one record stored in Parquet file type Record struct { Color string `parquet:"name=color, type=UTF8, encoding=PLAIN_DICTIONARY"` }

Opět následuje výpis úplného zdrojového kódu tohoto demonstračního příkladu:

package main import ( "log" "math/rand" "os" "github.com/xitongsys/parquet-go/parquet" "github.com/xitongsys/parquet-go/writer" ) const defaultOutputFile = "flat.parquet" // Record represents one record stored in Parquet file type Record struct { Color string `parquet:"name=color, type=UTF8, encoding=PLAIN_DICTIONARY"` } func generateColor() string { var colors []string = []string{ "black", "blue", "red", "magenta", "green", "cyan", "yellow", "white", } return colors[rand.Int()%len(colors)] } func writeRecords(pw *writer.ParquetWriter, n int) { // create report structure to be stored in Parquet file record := Record{} for i := 0; i < n; i++ { record.Color = generateColor() // write the record structure into Parquet file err := pw.Write(record) if err != nil { log.Println("Write into Parquet error", err) } } } // stopWrite function stop writing into Parquet file func stopWrite(pw *writer.ParquetWriter) { err := pw.WriteStop() // most write errors are caught at this time if err != nil { log.Println("WriteStop error", err) } } func createAndWriteIntoParquetFile(filename string, records int, compression parquet.CompressionCodec) { w, err := os.Create(filename) if err != nil { log.Println("Can't create local file", err) return } defer w.Close() // initialize Parquet file writer pw, err := writer.NewParquetWriterFromWriter(w, new(Record), 1) if err != nil { log.Println("Can't create parquet writer", err) return } pw.RowGroupSize = 128 * 1024 * 1024 //128M pw.CompressionType = compression defer stopWrite(pw) writeRecords(pw, records) log.Println("Write Finished") } func main() { createAndWriteIntoParquetFile("10000records_compression_none.parquet", 10000, parquet.CompressionCodec_UNCOMPRESSED) createAndWriteIntoParquetFile("10000records_compression_snappy.parquet", 10000, parquet.CompressionCodec_SNAPPY) createAndWriteIntoParquetFile("10000records_compression_gzip.parquet", 10000, parquet.CompressionCodec_GZIP) }

Aniž bychom museli přesně znát interní formát Parquet souborů, je již na první pohled zřejmé, že se nejdříve uložil slovník (všech osm jmen barev) a následně soubor obsahuje pouze indexy do tohoto slovníku. Zvýrazněn je jeden z indexů, který je čtyřbajtový:

$ xxd -g 1 10000records_compression_none.parquet | head -n 20 00000000: 50 41 52 31 15 04 15 8e 01 15 8e 01 4c 15 10 15 PAR1........L... 00000010: 00 00 00 03 00 00 00 72 65 64 05 00 00 00 77 68 .......red....wh 00000020: 69 74 65 04 00 00 00 63 79 61 6e 07 00 00 00 6d ite....cyan....m 00000030: 61 67 65 6e 74 61 04 00 00 00 62 6c 75 65 05 00 agenta....blue.. 00000040: 00 00 62 6c 61 63 6b 06 00 00 00 79 65 6c 6c 6f ..black....yello 00000050: 77 05 00 00 00 67 72 65 65 6e 15 00 15 8c 73 15 w....green....s. 00000060: 8c 73 2c 15 96 1a 15 04 15 06 15 06 00 00 20 02 .s,........... . 00000070: 00 00 00 00 02 01 00 00 00 02 02 00 00 00 02 03 ................ 00000080: 00 00 00 02 04 00 00 00 02 05 00 00 00 02 06 00 ................ 00000090: 00 00 02 07 00 00 00 02 05 00 00 00 02 04 00 00 ................ 000000a0: 00 02 07 00 00 00 02 01 00 00 00 02 06 00 00 00 ................ 000000b0: 02 07 00 00 00 02 03 00 00 00 02 04 00 00 00 02 ................ 000000c0: 05 00 00 00 02 03 00 00 00 02 06 00 00 00 02 03 ................ 000000d0: 00 00 00 02 04 00 00 00 02 07 00 00 00 02 01 00 ................ 000000e0: 00 00 02 02 00 00 00 02 00 00 00 00 02 02 00 00 ................ 000000f0: 00 02 05 00 00 00 02 00 00 00 00 02 07 00 00 00 ................ 00000100: 02 03 00 00 00 02 05 00 00 00 02 03 00 00 00 02 ................ 00000110: 01 00 00 00 0a 03 00 00 00 04 07 00 00 00 02 03 ................ 00000120: 00 00 00 04 07 00 00 00 02 01 00 00 00 02 07 00 ................ 00000130: 00 00 02 03 00 00 00 02 01 00 00 00 04 06 00 00 ................

13. Porovnání výsledků – přímé uložení řetězců versus použití slovníku

Použití slovníků při práci s řetězci má potenciálně poměrně velký vliv na velikost výsledných souborů, o čemž se můžeme velmi snadno přesvědčit při pohledu na následující tabulku se soubory, z nichž každý obsahuje 10000 záznamů:

# Algoritmus Přímé uložení Použití slovníku % 1 UNCOMPRESSED 89242 44332 49% 2 SNAPPY 24915 17038 68% 3 GZIP 13445 8181 60%

Poznámka: jména barev jsou tvořena velmi krátkými řetězci. Ušetření místa by bylo výraznější ve chvíli, kdyby se jednalo o delší texty.

14. Rychlost zápisu záznamů

V deváté kapitole jsme si řekli, že při zápisu sloupců je možné povolit komprimaci dat. Ta má pochopitelně vliv na velikost výsledných souborů a taktéž (i když většinou v malé míře) na rychlost zápisu. To si ostatně můžeme snadno ověřit velmi jednoduchým benchmarkem, který otestuje rychlost zápisu jednoho milionu (!) záznamů do sloupcové databáze:

package main import ( "log" "math/rand" "os" "time" "github.com/bxcodec/faker/v3" "github.com/xitongsys/parquet-go/parquet" "github.com/xitongsys/parquet-go/writer" ) const defaultOutputFile = "flat.parquet" // Record represents one record stored in Parquet file type Record struct { ID uint64 `parquet:"name=id, type=UINT_64, encoding=PLAIN"` Name string `parquet:"name=name, type=UTF8, encoding=PLAIN_DICTIONARY"` Surname string `parquet:"name=surname, type=UTF8, encoding=PLAIN"` Email string `parquet:"name=email, type=UTF8, encoding=PLAIN"` Active bool `parquet:"name=active, type=BOOLEAN"` Color string `parquet:"name=color, type=UTF8, encoding=PLAIN_DICTIONARY"` } func generateColor() string { var colors []string = []string{ "black", "blue", "red", "magenta", "green", "cyan", "yellow", "white", } return colors[rand.Int()%len(colors)] } func writeRecords(pw *writer.ParquetWriter, n int) { // create report structure to be stored in Parquet file record := Record{} for i := 0; i < n; i++ { record.ID = uint64(i) record.Name = faker.FirstName() record.Surname = faker.LastName() record.Email = faker.Email() record.Active = i%2 == 0 record.Color = generateColor() // write the record structure into Parquet file err := pw.Write(record) if err != nil { log.Println("Write into Parquet error", err) } } } // stopWrite function stop writing into Parquet file func stopWrite(pw *writer.ParquetWriter) { err := pw.WriteStop() // most write errors are caught at this time if err != nil { log.Println("WriteStop error", err) } } func createAndWriteIntoParquetFile(filename string, records int, compression parquet.CompressionCodec) { t1 := time.Now() w, err := os.Create(filename) if err != nil { log.Println("Can't create local file", err) return } defer w.Close() // initialize Parquet file writer pw, err := writer.NewParquetWriterFromWriter(w, new(Record), 1) if err != nil { log.Println("Can't create parquet writer", err) return } pw.RowGroupSize = 128 * 1024 * 1024 //128M pw.CompressionType = compression defer stopWrite(pw) writeRecords(pw, records) log.Println("Write Finished") // compute and print duration t2 := time.Now() since := time.Since(t1) log.Println("Start time: ", t1) log.Println("End time: ", t2) log.Println("Duration: ", since) } func main() { createAndWriteIntoParquetFile("1000000records_compression_none.parquet", 1000000, parquet.CompressionCodec_UNCOMPRESSED) createAndWriteIntoParquetFile("1000000records_compression_snappy.parquet", 1000000, parquet.CompressionCodec_SNAPPY) createAndWriteIntoParquetFile("1000000records_compression_gzip.parquet", 1000000, parquet.CompressionCodec_GZIP) }

15. Výsledky benchmarku

Podívejme se nyní na výsledky měření, a to konkrétně při použití netobookového SSD a ramdisku (v případě potřeby je pochopitelně možné měření provést i na serverovém „železe“).

Zápis na SSD:

2020/11/14 16:21:55 Write Finished 2020/11/14 16:21:55 Start time: 2020-11-14 16:21:52.018633135 +0100 CET m=+0.001051941 2020/11/14 16:21:55 End time: 2020-11-14 16:21:55.172638037 +0100 CET m=+3.155056813 2020/11/14 16:21:55 Duration: 3.154004978s 2020/11/14 16:21:58 Write Finished 2020/11/14 16:21:58 Start time: 2020-11-14 16:21:55.227638659 +0100 CET m=+3.210057475 2020/11/14 16:21:58 End time: 2020-11-14 16:21:58.53302545 +0100 CET m=+6.515444235 2020/11/14 16:21:58 Duration: 3.305386841s 2020/11/14 16:22:02 Write Finished 2020/11/14 16:22:02 Start time: 2020-11-14 16:21:58.575295122 +0100 CET m=+6.557713938 2020/11/14 16:22:02 End time: 2020-11-14 16:22:02.65420359 +0100 CET m=+10.636622367 2020/11/14 16:22:02 Duration: 4.07890851s

Zápis do ramdisku:

2020/11/14 16:22:21 Write Finished 2020/11/14 16:22:21 Start time: 2020-11-14 16:22:18.382414375 +0100 CET m=+0.001018949 2020/11/14 16:22:21 End time: 2020-11-14 16:22:21.496799932 +0100 CET m=+3.115404464 2020/11/14 16:22:21 Duration: 3.114385625s 2020/11/14 16:22:24 Write Finished 2020/11/14 16:22:24 Start time: 2020-11-14 16:22:21.52651968 +0100 CET m=+3.145124247 2020/11/14 16:22:24 End time: 2020-11-14 16:22:24.81071525 +0100 CET m=+6.429319812 2020/11/14 16:22:24 Duration: 3.284195685s 2020/11/14 16:22:28 Write Finished 2020/11/14 16:22:28 Start time: 2020-11-14 16:22:24.835851362 +0100 CET m=+6.454455962 2020/11/14 16:22:28 End time: 2020-11-14 16:22:28.88592985 +0100 CET m=+10.504534394 2020/11/14 16:22:28 Duration: 4.050078532s

Poznámka: povšimněte si, že u takto malých objemů dat je SSD (s případným bufferováním) prakticky stejně rychlý jako ramdisk. U větších objemů se již budou rozdíly mezi výsledky projevovat výraznějším způsobem.

16. Rychlost čtení záznamů

Otestovat si můžeme i rychlost čtení celých záznamů. Opět je nutné upozornit na to, že se jedná o velmi neefektivní způsob práce se sloupcovou databází a pokud je primárním účelem vaší aplikace přístup k datům po řádcích a nikoli po sloupcích, nebude výkon (rychlost čtení) nijak oslňující:

// This tool is able to read all records stored in selected Parquet file. // Currently, only records with the structure `Record` is read correctly. Name // of input Parquet file needs to be selected from command line. package main import ( "log" "time" "github.com/xitongsys/parquet-go-source/local" "github.com/xitongsys/parquet-go/reader" "github.com/xitongsys/parquet-go/source" ) // Record represents one record stored in Parquet file type Record struct { ID uint64 `parquet:"name=id, type=UINT_64, encoding=PLAIN"` Name string `parquet:"name=name, type=UTF8, encoding=PLAIN_DICTIONARY"` Surname string `parquet:"name=surname, type=UTF8, encoding=PLAIN"` Email string `parquet:"name=email, type=UTF8, encoding=PLAIN"` Active bool `parquet:"name=active, type=BOOLEAN"` Color string `parquet:"name=color, type=UTF8, encoding=PLAIN_DICTIONARY"` } // closeReader tries to close the given Parquet file reader func closeReader(reader source.ParquetFile) { err := reader.Close() if err != nil { log.Println("close reader:", err) } } func readParquetFile(fileName string) { t1 := time.Now() const parallelNumber = 1 // construct the file reader and try to open the Parquet file for // reading fileReader, err := local.NewLocalFileReader(fileName) if err != nil { log.Fatal("Can't open file", err) return } // fileReader needs to be closed properly defer closeReader(fileReader) // initializa Parquet file reader parquetReader, err := reader.NewParquetReader(fileReader, new(Record), parallelNumber) if err != nil { log.Fatal("Can't create parquet reader", err) return } // parquetReader needs to be stopped defer parquetReader.ReadStop() readRecords(parquetReader) // compute and print duration t2 := time.Now() since := time.Since(t1) log.Println("Start time: ", t1) log.Println("End time: ", t2) log.Println("Duration: ", since) } func readRecords(parquetReader *reader.ParquetReader) { recordCount := int(parquetReader.GetNumRows()) log.Println("Records to read", recordCount) record := make([]Record, 1) records := 0 // try to read and display all records for i := 0; i < recordCount; i++ { // try to read record err := parquetReader.Read(&record) if err != nil { log.Println("Read error", err) continue } else { records++ } } log.Println("Read", records, "records") } func main() { readParquetFile("1000000records_compression_none.parquet") readParquetFile("1000000records_compression_snappy.parquet") readParquetFile("1000000records_compression_gzip.parquet") }

17. Výsledky benchmarku

Podívejme se nyní na dosažené výsledky. Rychlost čtení (po jednotlivých záznamech) je mnohem pomalejší, než samotný zápis do sloupcové databáze! Podrobnosti si vysvětlíme příště:

2020/11/14 16:46:53 Records to read 1000000 2020/11/14 16:47:17 Read 1000000 records 2020/11/14 16:47:17 Start time: 2020-11-14 16:46:53.80851109 +0100 CET m=+0.000895204 2020/11/14 16:47:17 End time: 2020-11-14 16:47:17.695641899 +0100 CET m=+23.888025988 2020/11/14 16:47:17 Duration: 23.887130935s 2020/11/14 16:47:17 Records to read 1000000 2020/11/14 16:47:41 Read 1000000 records 2020/11/14 16:47:41 Start time: 2020-11-14 16:47:17.695696876 +0100 CET m=+23.888080959 2020/11/14 16:47:41 End time: 2020-11-14 16:47:41.460809934 +0100 CET m=+47.653194032 2020/11/14 16:47:41 Duration: 23.765113146s 2020/11/14 16:47:41 Records to read 1000000 2020/11/14 16:48:05 Read 1000000 records 2020/11/14 16:48:05 Start time: 2020-11-14 16:47:41.460860147 +0100 CET m=+47.653244228 2020/11/14 16:48:05 End time: 2020-11-14 16:48:05.50961075 +0100 CET m=+71.701994837 2020/11/14 16:48:05 Duration: 24.048750743s

18. Obsah navazující části seriálu

Dnes jsme si popsali pouze základy práce s formátem Parquet. Příště si ukážeme, jak se přistupuje k datům po sloupcích, což je ostatně obecně doporučovaný přístup využívaný v mnoha analytických aplikacích, které v daný okamžik potřebují zpracovat údaje pouze z několika sloupců mnohdy velmi široké tabulky (taková tabulka může mít desítky popř. i stovky sloupců, což je pro mnohé klasické relační databáze zcela nevhodný přístup).

19. Repositář s demonstračními příklady

Zdrojové kódy všech dnes použitých demonstračních příkladů byly uloženy do nového Git repositáře, který je dostupný na adrese https://github.com/tisnik/go-root (stále na GitHubu :-). V případě, že nebudete chtít klonovat celý repositář (ten je ovšem – alespoň prozatím – velmi malý, dnes má přibližně stovku kilobajtů), můžete namísto toho použít odkazy na jednotlivé demonstrační příklady, které naleznete v následující tabulce:

