1
0
mirror of https://github.com/schollz/cowyo.git synced 2023-08-10 21:13:00 +03:00
cowyo/vendor/gopkg.in/mgo.v2/txn/sim_test.go
2017-10-03 14:43:55 -04:00

389 lines
8.8 KiB
Go

package txn_test
import (
"flag"
"gopkg.in/mgo.v2"
"gopkg.in/mgo.v2/bson"
"gopkg.in/mgo.v2/dbtest"
"gopkg.in/mgo.v2/txn"
. "gopkg.in/check.v1"
"math/rand"
"time"
)
var (
duration = flag.Duration("duration", 200*time.Millisecond, "duration for each simulation")
seed = flag.Int64("seed", 0, "seed for rand")
)
type params struct {
killChance float64
slowdownChance float64
slowdown time.Duration
unsafe bool
workers int
accounts int
changeHalf bool
reinsertCopy bool
reinsertZeroed bool
changelog bool
changes int
}
func (s *S) TestSim1Worker(c *C) {
simulate(c, &s.server, params{
workers: 1,
accounts: 4,
killChance: 0.01,
slowdownChance: 0.3,
slowdown: 100 * time.Millisecond,
})
}
func (s *S) TestSim4WorkersDense(c *C) {
simulate(c, &s.server, params{
workers: 4,
accounts: 2,
killChance: 0.01,
slowdownChance: 0.3,
slowdown: 100 * time.Millisecond,
})
}
func (s *S) TestSim4WorkersSparse(c *C) {
simulate(c, &s.server, params{
workers: 4,
accounts: 10,
killChance: 0.01,
slowdownChance: 0.3,
slowdown: 100 * time.Millisecond,
})
}
func (s *S) TestSimHalf1Worker(c *C) {
simulate(c, &s.server, params{
workers: 1,
accounts: 4,
changeHalf: true,
killChance: 0.01,
slowdownChance: 0.3,
slowdown: 100 * time.Millisecond,
})
}
func (s *S) TestSimHalf4WorkersDense(c *C) {
simulate(c, &s.server, params{
workers: 4,
accounts: 2,
changeHalf: true,
killChance: 0.01,
slowdownChance: 0.3,
slowdown: 100 * time.Millisecond,
})
}
func (s *S) TestSimHalf4WorkersSparse(c *C) {
simulate(c, &s.server, params{
workers: 4,
accounts: 10,
changeHalf: true,
killChance: 0.01,
slowdownChance: 0.3,
slowdown: 100 * time.Millisecond,
})
}
func (s *S) TestSimReinsertCopy1Worker(c *C) {
simulate(c, &s.server, params{
workers: 1,
accounts: 10,
reinsertCopy: true,
killChance: 0.01,
slowdownChance: 0.3,
slowdown: 100 * time.Millisecond,
})
}
func (s *S) TestSimReinsertCopy4Workers(c *C) {
simulate(c, &s.server, params{
workers: 4,
accounts: 10,
reinsertCopy: true,
killChance: 0.01,
slowdownChance: 0.3,
slowdown: 100 * time.Millisecond,
})
}
func (s *S) TestSimReinsertZeroed1Worker(c *C) {
simulate(c, &s.server, params{
workers: 1,
accounts: 10,
reinsertZeroed: true,
killChance: 0.01,
slowdownChance: 0.3,
slowdown: 100 * time.Millisecond,
})
}
func (s *S) TestSimReinsertZeroed4Workers(c *C) {
simulate(c, &s.server, params{
workers: 4,
accounts: 10,
reinsertZeroed: true,
killChance: 0.01,
slowdownChance: 0.3,
slowdown: 100 * time.Millisecond,
})
}
func (s *S) TestSimChangeLog(c *C) {
simulate(c, &s.server, params{
workers: 4,
accounts: 10,
killChance: 0.01,
slowdownChance: 0.3,
slowdown: 100 * time.Millisecond,
changelog: true,
})
}
type balanceChange struct {
id bson.ObjectId
origin int
target int
amount int
}
func simulate(c *C, server *dbtest.DBServer, params params) {
seed := *seed
if seed == 0 {
seed = time.Now().UnixNano()
}
rand.Seed(seed)
c.Logf("Seed: %v", seed)
txn.SetChaos(txn.Chaos{
KillChance: params.killChance,
SlowdownChance: params.slowdownChance,
Slowdown: params.slowdown,
})
defer txn.SetChaos(txn.Chaos{})
session := server.Session()
defer session.Close()
db := session.DB("test")
tc := db.C("tc")
runner := txn.NewRunner(tc)
tclog := db.C("tc.log")
if params.changelog {
info := mgo.CollectionInfo{
Capped: true,
MaxBytes: 1000000,
}
err := tclog.Create(&info)
c.Assert(err, IsNil)
runner.ChangeLog(tclog)
}
accounts := db.C("accounts")
for i := 0; i < params.accounts; i++ {
err := accounts.Insert(M{"_id": i, "balance": 300})
c.Assert(err, IsNil)
}
var stop time.Time
if params.changes <= 0 {
stop = time.Now().Add(*duration)
}
max := params.accounts
if params.reinsertCopy || params.reinsertZeroed {
max = int(float64(params.accounts) * 1.5)
}
changes := make(chan balanceChange, 1024)
//session.SetMode(mgo.Eventual, true)
for i := 0; i < params.workers; i++ {
go func() {
n := 0
for {
if n > 0 && n == params.changes {
break
}
if !stop.IsZero() && time.Now().After(stop) {
break
}
change := balanceChange{
id: bson.NewObjectId(),
origin: rand.Intn(max),
target: rand.Intn(max),
amount: 100,
}
var old Account
var oldExists bool
if params.reinsertCopy || params.reinsertZeroed {
if err := accounts.FindId(change.origin).One(&old); err != mgo.ErrNotFound {
c.Check(err, IsNil)
change.amount = old.Balance
oldExists = true
}
}
var ops []txn.Op
switch {
case params.reinsertCopy && oldExists:
ops = []txn.Op{{
C: "accounts",
Id: change.origin,
Assert: M{"balance": change.amount},
Remove: true,
}, {
C: "accounts",
Id: change.target,
Assert: txn.DocMissing,
Insert: M{"balance": change.amount},
}}
case params.reinsertZeroed && oldExists:
ops = []txn.Op{{
C: "accounts",
Id: change.target,
Assert: txn.DocMissing,
Insert: M{"balance": 0},
}, {
C: "accounts",
Id: change.origin,
Assert: M{"balance": change.amount},
Remove: true,
}, {
C: "accounts",
Id: change.target,
Assert: txn.DocExists,
Update: M{"$inc": M{"balance": change.amount}},
}}
case params.changeHalf:
ops = []txn.Op{{
C: "accounts",
Id: change.origin,
Assert: M{"balance": M{"$gte": change.amount}},
Update: M{"$inc": M{"balance": -change.amount / 2}},
}, {
C: "accounts",
Id: change.target,
Assert: txn.DocExists,
Update: M{"$inc": M{"balance": change.amount / 2}},
}, {
C: "accounts",
Id: change.origin,
Update: M{"$inc": M{"balance": -change.amount / 2}},
}, {
C: "accounts",
Id: change.target,
Update: M{"$inc": M{"balance": change.amount / 2}},
}}
default:
ops = []txn.Op{{
C: "accounts",
Id: change.origin,
Assert: M{"balance": M{"$gte": change.amount}},
Update: M{"$inc": M{"balance": -change.amount}},
}, {
C: "accounts",
Id: change.target,
Assert: txn.DocExists,
Update: M{"$inc": M{"balance": change.amount}},
}}
}
err := runner.Run(ops, change.id, nil)
if err != nil && err != txn.ErrAborted && err != txn.ErrChaos {
c.Check(err, IsNil)
}
n++
changes <- change
}
changes <- balanceChange{}
}()
}
alive := params.workers
changeLog := make([]balanceChange, 0, 1024)
for alive > 0 {
change := <-changes
if change.id == "" {
alive--
} else {
changeLog = append(changeLog, change)
}
}
c.Check(len(changeLog), Not(Equals), 0, Commentf("No operations were even attempted."))
txn.SetChaos(txn.Chaos{})
err := runner.ResumeAll()
c.Assert(err, IsNil)
n, err := accounts.Count()
c.Check(err, IsNil)
c.Check(n, Equals, params.accounts, Commentf("Number of accounts has changed."))
n, err = accounts.Find(M{"balance": M{"$lt": 0}}).Count()
c.Check(err, IsNil)
c.Check(n, Equals, 0, Commentf("There are %d accounts with negative balance.", n))
globalBalance := 0
iter := accounts.Find(nil).Iter()
account := Account{}
for iter.Next(&account) {
globalBalance += account.Balance
}
c.Check(iter.Close(), IsNil)
c.Check(globalBalance, Equals, params.accounts*300, Commentf("Total amount of money should be constant."))
// Compute and verify the exact final state of all accounts.
balance := make(map[int]int)
for i := 0; i < params.accounts; i++ {
balance[i] += 300
}
var applied, aborted int
for _, change := range changeLog {
err := runner.Resume(change.id)
if err == txn.ErrAborted {
aborted++
continue
} else if err != nil {
c.Fatalf("resuming %s failed: %v", change.id, err)
}
balance[change.origin] -= change.amount
balance[change.target] += change.amount
applied++
}
iter = accounts.Find(nil).Iter()
for iter.Next(&account) {
c.Assert(account.Balance, Equals, balance[account.Id])
}
c.Check(iter.Close(), IsNil)
c.Logf("Total transactions: %d (%d applied, %d aborted)", len(changeLog), applied, aborted)
if params.changelog {
n, err := tclog.Count()
c.Assert(err, IsNil)
// Check if the capped collection is full.
dummy := make([]byte, 1024)
tclog.Insert(M{"_id": bson.NewObjectId(), "dummy": dummy})
m, err := tclog.Count()
c.Assert(err, IsNil)
if m == n+1 {
// Wasn't full, so it must have seen it all.
c.Assert(err, IsNil)
c.Assert(n, Equals, applied)
}
}
}