// start to spy on queue's time-out data and throw it
func (q *Queue) StartTimeSpying() {
fmt.Println("time supervisor starts")
go q.startTimeSpying()
}
// detail of StartTimeSpying function
func (q *Queue) startTimeSpying() error {
var err = make(chan string, 0)
go func(queue *Queue, er chan string) {
fmt.Println("start time spying, data in the queue can stay for " q.ExpireAfter.String())
for {
if queue.timeSpy == false {
err <- "spying routine stops because: queue's timeSpy is false, make sure the queue is definition by q=TimeQueue(time.Duration,int)"
return
}
select {
case <-queue.flag:
fmt.Println("time spy executing stops")
return
default:
fmt.Print()
}
ok,er:=queue.timingRemove()
if er!=nil{
err <- er.(errorx.Error).StackTrace()
}
if ok {
time.Sleep(queue.timeStep)
}
}
}(q, err)
select {
case msg := <-err:
fmt.Println("time spy supervisor accidentally stops because: ",msg)
return errorx.NewFromString(msg)
case <-q.flag:
fmt.Println("time spy supervisor stops")
return nil
}
}
// remove those time-out data
func (q *Queue) timingRemove() (bool,error) {
if len(q.Data) <1 {
return true,nil
}
head, index, er := q.THead()
if er != nil {
return false, errorx.Wrap(er)
}
if index < 0 {
return false, errorx.NewFromString("queue'length goes 0")
}
now := time.Now().Unix()
created := time.Unix(head.CreatedAt, 0)
//fmt.Println("now:",now)
//fmt.Println("expire:",created.Add(q.ExpireAfter).Unix())
if created.Add(q.ExpireAfter).Unix() < now {
// out of time
_,_,e := q.TPop()
if e!=nil {
return false, errorx.Wrap(e)
}
if len(q.Data) >0 {
return q.timingRemove()
}else{
return true,nil
}
} else{
return true ,nil
}
}