Soji est un nettoyeur de disque dur.
#!/usr/bin/env python # My hard drive is full, it's time to clean up! import copy, curses, os, shutil, sys, time, thread from optparse import OptionParser def main(): """Entry point""" data = Data(100) # parse user options parser = OptionParser() parser.add_option('-d', '--debug', action ='store_true', dest='debug', help='debug script') parser.add_option('-p', '--port', action='store', type='int', dest='port', help='port number used for debugging') (options, args) = parser.parse_args() # pick up a starting directory on the command line if len(args) and os.path.isdir(args[0]): start_dir = args[0] else: start_dir = os.getcwd() ui= UICurses(data, options, args) try: thread.start_new_thread(scan, (start_dir, data, ui)) ui.list() ui.end() except: ui.exception() def scan(dir, data, ui): try: hard_links = [] scandir(dir, data, hard_links) data.scan = False except: ui.exception() def scandir(dir, data, hard_links): dirsize = 0 try: list = os.listdir(dir) except: list = [] for file in list: absfile = dir + '/' + file # skip symlink if os.path.islink(absfile): continue # handle hard link try: fd = os.open(absfile, os.O_RDONLY) except OSError: # Named pipe causes and error # TBD check to see if it's fixed with Python 2.4 continue else: ret = os.fstat(fd) os.close(fd) inode = ret[1] nlink = ret[3] if inode in hard_links: continue if os.path.isdir(absfile): ret_size = scandir(absfile, data, hard_links) data.add_dir(absfile, ret_size) dirsize += ret_size elif os.path.exists(absfile): size = os.path.getsize(absfile) dirsize += size data.add_file(absfile, size) if nlink > 1: hard_links.append(inode) return dirsize class Data: # scan thread status scan = True def __init__(self, cap): self.file = [] self.dir = [] self.data = self.dir # len of data array self.cap = cap def add_file(self, file, size): """Add files, largest go to the top""" self.add(self.file, file, size) def add_dir(self, file, size): """Add directories, largest go to the top""" self.add(self.dir, file, size) def add(self, data, file, size): """Helper function, don't call outside of class""" for i in range(len(data)): if size > data[i][1]: data.insert(i, [file, size, False]) if len(data) > self.cap: del data[self.cap] break else: if len(data) < self.cap: data += [[file, size, False]] def ismark(self,index): if self.data[index][2]: return True return False def get(self, index): """"Return data at index""" if len(self.data) > index: row = self.data[index] line = human(row[1]) + row[0] else: line = '' return line def len(self): return len(self.data) def mark(self, index): self.data[index][2] = not self.data[index][2] # walk the list to mark files inside directories if self.data is self.dir and self.data[index][2]: dir = self.data[index][0] for row in self.data: if is_file_in_dir(row[0], dir): row[2] = True for row in self.file: if is_file_in_dir(row[0], dir): row[2] = True def switch(self): """Switch between directories and files view""" if self.data is self.file: self.data = self.dir else: self.data = self.file return self.describe() def describe(self): """Describe what is being viewed""" if self.data is self.file: return "files" return "directories" def delete(self): """Delete marked items""" # start with file r = range(len(self.file)) r.reverse() for i in r: if self.file[i][2]: file = self.file[i][0] del self.file[i] if os.path.isfile(file): os.remove(file) # finish with directories r = range(len(self.dir)) r.reverse() for i in r: if self.dir[i][2]: dir = self.dir[i][0] del self.dir[i] if os.path.isdir(dir): shutil.rmtree(dir) class UICurses: def __init__(self, data, options, args): self.data = data self.stdscr=curses.initscr() # Turn off echoing of keys, and enter cbreak mode, # where no buffering is performed on keyboard input curses.noecho() curses.cbreak() # create ui windows self.screen() self.status = Status(self.wstatus, data, True) # debugging helper if options.debug: # start listening socket for debugging import socket, random s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) port = random.randint(1025,10000) s.bind(('localhost', port)) if len(args): start_dir = args[0] else: start_dir = '' if True: os.spawnlp(os.P_NOWAIT, 'nice', 'nice', 'xterm', '-e', sys.argv[0], '--port', str(port), start_dir) else: # use this if you get a traceback in xterm os.spawnlp(os.P_NOWAIT, 'bash', 'bash', '-c', sys.argv[0], '--port', str(port), start_dir) s.listen(1) conn, addr = s.accept() f = conn.makefile('r') for line in f: print line, sys.exit() elif options.port: import socket port = options.port s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.connect(('localhost', port)) self.err = s.makefile('w') else: self.err = sys.stderr def exception(self): # In the event of an error, restore the terminal # to a sane state. self.end() (etype, value, tb) = sys.exc_info() try: import rtraceback rtraceback.print_exception(etype, value, tb, self.err) except: import traceback traceback.print_exception(etype, value, tb, self.err) self.err.flush() self.err.close() def debug(self, msg): self.err.write(msg + '\n') self.err.flush() def end(self): self.stdscr.keypad(0) curses.echo() curses.nocbreak() curses.endwin() def screen(self): (height, width) = self.stdscr.getmaxyx() self.wstatus = curses.newwin(1, width, height-1 , 0) self.wlb = curses.newwin(height-1, width, 0, 0) return def resize(self): del self.wlb del self.wstatus self.screen() def list_mode(self): self.status.set_mode('List') self.status.set_help('(q)uit (s)elect (v)iew') def list(self): self.list_mode() # prevent getch from blocking or scan won't update lb =ViewListbox(self.wlb, self.data) self.wlb.nodelay(True) while True: curses.doupdate() key = self.wlb.getch() if lb.key(key): pass elif key == curses.KEY_RESIZE: self.resize() lb.setw(self.wlb) self.status.setw(self.wstatus) elif key == ord('q'): return elif key == ord('s'): self.select() self.list_mode() elif key == ord('v'): self.data.switch() lb.draw() self.status.draw() if self.data.scan: lb.draw() time.sleep(.1) else: self.wlb.nodelay(False) self.status.set_scan(False) break self.select() def select(self): if self.data.scan: data = copy.deepcopy(self.data) else: data = self.data self.status.set_mode('Select') self.status.set_help('(d)elete (m)ark (v)iew (q)uit') lb = MarkListbox(self.wlb, data) while True: curses.doupdate() key = self.wlb.getch() if lb.key(key): pass elif key == ord('q'): return elif key == ord('d'): if self.status.yesno \ ('Are you sure you want to delete marked items ?'): data.delete() lb.draw() elif key == ord('v'): data.switch() lb.draw() if self.data.scan: time.sleep(.1) else: self.status.set_scan(False) self.wlb.nodelay(False) class Window: def setw(self, w): self.w = w self.height, self.width = self.w.getmaxyx() # enable cursors keys w.keypad(1) self.draw() class Status(Window): def __init__(self, w, data, scan): self.data = data self.help = '' self.mode = '' self.prompt = False self.scan = scan w.bkgdset(' ', curses.A_REVERSE) self.setw(w) def set_help(self, help): self.help = help self.draw() def set_mode(self, mode): self.mode = mode self.draw() def set_scan(self, scan): self.scan = scan self.draw() def draw(self): if self.prompt: line = self.prompt else: line = self.mode + ' ' words = [self.data.describe()] if self.scan: words.append('scanning') line += '(' + (', ').join(words) + ') ' line += self.help self.w.erase() if len(line) > self.width: line = line[:self.width] self.w.addstr(line) self.w.noutrefresh() def yesno(self, prompt): ret = False prompt += " (y or n) " self.prompt = prompt self.draw() while True: curses.doupdate() key = self.w.getch() if key == ord('n'): break if key == ord('y'): ret = True break self.prompt = False self.draw() return ret class Listbox(Window): def __init__(self, w, data): self.data = data self.setw(w) def key(self,key): "Handle keypress" handled = False if key == curses.KEY_DOWN or key == ord('j') or key == ord('p'): handled = self.key_down() handled = True if key == curses.KEY_UP or key == ord('k') or key == ord('n'): self.key_up() handled = True if key == curses.KEY_NPAGE: self.page_down() handled = True if key == curses.KEY_PPAGE: self.page_up() handled = True if key == curses.KEY_HOME: self.home() handled = True if key == curses.KEY_END: self.end() handled = True if not handled: handled = self.other_key(key) return handled def other_key(self, key): return False class ViewListbox(Listbox): "List box without selection" # index of item on top top = 0 def draw(self): self.w.erase() for i in range(self.height): index = self.top + i line = self.data.get(index) if line > self.width: line = line[:self.width-1] self.w.addstr(i, 0, line) self.w.noutrefresh() def key_down(self): len = self.data.len() if len > self.height and (self.top + self.height) < len: self.top += 1 self.draw() def page_down(self): len = self.data.len() if (self.top + (2 * self.height)) <= len: self.top += self.height self.draw() else: self.end() def key_up(self): if self.top > 0: self.top -= 1 self.draw() def page_up(self): if self.top - self.height >= 0: self.top -= self.height self.draw() else: self.home() def home(self): self.top = 0 self.draw() def end(self): len = self.data.len() if len > self.height: self.top = len - self.height self.draw() class MarkListbox(Listbox): "List box with a selection and a mark" # data index select = 0 # cursor position on screen cursor = 0 def draw(self): self.w.erase() start = self.select - self.cursor for i in range(self.height): index = start + i if self.data.len() > index: if self.data.ismark(index): line = '* ' else: line = ' ' line += self.data.get(index) if len(line) > self.width: line = line[:self.width-1] if i == self.cursor: self.w.addstr(i, 0, line, curses.A_REVERSE) else: self.w.addstr(i, 0, line) def key_down(self): if self.select + 1 < self.data.len(): self.select += 1 if self.cursor < self.height -1: self.cursor += 1 self.draw() def page_down(self): if self.select + self.height < self.data.len(): self.select += self.height self.cursor = 0 if self.data.len() - self.select < self.height: self.end() self.draw() def key_up(self): if self.select > 0: self.select -= 1 if self.cursor > 0: self.cursor -= 1 self.draw() def page_up(self): if self.select - self.height >= 0: self.select -= self.height self.cursor = 0 self.draw() else: self.home() def home(self): self.select = 0 self.cursor = 0 self.draw() def end(self): self.select = self.data.len() - 1 self.cursor = curses.LINES - 2 self.draw() def other_key(self, key): if key == ord('m') or key == ord(' '): self.data.mark(self.select) self.draw() return True else: return False def human(number): """Turn number of bytes into a human readable string""" if number < 1e3: ret = str(number) elif number < 1e6: ret = str(int(number/1e3)) + 'K' elif number < 1e9: ret = str(int(number/1e6)) + 'M' elif number < 1e12: ret = str(int(number/1e9)) + 'G' else: ret = 'BIG!' # pad with spaces return ret + (' ' * (5 - len(ret))) def is_file_in_dir(file, dir): """Returns True when 'file' is inside 'dir'""" len_dir = len(dir) if len(file) > len_dir: if file[:len_dir] == dir: return True return False main() # Copyright (C) 2006 Ivan Kanis # # # This program is free software ; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation ; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY ; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program ; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # Local Variables: # compile-command: "./soji.py --debug" # End: