Soji

Soji est un nettoyeur de disque dur.

#!/usr/bin/env python

# My hard drive is full, it's time to clean up!

import copy, curses, os, shutil, sys, time, thread
from optparse import OptionParser

def main():
    """Entry point"""
    data = Data(100)

    # parse user options
    parser = OptionParser()
    parser.add_option('-d', '--debug', action ='store_true', dest='debug',
                      help='debug script')
    parser.add_option('-p', '--port', action='store', type='int', dest='port',
                      help='port number used for debugging')
    (options, args) = parser.parse_args()


    # pick up a starting directory on the command line
    if len(args) and os.path.isdir(args[0]):
        start_dir = args[0]
    else:
        start_dir = os.getcwd()

    ui= UICurses(data, options, args)
    try:
        thread.start_new_thread(scan, (start_dir, data, ui))
        ui.list()
        ui.end()
    except:
        ui.exception()

def scan(dir, data, ui):
    try:
        hard_links = []
        scandir(dir, data, hard_links)
        data.scan = False
    except:
        ui.exception()

def scandir(dir, data, hard_links):
    dirsize = 0
    try:
        list = os.listdir(dir)
    except:
        list = []
    for file in list:
        absfile = dir + '/' + file

        # skip symlink
        if os.path.islink(absfile):
            continue

        # handle hard link
        try:
            fd = os.open(absfile, os.O_RDONLY)
        except OSError:
            # Named pipe causes and error
            # TBD check to see if it's fixed with Python 2.4
            continue
        else:
            ret = os.fstat(fd)
            os.close(fd)
            inode = ret[1]
            nlink = ret[3]

        if inode in hard_links:
            continue

        if os.path.isdir(absfile):
            ret_size = scandir(absfile, data, hard_links)
            data.add_dir(absfile, ret_size)
            dirsize += ret_size
        elif os.path.exists(absfile):
            size = os.path.getsize(absfile)
            dirsize += size
            data.add_file(absfile, size)

        if nlink > 1:
            hard_links.append(inode)

    return dirsize

class Data:
    # scan thread status
    scan = True

    def __init__(self, cap):
        self.file = []
        self.dir = []
        self.data = self.dir
        # len of data array
        self.cap = cap

    def add_file(self, file, size):
        """Add files, largest go to the top"""
        self.add(self.file, file, size)

    def add_dir(self, file, size):
        """Add directories, largest go to the top"""
        self.add(self.dir, file, size)

    def add(self, data, file, size):
        """Helper function, don't call outside of class"""
        for i in range(len(data)):
            if size > data[i][1]:
                data.insert(i, [file, size, False])
                if len(data) > self.cap:
                    del data[self.cap]
                break
        else:
            if len(data) < self.cap:
                data += [[file, size, False]]

    def ismark(self,index):
        if self.data[index][2]:
            return True
        return False

    def get(self, index):
        """"Return data at index"""
        if len(self.data) > index:
            row = self.data[index]
            line = human(row[1]) + row[0]
        else:
            line = ''
        return line

    def len(self):
        return len(self.data)

    def mark(self, index):
        self.data[index][2] = not self.data[index][2]

        # walk the list to mark files inside directories
        if self.data is self.dir and self.data[index][2]:
            dir = self.data[index][0]
            for row in self.data:
                if is_file_in_dir(row[0], dir):
                    row[2] = True
            for row in self.file:
                if is_file_in_dir(row[0], dir):
                    row[2] = True


    def switch(self):
        """Switch between directories and files view"""
        if self.data is self.file:
            self.data = self.dir
        else:
            self.data = self.file
        return self.describe()

    def describe(self):
        """Describe what is being viewed"""
        if self.data is self.file:
            return "files"
        return "directories"
    def delete(self):
        """Delete marked items"""
        # start with file
        r = range(len(self.file))
        r.reverse()
        for i in r:
            if self.file[i][2]:
                file = self.file[i][0]
                del self.file[i]
                if os.path.isfile(file):
                    os.remove(file)

        # finish with directories
        r = range(len(self.dir))
        r.reverse()
        for i in r:
            if self.dir[i][2]:
                dir = self.dir[i][0]
                del self.dir[i]
                if os.path.isdir(dir):
                    shutil.rmtree(dir)

class UICurses:
    def  __init__(self, data, options, args):
        self.data = data
        self.stdscr=curses.initscr()
        # Turn off echoing of keys, and enter cbreak mode,
        # where no buffering is performed on keyboard input
        curses.noecho()
        curses.cbreak()
        # create ui windows
        self.screen()
        self.status = Status(self.wstatus, data, True)

        # debugging helper
        if options.debug:
            # start listening socket for debugging
            import socket, random
            s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            port = random.randint(1025,10000)
            s.bind(('localhost', port))
            if len(args):
                start_dir = args[0]
            else:
                start_dir = ''
            if True:
                os.spawnlp(os.P_NOWAIT, 'nice', 'nice', 'xterm', '-e',
                           sys.argv[0], '--port', str(port), start_dir)
            else:
                # use this if you get a traceback in xterm
                os.spawnlp(os.P_NOWAIT, 'bash', 'bash', '-c',
                           sys.argv[0], '--port', str(port), start_dir)

            s.listen(1)
            conn, addr = s.accept()
            f = conn.makefile('r')
            for line in f:
                print line,
            sys.exit()
        elif options.port:
            import socket
            port = options.port
            s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            s.connect(('localhost', port))
            self.err = s.makefile('w')
        else:
            self.err = sys.stderr

    def exception(self):
        # In the event of an error, restore the terminal
        # to a sane state.
        self.end()
        (etype, value, tb) = sys.exc_info()
        try:
            import rtraceback
            rtraceback.print_exception(etype, value, tb, self.err)
        except:
            import traceback
            traceback.print_exception(etype, value, tb, self.err)
        self.err.flush()
        self.err.close()

    def debug(self, msg):
        self.err.write(msg + '\n')
        self.err.flush()

    def end(self):
        self.stdscr.keypad(0)
        curses.echo()
        curses.nocbreak()
        curses.endwin()

    def screen(self):
        (height, width) = self.stdscr.getmaxyx()
        self.wstatus = curses.newwin(1, width, height-1 , 0)
        self.wlb = curses.newwin(height-1, width, 0, 0)
        return

    def resize(self):
        del self.wlb
        del self.wstatus
        self.screen()

    def list_mode(self):
        self.status.set_mode('List')
        self.status.set_help('(q)uit (s)elect (v)iew')

    def list(self):
        self.list_mode()
        # prevent getch from blocking or scan won't update
        lb =ViewListbox(self.wlb, self.data)
        self.wlb.nodelay(True)
        while True:
            curses.doupdate()
            key = self.wlb.getch()
            if lb.key(key):
                pass
            elif key == curses.KEY_RESIZE:
                self.resize()
                lb.setw(self.wlb)
                self.status.setw(self.wstatus)
            elif key == ord('q'):
                return
            elif key == ord('s'):
                self.select()
                self.list_mode()
            elif key == ord('v'):
                self.data.switch()
                lb.draw()
                self.status.draw()
            if self.data.scan:
                lb.draw()
                time.sleep(.1)
            else:
                self.wlb.nodelay(False)
                self.status.set_scan(False)
                break
        self.select()

    def select(self):
        if self.data.scan:
            data = copy.deepcopy(self.data)
        else:
            data = self.data
        self.status.set_mode('Select')
        self.status.set_help('(d)elete (m)ark (v)iew (q)uit')
        lb = MarkListbox(self.wlb, data)
        while True:
            curses.doupdate()
            key = self.wlb.getch()
            if lb.key(key):
                pass
            elif key == ord('q'):
                return
            elif key == ord('d'):
                if self.status.yesno \
                        ('Are you sure you want to delete marked items ?'):
                    data.delete()
                    lb.draw()
            elif key == ord('v'):
                data.switch()
                lb.draw()
            if self.data.scan:
                time.sleep(.1)
            else:
                self.status.set_scan(False)
                self.wlb.nodelay(False)

class Window:
    def setw(self, w):
        self.w = w
        self.height, self.width = self.w.getmaxyx()
        # enable cursors keys
        w.keypad(1)
        self.draw()

class Status(Window):
    def __init__(self, w, data, scan):
        self.data = data
        self.help = ''
        self.mode = ''
        self.prompt = False
        self.scan = scan
        w.bkgdset(' ', curses.A_REVERSE)
        self.setw(w)

    def set_help(self, help):
        self.help = help
        self.draw()

    def set_mode(self, mode):
        self.mode = mode
        self.draw()

    def set_scan(self, scan):
        self.scan = scan
        self.draw()

    def draw(self):
        if self.prompt:
            line = self.prompt
        else:
            line = self.mode + ' '
            words = [self.data.describe()]
            if self.scan:
                words.append('scanning')
            line += '(' + (', ').join(words) + ') '
            line += self.help
        self.w.erase()
        if len(line) > self.width:
            line = line[:self.width]
        self.w.addstr(line)
        self.w.noutrefresh()

    def yesno(self, prompt):
        ret = False
        prompt += " (y or n) "
        self.prompt = prompt
        self.draw()
        while True:
            curses.doupdate()
            key = self.w.getch()
            if key == ord('n'):
                break
            if key == ord('y'):
                ret = True
                break
        self.prompt = False
        self.draw()
        return ret

class Listbox(Window):
    def __init__(self, w, data):
        self.data = data
        self.setw(w)

    def key(self,key):
        "Handle keypress"
        handled = False
        if key == curses.KEY_DOWN or key == ord('j') or key == ord('p'):
            handled = self.key_down()
            handled = True
        if key == curses.KEY_UP or key == ord('k') or key == ord('n'):
            self.key_up()
            handled = True
        if key == curses.KEY_NPAGE:
            self.page_down()
            handled = True
        if key == curses.KEY_PPAGE:
            self.page_up()
            handled = True
        if key == curses.KEY_HOME:
            self.home()
            handled = True
        if key == curses.KEY_END:
            self.end()
            handled = True
        if not handled:
            handled = self.other_key(key)
        return handled

    def other_key(self, key):
        return False

class ViewListbox(Listbox):
    "List box without selection"
    # index of item on top
    top = 0

    def draw(self):
        self.w.erase()
        for i in range(self.height):
            index = self.top + i
            line = self.data.get(index)
            if line > self.width:
                line = line[:self.width-1]
            self.w.addstr(i, 0, line)
        self.w.noutrefresh()

    def key_down(self):
        len = self.data.len()
        if len > self.height and (self.top + self.height) < len:
            self.top += 1
        self.draw()

    def page_down(self):
        len = self.data.len()
        if (self.top + (2 * self.height)) <= len:
            self.top += self.height
            self.draw()
        else:
            self.end()

    def key_up(self):
        if self.top > 0:
            self.top -= 1
        self.draw()

    def page_up(self):
        if self.top - self.height >= 0:
            self.top -= self.height
            self.draw()
        else:
            self.home()

    def home(self):
        self.top = 0
        self.draw()

    def end(self):
        len = self.data.len()
        if len > self.height:
            self.top = len - self.height
            self.draw()

class MarkListbox(Listbox):
    "List box with a selection and a mark"
    # data index
    select = 0
    # cursor position on screen
    cursor = 0

    def draw(self):
        self.w.erase()
        start = self.select - self.cursor
        for i in range(self.height):
            index = start + i
            if self.data.len() > index:
                if self.data.ismark(index):
                    line = '* '
                else:
                    line = '  '
                line += self.data.get(index)
                if len(line) > self.width:
                    line = line[:self.width-1]

                if i == self.cursor:
                    self.w.addstr(i, 0, line, curses.A_REVERSE)
                else:
                    self.w.addstr(i, 0, line)


    def key_down(self):
        if self.select + 1 < self.data.len():
            self.select += 1
            if self.cursor < self.height -1:
                self.cursor += 1
        self.draw()

    def page_down(self):
        if self.select + self.height < self.data.len():
            self.select += self.height
            self.cursor = 0
            if self.data.len() - self.select < self.height:
                self.end()
        self.draw()

    def key_up(self):
        if self.select > 0:
            self.select -= 1
        if self.cursor > 0:
            self.cursor -= 1
        self.draw()

    def page_up(self):
        if self.select - self.height >= 0:
            self.select -= self.height
            self.cursor = 0
            self.draw()
        else:
            self.home()

    def home(self):
        self.select = 0
        self.cursor = 0
        self.draw()

    def end(self):
        self.select = self.data.len() - 1
        self.cursor = curses.LINES - 2
        self.draw()

    def other_key(self, key):
        if key == ord('m') or key == ord(' '):
            self.data.mark(self.select)
            self.draw()
            return True
        else:
            return False

def human(number):
    """Turn number of bytes into a human readable string"""
    if number < 1e3:
        ret = str(number)
    elif number < 1e6:
        ret = str(int(number/1e3)) + 'K'
    elif number < 1e9:
        ret = str(int(number/1e6)) + 'M'
    elif number < 1e12:
        ret = str(int(number/1e9)) + 'G'
    else:
        ret = 'BIG!'

    # pad with spaces
    return ret + (' ' * (5 - len(ret)))

def is_file_in_dir(file, dir):
    """Returns True when 'file' is inside 'dir'"""
    len_dir = len(dir)
    if len(file) > len_dir:
        if file[:len_dir] == dir:
            return True
    return False

main()

# Copyright (C) 2006 Ivan Kanis
# 
#
# This program is free software ; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation ; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY ; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program ; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA

# Local Variables:
# compile-command: "./soji.py --debug"
# End:

retour