aboutsummaryrefslogtreecommitdiff
path: root/tasks/main.go
diff options
context:
space:
mode:
authorMitchell Riedstra <mitch@riedstra.dev>2023-01-20 00:35:09 -0500
committerMitchell Riedstra <mitch@riedstra.dev>2023-01-20 00:35:09 -0500
commitf07efbb6fc7a63055a8424799ce03a5f37539873 (patch)
treeff5983b2cae4cc9b8f2f346a47cb3eb23b2f79ae /tasks/main.go
parentcbfd82db8a20be32ffa82a1afa860729f3097de6 (diff)
downloadsteam-export-f07efbb6fc7a63055a8424799ce03a5f37539873.tar.gz
steam-export-f07efbb6fc7a63055a8424799ce03a5f37539873.tar.xz
Diffstat (limited to 'tasks/main.go')
-rw-r--r--tasks/main.go236
1 files changed, 236 insertions, 0 deletions
diff --git a/tasks/main.go b/tasks/main.go
new file mode 100644
index 0000000..13832c4
--- /dev/null
+++ b/tasks/main.go
@@ -0,0 +1,236 @@
+// Package tasks is a simple package for tracking the state of particular asynchronous, or
+// background tasks. Simply define a "Run" method to satisfy the Task interface, then
+// they can be added to a group for tracking.
+package tasks
+
+import (
+ "errors"
+ "fmt"
+ "io"
+ "log"
+ "reflect"
+ "strings"
+ "sync"
+)
+
+var (
+ EAlreadyRegistered = errors.New("a task has already been registered with this name")
+ EAlreadyStarted = errors.New("specified task has already been started")
+ ENotRegistered = errors.New("task has not been registered")
+
+ // logger = log.New(os.Stderr, "", log.LstdFlags|log.Lshortfile)
+ logger = log.New(io.Discard, "", 0)
+)
+
+const (
+ _ taskState = 1 << iota
+ sTodo
+ sRunning
+ sFinished
+ sError
+)
+
+type taskState int
+
+// Task interface is wraps anything with a Run(), it's assumed
+// that the Run() is blocking and will return when the task
+// is complete.
+type Task interface {
+ Run() error
+}
+
+// Group represents a group of tasks, and will keep track of their various states
+// as well as any errors. While this library does not concurrently access the
+// tasks it's likely you'll want them to be safe for concurrent use if you're
+// accessing the tasks elsewhere.
+//
+// Tasks can be run more than once if Start or Run is called more than once.
+// no effort is made to prevent that other than not allowing jobs to stack.
+// ( They will return an error if they're currently running )
+type Group struct {
+ tasks map[string]*taskEntry
+ m *sync.RWMutex
+}
+
+func NewGroup() *Group {
+ return &Group{
+ tasks: map[string]*taskEntry{},
+ m: &sync.RWMutex{},
+ }
+}
+
+type TaskList map[string]Task
+
+func (tl TaskList) String() string {
+ bld := &strings.Builder{}
+ for name, entry := range tl {
+ bld.Write([]byte{' '})
+ bld.WriteString(fmt.Sprintf("%s->%s(%+v)", name,
+ reflect.TypeOf(entry), entry))
+ }
+ return bld.String()
+}
+
+// Running returns all the running Tasks. These are not copies, so care must
+// be taken if they're not thread safe.
+func (g *Group) Running() TaskList {
+ return g.getForCondition(
+ func(state taskState) bool {
+ return state&sRunning > 0
+ })
+}
+
+// Todo returns all the tasks in the Todo state. Run or Start simply has not
+// been called.
+func (g *Group) Todo() TaskList {
+ return g.getForCondition(
+ func(state taskState) bool {
+ return state&sTodo > 0
+ })
+}
+
+func (g *Group) Finished() TaskList {
+ return g.getForCondition(
+ func(state taskState) bool {
+ return state&sFinished > 0
+ })
+}
+
+func (g *Group) Error() TaskList {
+ return g.getForCondition(
+ func(state taskState) bool {
+ return state&sError > 0
+ })
+}
+
+func (g *Group) GetError(name string) error {
+ var err error
+ g.m.RLock()
+ defer g.m.RUnlock()
+ entry, ok := g.tasks[name]
+ if !ok {
+ return fmt.Errorf("task: %s not found: %w", name, ENotRegistered)
+ }
+ err = entry.Error
+ return err
+}
+
+// Registered returns all the registered tasks
+func (g *Group) Registered() TaskList {
+ return g.getForCondition(
+ func(state taskState) bool {
+ return true
+ })
+}
+
+func (g *Group) getForCondition(condition func(state taskState) bool) TaskList {
+ g.m.Lock()
+ defer g.m.Unlock()
+ out := TaskList{}
+ for name, entry := range g.tasks {
+ if condition(entry.State) {
+ out[name] = entry.Task
+ }
+ }
+ return out
+}
+
+// AddAndStart does exactly what ti says on the tin
+func (g *Group) AddAndStart(name string, task Task) error {
+ err := g.Add(name, task)
+ if err != nil {
+ return err
+ }
+ return g.Start(name)
+}
+
+// AddAndRun does exactly what it says on the tin
+func (g *Group) AddAndRun(name string, task Task) error {
+ err := g.Add(name, task)
+ if err != nil {
+ return err
+ }
+ return g.Run(name)
+}
+
+// Add will register a task with the group, however it does not run it
+// immediately. Allowing you to call Run or Start separately.
+func (g *Group) Add(name string, task Task) error {
+ g.m.Lock()
+ defer g.m.Unlock()
+ _, ok := g.tasks[name]
+ if ok {
+ return fmt.Errorf("%s already registered: %w", name, EAlreadyRegistered)
+ }
+ g.tasks[name] = &taskEntry{
+ Task: task,
+ State: sTodo,
+ }
+ return nil
+}
+
+// Start will start the task but not wait around for it to finish, returns
+// an error if the task isn't registered or is already started.
+func (g *Group) Start(name string) error {
+ g.m.Lock()
+ entry, ok := g.tasks[name]
+ if !ok {
+ g.m.Unlock()
+ return fmt.Errorf("'%s' not registered %w", name, ENotRegistered)
+ }
+ logger.Printf("Start %s -> state: %d condition: %v",
+ name, entry.State, entry.State&sRunning > 0)
+ if entry.State&sRunning > 0 {
+ g.m.Unlock()
+ return fmt.Errorf("'%s' already running %w", name, EAlreadyStarted)
+ }
+ go func() {
+ g.m.Unlock()
+ _ = entry.run(g.m, name)
+ }()
+ return nil
+}
+
+// Run will do the same thing as start, except that it will block and
+// wait for the task to finish, and return the task's error.
+func (g *Group) Run(name string) error {
+ g.m.Lock()
+ entry, ok := g.tasks[name]
+ g.m.Unlock()
+ if !ok {
+ return fmt.Errorf("'%s' not registered %w", name, ENotRegistered)
+ }
+ return entry.run(g.m, name)
+}
+
+type taskEntry struct {
+ Task Task
+ State taskState
+ Error error
+}
+
+func (e *taskEntry) getState(m *sync.RWMutex) taskState {
+ m.RLock()
+ defer m.RUnlock()
+ return e.State
+}
+
+func (e *taskEntry) run(m *sync.RWMutex, name string) error {
+ s := e.getState(m)
+ if s&sRunning > 0 {
+ return fmt.Errorf("'%s' already running %w", name, EAlreadyStarted)
+ }
+ m.Lock()
+ e.State = sRunning
+ m.Unlock()
+
+ err := e.Task.Run()
+ m.Lock()
+ e.State = sFinished
+ e.Error = err
+ if err != nil {
+ e.State = e.State | sError
+ }
+ m.Unlock()
+ return err
+}