From ef63e9af55fcfe3255dddec3197bd8a807152c66 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Fri, 18 Mar 2016 19:37:29 +0100 Subject: Godeps: add github.com/rjeczalik/notify --- .../github.com/rjeczalik/notify/watcher_inotify.go | 396 +++++++++++++++++++++ 1 file changed, 396 insertions(+) create mode 100644 Godeps/_workspace/src/github.com/rjeczalik/notify/watcher_inotify.go (limited to 'Godeps/_workspace/src/github.com/rjeczalik/notify/watcher_inotify.go') diff --git a/Godeps/_workspace/src/github.com/rjeczalik/notify/watcher_inotify.go b/Godeps/_workspace/src/github.com/rjeczalik/notify/watcher_inotify.go new file mode 100644 index 000000000..3ceaa8f3a --- /dev/null +++ b/Godeps/_workspace/src/github.com/rjeczalik/notify/watcher_inotify.go @@ -0,0 +1,396 @@ +// Copyright (c) 2014-2015 The Notify Authors. All rights reserved. +// Use of this source code is governed by the MIT license that can be +// found in the LICENSE file. + +// +build linux + +package notify + +import ( + "bytes" + "errors" + "path/filepath" + "runtime" + "sync" + "sync/atomic" + "syscall" + "unsafe" +) + +// eventBufferSize defines the size of the buffer given to read(2) function. One +// should not depend on this value, since it was arbitrary chosen and may be +// changed in the future. +const eventBufferSize = 64 * (syscall.SizeofInotifyEvent + syscall.PathMax + 1) + +// consumersCount defines the number of consumers in producer-consumer based +// implementation. Each consumer is run in a separate goroutine and has read +// access to watched files map. +const consumersCount = 2 + +const invalidDescriptor = -1 + +// watched is a pair of file path and inotify mask used as a value in +// watched files map. +type watched struct { + path string + mask uint32 +} + +// inotify implements Watcher interface. +type inotify struct { + sync.RWMutex // protects inotify.m map + m map[int32]*watched // watch descriptor to watched object + fd int32 // inotify file descriptor + pipefd []int // pipe's read and write descriptors + epfd int // epoll descriptor + epes []syscall.EpollEvent // epoll events + buffer [eventBufferSize]byte // inotify event buffer + wg sync.WaitGroup // wait group used to close main loop + c chan<- EventInfo // event dispatcher channel +} + +// NewWatcher creates new non-recursive inotify backed by inotify. +func newWatcher(c chan<- EventInfo) watcher { + i := &inotify{ + m: make(map[int32]*watched), + fd: invalidDescriptor, + pipefd: []int{invalidDescriptor, invalidDescriptor}, + epfd: invalidDescriptor, + epes: make([]syscall.EpollEvent, 0), + c: c, + } + runtime.SetFinalizer(i, func(i *inotify) { + i.epollclose() + if i.fd != invalidDescriptor { + syscall.Close(int(i.fd)) + } + }) + return i +} + +// Watch implements notify.watcher interface. +func (i *inotify) Watch(path string, e Event) error { + return i.watch(path, e) +} + +// Rewatch implements notify.watcher interface. +func (i *inotify) Rewatch(path string, _, newevent Event) error { + return i.watch(path, newevent) +} + +// watch adds a new watcher to the set of watched objects or modifies the existing +// one. If called for the first time, this function initializes inotify filesystem +// monitor and starts producer-consumers goroutines. +func (i *inotify) watch(path string, e Event) (err error) { + if e&^(All|Event(syscall.IN_ALL_EVENTS)) != 0 { + return errors.New("notify: unknown event") + } + if err = i.lazyinit(); err != nil { + return + } + iwd, err := syscall.InotifyAddWatch(int(i.fd), path, encode(e)) + if err != nil { + return + } + i.RLock() + wd := i.m[int32(iwd)] + i.RUnlock() + if wd == nil { + i.Lock() + if i.m[int32(iwd)] == nil { + i.m[int32(iwd)] = &watched{path: path, mask: uint32(e)} + } + i.Unlock() + } else { + i.Lock() + wd.mask = uint32(e) + i.Unlock() + } + return nil +} + +// lazyinit sets up all required file descriptors and starts 1+consumersCount +// goroutines. The producer goroutine blocks until file-system notifications +// occur. Then, all events are read from system buffer and sent to consumer +// goroutines which construct valid notify events. This method uses +// Double-Checked Locking optimization. +func (i *inotify) lazyinit() error { + if atomic.LoadInt32(&i.fd) == invalidDescriptor { + i.Lock() + defer i.Unlock() + if atomic.LoadInt32(&i.fd) == invalidDescriptor { + fd, err := syscall.InotifyInit() + if err != nil { + return err + } + i.fd = int32(fd) + if err = i.epollinit(); err != nil { + _, _ = i.epollclose(), syscall.Close(int(fd)) // Ignore errors. + i.fd = invalidDescriptor + return err + } + esch := make(chan []*event) + go i.loop(esch) + i.wg.Add(consumersCount) + for n := 0; n < consumersCount; n++ { + go i.send(esch) + } + } + } + return nil +} + +// epollinit opens an epoll file descriptor and creates a pipe which will be +// used to wake up the epoll_wait(2) function. Then, file descriptor associated +// with inotify event queue and the read end of the pipe are added to epoll set. +// Note that `fd` member must be set before this function is called. +func (i *inotify) epollinit() (err error) { + if i.epfd, err = syscall.EpollCreate1(0); err != nil { + return + } + if err = syscall.Pipe(i.pipefd); err != nil { + return + } + i.epes = []syscall.EpollEvent{ + {Events: syscall.EPOLLIN, Fd: i.fd}, + {Events: syscall.EPOLLIN, Fd: int32(i.pipefd[0])}, + } + if err = syscall.EpollCtl(i.epfd, syscall.EPOLL_CTL_ADD, int(i.fd), &i.epes[0]); err != nil { + return + } + return syscall.EpollCtl(i.epfd, syscall.EPOLL_CTL_ADD, i.pipefd[0], &i.epes[1]) +} + +// epollclose closes the file descriptor created by the call to epoll_create(2) +// and two file descriptors opened by pipe(2) function. +func (i *inotify) epollclose() (err error) { + if i.epfd != invalidDescriptor { + if err = syscall.Close(i.epfd); err == nil { + i.epfd = invalidDescriptor + } + } + for n, fd := range i.pipefd { + if fd != invalidDescriptor { + switch e := syscall.Close(fd); { + case e != nil && err == nil: + err = e + case e == nil: + i.pipefd[n] = invalidDescriptor + } + } + } + return +} + +// loop blocks until either inotify or pipe file descriptor is ready for I/O. +// All read operations triggered by filesystem notifications are forwarded to +// one of the event's consumers. If pipe fd became ready, loop function closes +// all file descriptors opened by lazyinit method and returns afterwards. +func (i *inotify) loop(esch chan<- []*event) { + epes := make([]syscall.EpollEvent, 1) + fd := atomic.LoadInt32(&i.fd) + for { + switch _, err := syscall.EpollWait(i.epfd, epes, -1); err { + case nil: + switch epes[0].Fd { + case fd: + esch <- i.read() + epes[0].Fd = 0 + case int32(i.pipefd[0]): + i.Lock() + defer i.Unlock() + if err = syscall.Close(int(fd)); err != nil && err != syscall.EINTR { + panic("notify: close(2) error " + err.Error()) + } + atomic.StoreInt32(&i.fd, invalidDescriptor) + if err = i.epollclose(); err != nil && err != syscall.EINTR { + panic("notify: epollclose error " + err.Error()) + } + close(esch) + return + } + case syscall.EINTR: + continue + default: // We should never reach this line. + panic("notify: epoll_wait(2) error " + err.Error()) + } + } +} + +// read reads events from an inotify file descriptor. It does not handle errors +// returned from read(2) function since they are not critical to watcher logic. +func (i *inotify) read() (es []*event) { + n, err := syscall.Read(int(i.fd), i.buffer[:]) + if err != nil || n < syscall.SizeofInotifyEvent { + return + } + var sys *syscall.InotifyEvent + nmin := n - syscall.SizeofInotifyEvent + for pos, path := 0, ""; pos <= nmin; { + sys = (*syscall.InotifyEvent)(unsafe.Pointer(&i.buffer[pos])) + pos += syscall.SizeofInotifyEvent + if path = ""; sys.Len > 0 { + endpos := pos + int(sys.Len) + path = string(bytes.TrimRight(i.buffer[pos:endpos], "\x00")) + pos = endpos + } + es = append(es, &event{ + sys: syscall.InotifyEvent{ + Wd: sys.Wd, + Mask: sys.Mask, + Cookie: sys.Cookie, + }, + path: path, + }) + } + return +} + +// send is a consumer function which sends events to event dispatcher channel. +// It is run in a separate goroutine in order to not block loop method when +// possibly expensive write operations are performed on inotify map. +func (i *inotify) send(esch <-chan []*event) { + for es := range esch { + for _, e := range i.transform(es) { + if e != nil { + i.c <- e + } + } + } + i.wg.Done() +} + +// transform prepares events read from inotify file descriptor for sending to +// user. It removes invalid events and these which are no longer present in +// inotify map. This method may also split one raw event into two different ones +// when system-dependent result is required. +func (i *inotify) transform(es []*event) []*event { + var multi []*event + i.RLock() + for idx, e := range es { + if e.sys.Mask&(syscall.IN_IGNORED|syscall.IN_Q_OVERFLOW) != 0 { + es[idx] = nil + continue + } + wd, ok := i.m[e.sys.Wd] + if !ok || e.sys.Mask&encode(Event(wd.mask)) == 0 { + es[idx] = nil + continue + } + if e.path == "" { + e.path = wd.path + } else { + e.path = filepath.Join(wd.path, e.path) + } + multi = append(multi, decode(Event(wd.mask), e)) + if e.event == 0 { + es[idx] = nil + } + } + i.RUnlock() + es = append(es, multi...) + return es +} + +// encode converts notify system-independent events to valid inotify mask +// which can be passed to inotify_add_watch(2) function. +func encode(e Event) uint32 { + if e&Create != 0 { + e = (e ^ Create) | InCreate | InMovedTo + } + if e&Remove != 0 { + e = (e ^ Remove) | InDelete | InDeleteSelf + } + if e&Write != 0 { + e = (e ^ Write) | InModify + } + if e&Rename != 0 { + e = (e ^ Rename) | InMovedFrom | InMoveSelf + } + return uint32(e) +} + +// decode uses internally stored mask to distinguish whether system-independent +// or system-dependent event is requested. The first one is created by modifying +// `e` argument. decode method sets e.event value to 0 when an event should be +// skipped. System-dependent event is set as the function's return value which +// can be nil when the event should not be passed on. +func decode(mask Event, e *event) (syse *event) { + if sysmask := uint32(mask) & e.sys.Mask; sysmask != 0 { + syse = &event{sys: syscall.InotifyEvent{ + Wd: e.sys.Wd, + Mask: e.sys.Mask, + Cookie: e.sys.Cookie, + }, event: Event(sysmask), path: e.path} + } + imask := encode(mask) + switch { + case mask&Create != 0 && imask&uint32(InCreate|InMovedTo)&e.sys.Mask != 0: + e.event = Create + case mask&Remove != 0 && imask&uint32(InDelete|InDeleteSelf)&e.sys.Mask != 0: + e.event = Remove + case mask&Write != 0 && imask&uint32(InModify)&e.sys.Mask != 0: + e.event = Write + case mask&Rename != 0 && imask&uint32(InMovedFrom|InMoveSelf)&e.sys.Mask != 0: + e.event = Rename + default: + e.event = 0 + } + return +} + +// Unwatch implements notify.watcher interface. It looks for watch descriptor +// related to registered path and if found, calls inotify_rm_watch(2) function. +// This method is allowed to return EINVAL error when concurrently requested to +// delete identical path. +func (i *inotify) Unwatch(path string) (err error) { + iwd := int32(invalidDescriptor) + i.RLock() + for iwdkey, wd := range i.m { + if wd.path == path { + iwd = iwdkey + break + } + } + i.RUnlock() + if iwd == invalidDescriptor { + return errors.New("notify: path " + path + " is already watched") + } + fd := atomic.LoadInt32(&i.fd) + if _, err = syscall.InotifyRmWatch(int(fd), uint32(iwd)); err != nil { + return + } + i.Lock() + delete(i.m, iwd) + i.Unlock() + return nil +} + +// Close implements notify.watcher interface. It removes all existing watch +// descriptors and wakes up producer goroutine by sending data to the write end +// of the pipe. The function waits for a signal from producer which means that +// all operations on current monitoring instance are done. +func (i *inotify) Close() (err error) { + i.Lock() + if fd := atomic.LoadInt32(&i.fd); fd == invalidDescriptor { + i.Unlock() + return nil + } + for iwd := range i.m { + if _, e := syscall.InotifyRmWatch(int(i.fd), uint32(iwd)); e != nil && err == nil { + err = e + } + delete(i.m, iwd) + } + switch _, errwrite := syscall.Write(i.pipefd[1], []byte{0x00}); { + case errwrite != nil && err == nil: + err = errwrite + fallthrough + case errwrite != nil: + i.Unlock() + default: + i.Unlock() + i.wg.Wait() + } + return +} -- cgit v1.2.3