Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

history #37

Merged
merged 8 commits into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions kt/basictest.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ func testBasic(setup *setupParams) {
primitive.Assume(!err2)

adtrs := mkRpcClients(setup.adtrAddrs)
updAdtrs(upd0, adtrs)
updAdtrs(upd1, adtrs)
updAdtrsOnce(upd0, adtrs)
updAdtrsOnce(upd1, adtrs)

// bob get.
bob := newClient(bobUid, setup.servAddr, setup.servSigPk, setup.servVrfPk)
Expand All @@ -80,7 +80,7 @@ func mkRpcClients(addrs []uint64) []*advrpc.Client {
return c
}

func updAdtrs(upd *UpdateProof, adtrs []*advrpc.Client) {
func updAdtrsOnce(upd *UpdateProof, adtrs []*advrpc.Client) {
for _, cli := range adtrs {
err := callAdtrUpdate(cli, upd)
primitive.Assume(!err)
Expand Down
21 changes: 21 additions & 0 deletions kt/history.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package kt

type HistEntry struct {
Epoch uint64
HistVal []byte
}

// GetHist searches hist at the epoch and rets the latest val, or false
// if there's no registered val.
func GetHist(o []*HistEntry, epoch uint64) (bool, []byte) {
var isReg bool
var val []byte
// entries inv: ordered by epoch field.
for _, e := range o {
if e.Epoch <= epoch {
isReg = true
val = e.HistVal
}
}
return isReg, val
}
7 changes: 3 additions & 4 deletions kt/kt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,9 @@ import (
)

func TestAll(t *testing.T) {
serverAddr := makeUniqueAddr()
adtr0Addr := makeUniqueAddr()
adtr1Addr := makeUniqueAddr()
testAll(serverAddr, adtr0Addr, adtr1Addr)
servAddr := makeUniqueAddr()
adtrAddrs := []uint64{makeUniqueAddr(), makeUniqueAddr()}
testAllFull(servAddr, adtrAddrs)
}

func TestBasic(t *testing.T) {
Expand Down
146 changes: 49 additions & 97 deletions kt/test.go
Original file line number Diff line number Diff line change
@@ -1,152 +1,104 @@
package kt

// set global timing such that:
// - chaos interlaces enough with alice.
// - chaos mostly has up-to-date audits.
// - bob queries somewhere around halfway thru alice's puts.
// - before alice and bob finally check keys, the auditor has caught up.

import (
"github.com/goose-lang/primitive"
"github.com/goose-lang/std"
"github.com/mit-pdos/pav/advrpc"
"github.com/mit-pdos/pav/cryptoffi"
"sync"
)

const (
aliceUid uint64 = 0
bobUid uint64 = 1
charlieUid uint64 = 2
aliceUid uint64 = 0
bobUid uint64 = 1
)

func testAll(servAddr, adtr0Addr, adtr1Addr uint64) {
// start server and auditors.
serv, servSigPk, servVrfPk := newServer()
servRpc := newRpcServer(serv)
servRpc.Serve(servAddr)
adtr0, adtr0Pk := newAuditor(servSigPk)
adtr0Rpc := newRpcAuditor(adtr0)
adtr0Rpc.Serve(adtr0Addr)
adtr1, adtr1Pk := newAuditor(servSigPk)
adtr1Rpc := newRpcAuditor(adtr1)
adtr1Rpc.Serve(adtr1Addr)
primitive.Sleep(1_000_000)
func testAllFull(servAddr uint64, adtrAddrs []uint64) {
testAll(setup(servAddr, adtrAddrs))
}

// run background threads.
go func() {
charlie := newClient(charlieUid, servAddr, servSigPk, servVrfPk)
chaos(charlie, adtr0Addr, adtr1Addr, adtr0Pk, adtr1Pk)
}()
go func() {
syncAdtr(servAddr, adtr0Addr, adtr1Addr)
}()
func testAll(setup *setupParams) {
aliceCli := newClient(aliceUid, setup.servAddr, setup.servSigPk, setup.servVrfPk)
alice := &alice{cli: aliceCli}
bobCli := newClient(bobUid, setup.servAddr, setup.servSigPk, setup.servVrfPk)
bob := &bob{cli: bobCli}

// run alice and bob.
alice := &alice{}
aliceMu := new(sync.Mutex)
aliceMu.Lock()
aliceCli := newClient(aliceUid, servAddr, servSigPk, servVrfPk)
wg := new(sync.WaitGroup)
wg.Add(1)
wg.Add(1)
// alice does a bunch of puts.
go func() {
alice.run(aliceCli)
aliceMu.Unlock()
alice.run()
wg.Done()
}()
bob := &bob{}
bobMu := new(sync.Mutex)
bobMu.Lock()
bobCli := newClient(bobUid, servAddr, servSigPk, servVrfPk)
// bob does a get at some time in the middle of alice's puts.
go func() {
bob.run(bobCli)
bobMu.Unlock()
bob.run()
wg.Done()
}()
wg.Wait()

// wait for alice and bob to finish.
aliceMu.Lock()
bobMu.Lock()

// alice SelfMon + Audit. bob Audit. ordering irrelevant across clients.
primitive.Sleep(1000_000_000)
selfMonEp, err0 := aliceCli.SelfMon()
// alice self monitor. in real world, she'll come on-line at times and do this.
selfMonEp, err0 := alice.cli.SelfMon()
primitive.Assume(!err0.err)
// could also state this as bob.epoch <= last epoch in TS.
// this last self monitor will be our history bound.
primitive.Assume(bob.epoch <= selfMonEp)
err1 := aliceCli.Audit(adtr0Addr, adtr0Pk)
primitive.Assume(!err1.err)
err2 := aliceCli.Audit(adtr1Addr, adtr1Pk)
primitive.Assume(!err2.err)
err3 := bobCli.Audit(adtr0Addr, adtr0Pk)
primitive.Assume(!err3.err)
err4 := bobCli.Audit(adtr1Addr, adtr1Pk)
primitive.Assume(!err4.err)

// sync auditors. in real world, this'll happen periodically.
updAdtrsAll(setup.servAddr, setup.adtrAddrs)

// alice and bob audit. ordering irrelevant across clients.
doAudits(alice.cli, setup.adtrAddrs, setup.adtrPks)
doAudits(bob.cli, setup.adtrAddrs, setup.adtrPks)

// final check. bob got the right key.
isReg, aliceKey := GetTimeSeries(alice.pks, bob.epoch)
isReg, aliceKey := GetHist(alice.hist, bob.epoch)
primitive.Assert(isReg == bob.isReg)
if isReg {
primitive.Assert(std.BytesEqual(aliceKey, bob.alicePk))
}
}

type alice struct {
pks []*TimeSeriesEntry
cli *Client
hist []*HistEntry
}

func (a *alice) run(cli *Client) {
for i := byte(0); i < byte(20); i++ {
primitive.Sleep(50_000_000)
pk := []byte{i}
epoch, err0 := cli.Put(pk)
func (a *alice) run() {
for i := uint64(0); i < uint64(20); i++ {
primitive.Sleep(5_000_000)
pk := []byte{byte(i)}
epoch, err0 := a.cli.Put(pk)
primitive.Assume(!err0.err)
a.pks = append(a.pks, &TimeSeriesEntry{Epoch: epoch, TSVal: pk})
a.hist = append(a.hist, &HistEntry{Epoch: epoch, HistVal: pk})
}
}

type bob struct {
cli *Client
epoch uint64
isReg bool
alicePk []byte
}

func (b *bob) run(cli *Client) {
primitive.Sleep(550_000_000)
isReg, pk, epoch, err0 := cli.Get(aliceUid)
func (b *bob) run() {
primitive.Sleep(120_000_000)
isReg, pk, epoch, err0 := b.cli.Get(aliceUid)
primitive.Assume(!err0.err)
b.epoch = epoch
b.isReg = isReg
b.alicePk = pk
}

// chaos from Charlie running all the ops.
func chaos(charlie *Client, adtr0Addr, adtr1Addr uint64, adtr0Pk, adtr1Pk cryptoffi.PublicKey) {
for {
primitive.Sleep(40_000_000)
pk := []byte{2}
_, err0 := charlie.Put(pk)
primitive.Assume(!err0.err)
_, _, _, err1 := charlie.Get(aliceUid)
primitive.Assume(!err1.err)
_, err2 := charlie.SelfMon()
primitive.Assume(!err2.err)
charlie.Audit(adtr0Addr, adtr0Pk)
charlie.Audit(adtr1Addr, adtr1Pk)
}
}

func syncAdtr(servAddr, adtr0Addr, adtr1Addr uint64) {
func updAdtrsAll(servAddr uint64, adtrAddrs []uint64) {
servCli := advrpc.Dial(servAddr)
adtr0Cli := advrpc.Dial(adtr0Addr)
adtr1Cli := advrpc.Dial(adtr1Addr)
adtrs := mkRpcClients(adtrAddrs)
var epoch uint64
for {
primitive.Sleep(1_000_000)
upd, err0 := callServAudit(servCli, epoch)
if err0 {
continue
upd, err := callServAudit(servCli, epoch)
if err {
break
}
err1 := callAdtrUpdate(adtr0Cli, upd)
primitive.Assume(!err1)
err2 := callAdtrUpdate(adtr1Cli, upd)
primitive.Assume(!err2)
updAdtrsOnce(upd, adtrs)
epoch++
}
}
21 changes: 0 additions & 21 deletions kt/timeseries.go

This file was deleted.