From e8a9573b7b17c2c20d5c5d7c95f5f930c95ebbc8 Mon Sep 17 00:00:00 2001 From: Muaz Ahmad Date: Fri, 22 Sep 2023 13:13:17 +0500 Subject: [PATCH] run cleanup when shutdown received --- srt/packet.go | 3 ++- srt/protocol.go | 7 +++++++ srt/stream_ids.go | 3 ++- srt/tunnel.go | 1 - 4 files changed, 11 insertions(+), 3 deletions(-) diff --git a/srt/packet.go b/srt/packet.go index d958e0e..b72cdac 100644 --- a/srt/packet.go +++ b/srt/packet.go @@ -265,7 +265,8 @@ func parse_ctrl_packet(pkt *Packet, buffer []byte) (error) { pkt.packet_type = ACKACK return nil case 5: - return errors.New("Shutdown received") + pkt.packet_type = SHUTDOWN + return nil default: return errors.New("Unexpected control type") } diff --git a/srt/protocol.go b/srt/protocol.go index 2b945f6..80f1f8c 100644 --- a/srt/protocol.go +++ b/srt/protocol.go @@ -14,6 +14,7 @@ const ( INDUCTION uint8 = iota CONCLUSION DATA_LOOP + BROKEN ) type SRTManager struct { @@ -153,6 +154,7 @@ func (agent *SRTManager) process_conclusion(packet *Packet) (*Packet) { return resp_packet } else { agent.output = writer + CleanFiles(agent.stream_key, 0) } } } @@ -253,6 +255,9 @@ func (agent *SRTManager) process_data(packet *Packet) (*Packet) { } case ACKACK: agent.handle_ackack(packet) + case SHUTDOWN: + agent.state = 3 + go CleanFiles(agent.stream_key, 10) default: return nil } @@ -280,6 +285,8 @@ func (agent *SRTManager) Process(packet *Packet) (*Packet, error) { return agent.process_conclusion(packet), nil case DATA_LOOP: return agent.process_data(packet), nil + case BROKEN: + return nil, errors.New("Tunnel shutdown") default: return nil, errors.New("State not implemented") } diff --git a/srt/stream_ids.go b/srt/stream_ids.go index bd18e53..ab5b36c 100644 --- a/srt/stream_ids.go +++ b/srt/stream_ids.go @@ -7,6 +7,7 @@ import ( "stream_server/transcoder" "time" "path/filepath" + "fmt" ) func NewWriter(stream_key string) (io.WriteCloser, error) { @@ -21,7 +22,7 @@ func CleanFiles(stream_key string, delay time.Duration) { time.Sleep(delay * time.Second) base_dir, _ := os.UserHomeDir() stream_dir := base_dir + "/live/" + stream_key - fileinfo, _ := os.Stat(stream_dir + "/stream.m3u8") + fileinfo, _ := os.Stat(stream_dir + "/init.mp4") if time.Now().Sub(fileinfo.ModTime()) > delay * time.Second { leftover_files, _ := filepath.Glob(stream_dir + "/*") for _, file := range leftover_files { diff --git a/srt/tunnel.go b/srt/tunnel.go index 4098098..ac0acbd 100644 --- a/srt/tunnel.go +++ b/srt/tunnel.go @@ -48,7 +48,6 @@ func (tunnel *Tunnel) Shutdown() { tunnel.WritePacket(packet) if tunnel.state.output != nil { tunnel.state.output.Close() - go CleanFiles(tunnel.state.stream_key, 10) } } }