mercredi 6 mai 2015

Publisher-Subscriber setup with WatchService

I am trying to setup a 2 way publisher-subscriber using the WatchService in NIO.

I'm not terribly experienced with threads, so if I'm not making any sense feel free to call me out on it!

This is only a sample to figure out how the library works, but the production code is going to listen for a change in an input file, and when the file changes it will do some calculations and then write to an output file. This output file will be read by another program, some calculations will be run on it. The input file will then be written to and the cycle continues.

For this test though, I am making 2 threads with watchers, the first thread listens on first.txt and writes to second.txt, and the second thread waits on second.txt and writes to first.txt. All that I am doing is incrementing a count variable and writing to each thread's output file. Both of the threads have blocking calls and filters on what files they actually care about, so I figured the behavior would look like

Both threads are waiting on take() call. Change first.txt to start the process This triggers the first thread to change second.txt Which then triggers the second thread to change first.txt and so on.

Or so I hoped. The end result is that the threads get way out of sync and when I do this for count up to 1000, one thread is usually behind by more than 50 points.

Here is the code for the watcher

Watcher(Path input, Path output) throws IOException {
    this.watcher = FileSystems.getDefault().newWatchService();
    this.input = input;
    this.output = output;
    dir = input.getParent();
    dir.register(watcher, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY);
}

void watchAndRespond() throws IOException, InterruptedException {

    while (count < 1000) {

        WatchKey key = watcher.take();

        for (WatchEvent<?> event: key.pollEvents()) {
            if (! event.context().equals(input.getFileName())) {
                continue;
            }

            WatchEvent.Kind kind = event.kind();
            if (kind == OVERFLOW) {
                continue;
            }

            count++;

            try (BufferedWriter out = new BufferedWriter(new FileWriter(output.toFile()))) {
                out.write(count + "");
            }
        }
        key.reset();
    }
}

I don't want to have to read the file to decide whether or not the file has changed, because these files in production could potentially be large.

I feel like maybe it is too complicated and I'm trying to deal with a scraped knee with amputation. Am I using this library incorrectly? Am I using the wrong tool for this job, and if so are there any other file listening libraries that I can use so I don't have to do polling for last edited?

EDIT: Oops, here is the test I wrote that sets up the two threads

@Test
public void when_two_watchers_run_together_they_end_up_with_same_number_of_evaluation() throws InterruptedException, IOException {
    //setup
    Path input = environment.loadResourceAt("input.txt").asPath();
    Path output = environment.loadResourceAt("output.txt").asPath();

    if (Files.exists(input)) {
        Files.delete(input);
    }
    if (Files.exists(output)) {
        Files.delete(output);
    }

    Thread thread1 = makeThread(input, output, "watching input");
    Thread thread2 = makeThread(output, input, "watching output");

    //act
    thread1.start();
    thread2.start();

    Thread.sleep(50);

    BufferedWriter out = new BufferedWriter(new FileWriter(input.toFile()));
    out.write(0 + "");
    out.close();

    thread1.join();
    thread2.join();

    int inputResult = Integer.parseInt(Files.readAllLines(input).get(0));
    int outputResult = Integer.parseInt(Files.readAllLines(output).get(0));
    //assert
    assertThat(inputResult).describedAs("Expected is output file, Actual is input file").isEqualTo(outputResult);
}

public Thread makeThread(Path input, Path output, String threadName) {
    return new Thread(() ->
    {
        try {
            new Watcher(input, output).watchAndRespond();
        }
        catch (IOException | InterruptedException e) {
            fail();
        }

    }, threadName);
}

Aucun commentaire:

Enregistrer un commentaire