package ocf
import (
"bytes"
"compress/flate"
"crypto/rand"
"encoding/json"
"errors"
"fmt"
"io"
"os"
"github.com/hamba/avro/v2"
"github.com/hamba/avro/v2/internal/bytesx"
"github.com/klauspost/compress/zstd"
)
const (
schemaKey = "avro.schema"
codecKey = "avro.codec"
)
var (
magicBytes = [4 ]byte {'O' , 'b' , 'j' , 1 }
HeaderSchema = avro .MustParse (`{
"type": "record",
"name": "org.apache.avro.file.Header",
"fields": [
{"name": "magic", "type": {"type": "fixed", "name": "Magic", "size": 4}},
{"name": "meta", "type": {"type": "map", "values": "bytes"}},
{"name": "sync", "type": {"type": "fixed", "name": "Sync", "size": 16}}
]
}` )
DefaultSchemaMarshaler = defaultMarshalSchema
FullSchemaMarshaler = fullMarshalSchema
)
type Header struct {
Magic [4 ]byte `avro:"magic"`
Meta map [string ][]byte `avro:"meta"`
Sync [16 ]byte `avro:"sync"`
}
type decoderConfig struct {
DecoderConfig avro .API
SchemaCache *avro .SchemaCache
CodecOptions codecOptions
}
type DecoderFunc func (cfg *decoderConfig )
func WithDecoderConfig (wCfg avro .API ) DecoderFunc {
return func (cfg *decoderConfig ) {
cfg .DecoderConfig = wCfg
}
}
func WithDecoderSchemaCache (cache *avro .SchemaCache ) DecoderFunc {
return func (cfg *decoderConfig ) {
cfg .SchemaCache = cache
}
}
func WithZStandardDecoderOptions (opts ...zstd .DOption ) DecoderFunc {
return func (cfg *decoderConfig ) {
cfg .CodecOptions .ZStandardOptions .DOptions = append (cfg .CodecOptions .ZStandardOptions .DOptions , opts ...)
}
}
type Decoder struct {
reader *avro .Reader
resetReader *bytesx .ResetReader
decoder *avro .Decoder
meta map [string ][]byte
sync [16 ]byte
schema avro .Schema
codec Codec
count int64
}
func NewDecoder (r io .Reader , opts ...DecoderFunc ) (*Decoder , error ) {
cfg := decoderConfig {
DecoderConfig : avro .DefaultConfig ,
SchemaCache : avro .DefaultSchemaCache ,
CodecOptions : codecOptions {
DeflateCompressionLevel : flate .DefaultCompression ,
},
}
for _ , opt := range opts {
opt (&cfg )
}
reader := avro .NewReader (r , 1024 )
h , err := readHeader (reader , cfg .SchemaCache , cfg .CodecOptions )
if err != nil {
return nil , fmt .Errorf ("decoder: %w" , err )
}
decReader := bytesx .NewResetReader ([]byte {})
return &Decoder {
reader : reader ,
resetReader : decReader ,
decoder : cfg .DecoderConfig .NewDecoder (h .Schema , decReader ),
meta : h .Meta ,
sync : h .Sync ,
codec : h .Codec ,
schema : h .Schema ,
}, nil
}
func (d *Decoder ) Metadata () map [string ][]byte {
return d .meta
}
func (d *Decoder ) Schema () avro .Schema {
return d .schema
}
func (d *Decoder ) HasNext () bool {
if d .count <= 0 {
count := d .readBlock ()
d .count = count
}
if d .reader .Error != nil {
return false
}
return d .count > 0
}
func (d *Decoder ) Decode (v any ) error {
if d .count <= 0 {
return errors .New ("decoder: no data found, call HasNext first" )
}
d .count --
return d .decoder .Decode (v )
}
func (d *Decoder ) Error () error {
if errors .Is (d .reader .Error , io .EOF ) {
return nil
}
return d .reader .Error
}
func (d *Decoder ) readBlock () int64 {
_ = d .reader .Peek ()
if errors .Is (d .reader .Error , io .EOF ) {
return 0
}
count := d .reader .ReadLong ()
size := d .reader .ReadLong ()
switch {
case count > 0 :
data := make ([]byte , size )
d .reader .Read (data )
data , err := d .codec .Decode (data )
if err != nil {
d .reader .Error = err
}
d .resetReader .Reset (data )
case size > 0 :
data := make ([]byte , size )
d .reader .Read (data )
}
var sync [16 ]byte
d .reader .Read (sync [:])
if d .sync != sync && !errors .Is (d .reader .Error , io .EOF ) {
d .reader .Error = errors .New ("decoder: invalid block" )
}
return count
}
type encoderConfig struct {
BlockLength int
BlockSize int
CodecName CodecName
CodecOptions codecOptions
Metadata map [string ][]byte
Sync [16 ]byte
EncodingConfig avro .API
SchemaCache *avro .SchemaCache
SchemaMarshaler func (avro .Schema ) ([]byte , error )
}
type EncoderFunc func (cfg *encoderConfig )
func WithBlockLength (length int ) EncoderFunc {
return func (cfg *encoderConfig ) {
cfg .BlockLength = length
}
}
func WithBlockSize (size int ) EncoderFunc {
return func (cfg *encoderConfig ) {
cfg .BlockSize = size
}
}
func WithCodec (codec CodecName ) EncoderFunc {
return func (cfg *encoderConfig ) {
cfg .CodecName = codec
}
}
func WithCompressionLevel (compLvl int ) EncoderFunc {
return func (cfg *encoderConfig ) {
cfg .CodecName = Deflate
cfg .CodecOptions .DeflateCompressionLevel = compLvl
}
}
func WithZStandardEncoderOptions (opts ...zstd .EOption ) EncoderFunc {
return func (cfg *encoderConfig ) {
cfg .CodecOptions .ZStandardOptions .EOptions = append (cfg .CodecOptions .ZStandardOptions .EOptions , opts ...)
}
}
func WithMetadata (meta map [string ][]byte ) EncoderFunc {
return func (cfg *encoderConfig ) {
cfg .Metadata = meta
}
}
func WithMetadataKeyVal (key string , val []byte ) EncoderFunc {
return func (cfg *encoderConfig ) {
cfg .Metadata [key ] = val
}
}
func WithEncoderSchemaCache (cache *avro .SchemaCache ) EncoderFunc {
return func (cfg *encoderConfig ) {
cfg .SchemaCache = cache
}
}
func WithSchemaMarshaler (m func (avro .Schema ) ([]byte , error )) EncoderFunc {
return func (cfg *encoderConfig ) {
cfg .SchemaMarshaler = m
}
}
func WithSyncBlock (sync [16 ]byte ) EncoderFunc {
return func (cfg *encoderConfig ) {
cfg .Sync = sync
}
}
func WithEncodingConfig (wCfg avro .API ) EncoderFunc {
return func (cfg *encoderConfig ) {
cfg .EncodingConfig = wCfg
}
}
type Encoder struct {
writer *avro .Writer
buf *bytes .Buffer
encoder *avro .Encoder
sync [16 ]byte
codec Codec
blockLength int
count int
blockSize int
}
func NewEncoder (s string , w io .Writer , opts ...EncoderFunc ) (*Encoder , error ) {
cfg := computeEncoderConfig (opts )
schema , err := avro .ParseWithCache (s , "" , cfg .SchemaCache )
if err != nil {
return nil , err
}
return newEncoder (schema , w , cfg )
}
func NewEncoderWithSchema (schema avro .Schema , w io .Writer , opts ...EncoderFunc ) (*Encoder , error ) {
return newEncoder (schema , w , computeEncoderConfig (opts ))
}
func newEncoder(schema avro .Schema , w io .Writer , cfg encoderConfig ) (*Encoder , error ) {
switch file := w .(type ) {
case nil :
return nil , errors .New ("writer cannot be nil" )
case *os .File :
info , err := file .Stat ()
if err != nil {
return nil , err
}
if info .Size () > 0 {
reader := avro .NewReader (file , 1024 )
h , err := readHeader (reader , cfg .SchemaCache , cfg .CodecOptions )
if err != nil {
return nil , err
}
if err = skipToEnd (reader , h .Sync ); err != nil {
return nil , err
}
writer := avro .NewWriter (w , 512 , avro .WithWriterConfig (cfg .EncodingConfig ))
buf := &bytes .Buffer {}
e := &Encoder {
writer : writer ,
buf : buf ,
encoder : cfg .EncodingConfig .NewEncoder (h .Schema , buf ),
sync : h .Sync ,
codec : h .Codec ,
blockLength : cfg .BlockLength ,
blockSize : cfg .BlockSize ,
}
return e , nil
}
}
schemaJSON , err := cfg .SchemaMarshaler (schema )
if err != nil {
return nil , err
}
cfg .Metadata [schemaKey ] = schemaJSON
cfg .Metadata [codecKey ] = []byte (cfg .CodecName )
header := Header {
Magic : magicBytes ,
Meta : cfg .Metadata ,
}
header .Sync = cfg .Sync
if header .Sync == [16 ]byte {} {
_, _ = rand .Read (header .Sync [:])
}
codec , err := resolveCodec (cfg .CodecName , cfg .CodecOptions )
if err != nil {
return nil , err
}
writer := avro .NewWriter (w , 512 , avro .WithWriterConfig (cfg .EncodingConfig ))
writer .WriteVal (HeaderSchema , header )
if err = writer .Flush (); err != nil {
return nil , err
}
buf := &bytes .Buffer {}
e := &Encoder {
writer : writer ,
buf : buf ,
encoder : cfg .EncodingConfig .NewEncoder (schema , buf ),
sync : header .Sync ,
codec : codec ,
blockLength : cfg .BlockLength ,
blockSize : cfg .BlockSize ,
}
return e , nil
}
func computeEncoderConfig(opts []EncoderFunc ) encoderConfig {
cfg := encoderConfig {
BlockLength : 100 ,
CodecName : Null ,
CodecOptions : codecOptions {
DeflateCompressionLevel : flate .DefaultCompression ,
},
Metadata : map [string ][]byte {},
EncodingConfig : avro .DefaultConfig ,
SchemaCache : avro .DefaultSchemaCache ,
SchemaMarshaler : DefaultSchemaMarshaler ,
}
for _ , opt := range opts {
opt (&cfg )
}
return cfg
}
func (e *Encoder ) shouldWriteDataBlock () bool {
blockLengthReached := e .blockLength != 0 && e .count >= e .blockLength
blockSizeReached := e .blockSize != 0 && e .buf .Len () >= e .blockSize
return blockLengthReached || blockSizeReached
}
func (e *Encoder ) Write (p []byte ) (n int , err error ) {
n , err = e .buf .Write (p )
if err != nil {
return n , err
}
e .count ++
if e .shouldWriteDataBlock () {
if err = e .writerBlock (); err != nil {
return n , err
}
}
return n , e .writer .Error
}
func (e *Encoder ) Encode (v any ) error {
if err := e .encoder .Encode (v ); err != nil {
return err
}
e .count ++
if e .shouldWriteDataBlock () {
if err := e .writerBlock (); err != nil {
return err
}
}
return e .writer .Error
}
func (e *Encoder ) Flush () error {
if e .count == 0 {
return nil
}
if err := e .writerBlock (); err != nil {
return err
}
return e .writer .Error
}
func (e *Encoder ) Close () error {
return e .Flush ()
}
func (e *Encoder ) writerBlock () error {
e .writer .WriteLong (int64 (e .count ))
b := e .codec .Encode (e .buf .Bytes ())
e .writer .WriteLong (int64 (len (b )))
_, _ = e .writer .Write (b )
_, _ = e .writer .Write (e .sync [:])
e .count = 0
e .buf .Reset ()
return e .writer .Flush ()
}
type ocfHeader struct {
Schema avro .Schema
Codec Codec
Meta map [string ][]byte
Sync [16 ]byte
}
func readHeader(reader *avro .Reader , schemaCache *avro .SchemaCache , codecOpts codecOptions ) (*ocfHeader , error ) {
var h Header
reader .ReadVal (HeaderSchema , &h )
if reader .Error != nil {
return nil , fmt .Errorf ("unexpected error: %w" , reader .Error )
}
if h .Magic != magicBytes {
return nil , errors .New ("invalid avro file" )
}
schema , err := avro .ParseBytesWithCache (h .Meta [schemaKey ], "" , schemaCache )
if err != nil {
return nil , err
}
codec , err := resolveCodec (CodecName (h .Meta [codecKey ]), codecOpts )
if err != nil {
return nil , err
}
return &ocfHeader {
Schema : schema ,
Codec : codec ,
Meta : h .Meta ,
Sync : h .Sync ,
}, nil
}
func skipToEnd(reader *avro .Reader , sync [16 ]byte ) error {
for {
_ = reader .ReadLong ()
if errors .Is (reader .Error , io .EOF ) {
return nil
}
size := reader .ReadLong ()
reader .SkipNBytes (int (size ))
if reader .Error != nil {
return reader .Error
}
var synMark [16 ]byte
reader .Read (synMark [:])
if sync != synMark && !errors .Is (reader .Error , io .EOF ) {
reader .Error = errors .New ("invalid block" )
}
}
}
func defaultMarshalSchema(schema avro .Schema ) ([]byte , error ) {
return []byte (schema .String ()), nil
}
func fullMarshalSchema(schema avro .Schema ) ([]byte , error ) {
return json .Marshal (schema )
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .