Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
stream: added is_sync flag for begin/commit
Part of #TNTP-3334
Closes #366
  • Loading branch information
bigbes committed Jul 3, 2025
commit 57d71e8d7c2047441f54dbfa8b5d156fba957a1b
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
- Implemented box.session.su request and sugar interface only for current session granting (#426).
- Defined `ErrConcurrentSchemaUpdate` constant for "concurrent schema update" error.
Now you can check this error with `errors.Is(err, tarantool.ErrConcurrentSchemaUpdate)`.
- Implemented support for `IPROTO_IS_SYNC` flag in stream transactions,
added `IsSync(bool)` method for `BeginRequest`/`CommitRequest` (#447).

### Changed

Expand Down
84 changes: 84 additions & 0 deletions example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -998,6 +998,90 @@ func ExampleBeginRequest_TxnIsolation() {
fmt.Printf("Select after Rollback: response is %#v\n", data)
}

func ExampleBeginRequest_IsSync() {
conn := exampleConnect(dialer, opts)
defer conn.Close()

// Tarantool supports IS_SYNC flag for BeginRequest since version 3.1.0.
isLess, err := test_helpers.IsTarantoolVersionLess(3, 1, 0)
if err != nil || isLess {
return
}

stream, err := conn.NewStream()
if err != nil {
fmt.Printf("error getting the stream: %s\n", err)
return
}

// Begin transaction with synchronous mode.
req := tarantool.NewBeginRequest().IsSync(true)
resp, err := stream.Do(req).GetResponse()
switch {
case err != nil:
fmt.Printf("error getting the response: %s\n", err)
case resp.Header().Error != tarantool.ErrorNo:
fmt.Printf("response error code: %s\n", resp.Header().Error)
default:
fmt.Println("Success.")
}
}

func ExampleCommitRequest_IsSync() {
conn := exampleConnect(dialer, opts)
defer conn.Close()

// Tarantool supports IS_SYNC flag for CommitRequest since version 3.1.0.
isLess, err := test_helpers.IsTarantoolVersionLess(3, 1, 0)
if err != nil || isLess {
return
}

var req tarantool.Request

stream, err := conn.NewStream()
if err != nil {
fmt.Printf("error getting the stream: %s\n", err)
return
}

// Begin transaction.
req = tarantool.NewBeginRequest()
resp, err := stream.Do(req).GetResponse()
switch {
case err != nil:
fmt.Printf("error getting the response: %s\n", err)
return
case resp.Header().Error != tarantool.ErrorNo:
fmt.Printf("response error code: %s\n", resp.Header().Error)
return
}

// Insert in stream.
req = tarantool.NewReplaceRequest("test").Tuple([]interface{}{1, "test"})
resp, err = stream.Do(req).GetResponse()
switch {
case err != nil:
fmt.Printf("error getting the response: %s\n", err)
return
case resp.Header().Error != tarantool.ErrorNo:
fmt.Printf("response error code: %s\n", resp.Header().Error)
return
}

// Commit transaction in sync mode.
req = tarantool.NewCommitRequest().IsSync(true)
resp, err = stream.Do(req).GetResponse()
switch {
case err != nil:
fmt.Printf("error getting the response: %s\n", err)
case resp.Header().Error != tarantool.ErrorNo:
fmt.Printf("response error code: %s\n", resp.Header().Error)
default:
fmt.Println("Success.")
}
}

func ExampleErrorNo() {
conn := exampleConnect(dialer, opts)
defer conn.Close()
Expand Down
59 changes: 57 additions & 2 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ type BeginRequest struct {
baseRequest
txnIsolation TxnIsolationLevel
timeout time.Duration
isSync bool
isSyncSet bool
}

// NewBeginRequest returns a new BeginRequest.
Expand All @@ -59,12 +61,19 @@ func (req *BeginRequest) TxnIsolation(txnIsolation TxnIsolationLevel) *BeginRequ
return req
}

// WithTimeout allows to set up a timeout for call BeginRequest.
// Timeout allows to set up a timeout for call BeginRequest.
func (req *BeginRequest) Timeout(timeout time.Duration) *BeginRequest {
req.timeout = timeout
return req
}

// IsSync allows to set up a IsSync flag for call BeginRequest.
func (req *BeginRequest) IsSync(isSync bool) *BeginRequest {
req.isSync = isSync
req.isSyncSet = true
return req
}

// Body fills an msgpack.Encoder with the begin request body.
func (req *BeginRequest) Body(_ SchemaResolver, enc *msgpack.Encoder) error {
var (
Expand All @@ -81,6 +90,10 @@ func (req *BeginRequest) Body(_ SchemaResolver, enc *msgpack.Encoder) error {
mapLen++
}

if req.isSyncSet {
mapLen++
}

if err := enc.EncodeMapLen(mapLen); err != nil {
return err
}
Expand All @@ -105,6 +118,16 @@ func (req *BeginRequest) Body(_ SchemaResolver, enc *msgpack.Encoder) error {
}
}

if req.isSyncSet {
if err := enc.EncodeUint(uint64(iproto.IPROTO_IS_SYNC)); err != nil {
return err
}

if err := enc.EncodeBool(req.isSync); err != nil {
return err
}
}

return nil
}

Expand All @@ -124,6 +147,9 @@ func (req *BeginRequest) Context(ctx context.Context) *BeginRequest {
// Commit request can not be processed out of stream.
type CommitRequest struct {
baseRequest

isSync bool
isSyncSet bool
}

// NewCommitRequest returns a new CommitRequest.
Expand All @@ -133,9 +159,38 @@ func NewCommitRequest() *CommitRequest {
return req
}

// IsSync allows to set up a IsSync flag for call BeginRequest.
func (req *CommitRequest) IsSync(isSync bool) *CommitRequest {
req.isSync = isSync
req.isSyncSet = true
return req
}

// Body fills an msgpack.Encoder with the commit request body.
func (req *CommitRequest) Body(_ SchemaResolver, enc *msgpack.Encoder) error {
return enc.EncodeMapLen(0)
var (
mapLen = 0
)

if req.isSyncSet {
mapLen++
}

if err := enc.EncodeMapLen(mapLen); err != nil {
return err
}

if req.isSyncSet {
if err := enc.EncodeUint(uint64(iproto.IPROTO_IS_SYNC)); err != nil {
return err
}

if err := enc.EncodeBool(req.isSync); err != nil {
return err
}
}

return nil
}

// Context sets a passed context to the request.
Expand Down
69 changes: 69 additions & 0 deletions tarantool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4199,6 +4199,75 @@ func TestFdDialer(t *testing.T) {
require.Equal(t, int8(0), resp[0])
}

const (
errNoSyncTransactionQueue = "The synchronous transaction queue doesn't belong to any instance"
)

func TestDoBeginRequest_IsSync(t *testing.T) {
test_helpers.SkipIfIsSyncUnsupported(t)

conn := test_helpers.ConnectWithValidation(t, dialer, opts)
defer conn.Close()

stream, err := conn.NewStream()
require.NoError(t, err)

_, err = stream.Do(NewBeginRequest().IsSync(true)).Get()
assert.Nil(t, err)

_, err = stream.Do(
NewReplaceRequest("test").Tuple([]interface{}{1, "foo"}),
).Get()
require.Nil(t, err)

_, err = stream.Do(NewCommitRequest()).Get()
require.NotNil(t, err)
assert.Contains(t, err.Error(), errNoSyncTransactionQueue)
}

func TestDoCommitRequest_IsSync(t *testing.T) {
test_helpers.SkipIfIsSyncUnsupported(t)

conn := test_helpers.ConnectWithValidation(t, dialer, opts)
defer conn.Close()

stream, err := conn.NewStream()
require.NoError(t, err)

_, err = stream.Do(NewBeginRequest()).Get()
require.Nil(t, err)

_, err = stream.Do(
NewReplaceRequest("test").Tuple([]interface{}{1, "foo"}),
).Get()
require.Nil(t, err)

_, err = stream.Do(NewCommitRequest().IsSync(true)).Get()
require.NotNil(t, err)
assert.Contains(t, err.Error(), errNoSyncTransactionQueue)
}

func TestDoCommitRequest_NoSync(t *testing.T) {
test_helpers.SkipIfIsSyncUnsupported(t)

conn := test_helpers.ConnectWithValidation(t, dialer, opts)
defer conn.Close()

stream, err := conn.NewStream()
require.NoError(t, err)

_, err = stream.Do(NewBeginRequest()).Get()
require.Nil(t, err)

_, err = stream.Do(
NewReplaceRequest("test").Tuple([]interface{}{1, "foo"}),
).Get()
require.Nil(t, err)

_, err = stream.Do(NewCommitRequest()).Get()
assert.Nil(t, err)
}

// runTestMain is a body of TestMain function
// (see https://pkg.go.dev/testing#hdr-Main).
// Using defer + os.Exit is not works so TestMain body
Expand Down
8 changes: 8 additions & 0 deletions test_helpers/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,14 @@ func SkipIfCrudSpliceBroken(t *testing.T) {
SkipIfFeatureUnsupported(t, "crud update splice", 2, 0, 0)
}

// SkipIfIsSyncUnsupported skips test run if Tarantool without
// IS_SYNC support is used.
func SkipIfIsSyncUnsupported(t *testing.T) {
t.Helper()

SkipIfFeatureUnsupported(t, "is sync", 3, 1, 0)
}

// IsTcsSupported checks if Tarantool supports centralized storage.
// Tarantool supports centralized storage with Enterprise since 3.3.0 version.
func IsTcsSupported() (bool, error) {
Expand Down
Loading