Skip to content

Commit

Permalink
make threadpoolsize configurable, fix Fmt2 - Fmt3 | Fmt3 pattern deco…
Browse files Browse the repository at this point in the history
…de bug
  • Loading branch information
YuboLong committed Jan 14, 2020
1 parent 80d5881 commit 9469850
Show file tree
Hide file tree
Showing 9 changed files with 52 additions and 44 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# MyLive -- A Rtmp server implemention in java for live streamming
# MyLive -- A Rtmp server implemention in java for live streaming

### Introdution
MyLive is a rtmp server java implementation for live streaming.
Expand Down
1 change: 0 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@
<version>2.10.1</version>
</dependency>


<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down
5 changes: 4 additions & 1 deletion src/main/java/com/longyb/mylive/server/HttpFlvServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,13 @@ public class HttpFlvServer {

EventLoopGroup eventLoopGroup;
StreamManager streamManager;
int handlerThreadPoolSize;


public HttpFlvServer(int port, StreamManager sm) {
public HttpFlvServer(int port, StreamManager sm, int threadPoolSize) {
this.port = port;
this.streamManager = sm;
this.handlerThreadPoolSize = threadPoolSize;
}


Expand Down
6 changes: 3 additions & 3 deletions src/main/java/com/longyb/mylive/server/MyLiveServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,24 @@ public static void main(String[] args) throws Exception {
StreamManager streamManager = new StreamManager();

int rtmpPort = MyLiveConfig.INSTANCE.getRtmpPort();
int handlerThreadPoolSize=MyLiveConfig.INSTANCE.getHandlerThreadPoolSize();

RTMPServer rtmpServer = new RTMPServer(rtmpPort, streamManager);
RTMPServer rtmpServer = new RTMPServer(rtmpPort, streamManager,handlerThreadPoolSize);
rtmpServer.run();

if (!MyLiveConfig.INSTANCE.isEnableHttpFlv()) {
return;
}

int httpPort = MyLiveConfig.INSTANCE.getHttpFlvPort();
HttpFlvServer httpFlvServer = new HttpFlvServer(httpPort, streamManager);
HttpFlvServer httpFlvServer = new HttpFlvServer(httpPort, streamManager,handlerThreadPoolSize);
httpFlvServer.run();

}

private static void readConfig() {
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
try {

File file = new File("./mylive.yaml");

MyLiveConfig cfg = mapper.readValue(file, MyLiveConfig.class);
Expand Down
9 changes: 6 additions & 3 deletions src/main/java/com/longyb/mylive/server/RTMPServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,20 @@ public class RTMPServer {

EventLoopGroup eventLoopGroup;
StreamManager streamManager;

public RTMPServer(int port, StreamManager sm) {
int handlerThreadPoolSize;


public RTMPServer(int port, StreamManager sm,int threadPoolSize) {
this.port = port;
this.streamManager = sm;
this.handlerThreadPoolSize = threadPoolSize;
}

public void run() throws Exception {
eventLoopGroup = new NioEventLoopGroup();

ServerBootstrap b = new ServerBootstrap();
DefaultEventExecutorGroup executor = new DefaultEventExecutorGroup(8);// TODO: USE A CONFIG VALUE
DefaultEventExecutorGroup executor = new DefaultEventExecutorGroup(handlerThreadPoolSize);
b.group(eventLoopGroup).channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/com/longyb/mylive/server/entities/Stream.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public synchronized void addContent(RtmpMediaMessage msg) {
}

if (vm.isH264KeyFrame()) {
log.info("video key frame in stream :{}", streamName);
log.debug("video key frame in stream :{}", streamName);
content.clear();
}
}
Expand Down Expand Up @@ -259,7 +259,7 @@ private synchronized void broadCastToSubscribers(RtmpMediaMessage msg) {
if (next.isActive()) {
next.writeAndFlush(wrappedBuffer);
} else {
iterator.remove();
httpIte.remove();
}
}
}
Expand Down
58 changes: 29 additions & 29 deletions src/main/java/com/longyb/mylive/server/handlers/ChunkDecoder.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) t

DecodeState state = state();

if(state== null) {
if (state == null) {
state(DecodeState.STATE_HEADER);
}
if (state == DecodeState.STATE_HEADER) {
RtmpHeader rtmpHeader = readHeader(in);
// log.info("rtmpHeader read:{}",rtmpHeader);
log.debug("rtmpHeader read:{}", rtmpHeader);

completeHeader(rtmpHeader);
currentCsid = rtmpHeader.getCsid();

Expand All @@ -58,12 +58,15 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) t
}

currentPayload = inCompletePayload.get(rtmpHeader.getCsid());
if(currentPayload==null) {
//when fmt=3 and previous body completely read, the previous msgLength play the role of length
RtmpHeader previousHeader = prevousHeaders.get(rtmpHeader.getCsid());
currentPayload=Unpooled.buffer(previousHeader.messageLength, previousHeader.messageLength);
if (currentPayload == null) {
// when fmt=3 and previous body completely read, the previous msgLength play the
// role of length
RtmpHeader previousHeader = prevousHeaders.get(rtmpHeader.getCsid());
log.debug("current payload null,previous header:{}", previousHeader);
currentPayload = Unpooled.buffer(previousHeader.getMessageLength(), previousHeader.getMessageLength());
inCompletePayload.put(rtmpHeader.getCsid(), currentPayload);
log.debug("current payload assign as :{}",currentPayload);
}


checkpoint(DecodeState.STATE_PAYLOAD);
} else if (state == DecodeState.STATE_PAYLOAD) {
Expand Down Expand Up @@ -92,7 +95,7 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) t
// we need chunksize to decode the chunk
SetChunkSize scs = (SetChunkSize) msg;
clientChunkSize = scs.getChunkSize();
log.info("------------>client set chunkSize to :{}",clientChunkSize);
log.debug("------------>client set chunkSize to :{}", clientChunkSize);
} else {
out.add(msg);
}
Expand All @@ -104,10 +107,10 @@ private RtmpHeader readHeader(ByteBuf in) {
RtmpHeader rtmpHeader = new RtmpHeader();

// alway from the beginning
int headerLength=0;
int headerLength = 0;

byte firstByte = in.readByte();
headerLength+=1;
headerLength += 1;

// CHUNK HEADER is divided into
// BASIC HEADER
Expand All @@ -122,13 +125,13 @@ private RtmpHeader readHeader(ByteBuf in) {
if (csid == 0) {
// 2 byte form
csid = in.readByte() & 0xff + 64;
headerLength+=1;
headerLength += 1;
} else if (csid == 1) {
// 3 byte form
byte secondByte = in.readByte();
byte thirdByte = in.readByte();
csid = (thirdByte & 0xff) << 8 + (secondByte & 0xff) + 64;
headerLength+=2;
headerLength += 2;
} else if (csid >= 2) {
// that's it!
}
Expand All @@ -145,14 +148,13 @@ private RtmpHeader readHeader(ByteBuf in) {
int messageLength = in.readMedium();
short messageTypeId = (short) (in.readByte() & 0xff);
int messageStreamId = in.readIntLE();
headerLength+=11;
if(timestamp==MAX_TIMESTAMP) {
headerLength += 11;
if (timestamp == MAX_TIMESTAMP) {
long extendedTimestamp = in.readInt();
rtmpHeader.setExtendedTimestamp(extendedTimestamp);
headerLength+=4;
headerLength += 4;
}


rtmpHeader.setTimestamp(timestamp);
rtmpHeader.setMessageTypeId(messageTypeId);
rtmpHeader.setMessageStreamId(messageStreamId);
Expand All @@ -165,29 +167,29 @@ private RtmpHeader readHeader(ByteBuf in) {
int messageLength = in.readMedium();
short messageType = (short) (in.readByte() & 0xff);

headerLength+=7;
if(timestampDelta==MAX_TIMESTAMP) {
headerLength += 7;
if (timestampDelta == MAX_TIMESTAMP) {
long extendedTimestamp = in.readInt();
rtmpHeader.setExtendedTimestamp(extendedTimestamp);
headerLength+=4;
headerLength += 4;
}

rtmpHeader.setTimestampDelta(timestampDelta);
rtmpHeader.setMessageLength(messageLength);
rtmpHeader.setMessageTypeId(messageType);
}
break;
case CHUNK_FMT_2: {
int timestampDelta = in.readMedium();
headerLength+=3;
headerLength += 3;
rtmpHeader.setTimestampDelta(timestampDelta);
if(timestampDelta==MAX_TIMESTAMP) {

if (timestampDelta == MAX_TIMESTAMP) {
long extendedTimestamp = in.readInt();
rtmpHeader.setExtendedTimestamp(extendedTimestamp);
headerLength+=4;
headerLength += 4;
}

}
break;

Expand All @@ -201,10 +203,8 @@ private RtmpHeader readHeader(ByteBuf in) {

}

// EXTENDED TIMESTAMP

rtmpHeader.setHeaderLength(headerLength);

return rtmpHeader;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import com.longyb.mylive.server.rtmp.messages.VideoMessage;
import com.longyb.mylive.server.rtmp.messages.WindowAcknowledgementSize;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -271,7 +269,12 @@ private void handleConnect(ChannelHandlerContext ctx, RtmpCommandMessage msg) {
log.info("client connect {} ", msg);

String app = (String) ((Map) msg.getCommand().get(2)).get("app");

Integer clientRequestEncode = (Integer) ((Map) msg.getCommand().get(2)).get("objectEncoding");
if(clientRequestEncode !=null && clientRequestEncode.intValue()==3) {
log.error("client :{} request AMF3 encoding but server currently doesn't support",ctx);
ctx.close();
return ;
}
streamName = new StreamName(app, null);

int ackSize = 5000000;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public static RtmpMessage decode(RtmpHeader header, ByteBuf payload) {
break;

default:
log.debug("message type id {} payload {}", messageTypeId, payload);
log.info("message type id {} payload {}", messageTypeId, payload);
break;
}
if (result != null) {
Expand Down

0 comments on commit 9469850

Please sign in to comment.