// Package tasks is a simple package for tracking the state of particular asynchronous, or // background tasks. Simply define a "Run" method to satisfy the Task interface, then // they can be added to a group for tracking. package tasks import ( "errors" "fmt" "io" "log" "reflect" "strings" "sync" ) var ( EAlreadyRegistered = errors.New("a task has already been registered with this name") EAlreadyStarted = errors.New("specified task has already been started") ENotRegistered = errors.New("task has not been registered") // logger = log.New(os.Stderr, "", log.LstdFlags|log.Lshortfile) logger = log.New(io.Discard, "", 0) ) const ( _ taskState = 1 << iota sTodo sRunning sFinished sError ) type taskState int // Task interface is wraps anything with a Run(), it's assumed // that the Run() is blocking and will return when the task // is complete. type Task interface { Run() error } // Group represents a group of tasks, and will keep track of their various states // as well as any errors. While this library does not concurrently access the // tasks it's likely you'll want them to be safe for concurrent use if you're // accessing the tasks elsewhere. // // Tasks can be run more than once if Start or Run is called more than once. // no effort is made to prevent that other than not allowing jobs to stack. // ( They will return an error if they're currently running ) type Group struct { tasks map[string]*taskEntry m *sync.RWMutex } func NewGroup() *Group { return &Group{ tasks: map[string]*taskEntry{}, m: &sync.RWMutex{}, } } type TaskList map[string]Task func (tl TaskList) String() string { bld := &strings.Builder{} for name, entry := range tl { bld.Write([]byte{' '}) bld.WriteString(fmt.Sprintf("%s->%s(%+v)", name, reflect.TypeOf(entry), entry)) } return bld.String() } // Running returns all the running Tasks. These are not copies, so care must // be taken if they're not thread safe. func (g *Group) Running() TaskList { return g.getForCondition( func(state taskState) bool { return state&sRunning > 0 }) } // Todo returns all the tasks in the Todo state. Run or Start simply has not // been called. func (g *Group) Todo() TaskList { return g.getForCondition( func(state taskState) bool { return state&sTodo > 0 }) } func (g *Group) Finished() TaskList { return g.getForCondition( func(state taskState) bool { return state&sFinished > 0 }) } func (g *Group) Error() TaskList { return g.getForCondition( func(state taskState) bool { return state&sError > 0 }) } func (g *Group) GetError(name string) error { var err error g.m.RLock() defer g.m.RUnlock() entry, ok := g.tasks[name] if !ok { return fmt.Errorf("task: %s not found: %w", name, ENotRegistered) } err = entry.Error return err } // Registered returns all the registered tasks func (g *Group) Registered() TaskList { return g.getForCondition( func(state taskState) bool { return true }) } func (g *Group) getForCondition(condition func(state taskState) bool) TaskList { g.m.Lock() defer g.m.Unlock() out := TaskList{} for name, entry := range g.tasks { if condition(entry.State) { out[name] = entry.Task } } return out } // AddAndStart does exactly what ti says on the tin func (g *Group) AddAndStart(name string, task Task) error { err := g.Add(name, task) if err != nil { return err } return g.Start(name) } // AddAndRun does exactly what it says on the tin func (g *Group) AddAndRun(name string, task Task) error { err := g.Add(name, task) if err != nil { return err } return g.Run(name) } // Add will register a task with the group, however it does not run it // immediately. Allowing you to call Run or Start separately. func (g *Group) Add(name string, task Task) error { g.m.Lock() defer g.m.Unlock() _, ok := g.tasks[name] if ok { return fmt.Errorf("%s already registered: %w", name, EAlreadyRegistered) } g.tasks[name] = &taskEntry{ Task: task, State: sTodo, } return nil } // Start will start the task but not wait around for it to finish, returns // an error if the task isn't registered or is already started. func (g *Group) Start(name string) error { g.m.Lock() entry, ok := g.tasks[name] if !ok { g.m.Unlock() return fmt.Errorf("'%s' not registered %w", name, ENotRegistered) } logger.Printf("Start %s -> state: %d condition: %v", name, entry.State, entry.State&sRunning > 0) if entry.State&sRunning > 0 { g.m.Unlock() return fmt.Errorf("'%s' already running %w", name, EAlreadyStarted) } go func() { g.m.Unlock() _ = entry.run(g.m, name) }() return nil } // Run will do the same thing as start, except that it will block and // wait for the task to finish, and return the task's error. func (g *Group) Run(name string) error { g.m.Lock() entry, ok := g.tasks[name] g.m.Unlock() if !ok { return fmt.Errorf("'%s' not registered %w", name, ENotRegistered) } return entry.run(g.m, name) } type taskEntry struct { Task Task State taskState Error error } func (e *taskEntry) getState(m *sync.RWMutex) taskState { m.RLock() defer m.RUnlock() return e.State } func (e *taskEntry) run(m *sync.RWMutex, name string) error { s := e.getState(m) if s&sRunning > 0 { return fmt.Errorf("'%s' already running %w", name, EAlreadyStarted) } m.Lock() e.State = sRunning m.Unlock() err := e.Task.Run() m.Lock() e.State = sFinished e.Error = err if err != nil { e.State = e.State | sError } m.Unlock() return err }