forked from nanomsg/mangos
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmessage.go
182 lines (166 loc) · 4.91 KB
/
message.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
// Copyright 2019 The Mangos Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use file except in compliance with the License.
// You may obtain a copy of the license at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mangos
import (
"sync"
"sync/atomic"
)
// Message encapsulates the messages that we exchange back and forth. The
// meaning of the Header and Body fields, and where the splits occur, will
// vary depending on the protocol. Note however that any headers applied by
// transport layers (including TCP/ethernet headers, and SP protocol
// independent length headers), are *not* included in the Header.
type Message struct {
// Header carries any protocol (SP) specific header. Applications
// should not modify or use this unless they are using Raw mode.
// No user data may be placed here.
Header []byte
// Body carries the body of the message. This can also be thought
// of as the message "payload".
Body []byte
// Pipe may be set on message receipt, to indicate the Pipe from
// which the Message was received. There are no guarantees that the
// Pipe is still active, and applications should only use this for
// informational purposes.
Pipe Pipe
bbuf []byte
hbuf []byte
bsize int
refcnt int32
}
type msgCacheInfo struct {
maxbody int
pool *sync.Pool
}
func newMsg(sz int) *Message {
m := &Message{}
m.bbuf = make([]byte, 0, sz)
m.hbuf = make([]byte, 0, 32)
m.bsize = sz
return m
}
// We can tweak these!
var messageCache = []msgCacheInfo{
{
maxbody: 64,
pool: &sync.Pool{
New: func() interface{} { return newMsg(64) },
},
}, {
maxbody: 128,
pool: &sync.Pool{
New: func() interface{} { return newMsg(128) },
},
}, {
maxbody: 256,
pool: &sync.Pool{
New: func() interface{} { return newMsg(256) },
},
}, {
maxbody: 512,
pool: &sync.Pool{
New: func() interface{} { return newMsg(512) },
},
}, {
maxbody: 1024,
pool: &sync.Pool{
New: func() interface{} { return newMsg(1024) },
},
}, {
maxbody: 4096,
pool: &sync.Pool{
New: func() interface{} { return newMsg(4096) },
},
}, {
maxbody: 8192,
pool: &sync.Pool{
New: func() interface{} { return newMsg(8192) },
},
}, {
maxbody: 65536,
pool: &sync.Pool{
New: func() interface{} { return newMsg(65536) },
},
},
}
// Free releases the message to the pool from which it was allocated.
// While this is not strictly necessary thanks to GC, doing so allows
// for the resources to be recycled without engaging GC. This can have
// rather substantial benefits for performance.
func (m *Message) Free() {
if m != nil {
if atomic.AddInt32(&m.refcnt, -1) == 0 {
for i := range messageCache {
if m.bsize == messageCache[i].maxbody {
messageCache[i].pool.Put(m)
return
}
}
}
}
}
// Clone bumps the reference count on the message, allowing it to be
// shared. Callers of this MUST ensure that the message is never modified.
// If a read-only copy needs to be made "unique", callers can do so by
// using the Uniq function.
func (m *Message) Clone() {
atomic.AddInt32(&m.refcnt, 1)
}
// MakeUnique ensures that the message is not shared. If the reference
// count on the message is one, then the message is returned as is.
// Otherwise a new copy of hte message is made, and the reference count
// on the original is dropped. Note that it is an error for the caller
// to use the original message after this function; the caller should
// always do `m = m.MakeUnique()`. This function should be called whenever
// the message is leaving the control of the caller, such as when passing
// it to a user program.
//
// Note that transports always should call this on their transmit path
// if they are going to modify the message. (Most do not.)
func (m *Message) MakeUnique() *Message {
if atomic.LoadInt32(&m.refcnt) == 1 {
return m
}
d := m.Dup()
m.Free()
return d
}
//
// Dup creates a "duplicate" message. The message is made as a
// deep copy, so the resulting message is safe to modify.
func (m *Message) Dup() *Message {
dup := NewMessage(len(m.Body))
dup.Body = append(dup.Body, m.Body...)
dup.Header = append(dup.Header, m.Header...)
dup.Pipe = m.Pipe
return dup
}
// NewMessage is the supported way to obtain a new Message. This makes
// use of a "cache" which greatly reduces the load on the garbage collector.
func NewMessage(sz int) *Message {
var m *Message
for i := range messageCache {
if sz < messageCache[i].maxbody {
m = messageCache[i].pool.Get().(*Message)
break
}
}
if m == nil {
m = newMsg(sz)
}
m.Body = m.bbuf
m.Header = m.hbuf
atomic.StoreInt32(&m.refcnt, 1)
return m
}