-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathstdio.go
169 lines (148 loc) · 4.59 KB
/
stdio.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
package mcp
import (
"bufio"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"iter"
"log/slog"
)
// StdIO implements a standard input/output transport layer for MCP communication using
// JSON-RPC message encoding over stdin/stdout or similar io.Reader/io.Writer pairs. It
// provides a single persistent session identified as "1" and handles bidirectional message
// passing through internal channels, processing messages sequentially.
//
// The transport layer maintains internal state through its embedded stdIOSession and can
// be used as either ServerTransport or ClientTransport. Proper initialization requires
// using the NewStdIO constructor function to create new instances.
//
// Resources must be properly released by calling Close when the StdIO instance is no
// longer needed.
type StdIO struct {
sess stdIOSession
}
type stdIOSession struct {
reader io.ReadCloser
writer io.WriteCloser
logger *slog.Logger
// done signals session termination to all goroutines
done chan struct{}
// closed is used to ensure proper cleanup sequencing
closed chan struct{}
}
// NewStdIO creates a new StdIO instance configured with the provided reader and writer.
// The instance is initialized with default logging and required internal communication
// channels.
func NewStdIO(reader io.ReadCloser, writer io.WriteCloser) StdIO {
return StdIO{
sess: stdIOSession{
reader: reader,
writer: writer,
logger: slog.Default(),
done: make(chan struct{}),
closed: make(chan struct{}),
},
}
}
// Sessions implements the ServerTransport interface by providing an iterator that yields
// a single persistent session. This session remains active throughout the lifetime of
// the StdIO instance.
func (s StdIO) Sessions() iter.Seq[Session] {
return func(yield func(Session) bool) {
yield(s.sess)
}
}
// Send implements the ClientTransport interface by transmitting a JSON-RPC message to
// the server through the established session. The context can be used to control the
// transmission timeout.
func (s StdIO) Send(ctx context.Context, msg JSONRPCMessage) error {
return s.sess.Send(ctx, msg)
}
// StartSession implements the ClientTransport interface by initializing a new session
// and returning an iterator for receiving server messages. The ready channel is closed
// immediately to indicate session establishment.
func (s StdIO) StartSession(_ context.Context, ready chan<- error) (iter.Seq[JSONRPCMessage], error) {
close(ready)
return s.sess.Messages(), nil
}
// Close releases all resources associated with the StdIO instance, including its
// underlying reader and writer connections.
func (s StdIO) Close() {
s.sess.close()
}
func (s stdIOSession) ID() string {
return "1"
}
func (s stdIOSession) Send(ctx context.Context, msg JSONRPCMessage) error {
msgBs, err := json.Marshal(msg)
if err != nil {
return fmt.Errorf("failed to marshal message: %w", err)
}
// We append newline to maintain message framing protocol
msgBs = append(msgBs, '\n')
errs := make(chan error, 1)
// We use a goroutine for writing to prevent blocking on slow writers
// while still respecting context cancellation
go func() {
_, err = s.writer.Write(msgBs)
if err != nil {
errs <- fmt.Errorf("failed to write message: %w", err)
return
}
errs <- nil
}()
// We prioritize context cancellation and session termination over write completion
select {
case <-ctx.Done():
return ctx.Err()
case <-s.done:
return nil
case err := <-errs:
return err
}
}
func (s stdIOSession) Messages() iter.Seq[JSONRPCMessage] {
return func(yield func(JSONRPCMessage) bool) {
defer close(s.closed)
scanner := bufio.NewScanner(s.reader)
for scanner.Scan() {
// We check for session termination between each message
select {
case <-s.done:
return
default:
}
line := scanner.Text()
if line == "" {
continue
}
var msg JSONRPCMessage
err := json.Unmarshal([]byte(line), &msg)
if err != nil {
s.logger.Error("failed to unmarshal message", "err", err)
continue
}
// We stop iteration if yield returns false
if !yield(msg) {
return
}
}
// We ignore ErrClosedPipe as it's an expected error during shutdown
if scanner.Err() != nil && !errors.Is(scanner.Err(), io.ErrClosedPipe) {
s.logger.Error("scan error", "err", scanner.Err())
}
}
}
func (s stdIOSession) close() {
if err := s.reader.Close(); err != nil {
s.logger.Error("failed to close reader", "err", err)
}
if err := s.writer.Close(); err != nil {
s.logger.Error("failed to close writer", "err", err)
}
// We signal termination and wait for Messages goroutine to complete
close(s.done)
<-s.closed
}