diff --git a/services/broadcast_info_task.go b/services/broadcast_info_task.go new file mode 100644 index 0000000..96578f2 --- /dev/null +++ b/services/broadcast_info_task.go @@ -0,0 +1,8 @@ +package services + +type broadcastInfoTask struct { +} + +func (c *broadcastInfoTask) Run(i *Instance) { + wsServer.BroadcastTo(i.session.Id, "instance stats", i.Name, i.Mem, i.Cpu, i.IsManager) +} diff --git a/services/check_swarm_status_task.go b/services/check_swarm_status_task.go new file mode 100644 index 0000000..a892d18 --- /dev/null +++ b/services/check_swarm_status_task.go @@ -0,0 +1,15 @@ +package services + +import "github.com/docker/docker/api/types/swarm" + +type checkSwarmStatusTask struct { +} + +func (c checkSwarmStatusTask) Run(i *Instance) { + if info, err := GetDaemonInfo(i); err == nil { + if info.Swarm.LocalNodeState != swarm.LocalNodeStateInactive && info.Swarm.LocalNodeState != swarm.LocalNodeStateLocked { + i.IsManager = &info.Swarm.ControlAvailable + } + } + +} diff --git a/services/collect_stats_task.go b/services/collect_stats_task.go new file mode 100644 index 0000000..28ce5a8 --- /dev/null +++ b/services/collect_stats_task.go @@ -0,0 +1,64 @@ +package services + +import ( + "encoding/json" + "fmt" + "log" + + "github.com/docker/docker/api/types" + units "github.com/docker/go-units" +) + +type collectStatsTask struct { + mem float64 + memLimit float64 + memPercent float64 + v *types.StatsJSON + + cpuPercent float64 + previousCPU uint64 + previousSystem uint64 +} + +func (c *collectStatsTask) Run(i *Instance) { + reader, err := GetContainerStats(i.Name) + if err != nil { + log.Println("Error while trying to collect instance stats", err) + return + } + dec := json.NewDecoder(reader) + e := dec.Decode(&c.v) + if e != nil { + log.Println("Error while trying to collect instance stats", e) + return + } + // Memory + if c.v.MemoryStats.Limit != 0 { + c.memPercent = float64(c.v.MemoryStats.Usage) / float64(c.v.MemoryStats.Limit) * 100.0 + } + c.mem = float64(c.v.MemoryStats.Usage) + c.memLimit = float64(c.v.MemoryStats.Limit) + + i.Mem = fmt.Sprintf("%.2f%% (%s / %s)", c.memPercent, units.BytesSize(c.mem), units.BytesSize(c.memLimit)) + + // cpu + c.previousCPU = c.v.PreCPUStats.CPUUsage.TotalUsage + c.previousSystem = c.v.PreCPUStats.SystemUsage + c.cpuPercent = calculateCPUPercentUnix(c.previousCPU, c.previousSystem, c.v) + i.Cpu = fmt.Sprintf("%.2f%%", c.cpuPercent) +} + +func calculateCPUPercentUnix(previousCPU, previousSystem uint64, v *types.StatsJSON) float64 { + var ( + cpuPercent = 0.0 + // calculate the change for the cpu usage of the container in between readings + cpuDelta = float64(v.CPUStats.CPUUsage.TotalUsage) - float64(previousCPU) + // calculate the change for the entire system between readings + systemDelta = float64(v.CPUStats.SystemUsage) - float64(previousSystem) + ) + + if systemDelta > 0.0 && cpuDelta > 0.0 { + cpuPercent = (cpuDelta / systemDelta) * float64(len(v.CPUStats.CPUUsage.PercpuUsage)) * 100.0 + } + return cpuPercent +} diff --git a/services/docker.go b/services/docker.go index f7c6b52..4c1ad2d 100644 --- a/services/docker.go +++ b/services/docker.go @@ -4,9 +4,6 @@ import ( "fmt" "io" "log" - "net" - "net/http" - "time" "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/container" @@ -34,7 +31,7 @@ func init() { } func GetContainerStats(id string) (io.ReadCloser, error) { - stats, err := c.ContainerStats(context.Background(), id, true) + stats, err := c.ContainerStats(context.Background(), id, false) return stats.Body, err } @@ -43,20 +40,11 @@ func GetContainerInfo(id string) (types.ContainerJSON, error) { return c.ContainerInspect(context.Background(), id) } -func GetDaemonInfo(host string) (types.Info, error) { - transport := &http.Transport{ - DialContext: (&net.Dialer{ - Timeout: 1 * time.Second, - KeepAlive: 30 * time.Second, - }).DialContext} - cli := &http.Client{ - Transport: transport, +func GetDaemonInfo(i *Instance) (types.Info, error) { + if i.dockerClient == nil { + return types.Info{}, fmt.Errorf("Docker client for DinD (%s) is not ready", i.IP) } - c, err := client.NewClient(host, client.DefaultVersion, cli, nil) - if err != nil { - return types.Info{}, err - } - return c.Info(context.Background()) + return i.dockerClient.Info(context.Background()) } func CreateNetwork(name string) error { diff --git a/services/instance.go b/services/instance.go index 85c4157..67e1382 100644 --- a/services/instance.go +++ b/services/instance.go @@ -2,8 +2,6 @@ package services import ( "context" - "encoding/json" - "fmt" "io" "log" "os" @@ -12,20 +10,23 @@ import ( "golang.org/x/text/encoding" "github.com/docker/docker/api/types" - "github.com/docker/docker/api/types/swarm" - units "github.com/docker/go-units" + "github.com/docker/docker/client" ) var rw sync.Mutex type Instance struct { - session *Session `json:"-"` - Name string `json:"name"` - Hostname string `json:"hostname"` - IP string `json:"ip"` - conn *types.HijackedResponse `json:"-"` - ctx context.Context `json:"-"` - statsReader io.ReadCloser `json:"-"` + session *Session `json:"-"` + Name string `json:"name"` + Hostname string `json:"hostname"` + IP string `json:"ip"` + conn *types.HijackedResponse `json:"-"` + ctx context.Context `json:"-"` + statsReader io.ReadCloser `json:"-"` + dockerClient *client.Client `json:"-"` + IsManager *bool `json:"is_manager"` + Mem string `json:"mem"` + Cpu string `json:"cpu"` } func (i *Instance) IsConnected() bool { @@ -75,9 +76,6 @@ func NewInstance(session *Session) (*Instance, error) { wsServer.BroadcastTo(session.Id, "new instance", instance.Name, instance.IP, instance.Hostname) - // Start collecting stats - go instance.CollectStats() - return instance, nil } @@ -90,75 +88,6 @@ func (s *sessionWriter) Write(p []byte) (n int, err error) { return len(p), nil } -func (o *Instance) CollectStats() { - - reader, err := GetContainerStats(o.Name) - if err != nil { - log.Println("Error while trying to collect instance stats", err) - return - } - o.statsReader = reader - dec := json.NewDecoder(o.statsReader) - var ( - mem = 0.0 - memLimit = 0.0 - memPercent = 0.0 - v *types.StatsJSON - memFormatted = "" - - cpuPercent = 0.0 - previousCPU uint64 - previousSystem uint64 - cpuFormatted = "" - ) - for { - e := dec.Decode(&v) - if e != nil { - break - } - - var isManager *bool - if info, err := GetDaemonInfo(fmt.Sprintf("http://%s:2375", o.IP)); err == nil { - if info.Swarm.LocalNodeState != swarm.LocalNodeStateInactive && info.Swarm.LocalNodeState != swarm.LocalNodeStateLocked { - isManager = &info.Swarm.ControlAvailable - } - } - - // Memory - if v.MemoryStats.Limit != 0 { - memPercent = float64(v.MemoryStats.Usage) / float64(v.MemoryStats.Limit) * 100.0 - } - mem = float64(v.MemoryStats.Usage) - memLimit = float64(v.MemoryStats.Limit) - - memFormatted = fmt.Sprintf("%.2f%% (%s / %s)", memPercent, units.BytesSize(mem), units.BytesSize(memLimit)) - - // cpu - previousCPU = v.PreCPUStats.CPUUsage.TotalUsage - previousSystem = v.PreCPUStats.SystemUsage - cpuPercent = calculateCPUPercentUnix(previousCPU, previousSystem, v) - cpuFormatted = fmt.Sprintf("%.2f%%", cpuPercent) - - wsServer.BroadcastTo(o.session.Id, "instance stats", o.Name, memFormatted, cpuFormatted, isManager) - } - -} - -func calculateCPUPercentUnix(previousCPU, previousSystem uint64, v *types.StatsJSON) float64 { - var ( - cpuPercent = 0.0 - // calculate the change for the cpu usage of the container in between readings - cpuDelta = float64(v.CPUStats.CPUUsage.TotalUsage) - float64(previousCPU) - // calculate the change for the entire system between readings - systemDelta = float64(v.CPUStats.SystemUsage) - float64(previousSystem) - ) - - if systemDelta > 0.0 && cpuDelta > 0.0 { - cpuPercent = (cpuDelta / systemDelta) * float64(len(v.CPUStats.CPUUsage.PercpuUsage)) * 100.0 - } - return cpuPercent -} - func (i *Instance) ResizeTerminal(cols, rows uint) error { return ResizeConnection(i.Name, cols, rows) } diff --git a/services/session.go b/services/session.go index 71f01e1..49f3e24 100644 --- a/services/session.go +++ b/services/session.go @@ -2,13 +2,17 @@ package services import ( "encoding/gob" + "fmt" "log" "math" + "net" + "net/http" "os" "strconv" "sync" "time" + "github.com/docker/docker/client" "github.com/googollee/go-socket.io" "github.com/twinj/uuid" ) @@ -22,6 +26,8 @@ type Session struct { clients []*Client `json:"-"` CreatedAt time.Time `json:"created_at"` ExpiresAt time.Time `json:"expires_at"` + scheduled bool `json:"-"` + ticker *time.Ticker `json:"-"` } func (s *Session) Lock() { @@ -48,6 +54,49 @@ func (s *Session) AddNewClient(c *Client) { s.clients = append(s.clients, c) } +func (s *Session) SchedulePeriodicTasks() { + if s.scheduled { + return + } + + go func() { + s.scheduled = true + + s.ticker = time.NewTicker(1 * time.Second) + for range s.ticker.C { + var wg = sync.WaitGroup{} + wg.Add(len(s.Instances)) + for _, i := range s.Instances { + if i.dockerClient == nil { + // Need to create client to the DinD docker daemon + + transport := &http.Transport{ + DialContext: (&net.Dialer{ + Timeout: 1 * time.Second, + KeepAlive: 30 * time.Second, + }).DialContext} + cli := &http.Client{ + Transport: transport, + } + c, err := client.NewClient(fmt.Sprintf("http://%s:2375", i.IP), client.DefaultVersion, cli, nil) + if err != nil { + log.Println("Could not connect to DinD docker daemon", err) + } else { + i.dockerClient = c + } + } + go func() { + defer wg.Done() + for _, t := range periodicTasks { + t.Run(i) + } + }() + } + wg.Wait() + } + }() +} + var sessions map[string]*Session func init() { @@ -72,6 +121,8 @@ func CloseSessionAfter(s *Session, d time.Duration) { func CloseSession(s *Session) error { s.rw.Lock() defer s.rw.Unlock() + + s.ticker.Stop() wsServer.BroadcastTo(s.Id, "session end") log.Printf("Starting clean up of session [%s]\n", s.Id) for _, i := range s.Instances { @@ -88,6 +139,11 @@ func CloseSession(s *Session) error { return err } delete(sessions, s.Id) + + // We store sessions as soon as we delete one + if err := saveSessionsToDisk(); err != nil { + return err + } log.Printf("Cleaned up session [%s]\n", s.Id) return nil } @@ -135,6 +191,9 @@ func NewSession() (*Session, error) { } log.Printf("Connected pwd to network [%s]\n", s.Id) + // Schedule peridic tasks execution + s.SchedulePeriodicTasks() + // We store sessions as soon as we create one so we don't delete new sessions on an api restart if err := saveSessionsToDisk(); err != nil { return nil, err @@ -175,8 +234,11 @@ func LoadSessionsFromDisk() error { for _, i := range s.Instances { // wire the session back to the instance i.session = s - go i.CollectStats() + } + + // Schedule peridic tasks execution + s.SchedulePeriodicTasks() } } file.Close() diff --git a/services/task.go b/services/task.go new file mode 100644 index 0000000..3beec87 --- /dev/null +++ b/services/task.go @@ -0,0 +1,11 @@ +package services + +type periodicTask interface { + Run(i *Instance) +} + +var periodicTasks []periodicTask + +func init() { + periodicTasks = append(periodicTasks, &collectStatsTask{}, &checkSwarmStatusTask{}, &broadcastInfoTask{}) +}