diff options
Diffstat (limited to 'tasks/main.go')
| -rw-r--r-- | tasks/main.go | 236 |
1 files changed, 236 insertions, 0 deletions
diff --git a/tasks/main.go b/tasks/main.go new file mode 100644 index 0000000..13832c4 --- /dev/null +++ b/tasks/main.go @@ -0,0 +1,236 @@ +// Package tasks is a simple package for tracking the state of particular asynchronous, or +// background tasks. Simply define a "Run" method to satisfy the Task interface, then +// they can be added to a group for tracking. +package tasks + +import ( + "errors" + "fmt" + "io" + "log" + "reflect" + "strings" + "sync" +) + +var ( + EAlreadyRegistered = errors.New("a task has already been registered with this name") + EAlreadyStarted = errors.New("specified task has already been started") + ENotRegistered = errors.New("task has not been registered") + + // logger = log.New(os.Stderr, "", log.LstdFlags|log.Lshortfile) + logger = log.New(io.Discard, "", 0) +) + +const ( + _ taskState = 1 << iota + sTodo + sRunning + sFinished + sError +) + +type taskState int + +// Task interface is wraps anything with a Run(), it's assumed +// that the Run() is blocking and will return when the task +// is complete. +type Task interface { + Run() error +} + +// Group represents a group of tasks, and will keep track of their various states +// as well as any errors. While this library does not concurrently access the +// tasks it's likely you'll want them to be safe for concurrent use if you're +// accessing the tasks elsewhere. +// +// Tasks can be run more than once if Start or Run is called more than once. +// no effort is made to prevent that other than not allowing jobs to stack. +// ( They will return an error if they're currently running ) +type Group struct { + tasks map[string]*taskEntry + m *sync.RWMutex +} + +func NewGroup() *Group { + return &Group{ + tasks: map[string]*taskEntry{}, + m: &sync.RWMutex{}, + } +} + +type TaskList map[string]Task + +func (tl TaskList) String() string { + bld := &strings.Builder{} + for name, entry := range tl { + bld.Write([]byte{' '}) + bld.WriteString(fmt.Sprintf("%s->%s(%+v)", name, + reflect.TypeOf(entry), entry)) + } + return bld.String() +} + +// Running returns all the running Tasks. These are not copies, so care must +// be taken if they're not thread safe. +func (g *Group) Running() TaskList { + return g.getForCondition( + func(state taskState) bool { + return state&sRunning > 0 + }) +} + +// Todo returns all the tasks in the Todo state. Run or Start simply has not +// been called. +func (g *Group) Todo() TaskList { + return g.getForCondition( + func(state taskState) bool { + return state&sTodo > 0 + }) +} + +func (g *Group) Finished() TaskList { + return g.getForCondition( + func(state taskState) bool { + return state&sFinished > 0 + }) +} + +func (g *Group) Error() TaskList { + return g.getForCondition( + func(state taskState) bool { + return state&sError > 0 + }) +} + +func (g *Group) GetError(name string) error { + var err error + g.m.RLock() + defer g.m.RUnlock() + entry, ok := g.tasks[name] + if !ok { + return fmt.Errorf("task: %s not found: %w", name, ENotRegistered) + } + err = entry.Error + return err +} + +// Registered returns all the registered tasks +func (g *Group) Registered() TaskList { + return g.getForCondition( + func(state taskState) bool { + return true + }) +} + +func (g *Group) getForCondition(condition func(state taskState) bool) TaskList { + g.m.Lock() + defer g.m.Unlock() + out := TaskList{} + for name, entry := range g.tasks { + if condition(entry.State) { + out[name] = entry.Task + } + } + return out +} + +// AddAndStart does exactly what ti says on the tin +func (g *Group) AddAndStart(name string, task Task) error { + err := g.Add(name, task) + if err != nil { + return err + } + return g.Start(name) +} + +// AddAndRun does exactly what it says on the tin +func (g *Group) AddAndRun(name string, task Task) error { + err := g.Add(name, task) + if err != nil { + return err + } + return g.Run(name) +} + +// Add will register a task with the group, however it does not run it +// immediately. Allowing you to call Run or Start separately. +func (g *Group) Add(name string, task Task) error { + g.m.Lock() + defer g.m.Unlock() + _, ok := g.tasks[name] + if ok { + return fmt.Errorf("%s already registered: %w", name, EAlreadyRegistered) + } + g.tasks[name] = &taskEntry{ + Task: task, + State: sTodo, + } + return nil +} + +// Start will start the task but not wait around for it to finish, returns +// an error if the task isn't registered or is already started. +func (g *Group) Start(name string) error { + g.m.Lock() + entry, ok := g.tasks[name] + if !ok { + g.m.Unlock() + return fmt.Errorf("'%s' not registered %w", name, ENotRegistered) + } + logger.Printf("Start %s -> state: %d condition: %v", + name, entry.State, entry.State&sRunning > 0) + if entry.State&sRunning > 0 { + g.m.Unlock() + return fmt.Errorf("'%s' already running %w", name, EAlreadyStarted) + } + go func() { + g.m.Unlock() + _ = entry.run(g.m, name) + }() + return nil +} + +// Run will do the same thing as start, except that it will block and +// wait for the task to finish, and return the task's error. +func (g *Group) Run(name string) error { + g.m.Lock() + entry, ok := g.tasks[name] + g.m.Unlock() + if !ok { + return fmt.Errorf("'%s' not registered %w", name, ENotRegistered) + } + return entry.run(g.m, name) +} + +type taskEntry struct { + Task Task + State taskState + Error error +} + +func (e *taskEntry) getState(m *sync.RWMutex) taskState { + m.RLock() + defer m.RUnlock() + return e.State +} + +func (e *taskEntry) run(m *sync.RWMutex, name string) error { + s := e.getState(m) + if s&sRunning > 0 { + return fmt.Errorf("'%s' already running %w", name, EAlreadyStarted) + } + m.Lock() + e.State = sRunning + m.Unlock() + + err := e.Task.Run() + m.Lock() + e.State = sFinished + e.Error = err + if err != nil { + e.State = e.State | sError + } + m.Unlock() + return err +} |
