package ipc
import (
"encoding/binary"
"fmt"
"io"
"sync/atomic"
"github.com/apache/arrow-go/v18/arrow/internal/debug"
"github.com/apache/arrow-go/v18/arrow/internal/flatbuf"
"github.com/apache/arrow-go/v18/arrow/memory"
)
type MetadataVersion flatbuf .MetadataVersion
const (
MetadataV1 = MetadataVersion (flatbuf .MetadataVersionV1 )
MetadataV2 = MetadataVersion (flatbuf .MetadataVersionV2 )
MetadataV3 = MetadataVersion (flatbuf .MetadataVersionV3 )
MetadataV4 = MetadataVersion (flatbuf .MetadataVersionV4 )
MetadataV5 = MetadataVersion (flatbuf .MetadataVersionV5 )
)
func (m MetadataVersion ) String () string {
if v , ok := flatbuf .EnumNamesMetadataVersion [flatbuf .MetadataVersion (m )]; ok {
return v
}
return fmt .Sprintf ("MetadataVersion(%d)" , int16 (m ))
}
type MessageType flatbuf .MessageHeader
const (
MessageNone = MessageType (flatbuf .MessageHeaderNONE )
MessageSchema = MessageType (flatbuf .MessageHeaderSchema )
MessageDictionaryBatch = MessageType (flatbuf .MessageHeaderDictionaryBatch )
MessageRecordBatch = MessageType (flatbuf .MessageHeaderRecordBatch )
MessageTensor = MessageType (flatbuf .MessageHeaderTensor )
MessageSparseTensor = MessageType (flatbuf .MessageHeaderSparseTensor )
)
func (m MessageType ) String () string {
if v , ok := flatbuf .EnumNamesMessageHeader [flatbuf .MessageHeader (m )]; ok {
return v
}
return fmt .Sprintf ("MessageType(%d)" , int (m ))
}
type Message struct {
refCount atomic .Int64
msg *flatbuf .Message
meta *memory .Buffer
body *memory .Buffer
}
func NewMessage (meta , body *memory .Buffer ) *Message {
if meta == nil || body == nil {
panic ("arrow/ipc: nil buffers" )
}
meta .Retain ()
body .Retain ()
m := &Message {
msg : flatbuf .GetRootAsMessage (meta .Bytes (), 0 ),
meta : meta ,
body : body ,
}
m .refCount .Add (1 )
return m
}
func newMessageFromFB(meta *flatbuf .Message , body *memory .Buffer ) *Message {
if meta == nil || body == nil {
panic ("arrow/ipc: nil buffers" )
}
body .Retain ()
m := &Message {
msg : meta ,
meta : memory .NewBufferBytes (meta .Table ().Bytes ),
body : body ,
}
m .refCount .Add (1 )
return m
}
func (msg *Message ) Retain () {
msg .refCount .Add (1 )
}
func (msg *Message ) Release () {
debug .Assert (msg .refCount .Load () > 0 , "too many releases" )
if msg .refCount .Add (-1 ) == 0 {
msg .meta .Release ()
msg .body .Release ()
msg .msg = nil
msg .meta = nil
msg .body = nil
}
}
func (msg *Message ) Version () MetadataVersion {
return MetadataVersion (msg .msg .Version ())
}
func (msg *Message ) Type () MessageType {
return MessageType (msg .msg .HeaderType ())
}
func (msg *Message ) BodyLen () int64 {
return msg .msg .BodyLength ()
}
type MessageReader interface {
Message () (*Message , error )
Release ()
Retain ()
}
type messageReader struct {
r io .Reader
refCount atomic .Int64
msg *Message
mem memory .Allocator
}
func NewMessageReader (r io .Reader , opts ...Option ) MessageReader {
cfg := newConfig ()
for _ , opt := range opts {
opt (cfg )
}
mr := &messageReader {r : r , mem : cfg .alloc }
mr .refCount .Add (1 )
return mr
}
func (r *messageReader ) Retain () {
r .refCount .Add (1 )
}
func (r *messageReader ) Release () {
debug .Assert (r .refCount .Load () > 0 , "too many releases" )
if r .refCount .Add (-1 ) == 0 {
if r .msg != nil {
r .msg .Release ()
r .msg = nil
}
}
}
func (r *messageReader ) Message () (*Message , error ) {
buf := make ([]byte , 4 )
_ , err := io .ReadFull (r .r , buf )
if err != nil {
return nil , fmt .Errorf ("arrow/ipc: could not read continuation indicator: %w" , err )
}
var (
cid = binary .LittleEndian .Uint32 (buf )
msgLen int32
)
switch cid {
case 0 :
return nil , io .EOF
case kIPCContToken :
_, err = io .ReadFull (r .r , buf )
if err != nil {
return nil , fmt .Errorf ("arrow/ipc: could not read message length: %w" , err )
}
msgLen = int32 (binary .LittleEndian .Uint32 (buf ))
if msgLen == 0 {
return nil , io .EOF
}
default :
msgLen = int32 (cid )
}
buf = make ([]byte , msgLen )
_, err = io .ReadFull (r .r , buf )
if err != nil {
return nil , fmt .Errorf ("arrow/ipc: could not read message metadata: %w" , err )
}
meta := flatbuf .GetRootAsMessage (buf , 0 )
bodyLen := meta .BodyLength ()
body := memory .NewResizableBuffer (r .mem )
defer body .Release ()
body .Resize (int (bodyLen ))
_, err = io .ReadFull (r .r , body .Bytes ())
if err != nil {
return nil , fmt .Errorf ("arrow/ipc: could not read message body: %w" , err )
}
if r .msg != nil {
r .msg .Release ()
r .msg = nil
}
r .msg = newMessageFromFB (meta , body )
return r .msg , nil
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .