// Based on https://github.com/temporalio/samples-go/blob/main/fileprocessing///// This example shows a simple file processing workflow going from// DownloadingFile to FileUploaded.//// Sample output (LogLevel == LogChanges)://// === RUN TestFileProcessing// fileprocessing.go:156: [dad73] [state] +DownloadingFile// fileprocessing.go:156: [dad73] [extern:DownloadingF...] Downloading file... foo.txt// fileprocessing.go:169: waiting: DownloadingFile to FileUploaded// fileprocessing.go:156: [dad73] [state] +FileDownloaded -DownloadingFile// fileprocessing.go:156: [dad73] [state:auto] +ProcessingFile// fileprocessing.go:156: [dad73] [extern] processFileActivity succeed /tmp/temporal_sample920978545// fileprocessing.go:156: [dad73] [state] +FileProcessed -ProcessingFile// fileprocessing.go:156: [dad73] [extern:ProcessingFi...] cleanup /tmp/temporal_sample699261295// fileprocessing.go:156: [dad73] [state:auto] +UploadingFile// fileprocessing.go:156: [dad73] [extern] uploadFileActivity begin /tmp/temporal_sample920978545// fileprocessing.go:156: [dad73] [extern] uploadFileActivity succeed /tmp/temporal_sample920978545// fileprocessing.go:156: [dad73] [state] +FileUploaded -UploadingFile// fileprocessing.go:156: [dad73] [extern:UploadingFil...] cleanup /tmp/temporal_sample920978545// fileprocessing_test.go:34:// (FileUploaded:1 FileProcessed:1 FileDownloaded:1)// fileprocessing_test.go:35:// (FileDownloaded:1 FileProcessed:1 FileUploaded:1)[DownloadingFile:2 Exception:0 ProcessingFile:2 UploadingFile:2]// fileprocessing_test.go:36:// DownloadingFile:// State: false 2// Remove: FileDownloaded//// Exception:// State: false 0// Multi: true//// FileDownloaded:// State: true 1// Remove: DownloadingFile//// FileProcessed:// State: true 1// Remove: ProcessingFile//// FileUploaded:// State: true 1// Remove: UploadingFile//// ProcessingFile:// State: false 2// Auto: true// Require: FileDownloaded// Remove: FileProcessed//// UploadingFile:// State: false 2// Auto: true// Require: FileProcessed// Remove: FileUploaded////// --- PASS: TestFileProcessing (0.20s)// PASSpackage temporal_fileprocessingimport (ssamhelpam// "github.com/pancsta/asyncmachine-go/pkg/telemetry")typeLoggerfunc(msg string, args ...any)// MachineHandlers is a struct of handlers & their data for the FileProcessing// machine. None of the handlers can block.typeMachineHandlersstruct {// default handler for the build in Exception stateam.ExceptionHandler BlobStore *BlobStore Filename string DownloadedName string ProcessedFileName string}// DownloadingFileState is a _final_ entry handler for the DownloadingFile// state.func ( *MachineHandlers) ( *am.Event) { .Machine().Log("Downloading file... %s", .Filename)// read args .Filename = .Args["filename"].(string)// tick-based ctx := .Machine().NewStateCtx(ss.DownloadingFile)// unblockgofunc() {if .Err() != nil {return// expired } := .BlobStore.downloadFile(.Filename) , := saveToTmpFile()if != nil { .Machine().AddErr(, nil)return } .DownloadedName = .Name()// done, next step .Machine().Add1(ss.FileDownloaded, nil) }()}// ProcessingFileState is a _final_ entry handler for the ProcessingFile// state.func ( *MachineHandlers) ( *am.Event) {// tick-based ctx := .Machine().NewStateCtx(ss.ProcessingFile)// unblockgofunc() {if .Err() != nil {return// expired }// read downloaded file , := os.ReadFile(.DownloadedName)if != nil { .Machine().Log("processFileActivity failed to read file %s.", .DownloadedName) .Machine().AddErr(, nil)return }// process the file := transcodeData(, ) , := saveToTmpFile()if != nil { .Machine().Log("processFileActivity failed to save tmp file.") .Machine().AddErr(, nil)return } .ProcessedFileName = .Name() .Machine().Log("processFileActivity succeed %s", .ProcessedFileName)// done, next step .Machine().Add1(ss.FileProcessed, nil) }()}// ProcessingFileEnd is a _final_ exit handler for the ProcessingFile// state.func ( *MachineHandlers) ( *am.Event) {// clean up temp the file .Machine().Log("cleanup %s", .DownloadedName) _ = os.Remove(.DownloadedName)}// UploadingFileState is a _final_ transition handler for the// UploadingFile state.func ( *MachineHandlers) ( *am.Event) {// tick-based ctx := .Machine().NewStateCtx(ss.UploadingFile)// unblockgofunc() {if .Err() != nil {return// expired } .Machine().Log("uploadFileActivity begin %s", .ProcessedFileName) := .BlobStore.uploadFile(, .ProcessedFileName)if != nil { .Machine().Log("uploadFileActivity uploading failed.") .Machine().AddErr(, nil)return } .Machine().Log("uploadFileActivity succeed %s", .ProcessedFileName)// done, next step .Machine().Add1(ss.FileUploaded, nil) }()}// UploadingFileEnd is a _final_ exit handler for the UploadingFile// state.func ( *MachineHandlers) ( *am.Event) {// clean up temp the file .Machine().Log("cleanup %s", .ProcessedFileName) _ = os.Remove(.ProcessedFileName)}// FileProcessingFlow is an example of how to use the FileProcessing machine.func ( context.Context, Logger, string) (*am.Machine, error) {// init := am.New(, ss.States, &am.Opts{DontLogId: true, })amhelp.MachDebugEnv()// different log granularity and a custom output .SemLogger().SetLevel(am.LogChanges)// machine.SemLogger().SetLevel(am.LogOps) // machine.SemLogger().SetLevel(am.LogDecisions) // machine.SemLogger().SetLevel(am.LogEverything) .SemLogger().SetLogger(func( am.LogLevel, string, ...any) { (, ...) })// bind handlers := .BindHandlers(&MachineHandlers{})if != nil {return , }// start it up! .Add1(ss.DownloadingFile, am.A{"filename": })// DownloadingFile to FileUploaded ("waiting: DownloadingFile to FileUploaded")select {case<-time.After(5 * time.Second):return , errors.New("timeout")case<-.WhenErr(nil):return , .Err()case<-.When1(ss.FileUploaded, nil): }return , nil}// Helpers (taken from the Temporal sample)typeBlobStorestruct{}func ( *BlobStore) ( string) []byte {// dummy downloader := "dummy content for fileID:" + return []byte()}func ( *BlobStore) ( context.Context, string) error {// dummy uploader , := os.ReadFile()if != nil {return }time.Sleep(100 * time.Millisecond)returnnil}func transcodeData( context.Context, []byte) []byte {// dummy file processor, just do upper case for the data. // in real world case, you would want to avoid load entire file content into // memory at once.time.Sleep(100 * time.Millisecond)return []byte(strings.ToUpper(string()))}func saveToTmpFile( []byte) ( *os.File, error) { , := os.CreateTemp("", "temporal_sample")if != nil {returnnil, } _, = .Write()if != nil { _ = os.Remove(.Name())returnnil, }return , nil}
The pages are generated with Goldsv0.8.2. (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu.
PR and bug reports are welcome and can be submitted to the issue list.
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds.