From f07efbb6fc7a63055a8424799ce03a5f37539873 Mon Sep 17 00:00:00 2001 From: Mitchell Riedstra Date: Fri, 20 Jan 2023 00:35:09 -0500 Subject: wip --- tasks/main.go | 236 +++++++++++++++++++++++++++++++++++++++++++++++++++++ tasks/main_test.go | 138 +++++++++++++++++++++++++++++++ 2 files changed, 374 insertions(+) create mode 100644 tasks/main.go create mode 100644 tasks/main_test.go (limited to 'tasks') diff --git a/tasks/main.go b/tasks/main.go new file mode 100644 index 0000000..13832c4 --- /dev/null +++ b/tasks/main.go @@ -0,0 +1,236 @@ +// Package tasks is a simple package for tracking the state of particular asynchronous, or +// background tasks. Simply define a "Run" method to satisfy the Task interface, then +// they can be added to a group for tracking. +package tasks + +import ( + "errors" + "fmt" + "io" + "log" + "reflect" + "strings" + "sync" +) + +var ( + EAlreadyRegistered = errors.New("a task has already been registered with this name") + EAlreadyStarted = errors.New("specified task has already been started") + ENotRegistered = errors.New("task has not been registered") + + // logger = log.New(os.Stderr, "", log.LstdFlags|log.Lshortfile) + logger = log.New(io.Discard, "", 0) +) + +const ( + _ taskState = 1 << iota + sTodo + sRunning + sFinished + sError +) + +type taskState int + +// Task interface is wraps anything with a Run(), it's assumed +// that the Run() is blocking and will return when the task +// is complete. +type Task interface { + Run() error +} + +// Group represents a group of tasks, and will keep track of their various states +// as well as any errors. While this library does not concurrently access the +// tasks it's likely you'll want them to be safe for concurrent use if you're +// accessing the tasks elsewhere. +// +// Tasks can be run more than once if Start or Run is called more than once. +// no effort is made to prevent that other than not allowing jobs to stack. +// ( They will return an error if they're currently running ) +type Group struct { + tasks map[string]*taskEntry + m *sync.RWMutex +} + +func NewGroup() *Group { + return &Group{ + tasks: map[string]*taskEntry{}, + m: &sync.RWMutex{}, + } +} + +type TaskList map[string]Task + +func (tl TaskList) String() string { + bld := &strings.Builder{} + for name, entry := range tl { + bld.Write([]byte{' '}) + bld.WriteString(fmt.Sprintf("%s->%s(%+v)", name, + reflect.TypeOf(entry), entry)) + } + return bld.String() +} + +// Running returns all the running Tasks. These are not copies, so care must +// be taken if they're not thread safe. +func (g *Group) Running() TaskList { + return g.getForCondition( + func(state taskState) bool { + return state&sRunning > 0 + }) +} + +// Todo returns all the tasks in the Todo state. Run or Start simply has not +// been called. +func (g *Group) Todo() TaskList { + return g.getForCondition( + func(state taskState) bool { + return state&sTodo > 0 + }) +} + +func (g *Group) Finished() TaskList { + return g.getForCondition( + func(state taskState) bool { + return state&sFinished > 0 + }) +} + +func (g *Group) Error() TaskList { + return g.getForCondition( + func(state taskState) bool { + return state&sError > 0 + }) +} + +func (g *Group) GetError(name string) error { + var err error + g.m.RLock() + defer g.m.RUnlock() + entry, ok := g.tasks[name] + if !ok { + return fmt.Errorf("task: %s not found: %w", name, ENotRegistered) + } + err = entry.Error + return err +} + +// Registered returns all the registered tasks +func (g *Group) Registered() TaskList { + return g.getForCondition( + func(state taskState) bool { + return true + }) +} + +func (g *Group) getForCondition(condition func(state taskState) bool) TaskList { + g.m.Lock() + defer g.m.Unlock() + out := TaskList{} + for name, entry := range g.tasks { + if condition(entry.State) { + out[name] = entry.Task + } + } + return out +} + +// AddAndStart does exactly what ti says on the tin +func (g *Group) AddAndStart(name string, task Task) error { + err := g.Add(name, task) + if err != nil { + return err + } + return g.Start(name) +} + +// AddAndRun does exactly what it says on the tin +func (g *Group) AddAndRun(name string, task Task) error { + err := g.Add(name, task) + if err != nil { + return err + } + return g.Run(name) +} + +// Add will register a task with the group, however it does not run it +// immediately. Allowing you to call Run or Start separately. +func (g *Group) Add(name string, task Task) error { + g.m.Lock() + defer g.m.Unlock() + _, ok := g.tasks[name] + if ok { + return fmt.Errorf("%s already registered: %w", name, EAlreadyRegistered) + } + g.tasks[name] = &taskEntry{ + Task: task, + State: sTodo, + } + return nil +} + +// Start will start the task but not wait around for it to finish, returns +// an error if the task isn't registered or is already started. +func (g *Group) Start(name string) error { + g.m.Lock() + entry, ok := g.tasks[name] + if !ok { + g.m.Unlock() + return fmt.Errorf("'%s' not registered %w", name, ENotRegistered) + } + logger.Printf("Start %s -> state: %d condition: %v", + name, entry.State, entry.State&sRunning > 0) + if entry.State&sRunning > 0 { + g.m.Unlock() + return fmt.Errorf("'%s' already running %w", name, EAlreadyStarted) + } + go func() { + g.m.Unlock() + _ = entry.run(g.m, name) + }() + return nil +} + +// Run will do the same thing as start, except that it will block and +// wait for the task to finish, and return the task's error. +func (g *Group) Run(name string) error { + g.m.Lock() + entry, ok := g.tasks[name] + g.m.Unlock() + if !ok { + return fmt.Errorf("'%s' not registered %w", name, ENotRegistered) + } + return entry.run(g.m, name) +} + +type taskEntry struct { + Task Task + State taskState + Error error +} + +func (e *taskEntry) getState(m *sync.RWMutex) taskState { + m.RLock() + defer m.RUnlock() + return e.State +} + +func (e *taskEntry) run(m *sync.RWMutex, name string) error { + s := e.getState(m) + if s&sRunning > 0 { + return fmt.Errorf("'%s' already running %w", name, EAlreadyStarted) + } + m.Lock() + e.State = sRunning + m.Unlock() + + err := e.Task.Run() + m.Lock() + e.State = sFinished + e.Error = err + if err != nil { + e.State = e.State | sError + } + m.Unlock() + return err +} diff --git a/tasks/main_test.go b/tasks/main_test.go new file mode 100644 index 0000000..5c69a81 --- /dev/null +++ b/tasks/main_test.go @@ -0,0 +1,138 @@ +package tasks + +import ( + "errors" + "log" + "os" + "testing" + "time" +) + +type tShortTask struct{} + +func (t *tShortTask) Run() error { + time.Sleep(time.Second * 1) + return nil +} + +type tLongTask struct{} + +func (t *tLongTask) Run() error { + time.Sleep(time.Second * 2) + return nil +} + +type tErrorTask struct{} + +func (t *tErrorTask) Run() error { + time.Sleep(time.Second * 2) + return eErrorTask +} + +var ( + eErrorTask = errors.New("tErrorTask testing error") + + testLogger = log.New(os.Stderr, "", log.LstdFlags|log.Lshortfile) +) + +func TestIntegration(t *testing.T) { + logger = testLogger + + tg := NewGroup() + + pairs := map[string]Task{ + "tShortTask": &tShortTask{}, + "tLongTask": &tLongTask{}, + "tErrorTask": &tErrorTask{}, + } + + done := []string{} + for name, task := range pairs { + err := tg.Add(name, task) + if err != nil { + t.Fatalf("while adding: %s", err) + } + testLogger.Println("Added: ", name) + + registered := tg.Registered() + testLogger.Println("Currently registered: ", registered) + for _, n := range done { + _, ok := registered[n] + if !ok { + t.Fatalf("%s is supposed to be registered", n) + } + } + } + + reg := tg.Registered() + testLogger.Println("Registered tasks: ", reg) + + err := tg.Add("tLongTask", &tLongTask{}) + if !errors.Is(err, EAlreadyRegistered) { + t.Errorf("tLongTask should already be registered got: %s", err) + } + + err = tg.Start("tLongTask") + if err != nil { + t.Errorf("starting tLongTask err should've been nil: %s", err) + } + testLogger.Println("tried starting tLongTask again") + + err = tg.Run("tShortTask") + if err != nil { + t.Errorf("running tShortTask err should've been nil: %s", err) + } + + finished := tg.Finished() + testLogger.Println("Finished tasks: ", finished) + _, ok := finished["tShortTask"] + if !ok { + t.Errorf("tShortTask should've been finished and isn't") + } + + _, ok = finished["tLongTask"] + if ok { + t.Errorf("tLongTask is finished and it shouldn't be") + } + + err = tg.Start("tLongTask") + if !errors.Is(err, EAlreadyStarted) { + t.Errorf("tLongTask should've already been started got: %s", err) + } + + err = tg.Run("tLongTask") + if !errors.Is(err, EAlreadyStarted) { + t.Errorf("tLongTask should've already been started got: %s", err) + } + + running := tg.Running() + testLogger.Println("Running tasks: ", running) + + err = tg.Run("tErrorTask") + if err == nil { + t.Error("tErrorTask should have produced an error") + } + + withErrors := tg.Error() + testLogger.Println("Errored tasks: ", withErrors) + _, ok = withErrors["tErrorTask"] + if !ok { + t.Error("tErrorTask should've been found within tg.Error()") + } + + err = tg.GetError("tErrorTask") + if !errors.Is(err, eErrorTask) { + t.Errorf("tErrorTask should've returned an error got: %s", err) + } + + finished = tg.Finished() + testLogger.Println("Finished tasks: ", finished) + + for name := range pairs { + _, ok := finished[name] + if !ok { + t.Errorf("'%s' should be finished and isn't", name) + } + } + +} -- cgit v1.2.3