aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMitchell Riedstra <mitch@riedstra.dev>2023-01-20 00:35:09 -0500
committerMitchell Riedstra <mitch@riedstra.dev>2023-01-20 00:35:09 -0500
commitf07efbb6fc7a63055a8424799ce03a5f37539873 (patch)
treeff5983b2cae4cc9b8f2f346a47cb3eb23b2f79ae
parentcbfd82db8a20be32ffa82a1afa860729f3097de6 (diff)
downloadsteam-export-dev-wip.tar.gz
steam-export-dev-wip.tar.xz
-rw-r--r--cmd/web/handlers.go2
-rw-r--r--demo/Dockerfile8
-rwxr-xr-xdemo/entrypoint.sh2
-rw-r--r--go.mod3
-rw-r--r--go.sum2
-rw-r--r--jobStatus/main.go1
-rw-r--r--steam/delete.go18
-rw-r--r--steam/extract.go15
-rw-r--r--steam/game.go8
-rw-r--r--steam/package.go13
-rw-r--r--steam/status.go313
-rw-r--r--steam/steam.go53
-rw-r--r--tasks/main.go236
-rw-r--r--tasks/main_test.go138
14 files changed, 451 insertions, 361 deletions
diff --git a/cmd/web/handlers.go b/cmd/web/handlers.go
index 182b100..a8db3bc 100644
--- a/cmd/web/handlers.go
+++ b/cmd/web/handlers.go
@@ -83,6 +83,8 @@ func (a *App) HandleDownload(w http.ResponseWriter, r *http.Request) {
w.Header().Add("Content-type", "application/tar")
w.Header().Add("Estimated-size", fmt.Sprintf("%d", g.Size))
+ // Content-length is not being sent down intentionally as we do not
+ // know the exact size
Logger.Printf("Client %s is downloading: %s", r.RemoteAddr, game)
diff --git a/demo/Dockerfile b/demo/Dockerfile
index 5b1101a..92a18dc 100644
--- a/demo/Dockerfile
+++ b/demo/Dockerfile
@@ -1,7 +1,7 @@
-FROM golang:1.16-alpine
+FROM docker.io/golang:1.19-alpine
-RUN apk update
-RUN apk add nginx
+# RUN apk update
+# RUN apk add nginx
RUN mkdir /code /steam-lib
@@ -13,5 +13,7 @@ RUN go build -ldflags="-X 'main.Version=Demo'" -o /bin/steam-export ./cmd/web
COPY demo/entrypoint.sh /
+RUN chmod +x /entrypoint.sh
+
ENTRYPOINT /entrypoint.sh
diff --git a/demo/entrypoint.sh b/demo/entrypoint.sh
index e060597..3c65e54 100755
--- a/demo/entrypoint.sh
+++ b/demo/entrypoint.sh
@@ -1,6 +1,6 @@
#!/bin/sh
USER_SHELL="${USER_SHELL:-/bin/ash}"
-# UID and GID used by the `git` user inside of the container
+# UID and GID used by the user inside of the container
USER_UID="${USER_UID:-3500}"
USER_GID="${USER_GID:-3500}"
diff --git a/go.mod b/go.mod
index c8fb988..54fc4b4 100644
--- a/go.mod
+++ b/go.mod
@@ -1,8 +1,9 @@
module riedstra.dev/mitch/steam-export
-go 1.16
+go 1.19
require (
+ github.com/barkimedes/go-deepcopy v0.0.0-20220514131651-17c30cfc62df // indirect
github.com/gorilla/mux v1.8.0
github.com/swaggo/http-swagger v1.1.1
github.com/swaggo/swag v1.7.0
diff --git a/go.sum b/go.sum
index f2b0616..7cc1a4c 100644
--- a/go.sum
+++ b/go.sum
@@ -6,6 +6,8 @@ github.com/PuerkitoBio/purell v1.1.1/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbt
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 h1:d+Bc7a5rLufV/sSk/8dngufqelfh6jnri85riMAaF/M=
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
+github.com/barkimedes/go-deepcopy v0.0.0-20220514131651-17c30cfc62df h1:GSoSVRLoBaFpOOds6QyY1L8AX7uoY+Ln3BHc22W40X0=
+github.com/barkimedes/go-deepcopy v0.0.0-20220514131651-17c30cfc62df/go.mod h1:hiVxq5OP2bUGBRNS3Z/bt/reCLFNbdcST6gISi1fiOM=
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
diff --git a/jobStatus/main.go b/jobStatus/main.go
new file mode 100644
index 0000000..9d5f955
--- /dev/null
+++ b/jobStatus/main.go
@@ -0,0 +1 @@
+package jobStatus
diff --git a/steam/delete.go b/steam/delete.go
index aa78e22..eb70a50 100644
--- a/steam/delete.go
+++ b/steam/delete.go
@@ -3,9 +3,25 @@ package steam
import (
"os"
"path/filepath"
+ "sync"
+
+ "riedstra.dev/mitch/steam-export/tasks"
)
-// Delete removes all of the game files and the ACF
+type DeleteJob struct {
+ delFunc tasks.TaskFunc
+ m sync.Mutex
+}
+
+func newDeleteJob(f delFunc) {
+
+}
+
+func (l *Library) Delete(game string) error {
+ l.status.Add("")
+}
+
+// Delete removes all the game files and the ACF
func (l *Library) Delete(game string) error {
g, ok := l.games[game]
if !ok {
diff --git a/steam/extract.go b/steam/extract.go
index 45a760b..4f340fe 100644
--- a/steam/extract.go
+++ b/steam/extract.go
@@ -2,7 +2,6 @@ package steam
import (
"archive/tar"
- "errors"
"fmt"
"io"
"net/url"
@@ -20,11 +19,11 @@ const updateEveryNBytes = 10 * 1024 * 1024 // 10mb
//
// For example the following forms are accepted:
//
-// ExtractSmart("http://127.0.0.1/some-archive")
-// ExtractSmart("https://example.com/some-archive")
-// ExtractSmart("file:///some/local/file/path/to/archive.tar")
-// ExtractSmart("/direct/path/to/archive.tar")
-// ExtractSmart("C:\Users\user\Downloads\archive.tar")
+// ExtractSmart("http://127.0.0.1/some-archive")
+// ExtractSmart("https://example.com/some-archive")
+// ExtractSmart("file:///some/local/file/path/to/archive.tar")
+// ExtractSmart("/direct/path/to/archive.tar")
+// ExtractSmart("C:\Users\user\Downloads\archive.tar")
func (l *Library) ExtractSmart(uri string) (*Game, error) {
if strings.HasPrefix(uri, "http") {
_, err := url.Parse(uri)
@@ -45,7 +44,7 @@ func (l *Library) ExtractSmart(uri string) (*Game, error) {
// ExtractFile is a wrapper around Extract that handles local files. this
// spawns an "extractFile" on the library. Status will be updated there as this
-// goes along. Non fatal and fatal errors will be populated there
+// goes along. Non-fatal and fatal errors will be populated there
func (l *Library) ExtractFile(fn string) (*Game, error) {
g := &Game{}
j := newJob("extractFile", g)
@@ -125,7 +124,7 @@ func (l *Library) extractUpdate(j *Job, g *Game, rdr io.Reader) (*Game, error) {
estSize := j.GetSize()
if estSize == nil {
- j.addError(errors.New("Expected an estimated size, got nil"))
+ j.addError(E_NoEstimatedSize)
continue
}
diff --git a/steam/game.go b/steam/game.go
index dd4a297..8c387d2 100644
--- a/steam/game.go
+++ b/steam/game.go
@@ -5,6 +5,14 @@ import (
"path/filepath"
)
+// Game represents an actual game in the steam Library. The purpose is only
+// to provide info on a game.
+type Game struct {
+ Name string `json:"Name" example:"Doom"`
+ LibraryPath string `json:"LibraryPath" example:"C:\\Program Files (x86)\\Steam\\steamapps"`
+ Size int64 `json:"Size" example:"12345"`
+}
+
// GetSizeBytes returns the size in bytes, calling SetSizeInfo info Size is
// currently == 0
func (g *Game) GetSizeBytes() int64 {
diff --git a/steam/package.go b/steam/package.go
index db3d6cf..3be5ccf 100644
--- a/steam/package.go
+++ b/steam/package.go
@@ -2,7 +2,6 @@ package steam
import (
"archive/tar"
- "errors"
"io"
"path/filepath"
"time"
@@ -14,7 +13,9 @@ const (
)
func (l *Library) Package(game string, wr io.Writer) error {
+ l.m.Lock()
if _, ok := l.Games()[game]; !ok {
+ l.m.Unlock()
return E_GameDoesNotExist
}
@@ -26,7 +27,7 @@ func (l *Library) Package(game string, wr io.Writer) error {
j.setSize(g.GetSizeBytes())
- // Invert the writer so we can break up the copy and get progress
+ // Invert the writer, so we can break up the copy and get progress
// information in here
rdr, pwrtr := io.Pipe()
go func() {
@@ -51,7 +52,7 @@ func (l *Library) Package(game string, wr io.Writer) error {
total += n
j.setTransferred(total)
- elapsedSeconds := float64(time.Since(*j.StartTime()).Seconds())
+ elapsedSeconds := time.Since(*j.StartTime()).Seconds()
rate := float64(total) / elapsedSeconds
@@ -60,14 +61,14 @@ func (l *Library) Package(game string, wr io.Writer) error {
estSize := j.GetSize()
if estSize == nil {
- j.addError(errors.New("Expected an estimated size, got nil"))
+ j.addError(E_NoEstimatedSize)
continue
}
remainingBytes := float64(*estSize - total)
// fmt.Println("remaining bytes: ", formatBytes(int64(remainingBytes)))
- seconds := (remainingBytes / rate)
+ seconds := remainingBytes / rate
duration := time.Duration(seconds * 1000 * 1000 * 1000)
// fmt.Println("Raw duration: ", duration)
@@ -79,7 +80,7 @@ func (l *Library) Package(game string, wr io.Writer) error {
}
// Package writes the package to wr, returning errors if any
-func (l *Library) packagePrimitive(j *Job, g *Game, wr io.WriteCloser) error {
+func (l *Library) packagePrimitive(g *Game, wr io.WriteCloser) error {
acf, err := FindACF(l.folder, g.Name)
if err != nil {
diff --git a/steam/status.go b/steam/status.go
deleted file mode 100644
index 6b0d874..0000000
--- a/steam/status.go
+++ /dev/null
@@ -1,313 +0,0 @@
-package steam
-
-import (
- "encoding/json"
- "sync"
- "time"
-
- "fmt"
- "os"
-)
-
-var debuglogging = false
-
-func debugLogJob(s string, args ...interface{}) {
- if debuglogging {
- fmt.Fprintf(os.Stderr, s, args...)
- }
-}
-
-func debugLogJobs(s string, args ...interface{}) {
- if debuglogging {
- fmt.Fprintf(os.Stderr, s, args...)
- }
-}
-
-// JobStatus provides specific information about an individual job
-type Job struct {
- action string
- target *Game
- running bool
- start *time.Time
- errors []error
-
- // If applicablle
- size *int64
- transferred *int64
- eta *time.Duration
-
- m sync.Mutex
-}
-
-type JobStatusJson struct {
- Action string `json:"Action" example:"extractHTTP,delete"` // What action is being run?
- Target *Game `json:"Target" example:"Doom"` // Name of the target game
- Running bool `json:"Running" example:"false"` // Whether or not the job is running
- Start *time.Time `json:"Start" example:"1629855616"` // Start time as a unix timestamp
- Errors []string `json:"Errors"` // List of all errors encountered through the course of the job
-
- // If applicablle
- Size *int64 `json:"Size" example:"12345"` // Game size in bytes
- Transferred *int64 `json:"Transferred" example:"1234"` // Bytes transferred
- Eta *time.Duration `json:"ETA" example:"1234"` // Time in seconds until it finishes
-}
-
-func (j Job) MarshalJSON() ([]byte, error) {
-
- errs := []string{}
- for _, e := range j.errors {
- errs = append(errs, e.Error())
- }
-
- return json.Marshal(
- &JobStatusJson{
- Action: j.action,
- Target: j.target,
- Running: j.running,
- Start: j.start,
- Errors: errs,
- Size: j.size,
- Transferred: j.transferred,
- Eta: j.eta,
- })
-}
-
-// Action is a short string describing the action, i.e. "packaging", "deleting"
-func (j *Job) Action() string {
- j.m.Lock()
- defer j.m.Unlock()
- debugLogJob("Action on: '%s'\n", *j)
- return j.action
-}
-
-// Target returns the game that is the target of the action
-func (j *Job) Target() *Game {
- j.m.Lock()
- defer j.m.Unlock()
- debugLogJob("Target on: '%s'\n", *j)
- return j.target
-}
-
-// IsRunning returns true if a job is currently running, otherwise false
-func (j *Job) IsRunning() bool {
- j.m.Lock()
- defer j.m.Unlock()
- debugLogJob("IsRunning on: '%s'\n", *j)
- return j.running
-}
-
-// StartTime returns the time in which the job started
-func (j *Job) StartTime() *time.Time {
- j.m.Lock()
- defer j.m.Unlock()
- debugLogJob("StartTime on: '%s'\n", *j)
- return j.start
-}
-
-func (j *Job) Errors() []error {
- j.m.Lock()
- defer j.m.Unlock()
- debugLogJob("errors on: '%s'\n", *j)
- return j.errors
-}
-
-// newJob sets up a job of action for the target Game
-func newJob(action string, target *Game) *Job {
- debugLogJob("New job: '%s' target: '%s'\n", action, target)
- t := time.Now()
- return &Job{
- action: action,
- target: target,
- running: true,
- start: &t,
- }
-}
-
-func (j *Job) setSize(size int64) {
- j.m.Lock()
- defer j.m.Unlock()
- debugLogJob("setSize on: '%s'\n", *j)
- j.size = &size
-}
-
-// GetSize returns the size set if applicable for the operation
-func (j *Job) GetSize() *int64 {
- j.m.Lock()
- defer j.m.Unlock()
- debugLogJob("getSize on: '%s'\n", *j)
- return j.size
-}
-
-func (j *Job) setTransferred(transferred int64) {
- j.m.Lock()
- defer j.m.Unlock()
- debugLogJob("setTransferred on: '%s'\n", *j)
- j.transferred = &transferred
-}
-
-// GetTransferred returns the transferred set if applicable for the operation
-func (j *Job) GetTransferred() *int64 {
- j.m.Lock()
- defer j.m.Unlock()
- debugLogJob("GetTransferred on: '%s'\n", *j)
- return j.transferred
-}
-
-// setETA sets the eta to the speicifed duration
-func (j *Job) setETA(d time.Duration) {
- j.m.Lock()
- defer j.m.Unlock()
- debugLogJob("setETA on: '%s'\n", *j)
- j.eta = &d
-}
-
-// GetETA returns the ETA to completion as a *time.Duration. nil represents
-// when
-func (j *Job) GetETA() *time.Duration {
- j.m.Lock()
- defer j.m.Unlock()
- debugLogJob("GetETA on: '%s'\n", *j)
- return j.eta
-}
-
-// done sets running to false
-func (j *Job) done() {
- j.m.Lock()
- defer j.m.Unlock()
- j.running = false
-}
-
-// addError appends an error to the internal slice of errors
-func (j *Job) addError(err error) {
- j.m.Lock()
- defer j.m.Unlock()
- debugLogJob("add error on: '%s'\n", *j)
- j.errors = append(j.errors, err)
-}
-
-// Jobs is by the Library to determine whether or not we currently have any
-// jobs running on this library, as well as to give some history as to
-// what jobs have been run previously
-type Jobs struct {
- running []*Job
- previous []*Job
-
- m sync.Mutex
-}
-
-func (jobs Jobs) MarshalJSON() ([]byte, error) {
- jobs.scan()
- return json.Marshal(
- struct {
- Running []*Job `json:"Running"`
- Previous []*Job `json:"Previous"`
- }{
- Running: jobs.running,
- Previous: jobs.previous,
- })
-}
-
-func (jobs Jobs) String() string {
- b, err := json.Marshal(jobs)
- if err != nil {
- panic(err)
- }
- return string(b)
-}
-
-func (jobs *Jobs) scan() {
- jobs.m.Lock()
- defer jobs.m.Unlock()
- debugLogJobs("scan on: '%s'\n", *jobs)
- running := []*Job{}
- notrunning := []*Job{}
-
- for _, job := range jobs.running {
- if job == nil {
- continue
- }
-
- if job.IsRunning() == true {
- running = append(running, job)
- } else {
- notrunning = append(notrunning, job)
- }
- }
-
- if len(notrunning) > 0 {
- jobs.previous = append(jobs.previous, notrunning...)
- }
-
- jobs.running = running
-}
-
-// otherThanCurrent will return true if there's another job running on the
-// game specified. It's the caller's responsibility to check that the provided
-// job has a game of not nil, otherwise a panic will occur
-func (jobs *Jobs) otherThanCurrent(j *Job) bool {
- for _, job := range jobs.GetRunningJobs() {
- if job == j {
- continue
- }
-
- g := job.Target()
-
- if g == nil {
- continue
- }
-
- if g.Name == j.Target().Name {
- return true
- }
- }
-
- return false
-}
-
-// Running returns true if any job is currently running, otherwise false
-func (jobs *Jobs) Running() bool {
- jobs.scan()
- jobs.m.Lock()
- defer jobs.m.Unlock()
- if len(jobs.running) == 0 {
- return false
- }
- debugLogJobs("running on: '%s'\n", *jobs)
- return true
-}
-
-// GetJobs returns all of the jobs regardless of their state
-func (jobs *Jobs) GetJobs() []*Job {
- jobs.scan()
- jobs.m.Lock()
- defer jobs.m.Unlock()
- debugLogJobs("GetJobs on: '%s'\n", *jobs)
- return append(jobs.running, jobs.previous...)
-}
-
-// GetRunningJobs returns all of the running jobs
-func (jobs *Jobs) GetRunningJobs() []*Job {
- jobs.scan()
- jobs.m.Lock()
- defer jobs.m.Unlock()
- debugLogJobs("GetRunningJobs on: '%s'\n", *jobs)
- return jobs.running
-}
-
-// GetStoppedJobs returns all of the stopped jobs
-func (jobs *Jobs) GetStoppedJobs() []*Job {
- jobs.scan()
- jobs.m.Lock()
- defer jobs.m.Unlock()
- debugLogJobs("GetStoppedJobs on: '%s'\n", *jobs)
- return jobs.previous
-}
-
-// addJob adds a job to the internal slice
-func (jobs *Jobs) addJob(j *Job) {
- jobs.m.Lock()
- jobs.running = append(jobs.running, j)
- debugLogJobs("addJob on: '%s'\n", *jobs)
- jobs.m.Unlock()
- jobs.scan()
-}
diff --git a/steam/steam.go b/steam/steam.go
index 03fa51e..418cbf4 100644
--- a/steam/steam.go
+++ b/steam/steam.go
@@ -6,15 +6,21 @@ package steam
import (
"errors"
"fmt"
- "io/ioutil"
+ "os"
"regexp"
+ "riedstra.dev/mitch/steam-export/tasks"
"sync"
+
+ "github.com/barkimedes/go-deepcopy"
)
var (
- E_GameDoesNotExist = errors.New("Game does not exist")
- E_BadURI = errors.New("The URI supplied is not understood")
- E_OperationConflict = errors.New("Another conflicting job is running on this game right now")
+ E_GameDoesNotExist = errors.New("game does not exist")
+ E_BadURI = errors.New("the URI supplied is not understood")
+ E_OperationConflict = errors.New("another conflicting job is running on this game right now")
+ E_NoEstimatedSize = errors.New("expected an estimated size, got nil")
+ E_LibraryLocked = errors.New("cannot process library with actions running, library is locked")
+ E_LibraryNoCommon = errors.New("no common directory")
)
// Library is used to represent the steam library, the Games map is populated
@@ -26,19 +32,11 @@ var (
type Library struct {
folder string
games map[string]*Game
- status *Jobs
+ status *tasks.Group
m sync.Mutex
}
-// Game represents an actual game in the steam Library. The purpose is only
-// to provide info on a game.
-type Game struct {
- Name string `json:"Name" example:"Doom"`
- LibraryPath string `json:"LibraryPath" example:"C:\\Program Files (x86)\\Steam\\steamapps"`
- Size int64 `json:"Size" example:"12345"`
-}
-
var slugregexp = regexp.MustCompile(`[^-0-9A-Za-z_:.]`)
// Slug returns a safer version of the name with spaces and other chars
@@ -51,10 +49,7 @@ func (g Game) Slug() string {
// if any
func NewLibrary(path string) (*Library, error) {
l := &Library{
- status: &Jobs{
- running: make([]*Job, 0),
- previous: make([]*Job, 0),
- },
+ status: tasks.NewGroup(),
}
err := l.ProcessLibrary(path)
if err != nil {
@@ -84,16 +79,18 @@ func (l *Library) Folder() string {
func (l *Library) Games() map[string]*Game {
l.m.Lock()
defer l.m.Unlock()
- return l.games
+ g := deepcopy.MustAnything(l.games)
+ return g.(map[string]*Game)
}
-// Jobs returns the current *Jobs struct which can be used to keep track
-// of any long running operations on the library as well as any errors
+// Status returns the current Jobs struct which can be used to keep track
+// of any long-running operations on the library as well as any errors
// encountered along the way
-func (l *Library) Status() Jobs {
+func (l *Library) Status() *tasks.Group {
l.m.Lock()
defer l.m.Unlock()
- return *l.status
+ s2 := deepcopy.MustAnything(l.status)
+ return s2.(*tasks.Group)
}
// Refresh simply calls ProcessLibrary to refresh the entire contents of the
@@ -105,19 +102,19 @@ func (l *Library) Refresh() error {
// ProcessLibrary Populates the "Folder" and "Games" fields based on the
// provided directory. Returns an error if any jobs are currently running
func (s *Library) ProcessLibrary(r string) error {
- if s.status.Running() {
- return errors.New("Cannot process library with actions running")
+ if len(s.status.Running()) > 0 {
+ return E_LibraryLocked
}
if !hasCommon(r) {
- return fmt.Errorf("No common directory in: %s", r)
+ return fmt.Errorf("in: '%s': %w", r, E_LibraryNoCommon)
}
s.m.Lock()
defer s.m.Unlock()
s.games = make(map[string]*Game)
- dirs, err := ioutil.ReadDir(r + "/common")
+ dirs, err := os.ReadDir(r + "/common")
if err != nil {
return err
}
@@ -128,7 +125,7 @@ func (s *Library) ProcessLibrary(r string) error {
Name: f.Name(),
LibraryPath: r,
}
- g.SetSizeInfo()
+ _ = g.SetSizeInfo()
s.games[f.Name()] = g
}
@@ -147,7 +144,7 @@ func (s *Library) String() (str string) {
}
func hasCommon(d string) bool {
- dirs, err := ioutil.ReadDir(d)
+ dirs, err := os.ReadDir(d)
if err != nil {
return false
}
diff --git a/tasks/main.go b/tasks/main.go
new file mode 100644
index 0000000..13832c4
--- /dev/null
+++ b/tasks/main.go
@@ -0,0 +1,236 @@
+// Package tasks is a simple package for tracking the state of particular asynchronous, or
+// background tasks. Simply define a "Run" method to satisfy the Task interface, then
+// they can be added to a group for tracking.
+package tasks
+
+import (
+ "errors"
+ "fmt"
+ "io"
+ "log"
+ "reflect"
+ "strings"
+ "sync"
+)
+
+var (
+ EAlreadyRegistered = errors.New("a task has already been registered with this name")
+ EAlreadyStarted = errors.New("specified task has already been started")
+ ENotRegistered = errors.New("task has not been registered")
+
+ // logger = log.New(os.Stderr, "", log.LstdFlags|log.Lshortfile)
+ logger = log.New(io.Discard, "", 0)
+)
+
+const (
+ _ taskState = 1 << iota
+ sTodo
+ sRunning
+ sFinished
+ sError
+)
+
+type taskState int
+
+// Task interface is wraps anything with a Run(), it's assumed
+// that the Run() is blocking and will return when the task
+// is complete.
+type Task interface {
+ Run() error
+}
+
+// Group represents a group of tasks, and will keep track of their various states
+// as well as any errors. While this library does not concurrently access the
+// tasks it's likely you'll want them to be safe for concurrent use if you're
+// accessing the tasks elsewhere.
+//
+// Tasks can be run more than once if Start or Run is called more than once.
+// no effort is made to prevent that other than not allowing jobs to stack.
+// ( They will return an error if they're currently running )
+type Group struct {
+ tasks map[string]*taskEntry
+ m *sync.RWMutex
+}
+
+func NewGroup() *Group {
+ return &Group{
+ tasks: map[string]*taskEntry{},
+ m: &sync.RWMutex{},
+ }
+}
+
+type TaskList map[string]Task
+
+func (tl TaskList) String() string {
+ bld := &strings.Builder{}
+ for name, entry := range tl {
+ bld.Write([]byte{' '})
+ bld.WriteString(fmt.Sprintf("%s->%s(%+v)", name,
+ reflect.TypeOf(entry), entry))
+ }
+ return bld.String()
+}
+
+// Running returns all the running Tasks. These are not copies, so care must
+// be taken if they're not thread safe.
+func (g *Group) Running() TaskList {
+ return g.getForCondition(
+ func(state taskState) bool {
+ return state&sRunning > 0
+ })
+}
+
+// Todo returns all the tasks in the Todo state. Run or Start simply has not
+// been called.
+func (g *Group) Todo() TaskList {
+ return g.getForCondition(
+ func(state taskState) bool {
+ return state&sTodo > 0
+ })
+}
+
+func (g *Group) Finished() TaskList {
+ return g.getForCondition(
+ func(state taskState) bool {
+ return state&sFinished > 0
+ })
+}
+
+func (g *Group) Error() TaskList {
+ return g.getForCondition(
+ func(state taskState) bool {
+ return state&sError > 0
+ })
+}
+
+func (g *Group) GetError(name string) error {
+ var err error
+ g.m.RLock()
+ defer g.m.RUnlock()
+ entry, ok := g.tasks[name]
+ if !ok {
+ return fmt.Errorf("task: %s not found: %w", name, ENotRegistered)
+ }
+ err = entry.Error
+ return err
+}
+
+// Registered returns all the registered tasks
+func (g *Group) Registered() TaskList {
+ return g.getForCondition(
+ func(state taskState) bool {
+ return true
+ })
+}
+
+func (g *Group) getForCondition(condition func(state taskState) bool) TaskList {
+ g.m.Lock()
+ defer g.m.Unlock()
+ out := TaskList{}
+ for name, entry := range g.tasks {
+ if condition(entry.State) {
+ out[name] = entry.Task
+ }
+ }
+ return out
+}
+
+// AddAndStart does exactly what ti says on the tin
+func (g *Group) AddAndStart(name string, task Task) error {
+ err := g.Add(name, task)
+ if err != nil {
+ return err
+ }
+ return g.Start(name)
+}
+
+// AddAndRun does exactly what it says on the tin
+func (g *Group) AddAndRun(name string, task Task) error {
+ err := g.Add(name, task)
+ if err != nil {
+ return err
+ }
+ return g.Run(name)
+}
+
+// Add will register a task with the group, however it does not run it
+// immediately. Allowing you to call Run or Start separately.
+func (g *Group) Add(name string, task Task) error {
+ g.m.Lock()
+ defer g.m.Unlock()
+ _, ok := g.tasks[name]
+ if ok {
+ return fmt.Errorf("%s already registered: %w", name, EAlreadyRegistered)
+ }
+ g.tasks[name] = &taskEntry{
+ Task: task,
+ State: sTodo,
+ }
+ return nil
+}
+
+// Start will start the task but not wait around for it to finish, returns
+// an error if the task isn't registered or is already started.
+func (g *Group) Start(name string) error {
+ g.m.Lock()
+ entry, ok := g.tasks[name]
+ if !ok {
+ g.m.Unlock()
+ return fmt.Errorf("'%s' not registered %w", name, ENotRegistered)
+ }
+ logger.Printf("Start %s -> state: %d condition: %v",
+ name, entry.State, entry.State&sRunning > 0)
+ if entry.State&sRunning > 0 {
+ g.m.Unlock()
+ return fmt.Errorf("'%s' already running %w", name, EAlreadyStarted)
+ }
+ go func() {
+ g.m.Unlock()
+ _ = entry.run(g.m, name)
+ }()
+ return nil
+}
+
+// Run will do the same thing as start, except that it will block and
+// wait for the task to finish, and return the task's error.
+func (g *Group) Run(name string) error {
+ g.m.Lock()
+ entry, ok := g.tasks[name]
+ g.m.Unlock()
+ if !ok {
+ return fmt.Errorf("'%s' not registered %w", name, ENotRegistered)
+ }
+ return entry.run(g.m, name)
+}
+
+type taskEntry struct {
+ Task Task
+ State taskState
+ Error error
+}
+
+func (e *taskEntry) getState(m *sync.RWMutex) taskState {
+ m.RLock()
+ defer m.RUnlock()
+ return e.State
+}
+
+func (e *taskEntry) run(m *sync.RWMutex, name string) error {
+ s := e.getState(m)
+ if s&sRunning > 0 {
+ return fmt.Errorf("'%s' already running %w", name, EAlreadyStarted)
+ }
+ m.Lock()
+ e.State = sRunning
+ m.Unlock()
+
+ err := e.Task.Run()
+ m.Lock()
+ e.State = sFinished
+ e.Error = err
+ if err != nil {
+ e.State = e.State | sError
+ }
+ m.Unlock()
+ return err
+}
diff --git a/tasks/main_test.go b/tasks/main_test.go
new file mode 100644
index 0000000..5c69a81
--- /dev/null
+++ b/tasks/main_test.go
@@ -0,0 +1,138 @@
+package tasks
+
+import (
+ "errors"
+ "log"
+ "os"
+ "testing"
+ "time"
+)
+
+type tShortTask struct{}
+
+func (t *tShortTask) Run() error {
+ time.Sleep(time.Second * 1)
+ return nil
+}
+
+type tLongTask struct{}
+
+func (t *tLongTask) Run() error {
+ time.Sleep(time.Second * 2)
+ return nil
+}
+
+type tErrorTask struct{}
+
+func (t *tErrorTask) Run() error {
+ time.Sleep(time.Second * 2)
+ return eErrorTask
+}
+
+var (
+ eErrorTask = errors.New("tErrorTask testing error")
+
+ testLogger = log.New(os.Stderr, "", log.LstdFlags|log.Lshortfile)
+)
+
+func TestIntegration(t *testing.T) {
+ logger = testLogger
+
+ tg := NewGroup()
+
+ pairs := map[string]Task{
+ "tShortTask": &tShortTask{},
+ "tLongTask": &tLongTask{},
+ "tErrorTask": &tErrorTask{},
+ }
+
+ done := []string{}
+ for name, task := range pairs {
+ err := tg.Add(name, task)
+ if err != nil {
+ t.Fatalf("while adding: %s", err)
+ }
+ testLogger.Println("Added: ", name)
+
+ registered := tg.Registered()
+ testLogger.Println("Currently registered: ", registered)
+ for _, n := range done {
+ _, ok := registered[n]
+ if !ok {
+ t.Fatalf("%s is supposed to be registered", n)
+ }
+ }
+ }
+
+ reg := tg.Registered()
+ testLogger.Println("Registered tasks: ", reg)
+
+ err := tg.Add("tLongTask", &tLongTask{})
+ if !errors.Is(err, EAlreadyRegistered) {
+ t.Errorf("tLongTask should already be registered got: %s", err)
+ }
+
+ err = tg.Start("tLongTask")
+ if err != nil {
+ t.Errorf("starting tLongTask err should've been nil: %s", err)
+ }
+ testLogger.Println("tried starting tLongTask again")
+
+ err = tg.Run("tShortTask")
+ if err != nil {
+ t.Errorf("running tShortTask err should've been nil: %s", err)
+ }
+
+ finished := tg.Finished()
+ testLogger.Println("Finished tasks: ", finished)
+ _, ok := finished["tShortTask"]
+ if !ok {
+ t.Errorf("tShortTask should've been finished and isn't")
+ }
+
+ _, ok = finished["tLongTask"]
+ if ok {
+ t.Errorf("tLongTask is finished and it shouldn't be")
+ }
+
+ err = tg.Start("tLongTask")
+ if !errors.Is(err, EAlreadyStarted) {
+ t.Errorf("tLongTask should've already been started got: %s", err)
+ }
+
+ err = tg.Run("tLongTask")
+ if !errors.Is(err, EAlreadyStarted) {
+ t.Errorf("tLongTask should've already been started got: %s", err)
+ }
+
+ running := tg.Running()
+ testLogger.Println("Running tasks: ", running)
+
+ err = tg.Run("tErrorTask")
+ if err == nil {
+ t.Error("tErrorTask should have produced an error")
+ }
+
+ withErrors := tg.Error()
+ testLogger.Println("Errored tasks: ", withErrors)
+ _, ok = withErrors["tErrorTask"]
+ if !ok {
+ t.Error("tErrorTask should've been found within tg.Error()")
+ }
+
+ err = tg.GetError("tErrorTask")
+ if !errors.Is(err, eErrorTask) {
+ t.Errorf("tErrorTask should've returned an error got: %s", err)
+ }
+
+ finished = tg.Finished()
+ testLogger.Println("Finished tasks: ", finished)
+
+ for name := range pairs {
+ _, ok := finished[name]
+ if !ok {
+ t.Errorf("'%s' should be finished and isn't", name)
+ }
+ }
+
+}