.Net FileSystemWatcher
Intro
The sample sourece code demonstrates how to create a watch command
that will observe a file directory for changes. The design of the app allows to get all the file system events as fast as possible
into a Channel
Channels and Workers
The following diagram shows how a single event will be process from the FileSystemWatcher delegates to the final worker that will process it.
---
title: Channels and Workers
---
flowchart TD
A[FileSystemWatcher] --> |Publish Event|B[Channel<FileSystemEventStruct>]
B --> |Consume Event|C[FileSystemStructEventFilterer]
C --> |Accept Event| D[Channel<TextFileChangedEvent>]
C --> |Reject| A
D --> |Consume| E[DiffGenerator]
The above diagram shows the flow of a single event from the FileSystemWatcher to the final worker that will process the event. In this case if an event is rejected by the FileSystemStructEventFilterer it will be discarded and the FileSystemWatcher will continue to listen for new events.
Marille supports IWorkers to throw exceptions in their ConsumeAsync methods. This is useful to be able to track problems when processing an event. Whenever a worker throws an exception the channel will continue publishing and consuming events yet a IErrorWorker will receive a new event with the exception. Each channel must have a IErorWorker that will process the exceptions.
---
title: Success event flow
---
sequenceDiagram
participant FS as FileSystemWatcher
participant CH1 as Channel<FileSystemEventStruct>
participant W1 as FileSystemStructEventFilterer
participant CH2 as Channel<TextFileChangedEvent>
participant W2 as DiffGenerator
participant E1 as ErrorWorker<FileSystemEventStruct>
participant E2 as ErrorWorker<TextFileChangedEvent>
alt Accept Event
FS->>CH1: Publish Event
activate CH1
CH1->>W1: Consume Event
deactivate CH1
activate W1
W1->>CH2: Publish Event
deactivate W1
activate CH2
CH2->>W2: Consume Event
deactivate CH2
activate W2
W2->>W2: Process Event
deactivate W2
else FileSystemStructEventFilterer Throws Exception
FS->>CH1: Publish Event
activate CH1
CH1->>W1: Consume Event
deactivate CH1
activate W1
W1->>CH1: Throw Exception
deactivate W1
activate CH1
CH1->>E1: Publish ErrorEvent
deactivate CH1
activate E1
E1->>E1: Log Exception
deactivate E1
else DiffGenerator Throws Exception
FS->>CH1: Publish Event
activate CH1
CH1->>W1: Consume Event
deactivate CH1
activate W1
W1->>CH2: Accept Event
deactivate W1
activate CH2
CH2->>W2: Consume Event
deactivate CH2
activate W2
W2->>CH2: Throw Exception
deactivate W2
activate CH2
CH2->>E2: Publish ErrorEvent
deactivate CH2
activate E2
E2->>E2: Log Exception
deactivate E2
end
Creation and Registration
The following code is the one that is used to create the channels and workers.
// workers
var eventFilter = new FileSystemStructEventFilterer (hub);
var diffGenerator = new DiffGenerator (snapshot);
// error workers
var fsEventsErrorHandler = new FileSystemEventStructErrorHandler ();
var textFileChangedErrorHandler = new TextFileChangedErrorHandler ();
// channel configurations
var fsEventsConfig = new TopicConfiguration { Mode = ChannelDeliveryMode.AtLeastOnceSync };
var txtEventsConfig = new TopicConfiguration { Mode = ChannelDeliveryMode.AtMostOnceAsync };
// create the channels and registed the workers
await hub.CreateAsync (nameof(FileSystemWatcher), txtEventsConfig, textFileChangedErrorHandler,
diffGenerator);
await hub.CreateAsync (nameof(FileSystemWatcher), fsEventsConfig, fsEventsErrorHandler, eventFilter);
In the above code we do the following:
- Create a new instance of the FileSystemStructEventFilterer and DiffGenerator. This will be the workers that will process the events.
- Create a new instance of the FileSystemEventStructErrorHandler and TextFileChangedErrorHandler. This will be the error workers that will process any possible exceptions that are thrown by the consuming workers. Each worker will be registered to a specific channel.
- Use the hub to create the channels and workers. The hub will create the channels and workers and will registered the workers to the channels. Each channel will have a specific configuration that will define how the events will be delivered.
FileSystemStructEventFilterer implementation
The FileSystemStructEventFilterers job is to filter the events that are not text files. The worker will use the IsValidMimeType method to verify the MIME type of the file. If the MIME type is not 'text/*' or 'application/json' the event will be rejected and the worker will return false.
IsValidMimeType implementation:
protected bool IsValidMimeType (string path)
{
var mime = MimeGuesser.GuessFileType (path);
Log.Information ("Mime type for {Path} is {Mime}", path, mime.MimeType);
return mime.MimeType switch {
"application/json" => true,
var str when str.StartsWith ("text/") => true,
_ => false,
};
Filtering the event:
TextFileChangedEvent? textFileChangedEvent = default;
switch (currentMessage.ChangeType) {
case WatcherChangeTypes.Changed:
case WatcherChangeTypes.Created: {
if (IsValidMimeType (currentMessage.FullPath))
textFileChangedEvent = TextFileChangedEventFactory.FromFileSystemEventStruct (currentMessage);
break;
}
case WatcherChangeTypes.Deleted:
textFileChangedEvent = TextFileChangedEventFactory.FromFileSystemEventStruct (currentMessage);
break;
case WatcherChangeTypes.Renamed:
if (currentMessage.DestinationFullPath is not null && IsValidMimeType (currentMessage.DestinationFullPath))
textFileChangedEvent = TextFileChangedEventFactory.FromFileSystemEventStruct (currentMessage);
break;
}
Publishing the event if it was accepted:
if (textFileChangedEvent is not null)
await _hub.PublishAsync (nameof(FileSystemWatcher), textFileChangedEvent.Value);
DiffGenerator implementation
The DiffGenerator worker will process the events that are text files. The worker will read the file and will generate a diff of the file. The idea of the worker is to show the differences between the current file and the previous file and perform a CPU intensive operation.
The first thing to note is that the DiffGenerator states that is a CPU intensive operation by implementing the UseBackgroundThread property and returning true. This will allow the hub to run the worker ConsumeAsync method in a background thread as opposed to the current thread, which is what happens when the IWorker is IO bound (the default).
public bool UseBackgroundThread => true;
The ConsumeAsync implementation is more complicated. The first thing it does is to determine what type of event we have received and then will generate a diff based on the event type. For example, if the event is a WatcherChangeTypes.Created the worker will read the file and will generate a diff with the new content, if it is a modify event the worker will compare the current content of the file with the content that it had when it took a snapshot of the file.
public async Task ConsumeAsync (TextFileChangedEvent currentMessage, CancellationToken token = default)
{
Log.Information ("Consuming event {Event}", currentMessage);
// ignore events without a path
if (currentMessage.Path is null)
return;
// depending on the event type we will take different actions
var task = currentMessage.Flags switch {
TextFileChangedType.ItemRemoved => ConsumeRemoved (currentMessage, token),
TextFileChangedType.ItemCreated => ConsumeCreated (currentMessage, token),
TextFileChangedType.ItemModified => ConsumeModified (currentMessage, token),
TextFileChangedType.ItemRenamed => ConsumeRenamed (currentMessage, token),
_ => throw new InvalidOperationException ($"Unhandled event type {currentMessage.Flags}"),
};
await task;
}
As you can see the DiffGenerator worker takes advantage of the Marille Exception support and it throws a InvalidOperationException if the event type is not handled. This will allow the hub to catch the exception and to process it in the appropriate ErrorWorker.
ErrorWorkers
The project contains two different error workers with a very simple implementation. Both workers will log the exception that they receive.
TextFileChangedErrorHandler
using Marille.FileSystem.Events;
using Serilog;
namespace Marille.FileSystem.Workers;
public class TextFileChangedErrorHandler : IErrorWorker<TextFileChangedEvent> {
public bool UseBackgroundThread => false;
public Task ConsumeAsync (TextFileChangedEvent message, Exception exception, CancellationToken token = default)
{
Log.Error (exception, "Error processing event {Event}", message);
return Task.CompletedTask;
}
public void Dispose () { }
public ValueTask DisposeAsync () => ValueTask.CompletedTask;
}
FileSystemEventStructErrorHandler
using Marille;
using Serilog;
namespace FileSystemWatcher.Workers;
public class FileSystemEventStructErrorHandler : IErrorWorker<FileSystemEventStruct> {
public bool UseBackgroundThread => false;
public Task ConsumeAsync (FileSystemEventStruct message, Exception exception, CancellationToken token = default)
{
// log the error and the event that caused it, there is not much we can do about it
Log.Error (exception, "Error processing event {Event}", message);
return Task.CompletedTask;
}
public void Dispose () { }
public ValueTask DisposeAsync () => ValueTask.CompletedTask;
}
Events
Marille is very opinionated about the events that can be used in the system. Because events can be consumed by more than one worker it is important to make sure that workers do not mess with the event data while another worker is consuming it (we allow parallel execution). To prevent this Marille limits events to be struct. As structs are value types they are copied when passed to a worker. This means that the worker can modify the event without affecting the original event.
Because of the above mentioned limitation, the sample needs to convert the FileSystemEventArgs events from the FileSystemWatcher to a struct before it is published to the channel. The following code shows how the FileSystemEventStruct is created.
public static FileSystemEventStruct FromEventArgs (FileSystemEventArgs args)
{
return new(args.ChangeType, args.FullPath);
}
public static FileSystemEventStruct FromRenamedEventArgs (RenamedEventArgs args)
{
return new FileSystemEventStruct (args.OldFullPath, args.FullPath);
}
Main Loop
Marilles consuming loops are executed in the current thread. Because of that it is important to make sure that the consuming loop does not block the thread. The FileSystemWatcher also uses the current thread to raise events. If both the Hub and the FileSystemWatcher are running in the same thread it is possible that the FileSystemWatcher will block the Hub from processing, we can avoid the situation by running the hub in a background thread:
// we need to create several things to be able to get the events:
// 1. Hub: will be used to deliver the events to the consumers.
// 2. Worker: will be used to process the events.
// 3. FSMonitor: will be used to monitor the file system and deliver the events to the hub.
var hub = new Hub ();
// because we are going to start a main loop in the main thread, we need the channel initialization to be
// done in a background thread else we will be blocked.
Task.Run (async () => {
// workers will be disposed by the hub
//var worker = new LogEventToConsole ();
var eventFilter = new FileSystemStructEventFilterer (hub);
var diffGenerator = new DiffGenerator (snapshot);
var fsEventsErrorHandler = new FileSystemEventStructErrorHandler ();
var textFileChangedErrorHandler = new TextFileChangedErrorHandler ();
var fsEventsConfig = new TopicConfiguration { Mode = ChannelDeliveryMode.AtLeastOnceSync };
var txtEventsConfig = new TopicConfiguration { Mode = ChannelDeliveryMode.AtMostOnceAsync };
await hub.CreateAsync (nameof(FileSystemWatcher), txtEventsConfig, textFileChangedErrorHandler,
diffGenerator);
await hub.CreateAsync (nameof(FileSystemWatcher), fsEventsConfig, fsEventsErrorHandler, eventFilter);
Log.Information ("Channels created");
});
To be able to pump events from the FileSystemWatcher we need to register to the different events that the FileSystemWatcher raises. The following code shows how to register to the events:
// create a watcher with the needed delegates that will pump the events to the channels
using var watcher = new System.IO.FileSystemWatcher("/Users/mandel/Xamarin/");
watcher.NotifyFilter = NotifyFilters.Attributes
| NotifyFilters.CreationTime
| NotifyFilters.DirectoryName
| NotifyFilters.FileName
| NotifyFilters.LastAccess
| NotifyFilters.LastWrite
| NotifyFilters.Security
| NotifyFilters.Size;
void OnEventRaised (object _, FileSystemEventArgs args)
{
if (!hub.TryPublish (nameof(FileSystemWatcher), FileSystemEventStruct.FromEventArgs (args))) {
Log.Error ("Could not publish {Event}", args);
}
}
watcher.Changed += OnEventRaised;
watcher.Created += OnEventRaised;
watcher.Deleted += OnEventRaised;
watcher.Renamed += OnEventRaised;
watcher.Error += (_, args) => {
Log.Error ("Error on with event {Args}", args);
};
watcher.Filter = "*.txt";
watcher.IncludeSubdirectories = true;
watcher.EnableRaisingEvents = true;
The TryPusblish method allows to publish an event from a sync method and will return false if the event could not be published, false otherwise. This method will fail if we are trying to publish an event in a bounded channel and the channel is full.