aboutsummaryrefslogtreecommitdiff
path: root/tasks
diff options
context:
space:
mode:
authorMitchell Riedstra <mitch@riedstra.dev>2023-01-20 00:35:09 -0500
committerMitchell Riedstra <mitch@riedstra.dev>2023-01-20 00:35:09 -0500
commitf07efbb6fc7a63055a8424799ce03a5f37539873 (patch)
treeff5983b2cae4cc9b8f2f346a47cb3eb23b2f79ae /tasks
parentcbfd82db8a20be32ffa82a1afa860729f3097de6 (diff)
downloadsteam-export-dev-wip.tar.gz
steam-export-dev-wip.tar.xz
Diffstat (limited to 'tasks')
-rw-r--r--tasks/main.go236
-rw-r--r--tasks/main_test.go138
2 files changed, 374 insertions, 0 deletions
diff --git a/tasks/main.go b/tasks/main.go
new file mode 100644
index 0000000..13832c4
--- /dev/null
+++ b/tasks/main.go
@@ -0,0 +1,236 @@
+// Package tasks is a simple package for tracking the state of particular asynchronous, or
+// background tasks. Simply define a "Run" method to satisfy the Task interface, then
+// they can be added to a group for tracking.
+package tasks
+
+import (
+ "errors"
+ "fmt"
+ "io"
+ "log"
+ "reflect"
+ "strings"
+ "sync"
+)
+
+var (
+ EAlreadyRegistered = errors.New("a task has already been registered with this name")
+ EAlreadyStarted = errors.New("specified task has already been started")
+ ENotRegistered = errors.New("task has not been registered")
+
+ // logger = log.New(os.Stderr, "", log.LstdFlags|log.Lshortfile)
+ logger = log.New(io.Discard, "", 0)
+)
+
+const (
+ _ taskState = 1 << iota
+ sTodo
+ sRunning
+ sFinished
+ sError
+)
+
+type taskState int
+
+// Task interface is wraps anything with a Run(), it's assumed
+// that the Run() is blocking and will return when the task
+// is complete.
+type Task interface {
+ Run() error
+}
+
+// Group represents a group of tasks, and will keep track of their various states
+// as well as any errors. While this library does not concurrently access the
+// tasks it's likely you'll want them to be safe for concurrent use if you're
+// accessing the tasks elsewhere.
+//
+// Tasks can be run more than once if Start or Run is called more than once.
+// no effort is made to prevent that other than not allowing jobs to stack.
+// ( They will return an error if they're currently running )
+type Group struct {
+ tasks map[string]*taskEntry
+ m *sync.RWMutex
+}
+
+func NewGroup() *Group {
+ return &Group{
+ tasks: map[string]*taskEntry{},
+ m: &sync.RWMutex{},
+ }
+}
+
+type TaskList map[string]Task
+
+func (tl TaskList) String() string {
+ bld := &strings.Builder{}
+ for name, entry := range tl {
+ bld.Write([]byte{' '})
+ bld.WriteString(fmt.Sprintf("%s->%s(%+v)", name,
+ reflect.TypeOf(entry), entry))
+ }
+ return bld.String()
+}
+
+// Running returns all the running Tasks. These are not copies, so care must
+// be taken if they're not thread safe.
+func (g *Group) Running() TaskList {
+ return g.getForCondition(
+ func(state taskState) bool {
+ return state&sRunning > 0
+ })
+}
+
+// Todo returns all the tasks in the Todo state. Run or Start simply has not
+// been called.
+func (g *Group) Todo() TaskList {
+ return g.getForCondition(
+ func(state taskState) bool {
+ return state&sTodo > 0
+ })
+}
+
+func (g *Group) Finished() TaskList {
+ return g.getForCondition(
+ func(state taskState) bool {
+ return state&sFinished > 0
+ })
+}
+
+func (g *Group) Error() TaskList {
+ return g.getForCondition(
+ func(state taskState) bool {
+ return state&sError > 0
+ })
+}
+
+func (g *Group) GetError(name string) error {
+ var err error
+ g.m.RLock()
+ defer g.m.RUnlock()
+ entry, ok := g.tasks[name]
+ if !ok {
+ return fmt.Errorf("task: %s not found: %w", name, ENotRegistered)
+ }
+ err = entry.Error
+ return err
+}
+
+// Registered returns all the registered tasks
+func (g *Group) Registered() TaskList {
+ return g.getForCondition(
+ func(state taskState) bool {
+ return true
+ })
+}
+
+func (g *Group) getForCondition(condition func(state taskState) bool) TaskList {
+ g.m.Lock()
+ defer g.m.Unlock()
+ out := TaskList{}
+ for name, entry := range g.tasks {
+ if condition(entry.State) {
+ out[name] = entry.Task
+ }
+ }
+ return out
+}
+
+// AddAndStart does exactly what ti says on the tin
+func (g *Group) AddAndStart(name string, task Task) error {
+ err := g.Add(name, task)
+ if err != nil {
+ return err
+ }
+ return g.Start(name)
+}
+
+// AddAndRun does exactly what it says on the tin
+func (g *Group) AddAndRun(name string, task Task) error {
+ err := g.Add(name, task)
+ if err != nil {
+ return err
+ }
+ return g.Run(name)
+}
+
+// Add will register a task with the group, however it does not run it
+// immediately. Allowing you to call Run or Start separately.
+func (g *Group) Add(name string, task Task) error {
+ g.m.Lock()
+ defer g.m.Unlock()
+ _, ok := g.tasks[name]
+ if ok {
+ return fmt.Errorf("%s already registered: %w", name, EAlreadyRegistered)
+ }
+ g.tasks[name] = &taskEntry{
+ Task: task,
+ State: sTodo,
+ }
+ return nil
+}
+
+// Start will start the task but not wait around for it to finish, returns
+// an error if the task isn't registered or is already started.
+func (g *Group) Start(name string) error {
+ g.m.Lock()
+ entry, ok := g.tasks[name]
+ if !ok {
+ g.m.Unlock()
+ return fmt.Errorf("'%s' not registered %w", name, ENotRegistered)
+ }
+ logger.Printf("Start %s -> state: %d condition: %v",
+ name, entry.State, entry.State&sRunning > 0)
+ if entry.State&sRunning > 0 {
+ g.m.Unlock()
+ return fmt.Errorf("'%s' already running %w", name, EAlreadyStarted)
+ }
+ go func() {
+ g.m.Unlock()
+ _ = entry.run(g.m, name)
+ }()
+ return nil
+}
+
+// Run will do the same thing as start, except that it will block and
+// wait for the task to finish, and return the task's error.
+func (g *Group) Run(name string) error {
+ g.m.Lock()
+ entry, ok := g.tasks[name]
+ g.m.Unlock()
+ if !ok {
+ return fmt.Errorf("'%s' not registered %w", name, ENotRegistered)
+ }
+ return entry.run(g.m, name)
+}
+
+type taskEntry struct {
+ Task Task
+ State taskState
+ Error error
+}
+
+func (e *taskEntry) getState(m *sync.RWMutex) taskState {
+ m.RLock()
+ defer m.RUnlock()
+ return e.State
+}
+
+func (e *taskEntry) run(m *sync.RWMutex, name string) error {
+ s := e.getState(m)
+ if s&sRunning > 0 {
+ return fmt.Errorf("'%s' already running %w", name, EAlreadyStarted)
+ }
+ m.Lock()
+ e.State = sRunning
+ m.Unlock()
+
+ err := e.Task.Run()
+ m.Lock()
+ e.State = sFinished
+ e.Error = err
+ if err != nil {
+ e.State = e.State | sError
+ }
+ m.Unlock()
+ return err
+}
diff --git a/tasks/main_test.go b/tasks/main_test.go
new file mode 100644
index 0000000..5c69a81
--- /dev/null
+++ b/tasks/main_test.go
@@ -0,0 +1,138 @@
+package tasks
+
+import (
+ "errors"
+ "log"
+ "os"
+ "testing"
+ "time"
+)
+
+type tShortTask struct{}
+
+func (t *tShortTask) Run() error {
+ time.Sleep(time.Second * 1)
+ return nil
+}
+
+type tLongTask struct{}
+
+func (t *tLongTask) Run() error {
+ time.Sleep(time.Second * 2)
+ return nil
+}
+
+type tErrorTask struct{}
+
+func (t *tErrorTask) Run() error {
+ time.Sleep(time.Second * 2)
+ return eErrorTask
+}
+
+var (
+ eErrorTask = errors.New("tErrorTask testing error")
+
+ testLogger = log.New(os.Stderr, "", log.LstdFlags|log.Lshortfile)
+)
+
+func TestIntegration(t *testing.T) {
+ logger = testLogger
+
+ tg := NewGroup()
+
+ pairs := map[string]Task{
+ "tShortTask": &tShortTask{},
+ "tLongTask": &tLongTask{},
+ "tErrorTask": &tErrorTask{},
+ }
+
+ done := []string{}
+ for name, task := range pairs {
+ err := tg.Add(name, task)
+ if err != nil {
+ t.Fatalf("while adding: %s", err)
+ }
+ testLogger.Println("Added: ", name)
+
+ registered := tg.Registered()
+ testLogger.Println("Currently registered: ", registered)
+ for _, n := range done {
+ _, ok := registered[n]
+ if !ok {
+ t.Fatalf("%s is supposed to be registered", n)
+ }
+ }
+ }
+
+ reg := tg.Registered()
+ testLogger.Println("Registered tasks: ", reg)
+
+ err := tg.Add("tLongTask", &tLongTask{})
+ if !errors.Is(err, EAlreadyRegistered) {
+ t.Errorf("tLongTask should already be registered got: %s", err)
+ }
+
+ err = tg.Start("tLongTask")
+ if err != nil {
+ t.Errorf("starting tLongTask err should've been nil: %s", err)
+ }
+ testLogger.Println("tried starting tLongTask again")
+
+ err = tg.Run("tShortTask")
+ if err != nil {
+ t.Errorf("running tShortTask err should've been nil: %s", err)
+ }
+
+ finished := tg.Finished()
+ testLogger.Println("Finished tasks: ", finished)
+ _, ok := finished["tShortTask"]
+ if !ok {
+ t.Errorf("tShortTask should've been finished and isn't")
+ }
+
+ _, ok = finished["tLongTask"]
+ if ok {
+ t.Errorf("tLongTask is finished and it shouldn't be")
+ }
+
+ err = tg.Start("tLongTask")
+ if !errors.Is(err, EAlreadyStarted) {
+ t.Errorf("tLongTask should've already been started got: %s", err)
+ }
+
+ err = tg.Run("tLongTask")
+ if !errors.Is(err, EAlreadyStarted) {
+ t.Errorf("tLongTask should've already been started got: %s", err)
+ }
+
+ running := tg.Running()
+ testLogger.Println("Running tasks: ", running)
+
+ err = tg.Run("tErrorTask")
+ if err == nil {
+ t.Error("tErrorTask should have produced an error")
+ }
+
+ withErrors := tg.Error()
+ testLogger.Println("Errored tasks: ", withErrors)
+ _, ok = withErrors["tErrorTask"]
+ if !ok {
+ t.Error("tErrorTask should've been found within tg.Error()")
+ }
+
+ err = tg.GetError("tErrorTask")
+ if !errors.Is(err, eErrorTask) {
+ t.Errorf("tErrorTask should've returned an error got: %s", err)
+ }
+
+ finished = tg.Finished()
+ testLogger.Println("Finished tasks: ", finished)
+
+ for name := range pairs {
+ _, ok := finished[name]
+ if !ok {
+ t.Errorf("'%s' should be finished and isn't", name)
+ }
+ }
+
+}