Skip to content

Commit

Permalink
Merge branch 'master' into work-be1-etienne-comments-previous-pr
Browse files Browse the repository at this point in the history
  • Loading branch information
emonnin-epfl authored Jun 30, 2024
2 parents a6a9eca + 144e13c commit 02d74fc
Show file tree
Hide file tree
Showing 90 changed files with 2,281 additions and 191 deletions.
7 changes: 4 additions & 3 deletions be1-go/internal/database/sqlite/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ func (s *SQLite) StoreRumor(rumorID int, sender string, timestamp mrumor.RumorTi
if err != nil {
return poperrors.NewDatabaseTransactionBeginErrorMsg(err.Error())
}
defer tx.Rollback()

timestampBuf, err := json.Marshal(timestamp)
if err != nil {
Expand Down Expand Up @@ -314,14 +315,14 @@ func (s *SQLite) AddMessageToMyRumor(messageID string) (int, error) {

defer tx.Rollback()

_, err = s.database.Exec(insertMessageToMyRumor, messageID, serverKeysPath)
_, err = tx.Exec(insertMessageToMyRumor, messageID, serverKeysPath)
if err != nil {
return -1, poperrors.NewDatabaseInsertErrorMsg("message to the current rumor: %v", err)
}

var count int

err = s.database.QueryRow(selectCountMyRumor, serverKeysPath).Scan(&count)
err = tx.QueryRow(selectCountMyRumor, serverKeysPath).Scan(&count)
if err != nil {
return -1, poperrors.NewDatabaseSelectErrorMsg("number of messages in the current rumor: %v", err)
}
Expand All @@ -345,7 +346,7 @@ func (s *SQLite) GetAndIncrementMyRumor() (bool, mrumor.ParamsRumor, error) {

defer tx.Rollback()

rows, err := s.database.Query(selectMyRumorMessages, true, serverKeysPath, serverKeysPath)
rows, err := tx.Query(selectMyRumorMessages, true, serverKeysPath, serverKeysPath)
if err != nil {
return false, mrumor.ParamsRumor{}, poperrors.NewDatabaseSelectErrorMsg("current rumor params: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion be1-go/internal/database/sqlite/sqlite_const.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ const (
insertKeys = `INSERT INTO key (channelPath, publicKey, secretKey) VALUES (?, ?, ?)`
insertPublicKey = `INSERT INTO key (channelPath, publicKey) VALUES (?, ?)`
insertRumor = `INSERT INTO rumor (ID, sender, timestamp) VALUES (?, ?, ?)`
insertUnprocessedMessage = `INSERT INTO unprocessedMessage (messageID, channelPath, message) VALUES (?, ?, ?)`
insertUnprocessedMessage = `INSERT OR IGNORE INTO unprocessedMessage (messageID, channelPath, message) VALUES (?, ?, ?)`
insertUnprocessedMessageRumor = `INSERT INTO unprocessedMessageRumor (messageID, rumorID, sender) VALUES (?, ?, ?)`
insertMessageRumor = `INSERT INTO messageRumor (messageID, rumorID, sender) VALUES (?, ?, ?)`
tranferUnprocessedMessageRumor = `INSERT INTO messageRumor (messageID, rumorID, sender) SELECT messageID, rumorID, sender FROM unprocessedMessageRumor WHERE messageID = ?`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,34 @@ func New(hub Hub, subs Subscribers, sockets Sockets, conf Config,
}
}

func (h *Handler) Handle(channelPath string, msg mmessage.Message,
func (h *Handler) Handle(channelPath string, msg mmessage.Message) error {
jsonData, err := base64.URLEncoding.DecodeString(msg.Data)
if err != nil {
return errors.NewInvalidMessageFieldError("failed to decode message data: %v", err)
}

err = h.schema.VerifyJSON(jsonData, validation.Data)
if err != nil {
return err
}

object, action, err := channel.GetObjectAndAction(jsonData)
if err != nil {
return err
}

if object != channel.FederationObject {
return errors.NewInvalidMessageFieldError("invalid object %v", object)
}

if action != channel.FederationActionTokensExchange {
return errors.NewInvalidActionError("invalid action %v", action)
}

return h.handleTokensExchange(msg, channelPath)
}

func (h *Handler) HandleWithSocket(channelPath string, msg mmessage.Message,
socket socket.Socket) error {
err := msg.VerifyMessage()
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ func Test_handleChannelFederation(t *testing.T) {

for _, arg := range args {
t.Run(arg.name, func(t *testing.T) {
err = federationHandler.Handle(arg.channelPath, arg.msg, &fakeSocket)
err = federationHandler.HandleWithSocket(arg.channelPath, arg.msg, &fakeSocket)
if arg.isError {
require.Error(t, err, arg.contains)
} else {
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

31 changes: 21 additions & 10 deletions be1-go/internal/handler/method/publish/hpublish/publish.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package hpublish

import (
"encoding/base64"
"encoding/json"
"github.com/rs/zerolog"
"popstellar/internal/errors"
Expand Down Expand Up @@ -28,7 +29,7 @@ type MessageHandler interface {
}

type FederationHandler interface {
Handle(channelPath string, msg mmessage.Message, socket socket.Socket) error
HandleWithSocket(channelPath string, msg mmessage.Message, socket socket.Socket) error
}

type Handler struct {
Expand Down Expand Up @@ -58,24 +59,34 @@ func (h *Handler) Handle(socket socket.Socket, msg []byte) (*int, error) {
}

// The federation handler need to have access to the socket and are not
// using rumors
// using rumors, except for tokens exchange
if strings.Contains(publish.Params.Channel, channel.Federation) {
err = h.federationHandler.Handle(publish.Params.Channel, publish.Params.Message, socket)
if err != nil {
return &publish.ID, err
}

socket.SendResult(publish.ID, nil, nil)
return nil, nil
err = h.federationHandler.HandleWithSocket(publish.Params.Channel, publish.Params.Message, socket)
} else {
err = h.messageHandler.Handle(publish.Params.Channel, publish.Params.Message, false)
}

err = h.messageHandler.Handle(publish.Params.Channel, publish.Params.Message, false)
if err != nil {
return &publish.ID, err
}

socket.SendResult(publish.ID, nil, nil)

jsonData, err := base64.URLEncoding.DecodeString(publish.Params.Message.Data)
if err != nil {
return &publish.ID, errors.NewInvalidMessageFieldError(
"failed to decode message data: %v", err)
}

object, action, err := channel.GetObjectAndAction(jsonData)
if err != nil {
return &publish.ID, err
}

if object == channel.FederationObject && action != channel.FederationActionTokensExchange {
return nil, nil
}

logger.Logger.Debug().Msgf("sender rumor need to add message %s", publish.Params.Message.MessageID)
nbMessagesInsideRumor, err := h.db.AddMessageToMyRumor(publish.Params.Message.MessageID)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ func Test_Handle(t *testing.T) {
publishBuf = generator.NewPublishQuery(t, queryID, channelPath, msg)

//messageHandler.On("Handle", channelPath, msg, false).Return(nil).Once()
fHandler.On("Handle", channelPath, msg, socket).Return(nil).Once()
fHandler.On("HandleWithSocket", channelPath, msg, socket).Return(nil).Once()
db.On("AddMessageToMyRumor", msg.MessageID).Return(thresholdMessagesByRumor-1, nil).Once()

id, err = publishHandler.Handle(socket, publishBuf)
require.Nil(t, id)
Expand Down
21 changes: 11 additions & 10 deletions be1-go/internal/hub/hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,16 @@ func New(dbPath string, ownerPubKey kyber.Point, clientAddress, serverAddress st
return nil, err
}

// Create the greetserver handler
greetserverHandler := hgreetserver.New(conf, peers, log)

// Create the rumor state handler
rumorStateHandler := hrumorstate.New(queries, sockets, &db, log)

// Create the federation handler
federationHandler := hfederation.New(hubParams, subs, sockets, conf, &db,
rumorStateHandler, greetserverHandler, schemaValidator, log)

// Create the message channel handlers
channelHandlers := make(hmessage.ChannelHandlers)
channelHandlers[channel.RootObject] = hroot.New(conf, &db, subs, peers, schemaValidator, log)
Expand All @@ -175,23 +185,14 @@ func New(dbPath string, ownerPubKey kyber.Point, clientAddress, serverAddress st
channelHandlers[channel.ChirpObject] = hchirp.New(conf, subs, &db, schemaValidator, log)
channelHandlers[channel.ReactionObject] = hreaction.New(subs, &db, schemaValidator, log)
channelHandlers[channel.CoinObject] = hcoin.New(subs, &db, schemaValidator, log)
channelHandlers[channel.FederationObject] = federationHandler

// Create the message handler
msgHandler := hmessage.New(&db, channelHandlers, log)

// Create the greetserver handler
greetserverHandler := hgreetserver.New(conf, peers, log)

// Create the rumor handler
rumorHandler := hrumor.New(queries, sockets, &db, msgHandler, log)

// Create the rumor state handler
rumorStateHandler := hrumorstate.New(queries, sockets, &db, log)

// Create the federation handler
federationHandler := hfederation.New(hubParams, subs, sockets, conf, &db,
rumorStateHandler, greetserverHandler, schemaValidator, log)

// Create the query handler
methodHandlers := make(hquery.MethodHandlers)
methodHandlers[mquery.MethodCatchUp] = hcatchup.New(&db, log)
Expand Down
10 changes: 2 additions & 8 deletions be1-go/internal/network/socket/socket_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,19 +127,13 @@ func (s *baseSocket) WritePump() {
return
}

w, err := s.conn.NextWriter(websocket.TextMessage)
err := s.conn.WriteMessage(websocket.TextMessage, message)
if err != nil {
s.log.Err(err).Msg("failed to retrieve writer")
s.log.Error().Err(err).Send()
return
}

w.Write(message)
s.log.Info().RawJSON("sent", message).Msg("")

if err := w.Close(); err != nil {
s.log.Err(err).Msg("failed to close writer")
return
}
case <-ticker.C:
s.conn.SetWriteDeadline(time.Now().Add(writeWait))
if err := s.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,36 @@ object MessageDataProtocol extends DefaultJsonProtocol {
}
}

implicit object FederationTokensExchangeFormat extends JsonFormat[FederationTokensExchange] {
final private val PARAM_LAO_ID = "lao_id"
final private val PARAM_ROLL_CALL_ID = "roll_call_id"
final private val PARAM_TOKENS = "tokens"
final private val PARAM_TIMESTAMP = "timestamp"

override def read(json: JsValue): FederationTokensExchange = json.asJsObject().getFields(PARAM_LAO_ID, PARAM_ROLL_CALL_ID, PARAM_TOKENS, PARAM_TIMESTAMP) match {
case Seq(laoId @ JsString(_), rollCallId @ JsString(_), JsArray(tokens), timestamp @ JsNumber(_)) =>
FederationTokensExchange(
laoId.convertTo[Hash],
rollCallId.convertTo[Hash],
tokens.map(_.convertTo[PublicKey]).toList,
timestamp.convertTo[Timestamp]
)
case _ => throw new IllegalArgumentException(s"Can't parse json value $json to a FederationTokensExchange object")
}

override def write(obj: FederationTokensExchange): JsValue = {
var jsObjectContent: ListMap[String, JsValue] = ListMap[String, JsValue](
PARAM_OBJECT -> JsString(obj._object.toString),
PARAM_ACTION -> JsString(obj.action.toString),
PARAM_LAO_ID -> obj.laoId.toJson,
PARAM_ROLL_CALL_ID -> obj.rollCallId.toJson,
PARAM_TOKENS -> obj.tokens.toJson,
PARAM_TIMESTAMP -> obj.timestamp.toJson
)
JsObject(jsObjectContent)
}
}

implicit object NumberOfChirpsReactionsDataFormat extends JsonFormat[NumberOfChirpsReactionsData] {
final private val PARAM_NUMBER_OF_CHIRPS_REACTIONS: String = "numberOfChirpsReactions"

Expand All @@ -622,6 +652,7 @@ object MessageDataProtocol extends DefaultJsonProtocol {
override def write(obj: NumberOfChirpsReactionsData): JsValue = JsObject(
PARAM_NUMBER_OF_CHIRPS_REACTIONS -> obj.numberOfChirpsReactions.toJson
)

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,4 @@ enum ActionType(val action: String):
case expect extends ActionType("expect")
case challenge_request extends ActionType("challenge_request")
case challenge extends ActionType("challenge")
case tokens_exchange extends ActionType("tokens_exchange")
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package ch.epfl.pop.model.network.method.message.data.federation

import ch.epfl.pop.json.MessageDataProtocol.*
import ch.epfl.pop.model.network.Parsable
import ch.epfl.pop.model.network.method.message.data.{ActionType, MessageData, ObjectType}
import ch.epfl.pop.model.objects.{Hash, PublicKey, Timestamp}
import spray.json.*

final case class FederationTokensExchange(
laoId: Hash,
rollCallId: Hash,
tokens: List[PublicKey],
timestamp: Timestamp
) extends MessageData {

override val _object: ObjectType = ObjectType.federation
override val action: ActionType = ActionType.tokens_exchange

}

object FederationTokensExchange extends Parsable {
def apply(laoId: Hash, rollCallId: Hash, tokens: List[PublicKey], timestamp: Timestamp): FederationTokensExchange = {
new FederationTokensExchange(laoId, rollCallId, tokens, timestamp)
}

override def buildFromJson(payload: String): FederationTokensExchange = payload.parseJson.asJsObject.convertTo[FederationTokensExchange]
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package ch.epfl.pop.pubsub
import ch.epfl.pop.model.network.JsonRpcRequest
import ch.epfl.pop.model.network.method.message.data.coin.PostTransaction
import ch.epfl.pop.model.network.method.message.data.election.*
import ch.epfl.pop.model.network.method.message.data.federation.{FederationChallenge, FederationChallengeRequest, FederationExpect, FederationInit, FederationResult}
import ch.epfl.pop.model.network.method.message.data.federation.{FederationChallenge, FederationChallengeRequest, FederationExpect, FederationInit, FederationResult, FederationTokensExchange}
import ch.epfl.pop.model.network.method.message.data.lao.{CreateLao, GreetLao, StateLao, UpdateLao}
import ch.epfl.pop.model.network.method.message.data.meeting.{CreateMeeting, StateMeeting}
import ch.epfl.pop.model.network.method.message.data.popcha.Authenticate
Expand Down Expand Up @@ -285,6 +285,14 @@ object MessageRegistry {
FederationHandler.handleFederationResult
)

register.add(
(ObjectType.federation, ActionType.tokens_exchange),
createSchemaVerifier("dataFederationTokensExchange.json"),
FederationTokensExchange.buildFromJson,
FederationValidator.validateFederationTokensExchange,
FederationHandler.handleFederationTokensExchange
)

new MessageRegistry(register.get)
}
}
Loading

0 comments on commit 02d74fc

Please sign in to comment.