aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRagnis Armus <ragnis@aragnis.com>2018-04-15 00:20:28 +0300
committerRagnis Armus <ragnis@aragnis.com>2018-04-15 00:20:28 +0300
commit505aa1cd3e954c05830416b81113067b3071496e (patch)
tree40a2a43b2e99cc98f8aef71c8741bd055ecc6918
parentb9f111747e4d4a04b4163b10ace3a51fa879e584 (diff)
Aggregate searchers in parallel
-rw-r--r--search/search.go49
-rw-r--r--sync.go8
2 files changed, 42 insertions, 15 deletions
diff --git a/search/search.go b/search/search.go
index 97a8e59..7450b3b 100644
--- a/search/search.go
+++ b/search/search.go
@@ -62,29 +62,54 @@ func (ag *Aggregator) AddSearcher(s Searcher) {
// Search performs a search using all searchers and aggregates the results
// removing any duplicates.
-func (ag Aggregator) Search(ctx context.Context, query string) (Results, error) {
+func (ag Aggregator) Search(ctx context.Context, query string) (Results, []error) {
+ var (
+ chRes = make(chan *Result)
+ chErr = make(chan error)
+ chDone = make(chan bool)
+ wait = 0
+ )
+
+ for _, s := range ag.searchers {
+ go func(s Searcher) {
+ list, err := s.Search(ctx, query)
+ if err != nil {
+ chErr <- err
+ } else {
+ for _, r := range list {
+ chRes <- r
+ }
+ }
+ chDone <- true
+ }(s)
+ wait++
+ }
+
var (
ret Results
errs []error
names = make(map[string]bool)
)
- for _, s := range ag.searchers {
- list, err := s.Search(ctx, query)
- if err != nil {
- errs = append(errs, err)
- continue
- }
- for _, r := range list {
+
+ for wait > 0 {
+ select {
+ case r := <-chRes:
if !names[r.Name] {
ret = append(ret, r)
names[r.Name] = true
}
+ case err := <-chErr:
+ errs = append(errs, err)
+ case <-chDone:
+ wait--
+ case <-ctx.Done():
+ if err := ctx.Err(); err != nil {
+ errs = append(errs, err)
+ }
+ wait = 0
}
}
- if ret == nil && len(errs) > 0 {
- return nil, &AggregatorError{"all requests failed", errs}
- }
- return ret, nil
+ return ret, errs
}
// Filter creates a new results list using the filter function
diff --git a/sync.go b/sync.go
index 8c1e6b1..c5adb64 100644
--- a/sync.go
+++ b/sync.go
@@ -108,9 +108,11 @@ func syncShow(show *Show, out chan<- *search.Result, fin chan<- bool) {
query := fmt.Sprintf(show.Query, pointer)
- results, err := aggr.Search(ctx, query)
- if err != nil {
- fmt.Printf("syncing %s: search error: %v\n", show.Name, err)
+ results, errs := aggr.Search(ctx, query)
+ if errs != nil {
+ for _, err := range errs {
+ fmt.Printf("syncing %s: search error: %v\n", show.Name, err)
+ }
break
}
results = results.Filter(func(r *search.Result) bool {