diff options
| author | Mitchell Riedstra <mitch@riedstra.dev> | 2023-01-20 00:35:09 -0500 |
|---|---|---|
| committer | Mitchell Riedstra <mitch@riedstra.dev> | 2023-01-20 00:35:09 -0500 |
| commit | f07efbb6fc7a63055a8424799ce03a5f37539873 (patch) | |
| tree | ff5983b2cae4cc9b8f2f346a47cb3eb23b2f79ae | |
| parent | cbfd82db8a20be32ffa82a1afa860729f3097de6 (diff) | |
| download | steam-export-dev-wip.tar.gz steam-export-dev-wip.tar.xz | |
wipdev-wip
| -rw-r--r-- | cmd/web/handlers.go | 2 | ||||
| -rw-r--r-- | demo/Dockerfile | 8 | ||||
| -rwxr-xr-x | demo/entrypoint.sh | 2 | ||||
| -rw-r--r-- | go.mod | 3 | ||||
| -rw-r--r-- | go.sum | 2 | ||||
| -rw-r--r-- | jobStatus/main.go | 1 | ||||
| -rw-r--r-- | steam/delete.go | 18 | ||||
| -rw-r--r-- | steam/extract.go | 15 | ||||
| -rw-r--r-- | steam/game.go | 8 | ||||
| -rw-r--r-- | steam/package.go | 13 | ||||
| -rw-r--r-- | steam/status.go | 313 | ||||
| -rw-r--r-- | steam/steam.go | 53 | ||||
| -rw-r--r-- | tasks/main.go | 236 | ||||
| -rw-r--r-- | tasks/main_test.go | 138 |
14 files changed, 451 insertions, 361 deletions
diff --git a/cmd/web/handlers.go b/cmd/web/handlers.go index 182b100..a8db3bc 100644 --- a/cmd/web/handlers.go +++ b/cmd/web/handlers.go @@ -83,6 +83,8 @@ func (a *App) HandleDownload(w http.ResponseWriter, r *http.Request) { w.Header().Add("Content-type", "application/tar") w.Header().Add("Estimated-size", fmt.Sprintf("%d", g.Size)) + // Content-length is not being sent down intentionally as we do not + // know the exact size Logger.Printf("Client %s is downloading: %s", r.RemoteAddr, game) diff --git a/demo/Dockerfile b/demo/Dockerfile index 5b1101a..92a18dc 100644 --- a/demo/Dockerfile +++ b/demo/Dockerfile @@ -1,7 +1,7 @@ -FROM golang:1.16-alpine +FROM docker.io/golang:1.19-alpine -RUN apk update -RUN apk add nginx +# RUN apk update +# RUN apk add nginx RUN mkdir /code /steam-lib @@ -13,5 +13,7 @@ RUN go build -ldflags="-X 'main.Version=Demo'" -o /bin/steam-export ./cmd/web COPY demo/entrypoint.sh / +RUN chmod +x /entrypoint.sh + ENTRYPOINT /entrypoint.sh diff --git a/demo/entrypoint.sh b/demo/entrypoint.sh index e060597..3c65e54 100755 --- a/demo/entrypoint.sh +++ b/demo/entrypoint.sh @@ -1,6 +1,6 @@ #!/bin/sh USER_SHELL="${USER_SHELL:-/bin/ash}" -# UID and GID used by the `git` user inside of the container +# UID and GID used by the user inside of the container USER_UID="${USER_UID:-3500}" USER_GID="${USER_GID:-3500}" @@ -1,8 +1,9 @@ module riedstra.dev/mitch/steam-export -go 1.16 +go 1.19 require ( + github.com/barkimedes/go-deepcopy v0.0.0-20220514131651-17c30cfc62df // indirect github.com/gorilla/mux v1.8.0 github.com/swaggo/http-swagger v1.1.1 github.com/swaggo/swag v1.7.0 @@ -6,6 +6,8 @@ github.com/PuerkitoBio/purell v1.1.1/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbt github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 h1:d+Bc7a5rLufV/sSk/8dngufqelfh6jnri85riMAaF/M= github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= +github.com/barkimedes/go-deepcopy v0.0.0-20220514131651-17c30cfc62df h1:GSoSVRLoBaFpOOds6QyY1L8AX7uoY+Ln3BHc22W40X0= +github.com/barkimedes/go-deepcopy v0.0.0-20220514131651-17c30cfc62df/go.mod h1:hiVxq5OP2bUGBRNS3Z/bt/reCLFNbdcST6gISi1fiOM= github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= diff --git a/jobStatus/main.go b/jobStatus/main.go new file mode 100644 index 0000000..9d5f955 --- /dev/null +++ b/jobStatus/main.go @@ -0,0 +1 @@ +package jobStatus diff --git a/steam/delete.go b/steam/delete.go index aa78e22..eb70a50 100644 --- a/steam/delete.go +++ b/steam/delete.go @@ -3,9 +3,25 @@ package steam import ( "os" "path/filepath" + "sync" + + "riedstra.dev/mitch/steam-export/tasks" ) -// Delete removes all of the game files and the ACF +type DeleteJob struct { + delFunc tasks.TaskFunc + m sync.Mutex +} + +func newDeleteJob(f delFunc) { + +} + +func (l *Library) Delete(game string) error { + l.status.Add("") +} + +// Delete removes all the game files and the ACF func (l *Library) Delete(game string) error { g, ok := l.games[game] if !ok { diff --git a/steam/extract.go b/steam/extract.go index 45a760b..4f340fe 100644 --- a/steam/extract.go +++ b/steam/extract.go @@ -2,7 +2,6 @@ package steam import ( "archive/tar" - "errors" "fmt" "io" "net/url" @@ -20,11 +19,11 @@ const updateEveryNBytes = 10 * 1024 * 1024 // 10mb // // For example the following forms are accepted: // -// ExtractSmart("http://127.0.0.1/some-archive") -// ExtractSmart("https://example.com/some-archive") -// ExtractSmart("file:///some/local/file/path/to/archive.tar") -// ExtractSmart("/direct/path/to/archive.tar") -// ExtractSmart("C:\Users\user\Downloads\archive.tar") +// ExtractSmart("http://127.0.0.1/some-archive") +// ExtractSmart("https://example.com/some-archive") +// ExtractSmart("file:///some/local/file/path/to/archive.tar") +// ExtractSmart("/direct/path/to/archive.tar") +// ExtractSmart("C:\Users\user\Downloads\archive.tar") func (l *Library) ExtractSmart(uri string) (*Game, error) { if strings.HasPrefix(uri, "http") { _, err := url.Parse(uri) @@ -45,7 +44,7 @@ func (l *Library) ExtractSmart(uri string) (*Game, error) { // ExtractFile is a wrapper around Extract that handles local files. this // spawns an "extractFile" on the library. Status will be updated there as this -// goes along. Non fatal and fatal errors will be populated there +// goes along. Non-fatal and fatal errors will be populated there func (l *Library) ExtractFile(fn string) (*Game, error) { g := &Game{} j := newJob("extractFile", g) @@ -125,7 +124,7 @@ func (l *Library) extractUpdate(j *Job, g *Game, rdr io.Reader) (*Game, error) { estSize := j.GetSize() if estSize == nil { - j.addError(errors.New("Expected an estimated size, got nil")) + j.addError(E_NoEstimatedSize) continue } diff --git a/steam/game.go b/steam/game.go index dd4a297..8c387d2 100644 --- a/steam/game.go +++ b/steam/game.go @@ -5,6 +5,14 @@ import ( "path/filepath" ) +// Game represents an actual game in the steam Library. The purpose is only +// to provide info on a game. +type Game struct { + Name string `json:"Name" example:"Doom"` + LibraryPath string `json:"LibraryPath" example:"C:\\Program Files (x86)\\Steam\\steamapps"` + Size int64 `json:"Size" example:"12345"` +} + // GetSizeBytes returns the size in bytes, calling SetSizeInfo info Size is // currently == 0 func (g *Game) GetSizeBytes() int64 { diff --git a/steam/package.go b/steam/package.go index db3d6cf..3be5ccf 100644 --- a/steam/package.go +++ b/steam/package.go @@ -2,7 +2,6 @@ package steam import ( "archive/tar" - "errors" "io" "path/filepath" "time" @@ -14,7 +13,9 @@ const ( ) func (l *Library) Package(game string, wr io.Writer) error { + l.m.Lock() if _, ok := l.Games()[game]; !ok { + l.m.Unlock() return E_GameDoesNotExist } @@ -26,7 +27,7 @@ func (l *Library) Package(game string, wr io.Writer) error { j.setSize(g.GetSizeBytes()) - // Invert the writer so we can break up the copy and get progress + // Invert the writer, so we can break up the copy and get progress // information in here rdr, pwrtr := io.Pipe() go func() { @@ -51,7 +52,7 @@ func (l *Library) Package(game string, wr io.Writer) error { total += n j.setTransferred(total) - elapsedSeconds := float64(time.Since(*j.StartTime()).Seconds()) + elapsedSeconds := time.Since(*j.StartTime()).Seconds() rate := float64(total) / elapsedSeconds @@ -60,14 +61,14 @@ func (l *Library) Package(game string, wr io.Writer) error { estSize := j.GetSize() if estSize == nil { - j.addError(errors.New("Expected an estimated size, got nil")) + j.addError(E_NoEstimatedSize) continue } remainingBytes := float64(*estSize - total) // fmt.Println("remaining bytes: ", formatBytes(int64(remainingBytes))) - seconds := (remainingBytes / rate) + seconds := remainingBytes / rate duration := time.Duration(seconds * 1000 * 1000 * 1000) // fmt.Println("Raw duration: ", duration) @@ -79,7 +80,7 @@ func (l *Library) Package(game string, wr io.Writer) error { } // Package writes the package to wr, returning errors if any -func (l *Library) packagePrimitive(j *Job, g *Game, wr io.WriteCloser) error { +func (l *Library) packagePrimitive(g *Game, wr io.WriteCloser) error { acf, err := FindACF(l.folder, g.Name) if err != nil { diff --git a/steam/status.go b/steam/status.go deleted file mode 100644 index 6b0d874..0000000 --- a/steam/status.go +++ /dev/null @@ -1,313 +0,0 @@ -package steam - -import ( - "encoding/json" - "sync" - "time" - - "fmt" - "os" -) - -var debuglogging = false - -func debugLogJob(s string, args ...interface{}) { - if debuglogging { - fmt.Fprintf(os.Stderr, s, args...) - } -} - -func debugLogJobs(s string, args ...interface{}) { - if debuglogging { - fmt.Fprintf(os.Stderr, s, args...) - } -} - -// JobStatus provides specific information about an individual job -type Job struct { - action string - target *Game - running bool - start *time.Time - errors []error - - // If applicablle - size *int64 - transferred *int64 - eta *time.Duration - - m sync.Mutex -} - -type JobStatusJson struct { - Action string `json:"Action" example:"extractHTTP,delete"` // What action is being run? - Target *Game `json:"Target" example:"Doom"` // Name of the target game - Running bool `json:"Running" example:"false"` // Whether or not the job is running - Start *time.Time `json:"Start" example:"1629855616"` // Start time as a unix timestamp - Errors []string `json:"Errors"` // List of all errors encountered through the course of the job - - // If applicablle - Size *int64 `json:"Size" example:"12345"` // Game size in bytes - Transferred *int64 `json:"Transferred" example:"1234"` // Bytes transferred - Eta *time.Duration `json:"ETA" example:"1234"` // Time in seconds until it finishes -} - -func (j Job) MarshalJSON() ([]byte, error) { - - errs := []string{} - for _, e := range j.errors { - errs = append(errs, e.Error()) - } - - return json.Marshal( - &JobStatusJson{ - Action: j.action, - Target: j.target, - Running: j.running, - Start: j.start, - Errors: errs, - Size: j.size, - Transferred: j.transferred, - Eta: j.eta, - }) -} - -// Action is a short string describing the action, i.e. "packaging", "deleting" -func (j *Job) Action() string { - j.m.Lock() - defer j.m.Unlock() - debugLogJob("Action on: '%s'\n", *j) - return j.action -} - -// Target returns the game that is the target of the action -func (j *Job) Target() *Game { - j.m.Lock() - defer j.m.Unlock() - debugLogJob("Target on: '%s'\n", *j) - return j.target -} - -// IsRunning returns true if a job is currently running, otherwise false -func (j *Job) IsRunning() bool { - j.m.Lock() - defer j.m.Unlock() - debugLogJob("IsRunning on: '%s'\n", *j) - return j.running -} - -// StartTime returns the time in which the job started -func (j *Job) StartTime() *time.Time { - j.m.Lock() - defer j.m.Unlock() - debugLogJob("StartTime on: '%s'\n", *j) - return j.start -} - -func (j *Job) Errors() []error { - j.m.Lock() - defer j.m.Unlock() - debugLogJob("errors on: '%s'\n", *j) - return j.errors -} - -// newJob sets up a job of action for the target Game -func newJob(action string, target *Game) *Job { - debugLogJob("New job: '%s' target: '%s'\n", action, target) - t := time.Now() - return &Job{ - action: action, - target: target, - running: true, - start: &t, - } -} - -func (j *Job) setSize(size int64) { - j.m.Lock() - defer j.m.Unlock() - debugLogJob("setSize on: '%s'\n", *j) - j.size = &size -} - -// GetSize returns the size set if applicable for the operation -func (j *Job) GetSize() *int64 { - j.m.Lock() - defer j.m.Unlock() - debugLogJob("getSize on: '%s'\n", *j) - return j.size -} - -func (j *Job) setTransferred(transferred int64) { - j.m.Lock() - defer j.m.Unlock() - debugLogJob("setTransferred on: '%s'\n", *j) - j.transferred = &transferred -} - -// GetTransferred returns the transferred set if applicable for the operation -func (j *Job) GetTransferred() *int64 { - j.m.Lock() - defer j.m.Unlock() - debugLogJob("GetTransferred on: '%s'\n", *j) - return j.transferred -} - -// setETA sets the eta to the speicifed duration -func (j *Job) setETA(d time.Duration) { - j.m.Lock() - defer j.m.Unlock() - debugLogJob("setETA on: '%s'\n", *j) - j.eta = &d -} - -// GetETA returns the ETA to completion as a *time.Duration. nil represents -// when -func (j *Job) GetETA() *time.Duration { - j.m.Lock() - defer j.m.Unlock() - debugLogJob("GetETA on: '%s'\n", *j) - return j.eta -} - -// done sets running to false -func (j *Job) done() { - j.m.Lock() - defer j.m.Unlock() - j.running = false -} - -// addError appends an error to the internal slice of errors -func (j *Job) addError(err error) { - j.m.Lock() - defer j.m.Unlock() - debugLogJob("add error on: '%s'\n", *j) - j.errors = append(j.errors, err) -} - -// Jobs is by the Library to determine whether or not we currently have any -// jobs running on this library, as well as to give some history as to -// what jobs have been run previously -type Jobs struct { - running []*Job - previous []*Job - - m sync.Mutex -} - -func (jobs Jobs) MarshalJSON() ([]byte, error) { - jobs.scan() - return json.Marshal( - struct { - Running []*Job `json:"Running"` - Previous []*Job `json:"Previous"` - }{ - Running: jobs.running, - Previous: jobs.previous, - }) -} - -func (jobs Jobs) String() string { - b, err := json.Marshal(jobs) - if err != nil { - panic(err) - } - return string(b) -} - -func (jobs *Jobs) scan() { - jobs.m.Lock() - defer jobs.m.Unlock() - debugLogJobs("scan on: '%s'\n", *jobs) - running := []*Job{} - notrunning := []*Job{} - - for _, job := range jobs.running { - if job == nil { - continue - } - - if job.IsRunning() == true { - running = append(running, job) - } else { - notrunning = append(notrunning, job) - } - } - - if len(notrunning) > 0 { - jobs.previous = append(jobs.previous, notrunning...) - } - - jobs.running = running -} - -// otherThanCurrent will return true if there's another job running on the -// game specified. It's the caller's responsibility to check that the provided -// job has a game of not nil, otherwise a panic will occur -func (jobs *Jobs) otherThanCurrent(j *Job) bool { - for _, job := range jobs.GetRunningJobs() { - if job == j { - continue - } - - g := job.Target() - - if g == nil { - continue - } - - if g.Name == j.Target().Name { - return true - } - } - - return false -} - -// Running returns true if any job is currently running, otherwise false -func (jobs *Jobs) Running() bool { - jobs.scan() - jobs.m.Lock() - defer jobs.m.Unlock() - if len(jobs.running) == 0 { - return false - } - debugLogJobs("running on: '%s'\n", *jobs) - return true -} - -// GetJobs returns all of the jobs regardless of their state -func (jobs *Jobs) GetJobs() []*Job { - jobs.scan() - jobs.m.Lock() - defer jobs.m.Unlock() - debugLogJobs("GetJobs on: '%s'\n", *jobs) - return append(jobs.running, jobs.previous...) -} - -// GetRunningJobs returns all of the running jobs -func (jobs *Jobs) GetRunningJobs() []*Job { - jobs.scan() - jobs.m.Lock() - defer jobs.m.Unlock() - debugLogJobs("GetRunningJobs on: '%s'\n", *jobs) - return jobs.running -} - -// GetStoppedJobs returns all of the stopped jobs -func (jobs *Jobs) GetStoppedJobs() []*Job { - jobs.scan() - jobs.m.Lock() - defer jobs.m.Unlock() - debugLogJobs("GetStoppedJobs on: '%s'\n", *jobs) - return jobs.previous -} - -// addJob adds a job to the internal slice -func (jobs *Jobs) addJob(j *Job) { - jobs.m.Lock() - jobs.running = append(jobs.running, j) - debugLogJobs("addJob on: '%s'\n", *jobs) - jobs.m.Unlock() - jobs.scan() -} diff --git a/steam/steam.go b/steam/steam.go index 03fa51e..418cbf4 100644 --- a/steam/steam.go +++ b/steam/steam.go @@ -6,15 +6,21 @@ package steam import ( "errors" "fmt" - "io/ioutil" + "os" "regexp" + "riedstra.dev/mitch/steam-export/tasks" "sync" + + "github.com/barkimedes/go-deepcopy" ) var ( - E_GameDoesNotExist = errors.New("Game does not exist") - E_BadURI = errors.New("The URI supplied is not understood") - E_OperationConflict = errors.New("Another conflicting job is running on this game right now") + E_GameDoesNotExist = errors.New("game does not exist") + E_BadURI = errors.New("the URI supplied is not understood") + E_OperationConflict = errors.New("another conflicting job is running on this game right now") + E_NoEstimatedSize = errors.New("expected an estimated size, got nil") + E_LibraryLocked = errors.New("cannot process library with actions running, library is locked") + E_LibraryNoCommon = errors.New("no common directory") ) // Library is used to represent the steam library, the Games map is populated @@ -26,19 +32,11 @@ var ( type Library struct { folder string games map[string]*Game - status *Jobs + status *tasks.Group m sync.Mutex } -// Game represents an actual game in the steam Library. The purpose is only -// to provide info on a game. -type Game struct { - Name string `json:"Name" example:"Doom"` - LibraryPath string `json:"LibraryPath" example:"C:\\Program Files (x86)\\Steam\\steamapps"` - Size int64 `json:"Size" example:"12345"` -} - var slugregexp = regexp.MustCompile(`[^-0-9A-Za-z_:.]`) // Slug returns a safer version of the name with spaces and other chars @@ -51,10 +49,7 @@ func (g Game) Slug() string { // if any func NewLibrary(path string) (*Library, error) { l := &Library{ - status: &Jobs{ - running: make([]*Job, 0), - previous: make([]*Job, 0), - }, + status: tasks.NewGroup(), } err := l.ProcessLibrary(path) if err != nil { @@ -84,16 +79,18 @@ func (l *Library) Folder() string { func (l *Library) Games() map[string]*Game { l.m.Lock() defer l.m.Unlock() - return l.games + g := deepcopy.MustAnything(l.games) + return g.(map[string]*Game) } -// Jobs returns the current *Jobs struct which can be used to keep track -// of any long running operations on the library as well as any errors +// Status returns the current Jobs struct which can be used to keep track +// of any long-running operations on the library as well as any errors // encountered along the way -func (l *Library) Status() Jobs { +func (l *Library) Status() *tasks.Group { l.m.Lock() defer l.m.Unlock() - return *l.status + s2 := deepcopy.MustAnything(l.status) + return s2.(*tasks.Group) } // Refresh simply calls ProcessLibrary to refresh the entire contents of the @@ -105,19 +102,19 @@ func (l *Library) Refresh() error { // ProcessLibrary Populates the "Folder" and "Games" fields based on the // provided directory. Returns an error if any jobs are currently running func (s *Library) ProcessLibrary(r string) error { - if s.status.Running() { - return errors.New("Cannot process library with actions running") + if len(s.status.Running()) > 0 { + return E_LibraryLocked } if !hasCommon(r) { - return fmt.Errorf("No common directory in: %s", r) + return fmt.Errorf("in: '%s': %w", r, E_LibraryNoCommon) } s.m.Lock() defer s.m.Unlock() s.games = make(map[string]*Game) - dirs, err := ioutil.ReadDir(r + "/common") + dirs, err := os.ReadDir(r + "/common") if err != nil { return err } @@ -128,7 +125,7 @@ func (s *Library) ProcessLibrary(r string) error { Name: f.Name(), LibraryPath: r, } - g.SetSizeInfo() + _ = g.SetSizeInfo() s.games[f.Name()] = g } @@ -147,7 +144,7 @@ func (s *Library) String() (str string) { } func hasCommon(d string) bool { - dirs, err := ioutil.ReadDir(d) + dirs, err := os.ReadDir(d) if err != nil { return false } diff --git a/tasks/main.go b/tasks/main.go new file mode 100644 index 0000000..13832c4 --- /dev/null +++ b/tasks/main.go @@ -0,0 +1,236 @@ +// Package tasks is a simple package for tracking the state of particular asynchronous, or +// background tasks. Simply define a "Run" method to satisfy the Task interface, then +// they can be added to a group for tracking. +package tasks + +import ( + "errors" + "fmt" + "io" + "log" + "reflect" + "strings" + "sync" +) + +var ( + EAlreadyRegistered = errors.New("a task has already been registered with this name") + EAlreadyStarted = errors.New("specified task has already been started") + ENotRegistered = errors.New("task has not been registered") + + // logger = log.New(os.Stderr, "", log.LstdFlags|log.Lshortfile) + logger = log.New(io.Discard, "", 0) +) + +const ( + _ taskState = 1 << iota + sTodo + sRunning + sFinished + sError +) + +type taskState int + +// Task interface is wraps anything with a Run(), it's assumed +// that the Run() is blocking and will return when the task +// is complete. +type Task interface { + Run() error +} + +// Group represents a group of tasks, and will keep track of their various states +// as well as any errors. While this library does not concurrently access the +// tasks it's likely you'll want them to be safe for concurrent use if you're +// accessing the tasks elsewhere. +// +// Tasks can be run more than once if Start or Run is called more than once. +// no effort is made to prevent that other than not allowing jobs to stack. +// ( They will return an error if they're currently running ) +type Group struct { + tasks map[string]*taskEntry + m *sync.RWMutex +} + +func NewGroup() *Group { + return &Group{ + tasks: map[string]*taskEntry{}, + m: &sync.RWMutex{}, + } +} + +type TaskList map[string]Task + +func (tl TaskList) String() string { + bld := &strings.Builder{} + for name, entry := range tl { + bld.Write([]byte{' '}) + bld.WriteString(fmt.Sprintf("%s->%s(%+v)", name, + reflect.TypeOf(entry), entry)) + } + return bld.String() +} + +// Running returns all the running Tasks. These are not copies, so care must +// be taken if they're not thread safe. +func (g *Group) Running() TaskList { + return g.getForCondition( + func(state taskState) bool { + return state&sRunning > 0 + }) +} + +// Todo returns all the tasks in the Todo state. Run or Start simply has not +// been called. +func (g *Group) Todo() TaskList { + return g.getForCondition( + func(state taskState) bool { + return state&sTodo > 0 + }) +} + +func (g *Group) Finished() TaskList { + return g.getForCondition( + func(state taskState) bool { + return state&sFinished > 0 + }) +} + +func (g *Group) Error() TaskList { + return g.getForCondition( + func(state taskState) bool { + return state&sError > 0 + }) +} + +func (g *Group) GetError(name string) error { + var err error + g.m.RLock() + defer g.m.RUnlock() + entry, ok := g.tasks[name] + if !ok { + return fmt.Errorf("task: %s not found: %w", name, ENotRegistered) + } + err = entry.Error + return err +} + +// Registered returns all the registered tasks +func (g *Group) Registered() TaskList { + return g.getForCondition( + func(state taskState) bool { + return true + }) +} + +func (g *Group) getForCondition(condition func(state taskState) bool) TaskList { + g.m.Lock() + defer g.m.Unlock() + out := TaskList{} + for name, entry := range g.tasks { + if condition(entry.State) { + out[name] = entry.Task + } + } + return out +} + +// AddAndStart does exactly what ti says on the tin +func (g *Group) AddAndStart(name string, task Task) error { + err := g.Add(name, task) + if err != nil { + return err + } + return g.Start(name) +} + +// AddAndRun does exactly what it says on the tin +func (g *Group) AddAndRun(name string, task Task) error { + err := g.Add(name, task) + if err != nil { + return err + } + return g.Run(name) +} + +// Add will register a task with the group, however it does not run it +// immediately. Allowing you to call Run or Start separately. +func (g *Group) Add(name string, task Task) error { + g.m.Lock() + defer g.m.Unlock() + _, ok := g.tasks[name] + if ok { + return fmt.Errorf("%s already registered: %w", name, EAlreadyRegistered) + } + g.tasks[name] = &taskEntry{ + Task: task, + State: sTodo, + } + return nil +} + +// Start will start the task but not wait around for it to finish, returns +// an error if the task isn't registered or is already started. +func (g *Group) Start(name string) error { + g.m.Lock() + entry, ok := g.tasks[name] + if !ok { + g.m.Unlock() + return fmt.Errorf("'%s' not registered %w", name, ENotRegistered) + } + logger.Printf("Start %s -> state: %d condition: %v", + name, entry.State, entry.State&sRunning > 0) + if entry.State&sRunning > 0 { + g.m.Unlock() + return fmt.Errorf("'%s' already running %w", name, EAlreadyStarted) + } + go func() { + g.m.Unlock() + _ = entry.run(g.m, name) + }() + return nil +} + +// Run will do the same thing as start, except that it will block and +// wait for the task to finish, and return the task's error. +func (g *Group) Run(name string) error { + g.m.Lock() + entry, ok := g.tasks[name] + g.m.Unlock() + if !ok { + return fmt.Errorf("'%s' not registered %w", name, ENotRegistered) + } + return entry.run(g.m, name) +} + +type taskEntry struct { + Task Task + State taskState + Error error +} + +func (e *taskEntry) getState(m *sync.RWMutex) taskState { + m.RLock() + defer m.RUnlock() + return e.State +} + +func (e *taskEntry) run(m *sync.RWMutex, name string) error { + s := e.getState(m) + if s&sRunning > 0 { + return fmt.Errorf("'%s' already running %w", name, EAlreadyStarted) + } + m.Lock() + e.State = sRunning + m.Unlock() + + err := e.Task.Run() + m.Lock() + e.State = sFinished + e.Error = err + if err != nil { + e.State = e.State | sError + } + m.Unlock() + return err +} diff --git a/tasks/main_test.go b/tasks/main_test.go new file mode 100644 index 0000000..5c69a81 --- /dev/null +++ b/tasks/main_test.go @@ -0,0 +1,138 @@ +package tasks + +import ( + "errors" + "log" + "os" + "testing" + "time" +) + +type tShortTask struct{} + +func (t *tShortTask) Run() error { + time.Sleep(time.Second * 1) + return nil +} + +type tLongTask struct{} + +func (t *tLongTask) Run() error { + time.Sleep(time.Second * 2) + return nil +} + +type tErrorTask struct{} + +func (t *tErrorTask) Run() error { + time.Sleep(time.Second * 2) + return eErrorTask +} + +var ( + eErrorTask = errors.New("tErrorTask testing error") + + testLogger = log.New(os.Stderr, "", log.LstdFlags|log.Lshortfile) +) + +func TestIntegration(t *testing.T) { + logger = testLogger + + tg := NewGroup() + + pairs := map[string]Task{ + "tShortTask": &tShortTask{}, + "tLongTask": &tLongTask{}, + "tErrorTask": &tErrorTask{}, + } + + done := []string{} + for name, task := range pairs { + err := tg.Add(name, task) + if err != nil { + t.Fatalf("while adding: %s", err) + } + testLogger.Println("Added: ", name) + + registered := tg.Registered() + testLogger.Println("Currently registered: ", registered) + for _, n := range done { + _, ok := registered[n] + if !ok { + t.Fatalf("%s is supposed to be registered", n) + } + } + } + + reg := tg.Registered() + testLogger.Println("Registered tasks: ", reg) + + err := tg.Add("tLongTask", &tLongTask{}) + if !errors.Is(err, EAlreadyRegistered) { + t.Errorf("tLongTask should already be registered got: %s", err) + } + + err = tg.Start("tLongTask") + if err != nil { + t.Errorf("starting tLongTask err should've been nil: %s", err) + } + testLogger.Println("tried starting tLongTask again") + + err = tg.Run("tShortTask") + if err != nil { + t.Errorf("running tShortTask err should've been nil: %s", err) + } + + finished := tg.Finished() + testLogger.Println("Finished tasks: ", finished) + _, ok := finished["tShortTask"] + if !ok { + t.Errorf("tShortTask should've been finished and isn't") + } + + _, ok = finished["tLongTask"] + if ok { + t.Errorf("tLongTask is finished and it shouldn't be") + } + + err = tg.Start("tLongTask") + if !errors.Is(err, EAlreadyStarted) { + t.Errorf("tLongTask should've already been started got: %s", err) + } + + err = tg.Run("tLongTask") + if !errors.Is(err, EAlreadyStarted) { + t.Errorf("tLongTask should've already been started got: %s", err) + } + + running := tg.Running() + testLogger.Println("Running tasks: ", running) + + err = tg.Run("tErrorTask") + if err == nil { + t.Error("tErrorTask should have produced an error") + } + + withErrors := tg.Error() + testLogger.Println("Errored tasks: ", withErrors) + _, ok = withErrors["tErrorTask"] + if !ok { + t.Error("tErrorTask should've been found within tg.Error()") + } + + err = tg.GetError("tErrorTask") + if !errors.Is(err, eErrorTask) { + t.Errorf("tErrorTask should've returned an error got: %s", err) + } + + finished = tg.Finished() + testLogger.Println("Finished tasks: ", finished) + + for name := range pairs { + _, ok := finished[name] + if !ok { + t.Errorf("'%s' should be finished and isn't", name) + } + } + +} |
