Skip to content

Commit

Permalink
Merge pull request #44 from LokiYang/develop
Browse files Browse the repository at this point in the history
fixed heartbeat is not sent in given time interval; improved logging.
  • Loading branch information
pmembrey committed Sep 18, 2015
2 parents a87a016 + 69f951c commit b009352
Showing 1 changed file with 12 additions and 12 deletions.
24 changes: 12 additions & 12 deletions src/molderl_stream.erl
Original file line number Diff line number Diff line change
Expand Up @@ -100,28 +100,23 @@ handle_cast({send, Msg, _StartTime}, State) when byte_size(Msg)+22 > ?PACKET_SIZ
% second handle a msg when there's no messages in buffer
handle_cast({send, Msg, StartTime}, {Info, OldState=#state{messages={_,[]}}}) ->
State = OldState#state{messages={StartTime, [Msg]}, buffer_size=byte_size(Msg)+22},
{noreply, {Info, State}};

% third handle if the msg is big enough to promotes the current msgs buffer to a packet
handle_cast({send, Msg, Start}, {Info, OldState=#state{packets=Pckts, messages=Msgs, buffer_size=Size}})
when Size+byte_size(Msg)+2 > ?PACKET_SIZE ->
State = OldState#state{packets=[Msgs|Pckts], messages={Start, [Msg]}, buffer_size=0},
case flush(Info, State) of
{ok, NewState} ->
{noreply, {Info, NewState}};
{error, Reason} ->
{stop, Reason, {Info, State}}
end;

% third handle if the msg is big enough to promotes the current msgs buffer to a packet
handle_cast({send, Msg, Start}, {Info, State=#state{packets=Pckts, messages=Msgs, buffer_size=Size}})
when Size+byte_size(Msg)+2 > ?PACKET_SIZE ->
NewState = State#state{packets=[Msgs|Pckts], messages={{0,0,0}, []}, buffer_size=0},
handle_cast({send, Msg, Start}, {Info, NewState});

% finally handle if the msg is not big enough to promotes the current msgs buffer to a packet
handle_cast({send, Msg, _}, {Info, OldState=#state{messages={Start, Msgs}, buffer_size=Size}}) ->
State = OldState#state{messages={Start, [Msg|Msgs]}, buffer_size=Size+byte_size(Msg)+2},
case flush(Info, State) of
{ok, NewState} ->
{noreply, {Info, NewState}};
{error, Reason} ->
{stop, Reason, {Info, State}}
end;
{noreply, {Info, State}};

handle_cast({sequence_number, SeqNum}, {Info, State}) ->
{noreply, {Info, State#state{sequence_number=SeqNum}}}.
Expand All @@ -133,13 +128,15 @@ handle_info(prod, {Info, State=#state{sequence_number=undefined}}) ->

handle_info(prod, {Info, State=#state{packets=[], messages={_,[]}}}) ->
% Timer triggered a send, but packets/msgs queue empty
lager:debug("[molderl] Sending heartbeat... Stream:~p, SeqNum:~p.~n", [Info#info.stream_name, State#state.sequence_number]),
send_heartbeat(Info, State#state.sequence_number),
TRef = erlang:send_after(Info#info.prod_interval, self(), prod),
{noreply, {Info, State#state{timer_ref=TRef}}};

handle_info(prod, {Info, OldState=#state{packets=Pckts, messages=Msgs}}) ->
% Timer triggered a send, flush packets/msgs buffer
State = OldState#state{packets=[Msgs|Pckts], messages={{0,0,0}, []}},
lager:debug("[molderl] Flushing... Stream:~p, NumPackets:~p.~n", [Info#info.stream_name, length(State#state.packets)]),
case flush(Info, State) of
{ok, NewState} ->
{noreply, {Info, NewState}};
Expand Down Expand Up @@ -188,15 +185,18 @@ flush(Info, SeqNum, [{_Start, []}|Pckts]) -> % empty packet, ignore and go on
flush(Info, SeqNum, Pckts);

flush(Info=#info{stream_name=Name, socket=Socket}, SeqNum, [{Start, Msgs}|Pckts]) ->
lager:debug("[molderl] Encoding and sending mold packet. Stream:~p, SeqNum:~p, Remaining Packets:~p.~n", [Name, SeqNum, length(Pckts)]),
{EncodedMsgs, EncodedMsgsSize, NumMsgs} = molderl_utils:encode_messages(Msgs),
Payload = molderl_utils:gen_messagepacket(Name, SeqNum, NumMsgs, EncodedMsgs),
case gen_udp:send(Socket, Info#info.destination, Info#info.destination_port, Payload) of
ok ->
molderl_recovery:store(Info#info.recovery_service, EncodedMsgs, EncodedMsgsSize, NumMsgs),
statsderl:timing_now(Info#info.statsd_latency_key_out, Start, 0.1),
statsderl:increment(Info#info.statsd_count_key, 1, 0.1),
lager:debug("[molderl] Sent mold packet. Stream:~p, SeqNum:~p, NumMsgs:~p.~n", [Name, SeqNum, NumMsgs]),
flush(Info, SeqNum+NumMsgs, Pckts);
{error, eagain} -> % retry next cycle
lager:error("[molderl] Error sending UDP packets: (eagain) resource temporarily unavailable'. Stream:~p. Retrying...~n", [Name]),
{ok, SeqNum, lists:reverse([{Start, Msgs}|Pckts])};
{error, Reason} ->
Log = "[molderl] Experienced issue ~p (~p) writing to UDP socket. Resetting.",
Expand Down

0 comments on commit b009352

Please sign in to comment.