1
0
mirror of https://github.com/bingohuang/docker-labs.git synced 2025-07-14 01:57:32 +08:00

Periodic tasks refactor (#62)

* Once every second the session run a list of periodic tasks on every
instance concurrently. We use these tasks to do things like:
- Collect mem and cpu stats
- Check if instance is part of a swarm cluster
- Broadcast information to connected clients
This commit is contained in:
Jonathan Leibiusky 2016-12-01 15:57:30 -03:00 committed by GitHub
parent 07fee4c1bf
commit afa47c0bfc
7 changed files with 178 additions and 101 deletions

View File

@ -0,0 +1,8 @@
package services
type broadcastInfoTask struct {
}
func (c *broadcastInfoTask) Run(i *Instance) {
wsServer.BroadcastTo(i.session.Id, "instance stats", i.Name, i.Mem, i.Cpu, i.IsManager)
}

View File

@ -0,0 +1,15 @@
package services
import "github.com/docker/docker/api/types/swarm"
type checkSwarmStatusTask struct {
}
func (c checkSwarmStatusTask) Run(i *Instance) {
if info, err := GetDaemonInfo(i); err == nil {
if info.Swarm.LocalNodeState != swarm.LocalNodeStateInactive && info.Swarm.LocalNodeState != swarm.LocalNodeStateLocked {
i.IsManager = &info.Swarm.ControlAvailable
}
}
}

View File

@ -0,0 +1,64 @@
package services
import (
"encoding/json"
"fmt"
"log"
"github.com/docker/docker/api/types"
units "github.com/docker/go-units"
)
type collectStatsTask struct {
mem float64
memLimit float64
memPercent float64
v *types.StatsJSON
cpuPercent float64
previousCPU uint64
previousSystem uint64
}
func (c *collectStatsTask) Run(i *Instance) {
reader, err := GetContainerStats(i.Name)
if err != nil {
log.Println("Error while trying to collect instance stats", err)
return
}
dec := json.NewDecoder(reader)
e := dec.Decode(&c.v)
if e != nil {
log.Println("Error while trying to collect instance stats", e)
return
}
// Memory
if c.v.MemoryStats.Limit != 0 {
c.memPercent = float64(c.v.MemoryStats.Usage) / float64(c.v.MemoryStats.Limit) * 100.0
}
c.mem = float64(c.v.MemoryStats.Usage)
c.memLimit = float64(c.v.MemoryStats.Limit)
i.Mem = fmt.Sprintf("%.2f%% (%s / %s)", c.memPercent, units.BytesSize(c.mem), units.BytesSize(c.memLimit))
// cpu
c.previousCPU = c.v.PreCPUStats.CPUUsage.TotalUsage
c.previousSystem = c.v.PreCPUStats.SystemUsage
c.cpuPercent = calculateCPUPercentUnix(c.previousCPU, c.previousSystem, c.v)
i.Cpu = fmt.Sprintf("%.2f%%", c.cpuPercent)
}
func calculateCPUPercentUnix(previousCPU, previousSystem uint64, v *types.StatsJSON) float64 {
var (
cpuPercent = 0.0
// calculate the change for the cpu usage of the container in between readings
cpuDelta = float64(v.CPUStats.CPUUsage.TotalUsage) - float64(previousCPU)
// calculate the change for the entire system between readings
systemDelta = float64(v.CPUStats.SystemUsage) - float64(previousSystem)
)
if systemDelta > 0.0 && cpuDelta > 0.0 {
cpuPercent = (cpuDelta / systemDelta) * float64(len(v.CPUStats.CPUUsage.PercpuUsage)) * 100.0
}
return cpuPercent
}

View File

@ -4,9 +4,6 @@ import (
"fmt"
"io"
"log"
"net"
"net/http"
"time"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
@ -34,7 +31,7 @@ func init() {
}
func GetContainerStats(id string) (io.ReadCloser, error) {
stats, err := c.ContainerStats(context.Background(), id, true)
stats, err := c.ContainerStats(context.Background(), id, false)
return stats.Body, err
}
@ -43,20 +40,11 @@ func GetContainerInfo(id string) (types.ContainerJSON, error) {
return c.ContainerInspect(context.Background(), id)
}
func GetDaemonInfo(host string) (types.Info, error) {
transport := &http.Transport{
DialContext: (&net.Dialer{
Timeout: 1 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext}
cli := &http.Client{
Transport: transport,
func GetDaemonInfo(i *Instance) (types.Info, error) {
if i.dockerClient == nil {
return types.Info{}, fmt.Errorf("Docker client for DinD (%s) is not ready", i.IP)
}
c, err := client.NewClient(host, client.DefaultVersion, cli, nil)
if err != nil {
return types.Info{}, err
}
return c.Info(context.Background())
return i.dockerClient.Info(context.Background())
}
func CreateNetwork(name string) error {

View File

@ -2,8 +2,6 @@ package services
import (
"context"
"encoding/json"
"fmt"
"io"
"log"
"os"
@ -12,20 +10,23 @@ import (
"golang.org/x/text/encoding"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/swarm"
units "github.com/docker/go-units"
"github.com/docker/docker/client"
)
var rw sync.Mutex
type Instance struct {
session *Session `json:"-"`
Name string `json:"name"`
Hostname string `json:"hostname"`
IP string `json:"ip"`
conn *types.HijackedResponse `json:"-"`
ctx context.Context `json:"-"`
statsReader io.ReadCloser `json:"-"`
session *Session `json:"-"`
Name string `json:"name"`
Hostname string `json:"hostname"`
IP string `json:"ip"`
conn *types.HijackedResponse `json:"-"`
ctx context.Context `json:"-"`
statsReader io.ReadCloser `json:"-"`
dockerClient *client.Client `json:"-"`
IsManager *bool `json:"is_manager"`
Mem string `json:"mem"`
Cpu string `json:"cpu"`
}
func (i *Instance) IsConnected() bool {
@ -75,9 +76,6 @@ func NewInstance(session *Session) (*Instance, error) {
wsServer.BroadcastTo(session.Id, "new instance", instance.Name, instance.IP, instance.Hostname)
// Start collecting stats
go instance.CollectStats()
return instance, nil
}
@ -90,75 +88,6 @@ func (s *sessionWriter) Write(p []byte) (n int, err error) {
return len(p), nil
}
func (o *Instance) CollectStats() {
reader, err := GetContainerStats(o.Name)
if err != nil {
log.Println("Error while trying to collect instance stats", err)
return
}
o.statsReader = reader
dec := json.NewDecoder(o.statsReader)
var (
mem = 0.0
memLimit = 0.0
memPercent = 0.0
v *types.StatsJSON
memFormatted = ""
cpuPercent = 0.0
previousCPU uint64
previousSystem uint64
cpuFormatted = ""
)
for {
e := dec.Decode(&v)
if e != nil {
break
}
var isManager *bool
if info, err := GetDaemonInfo(fmt.Sprintf("http://%s:2375", o.IP)); err == nil {
if info.Swarm.LocalNodeState != swarm.LocalNodeStateInactive && info.Swarm.LocalNodeState != swarm.LocalNodeStateLocked {
isManager = &info.Swarm.ControlAvailable
}
}
// Memory
if v.MemoryStats.Limit != 0 {
memPercent = float64(v.MemoryStats.Usage) / float64(v.MemoryStats.Limit) * 100.0
}
mem = float64(v.MemoryStats.Usage)
memLimit = float64(v.MemoryStats.Limit)
memFormatted = fmt.Sprintf("%.2f%% (%s / %s)", memPercent, units.BytesSize(mem), units.BytesSize(memLimit))
// cpu
previousCPU = v.PreCPUStats.CPUUsage.TotalUsage
previousSystem = v.PreCPUStats.SystemUsage
cpuPercent = calculateCPUPercentUnix(previousCPU, previousSystem, v)
cpuFormatted = fmt.Sprintf("%.2f%%", cpuPercent)
wsServer.BroadcastTo(o.session.Id, "instance stats", o.Name, memFormatted, cpuFormatted, isManager)
}
}
func calculateCPUPercentUnix(previousCPU, previousSystem uint64, v *types.StatsJSON) float64 {
var (
cpuPercent = 0.0
// calculate the change for the cpu usage of the container in between readings
cpuDelta = float64(v.CPUStats.CPUUsage.TotalUsage) - float64(previousCPU)
// calculate the change for the entire system between readings
systemDelta = float64(v.CPUStats.SystemUsage) - float64(previousSystem)
)
if systemDelta > 0.0 && cpuDelta > 0.0 {
cpuPercent = (cpuDelta / systemDelta) * float64(len(v.CPUStats.CPUUsage.PercpuUsage)) * 100.0
}
return cpuPercent
}
func (i *Instance) ResizeTerminal(cols, rows uint) error {
return ResizeConnection(i.Name, cols, rows)
}

View File

@ -2,13 +2,17 @@ package services
import (
"encoding/gob"
"fmt"
"log"
"math"
"net"
"net/http"
"os"
"strconv"
"sync"
"time"
"github.com/docker/docker/client"
"github.com/googollee/go-socket.io"
"github.com/twinj/uuid"
)
@ -22,6 +26,8 @@ type Session struct {
clients []*Client `json:"-"`
CreatedAt time.Time `json:"created_at"`
ExpiresAt time.Time `json:"expires_at"`
scheduled bool `json:"-"`
ticker *time.Ticker `json:"-"`
}
func (s *Session) Lock() {
@ -48,6 +54,49 @@ func (s *Session) AddNewClient(c *Client) {
s.clients = append(s.clients, c)
}
func (s *Session) SchedulePeriodicTasks() {
if s.scheduled {
return
}
go func() {
s.scheduled = true
s.ticker = time.NewTicker(1 * time.Second)
for range s.ticker.C {
var wg = sync.WaitGroup{}
wg.Add(len(s.Instances))
for _, i := range s.Instances {
if i.dockerClient == nil {
// Need to create client to the DinD docker daemon
transport := &http.Transport{
DialContext: (&net.Dialer{
Timeout: 1 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext}
cli := &http.Client{
Transport: transport,
}
c, err := client.NewClient(fmt.Sprintf("http://%s:2375", i.IP), client.DefaultVersion, cli, nil)
if err != nil {
log.Println("Could not connect to DinD docker daemon", err)
} else {
i.dockerClient = c
}
}
go func() {
defer wg.Done()
for _, t := range periodicTasks {
t.Run(i)
}
}()
}
wg.Wait()
}
}()
}
var sessions map[string]*Session
func init() {
@ -72,6 +121,8 @@ func CloseSessionAfter(s *Session, d time.Duration) {
func CloseSession(s *Session) error {
s.rw.Lock()
defer s.rw.Unlock()
s.ticker.Stop()
wsServer.BroadcastTo(s.Id, "session end")
log.Printf("Starting clean up of session [%s]\n", s.Id)
for _, i := range s.Instances {
@ -88,6 +139,11 @@ func CloseSession(s *Session) error {
return err
}
delete(sessions, s.Id)
// We store sessions as soon as we delete one
if err := saveSessionsToDisk(); err != nil {
return err
}
log.Printf("Cleaned up session [%s]\n", s.Id)
return nil
}
@ -135,6 +191,9 @@ func NewSession() (*Session, error) {
}
log.Printf("Connected pwd to network [%s]\n", s.Id)
// Schedule peridic tasks execution
s.SchedulePeriodicTasks()
// We store sessions as soon as we create one so we don't delete new sessions on an api restart
if err := saveSessionsToDisk(); err != nil {
return nil, err
@ -175,8 +234,11 @@ func LoadSessionsFromDisk() error {
for _, i := range s.Instances {
// wire the session back to the instance
i.session = s
go i.CollectStats()
}
// Schedule peridic tasks execution
s.SchedulePeriodicTasks()
}
}
file.Close()

11
services/task.go Normal file
View File

@ -0,0 +1,11 @@
package services
type periodicTask interface {
Run(i *Instance)
}
var periodicTasks []periodicTask
func init() {
periodicTasks = append(periodicTasks, &collectStatsTask{}, &checkSwarmStatusTask{}, &broadcastInfoTask{})
}