mirror of
https://github.com/schollz/cowyo.git
synced 2023-08-10 21:13:00 +03:00
779 lines
17 KiB
Go
779 lines
17 KiB
Go
|
package txn_test
|
||
|
|
||
|
import (
|
||
|
"flag"
|
||
|
"fmt"
|
||
|
"sync"
|
||
|
"testing"
|
||
|
"time"
|
||
|
|
||
|
. "gopkg.in/check.v1"
|
||
|
"gopkg.in/mgo.v2"
|
||
|
"gopkg.in/mgo.v2/bson"
|
||
|
"gopkg.in/mgo.v2/dbtest"
|
||
|
"gopkg.in/mgo.v2/txn"
|
||
|
)
|
||
|
|
||
|
func TestAll(t *testing.T) {
|
||
|
TestingT(t)
|
||
|
}
|
||
|
|
||
|
type S struct {
|
||
|
server dbtest.DBServer
|
||
|
session *mgo.Session
|
||
|
db *mgo.Database
|
||
|
tc, sc *mgo.Collection
|
||
|
accounts *mgo.Collection
|
||
|
runner *txn.Runner
|
||
|
}
|
||
|
|
||
|
var _ = Suite(&S{})
|
||
|
|
||
|
type M map[string]interface{}
|
||
|
|
||
|
func (s *S) SetUpSuite(c *C) {
|
||
|
s.server.SetPath(c.MkDir())
|
||
|
}
|
||
|
|
||
|
func (s *S) TearDownSuite(c *C) {
|
||
|
s.server.Stop()
|
||
|
}
|
||
|
|
||
|
func (s *S) SetUpTest(c *C) {
|
||
|
s.server.Wipe()
|
||
|
|
||
|
txn.SetChaos(txn.Chaos{})
|
||
|
txn.SetLogger(c)
|
||
|
txn.SetDebug(true)
|
||
|
|
||
|
s.session = s.server.Session()
|
||
|
s.db = s.session.DB("test")
|
||
|
s.tc = s.db.C("tc")
|
||
|
s.sc = s.db.C("tc.stash")
|
||
|
s.accounts = s.db.C("accounts")
|
||
|
s.runner = txn.NewRunner(s.tc)
|
||
|
}
|
||
|
|
||
|
func (s *S) TearDownTest(c *C) {
|
||
|
txn.SetLogger(nil)
|
||
|
txn.SetDebug(false)
|
||
|
s.session.Close()
|
||
|
}
|
||
|
|
||
|
type Account struct {
|
||
|
Id int `bson:"_id"`
|
||
|
Balance int
|
||
|
}
|
||
|
|
||
|
func (s *S) TestDocExists(c *C) {
|
||
|
err := s.accounts.Insert(M{"_id": 0, "balance": 300})
|
||
|
c.Assert(err, IsNil)
|
||
|
|
||
|
exists := []txn.Op{{
|
||
|
C: "accounts",
|
||
|
Id: 0,
|
||
|
Assert: txn.DocExists,
|
||
|
}}
|
||
|
missing := []txn.Op{{
|
||
|
C: "accounts",
|
||
|
Id: 0,
|
||
|
Assert: txn.DocMissing,
|
||
|
}}
|
||
|
|
||
|
err = s.runner.Run(exists, "", nil)
|
||
|
c.Assert(err, IsNil)
|
||
|
err = s.runner.Run(missing, "", nil)
|
||
|
c.Assert(err, Equals, txn.ErrAborted)
|
||
|
|
||
|
err = s.accounts.RemoveId(0)
|
||
|
c.Assert(err, IsNil)
|
||
|
|
||
|
err = s.runner.Run(exists, "", nil)
|
||
|
c.Assert(err, Equals, txn.ErrAborted)
|
||
|
err = s.runner.Run(missing, "", nil)
|
||
|
c.Assert(err, IsNil)
|
||
|
}
|
||
|
|
||
|
func (s *S) TestInsert(c *C) {
|
||
|
err := s.accounts.Insert(M{"_id": 0, "balance": 300})
|
||
|
c.Assert(err, IsNil)
|
||
|
|
||
|
ops := []txn.Op{{
|
||
|
C: "accounts",
|
||
|
Id: 0,
|
||
|
Insert: M{"balance": 200},
|
||
|
}}
|
||
|
|
||
|
err = s.runner.Run(ops, "", nil)
|
||
|
c.Assert(err, IsNil)
|
||
|
|
||
|
var account Account
|
||
|
err = s.accounts.FindId(0).One(&account)
|
||
|
c.Assert(err, IsNil)
|
||
|
c.Assert(account.Balance, Equals, 300)
|
||
|
|
||
|
ops[0].Id = 1
|
||
|
err = s.runner.Run(ops, "", nil)
|
||
|
c.Assert(err, IsNil)
|
||
|
|
||
|
err = s.accounts.FindId(1).One(&account)
|
||
|
c.Assert(err, IsNil)
|
||
|
c.Assert(account.Balance, Equals, 200)
|
||
|
}
|
||
|
|
||
|
func (s *S) TestInsertStructID(c *C) {
|
||
|
type id struct {
|
||
|
FirstName string
|
||
|
LastName string
|
||
|
}
|
||
|
ops := []txn.Op{{
|
||
|
C: "accounts",
|
||
|
Id: id{FirstName: "John", LastName: "Jones"},
|
||
|
Assert: txn.DocMissing,
|
||
|
Insert: M{"balance": 200},
|
||
|
}, {
|
||
|
C: "accounts",
|
||
|
Id: id{FirstName: "Sally", LastName: "Smith"},
|
||
|
Assert: txn.DocMissing,
|
||
|
Insert: M{"balance": 800},
|
||
|
}}
|
||
|
|
||
|
err := s.runner.Run(ops, "", nil)
|
||
|
c.Assert(err, IsNil)
|
||
|
|
||
|
n, err := s.accounts.Find(nil).Count()
|
||
|
c.Assert(err, IsNil)
|
||
|
c.Assert(n, Equals, 2)
|
||
|
}
|
||
|
|
||
|
func (s *S) TestRemove(c *C) {
|
||
|
err := s.accounts.Insert(M{"_id": 0, "balance": 300})
|
||
|
c.Assert(err, IsNil)
|
||
|
|
||
|
ops := []txn.Op{{
|
||
|
C: "accounts",
|
||
|
Id: 0,
|
||
|
Remove: true,
|
||
|
}}
|
||
|
|
||
|
err = s.runner.Run(ops, "", nil)
|
||
|
c.Assert(err, IsNil)
|
||
|
|
||
|
err = s.accounts.FindId(0).One(nil)
|
||
|
c.Assert(err, Equals, mgo.ErrNotFound)
|
||
|
|
||
|
err = s.runner.Run(ops, "", nil)
|
||
|
c.Assert(err, IsNil)
|
||
|
}
|
||
|
|
||
|
func (s *S) TestUpdate(c *C) {
|
||
|
var err error
|
||
|
err = s.accounts.Insert(M{"_id": 0, "balance": 200})
|
||
|
c.Assert(err, IsNil)
|
||
|
err = s.accounts.Insert(M{"_id": 1, "balance": 200})
|
||
|
c.Assert(err, IsNil)
|
||
|
|
||
|
ops := []txn.Op{{
|
||
|
C: "accounts",
|
||
|
Id: 0,
|
||
|
Update: M{"$inc": M{"balance": 100}},
|
||
|
}}
|
||
|
|
||
|
err = s.runner.Run(ops, "", nil)
|
||
|
c.Assert(err, IsNil)
|
||
|
|
||
|
var account Account
|
||
|
err = s.accounts.FindId(0).One(&account)
|
||
|
c.Assert(err, IsNil)
|
||
|
c.Assert(account.Balance, Equals, 300)
|
||
|
|
||
|
ops[0].Id = 1
|
||
|
|
||
|
err = s.accounts.FindId(1).One(&account)
|
||
|
c.Assert(err, IsNil)
|
||
|
c.Assert(account.Balance, Equals, 200)
|
||
|
}
|
||
|
|
||
|
func (s *S) TestInsertUpdate(c *C) {
|
||
|
ops := []txn.Op{{
|
||
|
C: "accounts",
|
||
|
Id: 0,
|
||
|
Insert: M{"_id": 0, "balance": 200},
|
||
|
}, {
|
||
|
C: "accounts",
|
||
|
Id: 0,
|
||
|
Update: M{"$inc": M{"balance": 100}},
|
||
|
}}
|
||
|
|
||
|
err := s.runner.Run(ops, "", nil)
|
||
|
c.Assert(err, IsNil)
|
||
|
|
||
|
var account Account
|
||
|
err = s.accounts.FindId(0).One(&account)
|
||
|
c.Assert(err, IsNil)
|
||
|
c.Assert(account.Balance, Equals, 300)
|
||
|
|
||
|
err = s.runner.Run(ops, "", nil)
|
||
|
c.Assert(err, IsNil)
|
||
|
|
||
|
err = s.accounts.FindId(0).One(&account)
|
||
|
c.Assert(err, IsNil)
|
||
|
c.Assert(account.Balance, Equals, 400)
|
||
|
}
|
||
|
|
||
|
func (s *S) TestUpdateInsert(c *C) {
|
||
|
ops := []txn.Op{{
|
||
|
C: "accounts",
|
||
|
Id: 0,
|
||
|
Update: M{"$inc": M{"balance": 100}},
|
||
|
}, {
|
||
|
C: "accounts",
|
||
|
Id: 0,
|
||
|
Insert: M{"_id": 0, "balance": 200},
|
||
|
}}
|
||
|
|
||
|
err := s.runner.Run(ops, "", nil)
|
||
|
c.Assert(err, IsNil)
|
||
|
|
||
|
var account Account
|
||
|
err = s.accounts.FindId(0).One(&account)
|
||
|
c.Assert(err, IsNil)
|
||
|
c.Assert(account.Balance, Equals, 200)
|
||
|
|
||
|
err = s.runner.Run(ops, "", nil)
|
||
|
c.Assert(err, IsNil)
|
||
|
|
||
|
err = s.accounts.FindId(0).One(&account)
|
||
|
c.Assert(err, IsNil)
|
||
|
c.Assert(account.Balance, Equals, 300)
|
||
|
}
|
||
|
|
||
|
func (s *S) TestInsertRemoveInsert(c *C) {
|
||
|
ops := []txn.Op{{
|
||
|
C: "accounts",
|
||
|
Id: 0,
|
||
|
Insert: M{"_id": 0, "balance": 200},
|
||
|
}, {
|
||
|
C: "accounts",
|
||
|
Id: 0,
|
||
|
Remove: true,
|
||
|
}, {
|
||
|
C: "accounts",
|
||
|
Id: 0,
|
||
|
Insert: M{"_id": 0, "balance": 300},
|
||
|
}}
|
||
|
|
||
|
err := s.runner.Run(ops, "", nil)
|
||
|
c.Assert(err, IsNil)
|
||
|
|
||
|
var account Account
|
||
|
err = s.accounts.FindId(0).One(&account)
|
||
|
c.Assert(err, IsNil)
|
||
|
c.Assert(account.Balance, Equals, 300)
|
||
|
}
|
||
|
|
||
|
func (s *S) TestQueueStashing(c *C) {
|
||
|
txn.SetChaos(txn.Chaos{
|
||
|
KillChance: 1,
|
||
|
Breakpoint: "set-applying",
|
||
|
})
|
||
|
|
||
|
opses := [][]txn.Op{{{
|
||
|
C: "accounts",
|
||
|
Id: 0,
|
||
|
Insert: M{"balance": 100},
|
||
|
}}, {{
|
||
|
C: "accounts",
|
||
|
Id: 0,
|
||
|
Remove: true,
|
||
|
}}, {{
|
||
|
C: "accounts",
|
||
|
Id: 0,
|
||
|
Insert: M{"balance": 200},
|
||
|
}}, {{
|
||
|
C: "accounts",
|
||
|
Id: 0,
|
||
|
Update: M{"$inc": M{"balance": 100}},
|
||
|
}}}
|
||
|
|
||
|
var last bson.ObjectId
|
||
|
for _, ops := range opses {
|
||
|
last = bson.NewObjectId()
|
||
|
err := s.runner.Run(ops, last, nil)
|
||
|
c.Assert(err, Equals, txn.ErrChaos)
|
||
|
}
|
||
|
|
||
|
txn.SetChaos(txn.Chaos{})
|
||
|
err := s.runner.Resume(last)
|
||
|
c.Assert(err, IsNil)
|
||
|
|
||
|
var account Account
|
||
|
err = s.accounts.FindId(0).One(&account)
|
||
|
c.Assert(err, IsNil)
|
||
|
c.Assert(account.Balance, Equals, 300)
|
||
|
}
|
||
|
|
||
|
func (s *S) TestInfo(c *C) {
|
||
|
ops := []txn.Op{{
|
||
|
C: "accounts",
|
||
|
Id: 0,
|
||
|
Assert: txn.DocMissing,
|
||
|
}}
|
||
|
|
||
|
id := bson.NewObjectId()
|
||
|
err := s.runner.Run(ops, id, M{"n": 42})
|
||
|
c.Assert(err, IsNil)
|
||
|
|
||
|
var t struct{ I struct{ N int } }
|
||
|
err = s.tc.FindId(id).One(&t)
|
||
|
c.Assert(err, IsNil)
|
||
|
c.Assert(t.I.N, Equals, 42)
|
||
|
}
|
||
|
|
||
|
func (s *S) TestErrors(c *C) {
|
||
|
doc := bson.M{"foo": 1}
|
||
|
tests := []txn.Op{{
|
||
|
C: "c",
|
||
|
Id: 0,
|
||
|
}, {
|
||
|
C: "c",
|
||
|
Id: 0,
|
||
|
Insert: doc,
|
||
|
Remove: true,
|
||
|
}, {
|
||
|
C: "c",
|
||
|
Id: 0,
|
||
|
Insert: doc,
|
||
|
Update: doc,
|
||
|
}, {
|
||
|
C: "c",
|
||
|
Id: 0,
|
||
|
Update: doc,
|
||
|
Remove: true,
|
||
|
}, {
|
||
|
C: "c",
|
||
|
Assert: doc,
|
||
|
}, {
|
||
|
Id: 0,
|
||
|
Assert: doc,
|
||
|
}}
|
||
|
|
||
|
txn.SetChaos(txn.Chaos{KillChance: 1.0})
|
||
|
for _, op := range tests {
|
||
|
c.Logf("op: %v", op)
|
||
|
err := s.runner.Run([]txn.Op{op}, "", nil)
|
||
|
c.Assert(err, ErrorMatches, "error in transaction op 0: .*")
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (s *S) TestAssertNestedOr(c *C) {
|
||
|
// Assert uses $or internally. Ensure nesting works.
|
||
|
err := s.accounts.Insert(M{"_id": 0, "balance": 300})
|
||
|
c.Assert(err, IsNil)
|
||
|
|
||
|
ops := []txn.Op{{
|
||
|
C: "accounts",
|
||
|
Id: 0,
|
||
|
Assert: bson.D{{"$or", []bson.D{{{"balance", 100}}, {{"balance", 300}}}}},
|
||
|
Update: bson.D{{"$inc", bson.D{{"balance", 100}}}},
|
||
|
}}
|
||
|
|
||
|
err = s.runner.Run(ops, "", nil)
|
||
|
c.Assert(err, IsNil)
|
||
|
|
||
|
var account Account
|
||
|
err = s.accounts.FindId(0).One(&account)
|
||
|
c.Assert(err, IsNil)
|
||
|
c.Assert(account.Balance, Equals, 400)
|
||
|
}
|
||
|
|
||
|
func (s *S) TestVerifyFieldOrdering(c *C) {
|
||
|
// Used to have a map in certain operations, which means
|
||
|
// the ordering of fields would be messed up.
|
||
|
fields := bson.D{{"a", 1}, {"b", 2}, {"c", 3}}
|
||
|
ops := []txn.Op{{
|
||
|
C: "accounts",
|
||
|
Id: 0,
|
||
|
Insert: fields,
|
||
|
}}
|
||
|
|
||
|
err := s.runner.Run(ops, "", nil)
|
||
|
c.Assert(err, IsNil)
|
||
|
|
||
|
var d bson.D
|
||
|
err = s.accounts.FindId(0).One(&d)
|
||
|
c.Assert(err, IsNil)
|
||
|
|
||
|
var filtered bson.D
|
||
|
for _, e := range d {
|
||
|
switch e.Name {
|
||
|
case "a", "b", "c":
|
||
|
filtered = append(filtered, e)
|
||
|
}
|
||
|
}
|
||
|
c.Assert(filtered, DeepEquals, fields)
|
||
|
}
|
||
|
|
||
|
func (s *S) TestChangeLog(c *C) {
|
||
|
chglog := s.db.C("chglog")
|
||
|
s.runner.ChangeLog(chglog)
|
||
|
|
||
|
ops := []txn.Op{{
|
||
|
C: "debts",
|
||
|
Id: 0,
|
||
|
Assert: txn.DocMissing,
|
||
|
}, {
|
||
|
C: "accounts",
|
||
|
Id: 0,
|
||
|
Insert: M{"balance": 300},
|
||
|
}, {
|
||
|
C: "accounts",
|
||
|
Id: 1,
|
||
|
Insert: M{"balance": 300},
|
||
|
}, {
|
||
|
C: "people",
|
||
|
Id: "joe",
|
||
|
Insert: M{"accounts": []int64{0, 1}},
|
||
|
}}
|
||
|
id := bson.NewObjectId()
|
||
|
err := s.runner.Run(ops, id, nil)
|
||
|
c.Assert(err, IsNil)
|
||
|
|
||
|
type IdList []interface{}
|
||
|
type Log struct {
|
||
|
Docs IdList "d"
|
||
|
Revnos []int64 "r"
|
||
|
}
|
||
|
var m map[string]*Log
|
||
|
err = chglog.FindId(id).One(&m)
|
||
|
c.Assert(err, IsNil)
|
||
|
|
||
|
c.Assert(m["accounts"], DeepEquals, &Log{IdList{0, 1}, []int64{2, 2}})
|
||
|
c.Assert(m["people"], DeepEquals, &Log{IdList{"joe"}, []int64{2}})
|
||
|
c.Assert(m["debts"], IsNil)
|
||
|
|
||
|
ops = []txn.Op{{
|
||
|
C: "accounts",
|
||
|
Id: 0,
|
||
|
Update: M{"$inc": M{"balance": 100}},
|
||
|
}, {
|
||
|
C: "accounts",
|
||
|
Id: 1,
|
||
|
Update: M{"$inc": M{"balance": 100}},
|
||
|
}}
|
||
|
id = bson.NewObjectId()
|
||
|
err = s.runner.Run(ops, id, nil)
|
||
|
c.Assert(err, IsNil)
|
||
|
|
||
|
m = nil
|
||
|
err = chglog.FindId(id).One(&m)
|
||
|
c.Assert(err, IsNil)
|
||
|
|
||
|
c.Assert(m["accounts"], DeepEquals, &Log{IdList{0, 1}, []int64{3, 3}})
|
||
|
c.Assert(m["people"], IsNil)
|
||
|
|
||
|
ops = []txn.Op{{
|
||
|
C: "accounts",
|
||
|
Id: 0,
|
||
|
Remove: true,
|
||
|
}, {
|
||
|
C: "people",
|
||
|
Id: "joe",
|
||
|
Remove: true,
|
||
|
}}
|
||
|
id = bson.NewObjectId()
|
||
|
err = s.runner.Run(ops, id, nil)
|
||
|
c.Assert(err, IsNil)
|
||
|
|
||
|
m = nil
|
||
|
err = chglog.FindId(id).One(&m)
|
||
|
c.Assert(err, IsNil)
|
||
|
|
||
|
c.Assert(m["accounts"], DeepEquals, &Log{IdList{0}, []int64{-4}})
|
||
|
c.Assert(m["people"], DeepEquals, &Log{IdList{"joe"}, []int64{-3}})
|
||
|
}
|
||
|
|
||
|
func (s *S) TestPurgeMissing(c *C) {
|
||
|
txn.SetChaos(txn.Chaos{
|
||
|
KillChance: 1,
|
||
|
Breakpoint: "set-applying",
|
||
|
})
|
||
|
|
||
|
err := s.accounts.Insert(M{"_id": 0, "balance": 100})
|
||
|
c.Assert(err, IsNil)
|
||
|
err = s.accounts.Insert(M{"_id": 1, "balance": 100})
|
||
|
c.Assert(err, IsNil)
|
||
|
|
||
|
ops1 := []txn.Op{{
|
||
|
C: "accounts",
|
||
|
Id: 3,
|
||
|
Insert: M{"balance": 100},
|
||
|
}}
|
||
|
|
||
|
ops2 := []txn.Op{{
|
||
|
C: "accounts",
|
||
|
Id: 0,
|
||
|
Remove: true,
|
||
|
}, {
|
||
|
C: "accounts",
|
||
|
Id: 1,
|
||
|
Update: M{"$inc": M{"balance": 100}},
|
||
|
}, {
|
||
|
C: "accounts",
|
||
|
Id: 2,
|
||
|
Insert: M{"balance": 100},
|
||
|
}}
|
||
|
|
||
|
first := bson.NewObjectId()
|
||
|
c.Logf("---- Running ops1 under transaction %q, to be canceled by chaos", first.Hex())
|
||
|
err = s.runner.Run(ops1, first, nil)
|
||
|
c.Assert(err, Equals, txn.ErrChaos)
|
||
|
|
||
|
last := bson.NewObjectId()
|
||
|
c.Logf("---- Running ops2 under transaction %q, to be canceled by chaos", last.Hex())
|
||
|
err = s.runner.Run(ops2, last, nil)
|
||
|
c.Assert(err, Equals, txn.ErrChaos)
|
||
|
|
||
|
c.Logf("---- Removing transaction %q", last.Hex())
|
||
|
err = s.tc.RemoveId(last)
|
||
|
c.Assert(err, IsNil)
|
||
|
|
||
|
c.Logf("---- Disabling chaos and attempting to resume all")
|
||
|
txn.SetChaos(txn.Chaos{})
|
||
|
err = s.runner.ResumeAll()
|
||
|
c.Assert(err, IsNil)
|
||
|
|
||
|
again := bson.NewObjectId()
|
||
|
c.Logf("---- Running ops2 again under transaction %q, to fail for missing transaction", again.Hex())
|
||
|
err = s.runner.Run(ops2, again, nil)
|
||
|
c.Assert(err, ErrorMatches, "cannot find transaction .*")
|
||
|
|
||
|
c.Logf("---- Purging missing transactions")
|
||
|
err = s.runner.PurgeMissing("accounts")
|
||
|
c.Assert(err, IsNil)
|
||
|
|
||
|
c.Logf("---- Resuming pending transactions")
|
||
|
err = s.runner.ResumeAll()
|
||
|
c.Assert(err, IsNil)
|
||
|
|
||
|
expect := []struct{ Id, Balance int }{
|
||
|
{0, -1},
|
||
|
{1, 200},
|
||
|
{2, 100},
|
||
|
{3, 100},
|
||
|
}
|
||
|
var got Account
|
||
|
for _, want := range expect {
|
||
|
err = s.accounts.FindId(want.Id).One(&got)
|
||
|
if want.Balance == -1 {
|
||
|
if err != mgo.ErrNotFound {
|
||
|
c.Errorf("Account %d should not exist, find got err=%#v", err)
|
||
|
}
|
||
|
} else if err != nil {
|
||
|
c.Errorf("Account %d should have balance of %d, but wasn't found", want.Id, want.Balance)
|
||
|
} else if got.Balance != want.Balance {
|
||
|
c.Errorf("Account %d should have balance of %d, got %d", want.Id, want.Balance, got.Balance)
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (s *S) TestTxnQueueStashStressTest(c *C) {
|
||
|
txn.SetChaos(txn.Chaos{
|
||
|
SlowdownChance: 0.3,
|
||
|
Slowdown: 50 * time.Millisecond,
|
||
|
})
|
||
|
defer txn.SetChaos(txn.Chaos{})
|
||
|
|
||
|
// So we can run more iterations of the test in less time.
|
||
|
txn.SetDebug(false)
|
||
|
|
||
|
const runners = 10
|
||
|
const inserts = 10
|
||
|
const repeat = 100
|
||
|
|
||
|
for r := 0; r < repeat; r++ {
|
||
|
var wg sync.WaitGroup
|
||
|
wg.Add(runners)
|
||
|
for i := 0; i < runners; i++ {
|
||
|
go func(i, r int) {
|
||
|
defer wg.Done()
|
||
|
|
||
|
session := s.session.New()
|
||
|
defer session.Close()
|
||
|
runner := txn.NewRunner(s.tc.With(session))
|
||
|
|
||
|
for j := 0; j < inserts; j++ {
|
||
|
ops := []txn.Op{{
|
||
|
C: "accounts",
|
||
|
Id: fmt.Sprintf("insert-%d-%d", r, j),
|
||
|
Insert: bson.M{
|
||
|
"added-by": i,
|
||
|
},
|
||
|
}}
|
||
|
err := runner.Run(ops, "", nil)
|
||
|
if err != txn.ErrAborted {
|
||
|
c.Check(err, IsNil)
|
||
|
}
|
||
|
}
|
||
|
}(i, r)
|
||
|
}
|
||
|
wg.Wait()
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (s *S) TestPurgeMissingPipelineSizeLimit(c *C) {
|
||
|
// This test ensures that PurgeMissing can handle very large
|
||
|
// txn-queue fields. Previous iterations of PurgeMissing would
|
||
|
// trigger a 16MB aggregation pipeline result size limit when run
|
||
|
// against a documents or stashes with large numbers of txn-queue
|
||
|
// entries. PurgeMissing now no longer uses aggregation pipelines
|
||
|
// to work around this limit.
|
||
|
|
||
|
// The pipeline result size limitation was removed from MongoDB in 2.6 so
|
||
|
// this test is only run for older MongoDB version.
|
||
|
build, err := s.session.BuildInfo()
|
||
|
c.Assert(err, IsNil)
|
||
|
if build.VersionAtLeast(2, 6) {
|
||
|
c.Skip("This tests a problem that can only happen with MongoDB < 2.6 ")
|
||
|
}
|
||
|
|
||
|
// Insert a single document to work with.
|
||
|
err = s.accounts.Insert(M{"_id": 0, "balance": 100})
|
||
|
c.Assert(err, IsNil)
|
||
|
|
||
|
ops := []txn.Op{{
|
||
|
C: "accounts",
|
||
|
Id: 0,
|
||
|
Update: M{"$inc": M{"balance": 100}},
|
||
|
}}
|
||
|
|
||
|
// Generate one successful transaction.
|
||
|
good := bson.NewObjectId()
|
||
|
c.Logf("---- Running ops under transaction %q", good.Hex())
|
||
|
err = s.runner.Run(ops, good, nil)
|
||
|
c.Assert(err, IsNil)
|
||
|
|
||
|
// Generate another transaction which which will go missing.
|
||
|
missing := bson.NewObjectId()
|
||
|
c.Logf("---- Running ops under transaction %q (which will go missing)", missing.Hex())
|
||
|
err = s.runner.Run(ops, missing, nil)
|
||
|
c.Assert(err, IsNil)
|
||
|
|
||
|
err = s.tc.RemoveId(missing)
|
||
|
c.Assert(err, IsNil)
|
||
|
|
||
|
// Generate a txn-queue on the test document that's large enough
|
||
|
// that it used to cause PurgeMissing to exceed MongoDB's pipeline
|
||
|
// result 16MB size limit (MongoDB 2.4 and older only).
|
||
|
//
|
||
|
// The contents of the txn-queue field doesn't matter, only that
|
||
|
// it's big enough to trigger the size limit. The required size
|
||
|
// can also be achieved by using multiple documents as long as the
|
||
|
// cumulative size of all the txn-queue fields exceeds the
|
||
|
// pipeline limit. A single document is easier to work with for
|
||
|
// this test however.
|
||
|
//
|
||
|
// The txn id of the successful transaction is used fill the
|
||
|
// txn-queue because this takes advantage of a short circuit in
|
||
|
// PurgeMissing, dramatically speeding up the test run time.
|
||
|
const fakeQueueLen = 250000
|
||
|
fakeTxnQueue := make([]string, fakeQueueLen)
|
||
|
token := good.Hex() + "_12345678" // txn id + nonce
|
||
|
for i := 0; i < fakeQueueLen; i++ {
|
||
|
fakeTxnQueue[i] = token
|
||
|
}
|
||
|
|
||
|
err = s.accounts.UpdateId(0, bson.M{
|
||
|
"$set": bson.M{"txn-queue": fakeTxnQueue},
|
||
|
})
|
||
|
c.Assert(err, IsNil)
|
||
|
|
||
|
// PurgeMissing could hit the same pipeline result size limit when
|
||
|
// processing the txn-queue fields of stash documents so insert
|
||
|
// the large txn-queue there too to ensure that no longer happens.
|
||
|
err = s.sc.Insert(
|
||
|
bson.D{{"c", "accounts"}, {"id", 0}},
|
||
|
bson.M{"txn-queue": fakeTxnQueue},
|
||
|
)
|
||
|
c.Assert(err, IsNil)
|
||
|
|
||
|
c.Logf("---- Purging missing transactions")
|
||
|
err = s.runner.PurgeMissing("accounts")
|
||
|
c.Assert(err, IsNil)
|
||
|
}
|
||
|
|
||
|
var flaky = flag.Bool("flaky", false, "Include flaky tests")
|
||
|
|
||
|
func (s *S) TestTxnQueueStressTest(c *C) {
|
||
|
// This fails about 20% of the time on Mongo 3.2 (I haven't tried
|
||
|
// other versions) with account balance being 3999 instead of
|
||
|
// 4000. That implies that some updates are being lost. This is
|
||
|
// bad and we'll need to chase it down in the near future - the
|
||
|
// only reason it's being skipped now is that it's already failing
|
||
|
// and it's better to have the txn tests running without this one
|
||
|
// than to have them not running at all.
|
||
|
if !*flaky {
|
||
|
c.Skip("Fails intermittently - disabling until fixed")
|
||
|
}
|
||
|
txn.SetChaos(txn.Chaos{
|
||
|
SlowdownChance: 0.3,
|
||
|
Slowdown: 50 * time.Millisecond,
|
||
|
})
|
||
|
defer txn.SetChaos(txn.Chaos{})
|
||
|
|
||
|
// So we can run more iterations of the test in less time.
|
||
|
txn.SetDebug(false)
|
||
|
|
||
|
err := s.accounts.Insert(M{"_id": 0, "balance": 0}, M{"_id": 1, "balance": 0})
|
||
|
c.Assert(err, IsNil)
|
||
|
|
||
|
// Run half of the operations changing account 0 and then 1,
|
||
|
// and the other half in the opposite order.
|
||
|
ops01 := []txn.Op{{
|
||
|
C: "accounts",
|
||
|
Id: 0,
|
||
|
Update: M{"$inc": M{"balance": 1}},
|
||
|
}, {
|
||
|
C: "accounts",
|
||
|
Id: 1,
|
||
|
Update: M{"$inc": M{"balance": 1}},
|
||
|
}}
|
||
|
|
||
|
ops10 := []txn.Op{{
|
||
|
C: "accounts",
|
||
|
Id: 1,
|
||
|
Update: M{"$inc": M{"balance": 1}},
|
||
|
}, {
|
||
|
C: "accounts",
|
||
|
Id: 0,
|
||
|
Update: M{"$inc": M{"balance": 1}},
|
||
|
}}
|
||
|
|
||
|
ops := [][]txn.Op{ops01, ops10}
|
||
|
|
||
|
const runners = 4
|
||
|
const changes = 1000
|
||
|
|
||
|
var wg sync.WaitGroup
|
||
|
wg.Add(runners)
|
||
|
for n := 0; n < runners; n++ {
|
||
|
n := n
|
||
|
go func() {
|
||
|
defer wg.Done()
|
||
|
for i := 0; i < changes; i++ {
|
||
|
err = s.runner.Run(ops[n%2], "", nil)
|
||
|
c.Assert(err, IsNil)
|
||
|
}
|
||
|
}()
|
||
|
}
|
||
|
wg.Wait()
|
||
|
|
||
|
for id := 0; id < 2; id++ {
|
||
|
var account Account
|
||
|
err = s.accounts.FindId(id).One(&account)
|
||
|
if account.Balance != runners*changes {
|
||
|
c.Errorf("Account should have balance of %d, got %d", runners*changes, account.Balance)
|
||
|
}
|
||
|
}
|
||
|
}
|