package ipc
import (
"encoding/binary"
"fmt"
"io"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/bitutil"
"github.com/apache/arrow-go/v18/arrow/internal/dictutils"
"github.com/apache/arrow-go/v18/arrow/internal/flatbuf"
"github.com/apache/arrow-go/v18/arrow/memory"
)
type PayloadWriter interface {
Start () error
WritePayload (Payload ) error
Close () error
}
type fileWriter struct {
streamWriter
schema *arrow .Schema
dicts []dataBlock
recs []dataBlock
}
func (w *fileWriter ) Start () error {
var err error
_, err = w .Write (Magic )
if err != nil {
return fmt .Errorf ("arrow/ipc: could not write magic Arrow bytes: %w" , err )
}
err = w .align (kArrowIPCAlignment )
if err != nil {
return fmt .Errorf ("arrow/ipc: could not align start block: %w" , err )
}
return w .streamWriter .Start ()
}
func (w *fileWriter ) WritePayload (p Payload ) error {
blk := fileBlock {offset : w .pos , meta : 0 , body : p .size }
n , err := writeIPCPayload (w , p )
if err != nil {
return err
}
blk .meta = int32 (n )
switch flatbuf .MessageHeader (p .msg ) {
case flatbuf .MessageHeaderDictionaryBatch :
w .dicts = append (w .dicts , blk )
case flatbuf .MessageHeaderRecordBatch :
w .recs = append (w .recs , blk )
}
return nil
}
func (w *fileWriter ) Close () error {
var err error
if err = w .streamWriter .Close (); err != nil {
return err
}
pos := w .pos
if err = writeFileFooter (w .schema , w .dicts , w .recs , w ); err != nil {
return fmt .Errorf ("arrow/ipc: could not write file footer: %w" , err )
}
size := w .pos - pos
if size <= 0 {
return fmt .Errorf ("arrow/ipc: invalid file footer size (size=%d)" , size )
}
buf := make ([]byte , 4 )
binary .LittleEndian .PutUint32 (buf , uint32 (size ))
_, err = w .Write (buf )
if err != nil {
return fmt .Errorf ("arrow/ipc: could not write file footer size: %w" , err )
}
_, err = w .Write (Magic )
if err != nil {
return fmt .Errorf ("arrow/ipc: could not write Arrow magic bytes: %w" , err )
}
return nil
}
func (w *fileWriter ) align (align int32 ) error {
remainder := paddedLength (w .pos , align ) - w .pos
if remainder == 0 {
return nil
}
_ , err := w .Write (paddingBytes [:int (remainder )])
return err
}
func writeIPCPayload(w io .Writer , p Payload ) (int , error ) {
n , err := writeMessage (p .meta , kArrowIPCAlignment , w )
if err != nil {
return n , err
}
for _ , buf := range p .body {
var (
size int64
padding int64
)
if buf != nil {
size = int64 (buf .Len ())
padding = bitutil .CeilByte64 (size ) - size
}
if size > 0 {
_, err = w .Write (buf .Bytes ())
if err != nil {
return n , fmt .Errorf ("arrow/ipc: could not write payload message body: %w" , err )
}
}
if padding > 0 {
_, err = w .Write (paddingBytes [:padding ])
if err != nil {
return n , fmt .Errorf ("arrow/ipc: could not write payload message padding: %w" , err )
}
}
}
return n , err
}
type Payload struct {
msg MessageType
meta *memory .Buffer
body []*memory .Buffer
size int64
}
func (p *Payload ) Meta () *memory .Buffer {
if p .meta != nil {
p .meta .Retain ()
}
return p .meta
}
func (p *Payload ) SerializeBody (w io .Writer ) error {
for _ , data := range p .body {
if data == nil {
continue
}
size := int64 (data .Len ())
padding := bitutil .CeilByte64 (size ) - size
if size > 0 {
if _ , err := w .Write (data .Bytes ()); err != nil {
return fmt .Errorf ("arrow/ipc: could not write payload message body: %w" , err )
}
if padding > 0 {
if _ , err := w .Write (paddingBytes [:padding ]); err != nil {
return fmt .Errorf ("arrow/ipc: could not write payload message padding bytes: %w" , err )
}
}
}
}
return nil
}
func (p *Payload ) WritePayload (w io .Writer ) (int , error ) {
return writeIPCPayload (w , *p )
}
func (p *Payload ) Release () {
if p .meta != nil {
p .meta .Release ()
p .meta = nil
}
for i , b := range p .body {
if b == nil {
continue
}
b .Release ()
p .body [i ] = nil
}
}
type payloads []Payload
func (ps payloads ) Release () {
for i := range ps {
ps [i ].Release ()
}
}
type FileWriter struct {
w io .Writer
mem memory .Allocator
headerStarted bool
footerWritten bool
pw PayloadWriter
schema *arrow .Schema
mapper dictutils .Mapper
codec flatbuf .CompressionType
compressNP int
compressors []compressor
minSpaceSavings *float64
lastWrittenDicts map [int64 ]arrow .Array
}
func NewFileWriter (w io .Writer , opts ...Option ) (*FileWriter , error ) {
var (
cfg = newConfig (opts ...)
err error
)
f := FileWriter {
w : w ,
pw : &fileWriter {streamWriter : streamWriter {w : w }, schema : cfg .schema },
mem : cfg .alloc ,
schema : cfg .schema ,
codec : cfg .codec ,
compressNP : cfg .compressNP ,
minSpaceSavings : cfg .minSpaceSavings ,
compressors : make ([]compressor , cfg .compressNP ),
}
return &f , err
}
func (f *FileWriter ) Close () error {
err := f .checkStarted ()
if err != nil {
return fmt .Errorf ("arrow/ipc: could not write empty file: %w" , err )
}
if f .footerWritten {
return nil
}
err = f .pw .Close ()
if err != nil {
return fmt .Errorf ("arrow/ipc: could not close payload writer: %w" , err )
}
f .footerWritten = true
return nil
}
func (f *FileWriter ) Write (rec arrow .RecordBatch ) error {
schema := rec .Schema ()
if schema == nil || !schema .Equal (f .schema ) {
return errInconsistentSchema
}
if err := f .checkStarted (); err != nil {
return fmt .Errorf ("arrow/ipc: could not write header: %w" , err )
}
const allow64b = true
var (
data = Payload {msg : MessageRecordBatch }
enc = newRecordEncoder (
f .mem , 0 , kMaxNestingDepth , allow64b , f .codec , f .compressNP , f .minSpaceSavings , f .compressors ,
)
)
defer data .Release ()
err := writeDictionaryPayloads (f .mem , rec , true , false , &f .mapper , f .lastWrittenDicts , f .pw , enc )
if err != nil {
return fmt .Errorf ("arrow/ipc: failure writing dictionary batches: %w" , err )
}
enc .reset ()
if err := enc .Encode (&data , rec ); err != nil {
return fmt .Errorf ("arrow/ipc: could not encode record to payload: %w" , err )
}
return f .pw .WritePayload (data )
}
func (f *FileWriter ) checkStarted () error {
if !f .headerStarted {
return f .start ()
}
return nil
}
func (f *FileWriter ) start () error {
f .headerStarted = true
err := f .pw .Start ()
if err != nil {
return err
}
f .mapper .ImportSchema (f .schema )
f .lastWrittenDicts = make (map [int64 ]arrow .Array )
ps := payloadFromSchema (f .schema , f .mem , &f .mapper )
defer ps .Release ()
for _ , data := range ps {
err = f .pw .WritePayload (data )
if err != nil {
return err
}
}
return nil
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .