@@ -96,28 +96,17 @@ impl Watcher {
9696 } ) ;
9797 }
9898
99- async fn fan_in ( mut rx : watch:: Receiver < HashSet < Url > > , mut watcher : impl notify:: Watcher ) {
99+ async fn fan_in (
100+ mut rx : watch:: Receiver < HashSet < Url > > ,
101+ mut watcher : impl notify:: Watcher + Send + ' static ,
102+ ) {
100103 loop {
101- let ( mut to_unwatch, mut to_watch) : ( HashSet < _ > , HashSet < _ > ) = {
104+ let ( to_unwatch, to_watch) : ( HashSet < _ > , HashSet < _ > ) = {
102105 let ( new, old) = ( & * rx. borrow_and_update ( ) , & * WATCHED . read ( ) ) ;
103106 ( old. difference ( new) . cloned ( ) . collect ( ) , new. difference ( old) . cloned ( ) . collect ( ) )
104107 } ;
105108
106- to_unwatch. retain ( |u| match watcher. unwatch ( u) {
107- Ok ( _) => true ,
108- Err ( e) if matches ! ( e. kind, notify:: ErrorKind :: WatchNotFound ) => true ,
109- Err ( e) => {
110- error ! ( "Unwatch failed: {e:?}" ) ;
111- false
112- }
113- } ) ;
114- to_watch. retain ( |u| watcher. watch ( u, RecursiveMode :: NonRecursive ) . is_ok ( ) ) ;
115-
116- {
117- let mut watched = WATCHED . write ( ) ;
118- watched. retain ( |u| !to_unwatch. contains ( u) ) ;
119- watched. extend ( to_watch) ;
120- }
109+ watcher = Self :: sync_watched ( watcher, to_unwatch, to_watch) . await ;
121110
122111 if !rx. has_changed ( ) . unwrap_or ( false ) {
123112 Self :: sync_linked ( ) . await ;
@@ -173,6 +162,35 @@ impl Watcher {
173162 }
174163 }
175164
165+ async fn sync_watched < W > ( mut watcher : W , to_unwatch : HashSet < Url > , to_watch : HashSet < Url > ) -> W
166+ where
167+ W : notify:: Watcher + Send + ' static ,
168+ {
169+ use notify:: ErrorKind :: WatchNotFound ;
170+
171+ if to_unwatch. is_empty ( ) && to_watch. is_empty ( ) {
172+ return watcher;
173+ }
174+
175+ tokio:: task:: spawn_blocking ( move || {
176+ for u in to_unwatch {
177+ match watcher. unwatch ( & u) {
178+ Ok ( ( ) ) => _ = WATCHED . write ( ) . remove ( & u) ,
179+ Err ( e) if matches ! ( e. kind, WatchNotFound ) => _ = WATCHED . write ( ) . remove ( & u) ,
180+ Err ( e) => error ! ( "Unwatch failed: {e:?}" ) ,
181+ }
182+ }
183+ for u in to_watch {
184+ if watcher. watch ( & u, RecursiveMode :: NonRecursive ) . is_ok ( ) {
185+ WATCHED . write ( ) . insert ( u) ;
186+ }
187+ }
188+ watcher
189+ } )
190+ . await
191+ . unwrap ( )
192+ }
193+
176194 async fn sync_linked ( ) {
177195 let mut new = WATCHED . read ( ) . clone ( ) ;
178196
0 commit comments