Skip to content

Commit c85e275

Browse files
committed
[ADDED] Introduce PersistMode for configurable persistence settings in streams (#1943)
Signed-off-by: Piotr Piotrowski <piotr@synadia.com>
1 parent 13d3ae2 commit c85e275

1 file changed

Lines changed: 50 additions & 0 deletions

File tree

jetstream/stream_config.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,9 @@ type (
207207

208208
// AllowAtomicPublish allows atomic batch publishing into the stream.
209209
AllowAtomicPublish bool `json:"allow_atomic,omitempty"`
210+
211+
// PersistMode allows to opt-in to different persistence mode settings.
212+
PersistMode PersistModeType `json:"persist_mode,omitempty"`
210213
}
211214

212215
// StreamSourceInfo shows information about an upstream stream
@@ -428,6 +431,9 @@ type (
428431

429432
// StoreCompression determines how messages are compressed.
430433
StoreCompression uint8
434+
435+
// PersistModeType determines what persistence mode the stream uses.
436+
PersistModeType int
431437
)
432438

433439
const (
@@ -459,6 +465,16 @@ const (
459465
workQueuePolicyString = "workqueue"
460466
)
461467

468+
const (
469+
// DefaultPersistMode specifies the default persist mode. Writes to the stream will immediately be flushed.
470+
// The publish acknowledgement will be sent after the persisting completes.
471+
DefaultPersistMode = PersistModeType(iota)
472+
// AsyncPersistMode specifies writes to the stream will be flushed asynchronously.
473+
// The publish acknowledgement may be sent before the persisting completes.
474+
// This means writes could be lost if they weren't flushed prior to a hard kill of the server.
475+
AsyncPersistMode
476+
)
477+
462478
func (rp RetentionPolicy) String() string {
463479
switch rp {
464480
case LimitsPolicy:
@@ -533,6 +549,40 @@ func (dp *DiscardPolicy) UnmarshalJSON(data []byte) error {
533549
return nil
534550
}
535551

552+
func (pm PersistModeType) String() string {
553+
switch pm {
554+
case DefaultPersistMode:
555+
return "Default"
556+
case AsyncPersistMode:
557+
return "Async"
558+
default:
559+
return "Unknown Persist Mode"
560+
}
561+
}
562+
563+
func (pm PersistModeType) MarshalJSON() ([]byte, error) {
564+
switch pm {
565+
case DefaultPersistMode:
566+
return json.Marshal("default")
567+
case AsyncPersistMode:
568+
return json.Marshal("async")
569+
default:
570+
return nil, fmt.Errorf("nats: can not marshal %v", pm)
571+
}
572+
}
573+
574+
func (pm *PersistModeType) UnmarshalJSON(data []byte) error {
575+
switch strings.ToLower(string(data)) {
576+
case jsonString("default"):
577+
*pm = DefaultPersistMode
578+
case jsonString("async"):
579+
*pm = AsyncPersistMode
580+
default:
581+
return fmt.Errorf("nats: can not unmarshal %q", data)
582+
}
583+
return nil
584+
}
585+
536586
const (
537587
// FileStorage specifies on disk storage. It's the default.
538588
FileStorage StorageType = iota

0 commit comments

Comments
 (0)