From f43129dfad71456f532e4f7d7d68b741c6b541d6 Mon Sep 17 00:00:00 2001 From: Elad Date: Fri, 24 May 2019 17:21:07 +0200 Subject: [PATCH] network/syncer: initial commit add handlers.go file, syncer stream maintainance logic copy subscription diff functionality from old syncer network/syncer: add stream req handler network/syncer: add StreamInfoReq handler network/syncer: create initial StreamInfo message exchange remove peer --- network/syncer/handlers.go | 19 ++++ network/syncer/peer.go | 153 ++++++++++++++++++++++++++ network/syncer/peer_test.go | 116 ++++++++++++++++++++ network/syncer/stream_test.go | 159 +++++++++++++++++++++++++++ network/syncer/syncer.go | 199 ++++++++++++++++++++++++++++++++++ network/syncer/wire.go | 68 ++++++++++++ 6 files changed, 714 insertions(+) create mode 100644 network/syncer/handlers.go create mode 100644 network/syncer/peer.go create mode 100644 network/syncer/peer_test.go create mode 100644 network/syncer/stream_test.go create mode 100644 network/syncer/syncer.go create mode 100644 network/syncer/wire.go diff --git a/network/syncer/handlers.go b/network/syncer/handlers.go new file mode 100644 index 0000000000..70f4d99837 --- /dev/null +++ b/network/syncer/handlers.go @@ -0,0 +1,19 @@ +// Copyright 2019 The Swarm Authors +// This file is part of the Swarm library. +// +// The Swarm library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The Swarm library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the Swarm library. If not, see . + +package syncer + +//StreamInfoReq diff --git a/network/syncer/peer.go b/network/syncer/peer.go new file mode 100644 index 0000000000..e7c4cb847f --- /dev/null +++ b/network/syncer/peer.go @@ -0,0 +1,153 @@ +// Copyright 2019 The Swarm Authors +// This file is part of the Swarm library. +// +// The Swarm library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The Swarm library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the Swarm library. If not, see . + +package syncer + +import ( + "context" + "errors" + "fmt" + + "github.com/ethersphere/swarm/log" + "github.com/ethersphere/swarm/network" +) + +// ErrMaxPeerServers will be returned if peer server limit is reached. +// It will be sent in the SubscribeErrorMsg. +var ErrMaxPeerServers = errors.New("max peer servers") + +// Peer is the Peer extension for the streaming protocol +type Peer struct { + *network.BzzPeer + streamCursors map[uint]uint //key: bin, value: session cursor + syncer *SwarmSyncer + + quit chan struct{} +} + +// NewPeer is the constructor for Peer +func NewPeer(peer *network.BzzPeer, s *SwarmSyncer) *Peer { + p := &Peer{ + BzzPeer: peer, + streamCursors: make(map[uint]uint), + syncer: s, + quit: make(chan struct{}), + } + return p +} + +func (p *Peer) Left() { + close(p.quit) +} + +// HandleMsg is the message handler that delegates incoming messages +func (p *Peer) HandleMsg(ctx context.Context, msg interface{}) error { + switch msg := msg.(type) { + case *StreamInfoReq: + go p.handleStreamInfoReq(ctx, msg) + case *StreamInfoRes: + go p.handleStreamInfoRes(ctx, msg) + + default: + return fmt.Errorf("unknown message type: %T", msg) + } + return nil +} + +func (p *Peer) handleStreamInfoRes(ctx context.Context, msg *StreamInfoRes) { + log.Debug("handleStreamInfoRes", "msg", msg) +} +func (p *Peer) handleStreamInfoReq(ctx context.Context, msg *StreamInfoReq) { + log.Debug("handleStreamInfoReq", "msg", msg) + streamRes := StreamInfoRes{} + + for _, v := range msg.Streams { + streamCursor, err := p.syncer.netStore.LastPullSubscriptionBinID(uint8(v)) + if err != nil { + log.Error("error getting last bin id", "bin", v) + } + descriptor := StreamDescriptor{ + Name: "SYNC", + Cursor: streamCursor, + Bounded: false, + } + streamRes.Streams = append(streamRes.Streams, descriptor) + } + if err := p.Send(ctx, streamRes); err != nil { + log.Error("failed to send StreamInfoRes to client", "requested bins", msg.Streams) + } +} + +// syncSubscriptionsDiff calculates to which proximity order bins a peer +// (with po peerPO) needs to be subscribed after kademlia neighbourhood depth +// change from prevDepth to newDepth. Max argument limits the number of +// proximity order bins. Returned values are slices of integers which represent +// proximity order bins, the first one to which additional subscriptions need to +// be requested and the second one which subscriptions need to be quit. Argument +// prevDepth with value less then 0 represents no previous depth, used for +// initial syncing subscriptions. +func syncSubscriptionsDiff(peerPO, prevDepth, newDepth, max int) (subBins, quitBins []uint) { + newStart, newEnd := syncBins(peerPO, newDepth, max) + if prevDepth < 0 { + // no previous depth, return the complete range + // for subscriptions requests and nothing for quitting + return intRange(newStart, newEnd), nil + } + + prevStart, prevEnd := syncBins(peerPO, prevDepth, max) + + if newStart < prevStart { + subBins = append(subBins, intRange(newStart, prevStart)...) + } + + if prevStart < newStart { + quitBins = append(quitBins, intRange(prevStart, newStart)...) + } + + if newEnd < prevEnd { + quitBins = append(quitBins, intRange(newEnd, prevEnd)...) + } + + if prevEnd < newEnd { + subBins = append(subBins, intRange(prevEnd, newEnd)...) + } + + return subBins, quitBins +} + +// syncBins returns the range to which proximity order bins syncing +// subscriptions need to be requested, based on peer proximity and +// kademlia neighbourhood depth. Returned range is [start,end), inclusive for +// start and exclusive for end. +func syncBins(peerPO, depth, max int) (start, end int) { + if peerPO < depth { + // subscribe only to peerPO bin if it is not + // in the nearest neighbourhood + return peerPO, peerPO + 1 + } + // subscribe from depth to max bin if the peer + // is in the nearest neighbourhood + return depth, max + 1 +} + +// intRange returns the slice of integers [start,end). The start +// is inclusive and the end is not. +func intRange(start, end int) (r []uint) { + for i := start; i < end; i++ { + r = append(r, uint(i)) + } + return r +} diff --git a/network/syncer/peer_test.go b/network/syncer/peer_test.go new file mode 100644 index 0000000000..77853efd76 --- /dev/null +++ b/network/syncer/peer_test.go @@ -0,0 +1,116 @@ +// Copyright 2019 The Swarm Authors +// This file is part of the Swarm library. +// +// The Swarm library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The Swarm library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the Swarm library. If not, see . + +package syncer + +import ( + "fmt" + "testing" + + "github.com/ethersphere/swarm/network" +) + +// TestSyncSubscriptionsDiff validates the output of syncSubscriptionsDiff +// function for various arguments. +func TestSyncSubscriptionsDiff(t *testing.T) { + max := network.NewKadParams().MaxProxDisplay + for _, tc := range []struct { + po, prevDepth, newDepth int + subBins, quitBins []int + }{ + { + po: 0, prevDepth: -1, newDepth: 0, + subBins: []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}, + }, + { + po: 1, prevDepth: -1, newDepth: 0, + subBins: []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}, + }, + { + po: 2, prevDepth: -1, newDepth: 0, + subBins: []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}, + }, + { + po: 0, prevDepth: -1, newDepth: 1, + subBins: []int{0}, + }, + { + po: 1, prevDepth: -1, newDepth: 1, + subBins: []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}, + }, + { + po: 2, prevDepth: -1, newDepth: 2, + subBins: []int{2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}, + }, + { + po: 3, prevDepth: -1, newDepth: 2, + subBins: []int{2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}, + }, + { + po: 1, prevDepth: -1, newDepth: 2, + subBins: []int{1}, + }, + { + po: 0, prevDepth: 0, newDepth: 0, // 0-16 -> 0-16 + }, + { + po: 1, prevDepth: 0, newDepth: 0, // 0-16 -> 0-16 + }, + { + po: 0, prevDepth: 0, newDepth: 1, // 0-16 -> 0 + quitBins: []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}, + }, + { + po: 0, prevDepth: 0, newDepth: 2, // 0-16 -> 0 + quitBins: []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}, + }, + { + po: 1, prevDepth: 0, newDepth: 1, // 0-16 -> 1-16 + quitBins: []int{0}, + }, + { + po: 1, prevDepth: 1, newDepth: 0, // 1-16 -> 0-16 + subBins: []int{0}, + }, + { + po: 4, prevDepth: 0, newDepth: 1, // 0-16 -> 1-16 + quitBins: []int{0}, + }, + { + po: 4, prevDepth: 0, newDepth: 4, // 0-16 -> 4-16 + quitBins: []int{0, 1, 2, 3}, + }, + { + po: 4, prevDepth: 0, newDepth: 5, // 0-16 -> 4 + quitBins: []int{0, 1, 2, 3, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}, + }, + { + po: 4, prevDepth: 5, newDepth: 0, // 4 -> 0-16 + subBins: []int{0, 1, 2, 3, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}, + }, + { + po: 4, prevDepth: 5, newDepth: 6, // 4 -> 4 + }, + } { + subBins, quitBins := syncSubscriptionsDiff(tc.po, tc.prevDepth, tc.newDepth, max) + if fmt.Sprint(subBins) != fmt.Sprint(tc.subBins) { + t.Errorf("po: %v, prevDepth: %v, newDepth: %v: got subBins %v, want %v", tc.po, tc.prevDepth, tc.newDepth, subBins, tc.subBins) + } + if fmt.Sprint(quitBins) != fmt.Sprint(tc.quitBins) { + t.Errorf("po: %v, prevDepth: %v, newDepth: %v: got quitBins %v, want %v", tc.po, tc.prevDepth, tc.newDepth, quitBins, tc.quitBins) + } + } +} diff --git a/network/syncer/stream_test.go b/network/syncer/stream_test.go new file mode 100644 index 0000000000..4c1feb87c2 --- /dev/null +++ b/network/syncer/stream_test.go @@ -0,0 +1,159 @@ +// Copyright 2019 The Swarm Authors +// This file is part of the Swarm library. +// +// The Swarm library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The Swarm library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the Swarm library. If not, see . + +package syncer + +import ( + "context" + "errors" + "flag" + "io/ioutil" + "os" + "sync" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/node" + "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/ethereum/go-ethereum/p2p/simulations/adapters" + "github.com/ethersphere/swarm/network" + "github.com/ethersphere/swarm/network/simulation" + "github.com/ethersphere/swarm/storage" + "github.com/ethersphere/swarm/storage/localstore" + "github.com/ethersphere/swarm/storage/mock" +) + +var ( + loglevel = flag.Int("loglevel", 5, "verbosity of logs") +) + +func init() { + flag.Parse() + + log.PrintOrigins(true) + log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(*loglevel), log.StreamHandler(os.Stderr, log.TerminalFormat(false)))) +} +func newTestLocalStore(id enode.ID, addr *network.BzzAddr, globalStore mock.GlobalStorer) (localStore *localstore.DB, cleanup func(), err error) { + dir, err := ioutil.TempDir("", "swarm-stream-") + if err != nil { + return nil, nil, err + } + cleanup = func() { + os.RemoveAll(dir) + } + + var mockStore *mock.NodeStore + if globalStore != nil { + mockStore = globalStore.NewNodeStore(common.BytesToAddress(id.Bytes())) + } + + localStore, err = localstore.New(dir, addr.Over(), &localstore.Options{ + MockStore: mockStore, + }) + if err != nil { + cleanup() + return nil, nil, err + } + return localStore, cleanup, nil +} + +func TestNodesCanTalk(t *testing.T) { + nodeCount := 2 + + // create a standard sim + sim := simulation.New(map[string]simulation.ServiceFunc{ + "bzz-sync": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { + n := ctx.Config.Node() + addr := network.NewAddr(n) + + localStore, localStoreCleanup, err := newTestLocalStore(n.ID(), addr, nil) + if err != nil { + return nil, nil, err + } + + netStore := storage.NewNetStore(localStore, enode.ID{}) + + kad := network.NewKademlia(addr.Over(), network.NewKadParams()) + o := NewSwarmSyncer(enode.ID{}, nil, kad, netStore) + cleanup = func() { + localStore.Close() + localStoreCleanup() + } + + return o, cleanup, nil + }, + }) + defer sim.Close() + + // create context for simulation run + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) + // defer cancel should come before defer simulation teardown + defer cancel() + _, err := sim.AddNodesAndConnectStar(nodeCount) + if err != nil { + t.Fatal(err) + } + + // setup the filter for SubscribeMsg + msgs := sim.PeerEvents( + context.Background(), + sim.UpNodeIDs(), + simulation.NewPeerEventsFilter().ReceivedMessages().Protocol("bzz-sync"), + ) + + // strategy: listen to all SubscribeMsg events; after every event we wait + // if after `waitDuration` no more messages are being received, we assume the + // subscription phase has terminated! + + // the loop in this go routine will either wait for new message events + // or times out after 1 second, which signals that we are not receiving + // any new subscriptions any more + go func() { + //for long running sims, waiting 1 sec will not be enough + for { + select { + case <-ctx.Done(): + return + case m := <-msgs: // just reset the loop + if m.Error != nil { + log.Error("syncer message errored", "err", m.Error) + continue + } + log.Trace("syncer message", "node", m.NodeID, "peer", m.PeerID) + + } + } + }() + + //run the simulation + result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error { + log.Info("Simulation running") + _ = sim.Net.Nodes + + //wait until all subscriptions are done + select { + case <-ctx.Done(): + return errors.New("Context timed out") + } + + return nil + }) + if result.Error != nil { + t.Fatal(result.Error) + } +} diff --git a/network/syncer/syncer.go b/network/syncer/syncer.go new file mode 100644 index 0000000000..8df41ed8e7 --- /dev/null +++ b/network/syncer/syncer.go @@ -0,0 +1,199 @@ +// Copyright 2019 The Swarm Authors +// This file is part of the Swarm library. +// +// The Swarm library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The Swarm library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the Swarm library. If not, see . + +package syncer + +import ( + "context" + "sync" + "time" + + "github.com/ethereum/go-ethereum/node" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/ethereum/go-ethereum/rpc" + "github.com/ethersphere/swarm/chunk" + "github.com/ethersphere/swarm/log" + "github.com/ethersphere/swarm/network" + "github.com/ethersphere/swarm/p2p/protocols" + "github.com/ethersphere/swarm/state" + "github.com/ethersphere/swarm/storage" +) + +// SwarmSyncer implements node.Service +var _ node.Service = (*SwarmSyncer)(nil) +var pollTime = 1 * time.Second + +var SyncerSpec = &protocols.Spec{ + Name: "bzz-sync", + Version: 8, + MaxMsgSize: 10 * 1024 * 1024, + Messages: []interface{}{ + StreamInfoReq{}, + StreamInfoRes{}, + GetRange{}, + OfferedHashes{}, + WantedHashes{}, + }, +} + +// SwarmSyncer is the base type that handles all client/server operations on a node +// it is instantiated once per stream protocol instance, that is, it should have +// one instance per node +type SwarmSyncer struct { + mtx sync.RWMutex + intervalsStore state.Store //every protocol would make use of this + peers map[enode.ID]*Peer + netStore *storage.NetStore + kad *network.Kademlia + started bool + + spec *protocols.Spec //this protocol's spec + balance protocols.Balance //implements protocols.Balance, for accounting + prices protocols.Prices //implements protocols.Prices, provides prices to accounting + + quit chan struct{} // terminates registry goroutines +} + +func NewSwarmSyncer(me enode.ID, intervalsStore state.Store, kad *network.Kademlia, ns *storage.NetStore) *SwarmSyncer { + syncer := &SwarmSyncer{ + intervalsStore: intervalsStore, + peers: make(map[enode.ID]*Peer), + kad: kad, + netStore: ns, + quit: make(chan struct{}), + } + + syncer.spec = SyncerSpec + + return syncer +} + +func (o *SwarmSyncer) addPeer(p *Peer) { + o.mtx.Lock() + defer o.mtx.Unlock() + o.peers[p.ID()] = p +} + +func (o *SwarmSyncer) removePeer(p *Peer) { + o.mtx.Lock() + defer o.mtx.Unlock() + if _, found := o.peers[p.ID()]; found { + delete(o.peers, p.ID()) + p.Left() + + } else { + log.Warn("peer was marked for removal but not found") + panic("shouldnt happen") + } +} + +// Run is being dispatched when 2 nodes connect +func (o *SwarmSyncer) Run(p *p2p.Peer, rw p2p.MsgReadWriter) error { + peer := protocols.NewPeer(p, rw, o.spec) + bp := network.NewBzzPeer(peer) + sp := NewPeer(bp, o) + o.addPeer(sp) + defer o.removePeer(sp) + go o.CreateStreams(sp) + return peer.Run(sp.HandleMsg) +} + +// CreateStreams creates and maintains the streams per peer. Runs in a separate goroutine +func (s *SwarmSyncer) CreateStreams(p *Peer) { + peerPo := chunk.Proximity(s.kad.BaseAddr(), p.BzzAddr.Address()) + sub, _ := syncSubscriptionsDiff(peerPo, -1, s.kad.NeighbourhoodDepth(), s.kad.MaxProxDisplay) + streamsMsg := StreamInfoReq{Streams: sub} + log.Debug("sending subscriptions message", "bins", sub) + if err := p.Send(context.TODO(), streamsMsg); err != nil { + log.Error("err establishing initial subscription", "err", err) + } +} + +func (o *SwarmSyncer) Protocols() []p2p.Protocol { + return []p2p.Protocol{ + { + Name: "bzz-sync", + Version: 1, + Length: 10 * 1024 * 1024, + Run: o.Run, + }, + } +} + +func (r *SwarmSyncer) APIs() []rpc.API { + return []rpc.API{ + { + Namespace: "bzz-sync", + Version: "1.0", + Service: NewAPI(r), + Public: false, + }, + } +} + +// Additional public methods accessible through API for pss +type API struct { + *SwarmSyncer +} + +func NewAPI(o *SwarmSyncer) *API { + return &API{SwarmSyncer: o} +} + +func (o *SwarmSyncer) Start(server *p2p.Server) error { + log.Info("started getting this done") + o.mtx.Lock() + defer o.mtx.Unlock() + + //if o.started { + //panic("shouldnt happen") + //} + //o.started = true + //go func() { + + //o.started = true + ////kadDepthChanged = false + //for { + //// check kademlia depth + //// polling of peers + //// for each peer, establish streams: + //// - do the stream info query + //// - maintain session cursor somewhere + //// - start get ranges + //v := o.kad.SubscribeToNeighbourhoodDepthChange() + //select { + //case <-v: + ////kadDepthChanged = true + //case <-o.quit: + //return + //case <-time.After(pollTime): + //// go over each peer and for each subprotocol check that each stream is working + //// i.e. for each peer, for each subprotocol, a client should be created (with an infinite loop) + //// fetching the stream + //} + //} + //}() + return nil +} + +func (o *SwarmSyncer) Stop() error { + log.Info("shutting down") + o.mtx.Lock() + defer o.mtx.Unlock() + close(o.quit) + return nil +} diff --git a/network/syncer/wire.go b/network/syncer/wire.go new file mode 100644 index 0000000000..a4257a90d5 --- /dev/null +++ b/network/syncer/wire.go @@ -0,0 +1,68 @@ +// Copyright 2019 The Swarm Authors +// This file is part of the Swarm library. +// +// The Swarm library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The Swarm library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the Swarm library. If not, see . + +package syncer + +type StreamInfoReq struct { + Streams []uint +} + +type StreamInfoRes struct { + Streams []StreamDescriptor +} + +type StreamDescriptor struct { + Name string + Cursor uint64 + Bounded bool +} + +type GetRange struct { + Ruid uint + Stream string + From uint + To uint `rlp:nil` + BatchSize uint + Roundtrip bool +} + +type OfferedHashes struct { + Ruid uint + LastIndex uint + Hashes []byte +} + +type WantedHashes struct { + Ruid uint + BitVector []byte +} + +type ChunkDelivery struct { + Ruid uint + LastIndex uint + Chunks [][]byte +} + +type BatchDone struct { + Ruid uint + Last uint +} + +type StreamState struct { + Stream string + Code uint16 + Message string +}