Blame sanity/sanity.py

8de75c8
#!/usr/bin/python
8de75c8
#coding=utf-8
8de75c8
8de75c8
import unittest
8de75c8
import tempfile
8de75c8
import popen2
8de75c8
import os
8de75c8
import stat
8de75c8
import sys
8de75c8
import commands
8de75c8
import pwd
8de75c8
import grp
8de75c8
import shutil
8de75c8
import time
8de75c8
#import rpm
8de75c8
8de75c8
ftp_user = 'lftp-tester'
8de75c8
ftp_pass = 'lftp-tester'
8de75c8
ftp_home = '/home/lftp-tester'
8de75c8
ftp_root = '/var/ftp'
8de75c8
8de75c8
class LftpRunner(object):
8de75c8
    def __init__(self):
8de75c8
        self.reset()
8de75c8
        self.cmds = []
8de75c8
        self.script_file = tempfile.NamedTemporaryFile(mode='wb+')
8de75c8
8de75c8
    def reset(self):
8de75c8
        self.rc = -1
8de75c8
8de75c8
        self.out = []
8de75c8
        self.err = []
8de75c8
8de75c8
        self.server = []
8de75c8
        self.client = []
8de75c8
        self.regular = []
8de75c8
        self.info = []
8de75c8
8de75c8
    def add(self, command):
8de75c8
        self.cmds.append(command + '\n')
8de75c8
8de75c8
    def save(self):
8de75c8
        foo = open('/tmp/lftp', 'wb+')
8de75c8
        foo.writelines(self.cmds)
8de75c8
        self.script_file.writelines(self.cmds)
8de75c8
        self.script_file.flush()
8de75c8
8de75c8
    def filter(self, content=None):
8de75c8
        filter_hash = { '----' : self.info,
8de75c8
                        '--->' : self.client,
8de75c8
                        '<---' : self.server }
8de75c8
        if content == None:
8de75c8
            content = self.out
8de75c8
8de75c8
        for line in content:
8de75c8
            first = line.split()[0]
8de75c8
            try:
8de75c8
                filter_hash[first].append(line[5:])
8de75c8
            except KeyError:
8de75c8
                self.regular.append(line)
8de75c8
8de75c8
        return self.server, self.client, self.info, self.regular
8de75c8
8de75c8
    def run(self, options="-df", sleep=0):
8de75c8
        self.save()
8de75c8
8de75c8
        if not "f" in options:
8de75c8
            options += " -f"
8de75c8
8de75c8
        # print os.path.abspath(self.script_file.name)
8de75c8
        # with open(os.path.abspath(self.script_file.name)) as f:
8de75c8
        #    for l in f:
8de75c8
        #        print l
8de75c8
8de75c8
        p = popen2.Popen4("/usr/bin/lftp %s %s" % (options, os.path.abspath(self.script_file.name)))
8de75c8
8de75c8
        if sleep:
8de75c8
            time.sleep(sleep)
8de75c8
8de75c8
        self.rc = p.wait()
8de75c8
        self.out = p.fromchild.readlines()
8de75c8
8de75c8
        # Debug output:
8de75c8
        # print self.out
8de75c8
8de75c8
        return self.rc, self.out
8de75c8
8de75c8
    def connect(self, host='localhost', user='anonymous', password='anonymous@localhost.localdomain'):
8de75c8
        self.add("open %s" % host)
8de75c8
        if user == 'anonymous':
8de75c8
            self.add('anon')
8de75c8
        else:
8de75c8
            self.add('user %s %s' % (user, password))
8de75c8
8de75c8
class TestBase(unittest.TestCase):
8de75c8
    def setUp(self):
8de75c8
        unittest.TestCase.setUp(self)
8de75c8
        self.runner = LftpRunner()
8de75c8
8de75c8
    def assertContains(self, haystack, needle, msg):
8de75c8
        self.assert_(needle in haystack, msg)
8de75c8
8de75c8
    def assertDoesNotContain(self, haystack, needle, msg):
8de75c8
        self.assert_(not needle in haystack, msg)
8de75c8
8de75c8
    def assertContainsString(self, haystack, needle, msg):
8de75c8
        found = False
8de75c8
8de75c8
        for item in haystack:
8de75c8
            if needle in item:
8de75c8
                found = True
8de75c8
                return
8de75c8
8de75c8
        if found == False:
8de75c8
            self.fail(msg)
8de75c8
8de75c8
    def assertDoesNotContainString(self, haystack, needle, msg):
8de75c8
        found = False
8de75c8
8de75c8
        for item in haystack:
8de75c8
            if needle in item:
8de75c8
                found = True
8de75c8
8de75c8
        if found == True:
8de75c8
            self.fail(msg)
8de75c8
8de75c8
    def assertFilePermissions(self, file_path, permissions):
8de75c8
        real_perm = stat.S_IMODE(os.stat(file_path)[stat.ST_MODE])
8de75c8
        self.assertEquals(real_perm, permissions, "%o != %o" % (real_perm, permissions))
8de75c8
8de75c8
    def assertSameContent(self, file1, file2):
8de75c8
        # there's probably a pure-python way to do that, but this would be more portable 
8de75c8
        # even across RHEL4
8de75c8
        p = popen2.Popen3("/usr/bin/diff -q %s %s" % (file1, file2))
8de75c8
        self.assertEquals(p.wait(), 0)
8de75c8
8de75c8
    def _ftp_root_put_file(self, filename, path="", contents=""):
8de75c8
        f = open(os.path.join(ftp_root, path, filename), 'w')
8de75c8
        f.write(contents)
8de75c8
        f.flush()
8de75c8
        f.close()
8de75c8
8de75c8
        return f
8de75c8
8de75c8
    def _ftp_root_remove_file(self, filename):
8de75c8
        os.remove(filename)
8de75c8
8de75c8
    def _create_chowned_file(self, filename, path="", mode="0644"):
8de75c8
        newf = self._ftp_root_put_file(filename, path)
8de75c8
        os.chown(newf.name, pwd.getpwnam('lftp-tester').pw_uid, grp.getgrnam('lftp-tester').gr_gid)
8de75c8
        os.chmod(newf.name, mode)
8de75c8
8de75c8
        return newf
8de75c8
8de75c8
8de75c8
class TestLogging(TestBase):
8de75c8
    def setUp(self):
8de75c8
        TestBase.setUp(self)
8de75c8
8de75c8
    def testLogAnon(self):
8de75c8
        " Logging in and out using anonymous login "
8de75c8
        self.runner.connect()
8de75c8
        self.runner.add("ls -l /")
8de75c8
        self.runner.run()
8de75c8
        self.runner.filter()
8de75c8
8de75c8
        self.assertContainsString(self.runner.server, "230 ", "Could not log in")
8de75c8
        self.assertDoesNotContainString(self.runner.server, "530 ", "Could not log in")
8de75c8
8de75c8
    def testLogUser(self):
8de75c8
        " Logging in and out using a local user "
8de75c8
        self.runner.connect(user=ftp_user, password=ftp_pass)
8de75c8
        self.runner.add("ls -l /")
8de75c8
        self.runner.run()
8de75c8
        self.runner.filter()
8de75c8
8de75c8
        self.assertContainsString(self.runner.server, "230 ", "Could not log in")
8de75c8
        # Only 530 is not good enough here, on rhel8 vsftpd receives 530 because of TLS but you can
8de75c8
        # still log in with name and password successfully.
8de75c8
        self.assertDoesNotContainString(self.runner.server, "530 Login incorrect", "Login incorrect")
8de75c8
8de75c8
    def testLogUserBadPassword(self):
8de75c8
        " Logging in and out using a local user with bad password - must not succeed "
8de75c8
        self.runner.connect(user=ftp_user, password="blablableble")
8de75c8
        self.runner.add("ls -l /")
8de75c8
        self.runner.run()
8de75c8
        self.runner.filter()
8de75c8
8de75c8
        self.assertContainsString(self.runner.server, "530 ", "User was able to log in with bad password")
8de75c8
8de75c8
8de75c8
class TestBasicCommands(TestBase):
8de75c8
    def setUp(self):
8de75c8
        TestBase.setUp(self)
8de75c8
        self.file_name = 'test-basic-commands'
8de75c8
        self.file_full_path = os.path.join(ftp_root, self.file_name)
8de75c8
        f = self._ftp_root_put_file(self.file_name, contents="blabla")
8de75c8
        self.assert_(os.path.exists(os.path.join(ftp_root, self.file_name)))
8de75c8
8de75c8
    def tearDown(self):
8de75c8
        f = self._ftp_root_remove_file(os.path.join(ftp_root, self.file_name))
8de75c8
        self.assert_(not os.path.exists(os.path.join(ftp_root, self.file_name)))
8de75c8
8de75c8
    def testLS(self):
8de75c8
        "LS: Tests listing a directory "
8de75c8
        self.runner.connect()
8de75c8
        self.runner.add("ls -l /")
8de75c8
        self.runner.run("-df")
8de75c8
        self.runner.filter()
8de75c8
8de75c8
        self.assertContainsString(self.runner.server, '150 ', 'LS failed')
8de75c8
        self.assertContainsString(self.runner.regular, 'test-basic-commands', 'LS failed')
8de75c8
8de75c8
    def testPWD(self):
8de75c8
        "PWD: Tests printing current directory "
8de75c8
        self.runner.connect()
8de75c8
        self.runner.add("cd pub")
8de75c8
        self.runner.add("pwd")
8de75c8
        self.runner.run("-df")
8de75c8
        self.runner.filter()
8de75c8
8de75c8
        self.assertContainsString(self.runner.client, 'PWD', 'PWD failed')
8de75c8
        self.assertContainsString(self.runner.regular, 'ftp://localhost/pub', 'PWD failed')
8de75c8
8de75c8
    def testCD(self):
8de75c8
        "CD: Tests changing directory "
8de75c8
        self.runner.connect()
8de75c8
        self.runner.add("cd pub")
8de75c8
        self.runner.add("pwd")
8de75c8
        self.runner.run("-df")
8de75c8
        self.runner.filter()
8de75c8
8de75c8
        self.assertContainsString(self.runner.client, 'CWD /pub', 'CD failed')
8de75c8
        self.assertContainsString(self.runner.regular, 'ftp://localhost/pub', 'CD failed')
8de75c8
8de75c8
    def testGET(self):
8de75c8
        "GET: Test downloading a file "
8de75c8
        # get a file from ftp
8de75c8
        self.runner.connect()
8de75c8
        self.runner.add("get %s" % self.file_name)
8de75c8
        self.runner.run("-df")
8de75c8
        self.runner.filter()
8de75c8
8de75c8
        # checks its presence in the CWD, clean up
8de75c8
        self.assert_(os.path.exists(self.file_name), "GET failed")
8de75c8
        os.remove(self.file_name)
8de75c8
8de75c8
    def testPUT(self):
8de75c8
        "PUT: Test uploading a file "
8de75c8
        # create a dummy file
8de75c8
        fname = 'upload-file'
8de75c8
        f = open(fname, 'w')
8de75c8
        f.write('content')
8de75c8
        f.flush()
8de75c8
        f.close()
8de75c8
8de75c8
        # store the file
8de75c8
        self.runner.connect()
8de75c8
        self.runner.add("cd /pub")
8de75c8
        self.runner.add("put %s" % fname)
8de75c8
        self.runner.add("ls /pub/%s" % fname)
8de75c8
        self.runner.run("-df")
8de75c8
        self.runner.filter()
8de75c8
8de75c8
        # check if the store op was successfull
8de75c8
        fullpath = os.path.join(ftp_root, 'pub', fname)
8de75c8
        self.assertContainsString(self.runner.server, '226 ', 'PUT failed')
8de75c8
        self.assert_(os.path.exists(fullpath), 'PUT failed')
8de75c8
8de75c8
        # remove the file from the ftp root
8de75c8
        os.remove(fullpath)
8de75c8
        self.assert_(not os.path.exists(fullpath))
8de75c8
8de75c8
    def testExternalCommand(self):
8de75c8
        " Running external command "
8de75c8
        self.runner.connect()
8de75c8
        self.runner.add("!ls sanity.py")
8de75c8
        self.runner.run()
8de75c8
        self.runner.filter()
8de75c8
8de75c8
        self.assertContains(self.runner.regular, 'sanity.py\n', "Running external command failed")
8de75c8
8de75c8
    def testALIAS(self):
8de75c8
        "ALIAS: Tests defining an alias "
8de75c8
        self.runner.connect()
8de75c8
        self.runner.add("alias dir ls -l")
8de75c8
        self.runner.add("dir")
8de75c8
        self.runner.run()
8de75c8
        self.runner.filter()
8de75c8
8de75c8
        self.assertContainsString(self.runner.client, 'LIST -l', 'Setting alias failed')
8de75c8
8de75c8
    def testCAT(self):
8de75c8
        "CAT: Printing a remote file to std. output "
8de75c8
        # print file
8de75c8
        self.runner.connect()
8de75c8
        self.runner.add("cat %s" % self.file_name)
8de75c8
        self.runner.run()
8de75c8
        self.runner.filter()
8de75c8
8de75c8
        # assert its contents on std. output
8de75c8
        self.assertContainsString(self.runner.regular, 'blabla', 'CAT failed')
8de75c8
8de75c8
    def testCHMOD(self):
8de75c8
        "CHMOD: Changing permissions on a remote file "
8de75c8
        os.chown(self.file_full_path, pwd.getpwnam('lftp-tester').pw_uid, grp.getgrnam('lftp-tester').gr_gid)
8de75c8
        os.chmod(self.file_full_path, 0777)
8de75c8
        old_mode = stat.S_IMODE(os.stat(self.file_full_path)[stat.ST_MODE])
8de75c8
8de75c8
        # change the permission
8de75c8
        self.runner.connect(user=ftp_user, password=ftp_pass)
8de75c8
        self.runner.add("cd %s" % ftp_root)
8de75c8
        self.runner.add("ls -l")
8de75c8
        self.runner.add("ls -l %s" % self.file_name)
8de75c8
        self.runner.add("chmod 0644 %s" % self.file_name)
8de75c8
        self.runner.add("ls -l %s" % self.file_name)
8de75c8
        self.runner.run()
8de75c8
        self.runner.filter()
8de75c8
8de75c8
        # check against the old mode
8de75c8
        self.assertContainsString(self.runner.server, '200 ', 'RMDIR failed')
8de75c8
        new_mode = stat.S_IMODE(os.stat(self.file_full_path)[stat.ST_MODE])
8de75c8
        self.assertNotEqual(old_mode, new_mode, 'CHMOD failed')
8de75c8
        self.assertEquals(new_mode, 0644)
8de75c8
8de75c8
    def testCLS(self):
8de75c8
        "CLS"
8de75c8
        self.runner.connect()
8de75c8
        self.runner.add("cls")
8de75c8
        self.runner.run()
8de75c8
        self.runner.filter()
8de75c8
8de75c8
        self.assertContainsString(self.runner.client, 'LIST', "CLS failed")
8de75c8
        self.assertContains(self.runner.regular, "%s\n" % self.file_name , "CLS failed")
8de75c8
8de75c8
    def testCOMMAND(self):
8de75c8
        "COMMAND: Run a command, ignoring aliases "
8de75c8
        # less is a common defined alias..we try running it as a command, should fail
8de75c8
        self.runner.connect()
8de75c8
        self.runner.add("command less")
8de75c8
        self.runner.run()
8de75c8
        self.runner.filter()
8de75c8
8de75c8
        self.assertContains(self.runner.regular, "Unknown command `less'.\n", "COMMAND failed (or less nor defined as alias)")
8de75c8
8de75c8
    def testECHO(self):
8de75c8
        "ECHO: echo a string "
8de75c8
        # less is a common defined alias..we try running it as a command, should fail
8de75c8
        self.runner.connect()
8de75c8
        self.runner.add("echo foo")
8de75c8
        self.runner.run()
8de75c8
        self.runner.filter()
8de75c8
8de75c8
        self.assertContains(self.runner.regular, "foo\n", "ECHO failed")
8de75c8
8de75c8
    def testFIND(self):
8de75c8
        " FIND: List files in the directory recursively."
8de75c8
        self.runner.connect()
8de75c8
        self.runner.add("find .")
8de75c8
        self.runner.run()
8de75c8
        self.runner.filter()
8de75c8
8de75c8
        self.assertContains(self.runner.regular, "./\n", "FIND failed")
8de75c8
        self.assertContains(self.runner.regular, "./pub/\n", "FIND failed")
8de75c8
        self.assertContains(self.runner.regular, "./%s\n" % self.file_name, "FIND failed")
8de75c8
8de75c8
    def testMKDIR(self):
8de75c8
        "MKDIR: Create remote directories."
8de75c8
        self.runner.connect()
8de75c8
        self.runner.add("mkdir -p /pub/foo/bar")
8de75c8
        self.runner.run()
8de75c8
        self.runner.filter()
8de75c8
8de75c8
        self.assertContainsString(self.runner.server, '257 ', 'MKDIR failed')
8de75c8
        self.assert_(os.path.exists(os.path.join(ftp_root, 'pub', 'foo', 'bar')))
8de75c8
        shutil.rmtree(os.path.join(ftp_root, 'pub', 'foo'))
8de75c8
8de75c8
    def testRMDIR(self):
8de75c8
        "RMDIR: Remove remote directories."
8de75c8
        try:
8de75c8
           os.mkdir(os.path.join(ftp_root, 'pub', 'foo'))
8de75c8
           os.chmod(os.path.join(ftp_root, 'pub', 'foo'), 0777)
8de75c8
           os.chown(os.path.join(ftp_root, 'pub', 'foo'),
8de75c8
                    pwd.getpwnam('lftp-tester').pw_uid, grp.getgrnam('lftp-tester').gr_gid)
8de75c8
        except OSError:
8de75c8
           pass
8de75c8
        self.assert_(os.path.exists(os.path.join(ftp_root, 'pub', 'foo')))
8de75c8
8de75c8
        self.runner.connect(user=ftp_user, password=ftp_pass)
8de75c8
        self.runner.add("cd %s" % ftp_root)
8de75c8
        self.runner.add("ls -l pub")
8de75c8
        self.runner.add("rmdir pub/foo")
8de75c8
        self.runner.run()
8de75c8
        self.runner.filter()
8de75c8
8de75c8
        self.assertContainsString(self.runner.server, '250 ', 'RMDIR failed')
8de75c8
        self.assert_(not os.path.exists(os.path.join(ftp_root, 'pub', 'foo')))
8de75c8
8de75c8
    def testMV(self):
8de75c8
        "MV: rename/move files"
8de75c8
        fname  = 'newfile'
8de75c8
        fname2 = 'newfile-newname'
8de75c8
        f = self._create_chowned_file(fname, 'pub', mode=0777)
8de75c8
8de75c8
        self.runner.connect(user=ftp_user, password=ftp_pass)
8de75c8
        self.runner.add("mv %s/pub/%s %s/pub/%s" % (ftp_root, fname, ftp_root, fname2))
8de75c8
        self.runner.add("ls %s/pub/%s" % (ftp_root, fname2))
8de75c8
        self.runner.run()
8de75c8
        self.runner.filter()
8de75c8
8de75c8
        self.assertContainsString(self.runner.server, '250 ', 'MV failed')
8de75c8
        self.assert_(os.path.exists(os.path.join(ftp_root, 'pub', fname2)))
8de75c8
        # FIXME: verify that the file keeps permissions and content
8de75c8
8de75c8
    def testRM(self):
8de75c8
        "RM: Remove remote files."
8de75c8
        fname  = 'newfile'
8de75c8
        f = self._create_chowned_file(fname, 'pub', mode=0777)
8de75c8
8de75c8
        self.runner.connect(user=ftp_user, password=ftp_pass)
8de75c8
        self.runner.add("rm %s/pub/%s" % (ftp_root, fname))
8de75c8
        self.runner.run()
8de75c8
        self.runner.filter()
8de75c8
8de75c8
        self.assertContainsString(self.runner.server, '250 ', 'RM failed')
8de75c8
        self.assert_(not os.path.exists(os.path.abspath(f.name)))
8de75c8
8de75c8
    def testNLIST(self):
8de75c8
        "NLIST: List remote file names"
8de75c8
        self.runner.connect()
8de75c8
        self.runner.add("nlist /")
8de75c8
        self.runner.run()
8de75c8
        self.runner.filter()
8de75c8
8de75c8
        self.assertContainsString(self.runner.regular, '/test-basic-commands', 'NLIST failed')
8de75c8
        self.assertContainsString(self.runner.regular, '/pub', 'NLIST failed')
8de75c8
8de75c8
    def testRENLIST(self):
8de75c8
        "RENLIST: Same as `nlist', but ignores the cache."
8de75c8
        self.runner.connect()
8de75c8
        self.runner.add("renlist /")
8de75c8
        self.runner.run()
8de75c8
        self.runner.filter()
8de75c8
8de75c8
        self.assertContainsString(self.runner.regular, '/test-basic-commands', 'RENLIST failed')
8de75c8
        self.assertContainsString(self.runner.regular, '/pub', 'NLIST failed')
8de75c8
8de75c8
    def testRELS(self):
8de75c8
        "RELS: Same as ls, but ignores the cache."
8de75c8
        self.runner.connect()
8de75c8
        self.runner.add("rels -l /")
8de75c8
        self.runner.run("-df")
8de75c8
        self.runner.filter()
8de75c8
8de75c8
        self.assertContainsString(self.runner.server, '150 ', 'RELS failed')
8de75c8
        self.assertContainsString(self.runner.regular, 'test-basic-commands', 'RELS failed')
8de75c8
8de75c8
    def testLCD(self):
8de75c8
        "LCD: Change current local directory"
8de75c8
        dirname = 'foodir'
8de75c8
        os.mkdir(dirname)
8de75c8
8de75c8
        self.runner.connect()
8de75c8
        self.runner.add("lcd %s" % dirname)
8de75c8
        self.runner.add("lpwd")
8de75c8
        self.runner.add("lcd -")
8de75c8
        self.runner.run()
8de75c8
        self.runner.filter()
8de75c8
8de75c8
        os.rmdir(dirname)
8de75c8
        self.assertContainsString(self.runner.regular,
8de75c8
                                  os.path.join(os.getcwd(), dirname),
8de75c8
                                  'LCD failed')
8de75c8
8de75c8
    def testLPWD(self):
8de75c8
        "LPWD: Print current working directory on local machine"
8de75c8
        self.runner.connect()
8de75c8
        self.runner.add("lpwd")
8de75c8
        self.runner.run()
8de75c8
        self.runner.filter()
8de75c8
8de75c8
        self.assertContainsString(self.runner.regular, os.getcwd(), 'LPWD failed')
8de75c8
8de75c8
    def testMRM(self):
8de75c8
        "MRM: Remove remote files using wildcards"
8de75c8
        fname  = 'mrmfile'
8de75c8
        self._create_chowned_file(fname + '1', 'pub', mode=0777)
8de75c8
        self._create_chowned_file(fname + '2', 'pub', mode=0777)
8de75c8
8de75c8
        self.runner.connect(user=ftp_user, password=ftp_pass)
8de75c8
        self.runner.add("ls")
8de75c8
        self.runner.add("mrm %s/pub/%s*" % (ftp_root, fname))
8de75c8
        self.runner.run()
8de75c8
        self.runner.filter()
8de75c8
8de75c8
        # assert that we deleted something
8de75c8
        self.assertContainsString(self.runner.server, '250 ', 'MRM failed')
8de75c8
        # assert that we deleted exactly 2 files
8de75c8
        self.assertEquals(len([ str for str in self.runner.server if str == '250 Delete operation successful.\n']), 2)
8de75c8
8de75c8
class TestBOOKMARK(TestBase):
8de75c8
    def testADD(self):
8de75c8
        "ADD:  add current place or given location"
8de75c8
        self.runner.connect()
8de75c8
        self.runner.add("bookmark add pb /pub")
8de75c8
        self.runner.add("bookmark list")
8de75c8
        self.runner.run()
8de75c8
        self.runner.filter()
8de75c8
8de75c8
        self.assertContainsString(self.runner.regular, 'pb\t/pub\n', 'Bookmark add failed')
8de75c8
8de75c8
    def testDEL(self):
8de75c8
        "DEL: remove bookmark with name"
8de75c8
        self.runner.connect()
8de75c8
        self.runner.add("bookmark add pb /pub")
8de75c8
        self.runner.add("bookmark del pb /pub")
8de75c8
        self.runner.add("bookmark list")
8de75c8
        self.runner.run()
8de75c8
        self.runner.filter()
8de75c8
8de75c8
        self.assertDoesNotContainString(self.runner.regular, 'pb\t/pub\n', 'Bookmark del failed')
8de75c8
8de75c8
class TestGET(TestBase):
8de75c8
    def setUp(self):
8de75c8
        TestBase.setUp(self)
8de75c8
        self.fname  = 'newfile'
8de75c8
        self.full_path = os.path.join(ftp_root, 'pub', self.fname)
8de75c8
        self._create_chowned_file(self.fname, 'pub', 0777)
8de75c8
8de75c8
    def testDELETE(self):
8de75c8
        "GET -E: delete source files after successful transfer"
8de75c8
        self.runner.connect(user=ftp_user, password=ftp_pass)
8de75c8
        self.runner.add("get -E %s/pub/%s" % (ftp_root, self.fname))
8de75c8
        self.runner.run()
8de75c8
        self.runner.filter()
8de75c8
8de75c8
        self.assertContainsString(self.runner.server, '226 ', 'RELS failed')
8de75c8
        self.assertContainsString(self.runner.server, '250 ', 'GET -E failed')
8de75c8
        os.remove(self.fname)
8de75c8
8de75c8
    def testASCII(self):
8de75c8
        "GET -a: transfer using ASCII mode"
8de75c8
        self.runner.connect(user=ftp_user, password=ftp_pass)
8de75c8
        self.runner.add("get -a %s/pub/%s" % (ftp_root, self.fname))
8de75c8
        self.runner.run()
8de75c8
        self.runner.filter()
8de75c8
8de75c8
        self.assertContainsString(self.runner.server, 'Opening ASCII mode data connection', 'GET -a failed')
8de75c8
        self.assertContainsString(self.runner.server, '226 ', 'GET -a failed')
8de75c8
        os.remove(self.full_path)
8de75c8
        os.remove(self.fname)
8de75c8
8de75c8
    def testLocalName(self):
8de75c8
        "GET -o: destination file name"
8de75c8
        self.runner.connect(user=ftp_user, password=ftp_pass)
8de75c8
        self.runner.add("get %s/pub/%s -o newfile.local" % (ftp_root, self.fname))
8de75c8
        self.runner.run()
8de75c8
        self.runner.filter()
8de75c8
8de75c8
        self.assertContainsString(self.runner.server, '226 ', 'GET -o failed')
8de75c8
        os.remove('newfile.local')
8de75c8
        os.remove(self.full_path)
8de75c8
8de75c8
    def testLocalPath(self):
8de75c8
        "GET -O: specifies base directory or URL where files should be placed"
8de75c8
        # FIXME: does not work
8de75c8
        print "****** testLocalPath Currently does not work ********"
8de75c8
        #dest = '/tmp'
8de75c8
8de75c8
        #self.runner.connect(user=ftp_user, password=ftp_pass)
8de75c8
        #self.runner.add("get %s/pub/%s -O %s" % (ftp_root, self.fname, dest))
8de75c8
        #self.runner.run()
8de75c8
        #self.runner.filter()
8de75c8
8de75c8
        #self.assertContainsString(self.runner.server, '226 ', 'GET -O failed')
8de75c8
        #os.remove(os.path.join(dest, self.fname))
8de75c8
        #os.remove(self.full_path)
8de75c8
8de75c8
    def testContents(self):
8de75c8
        "GET: tests if get preservers contents"
8de75c8
        self.runner.connect(user=ftp_user, password=ftp_pass)
8de75c8
        self.runner.add("get %s/pub/%s" % (ftp_root, self.fname))
8de75c8
        self.runner.run()
8de75c8
        self.runner.filter()
8de75c8
8de75c8
        self.assertContainsString(self.runner.server, '226 ', 'Contents test failed')
8de75c8
        self.assertSameContent(self.full_path, 'newfile')
8de75c8
        os.remove('newfile')
8de75c8
        os.remove(self.full_path)
8de75c8
8de75c8
class TestPUT(TestBase):
8de75c8
    def setUp(self):
8de75c8
        TestBase.setUp(self)
8de75c8
        self.fname  = 'putfile'
8de75c8
        self.full_path = os.path.join(os.getcwd(), self.fname)
8de75c8
        self.ftp_full_path = os.path.join(ftp_root, 'pub', self.fname)
8de75c8
8de75c8
        f = open(self.fname, 'w')
8de75c8
        f.write('putfile')
8de75c8
        f.flush()
8de75c8
        f.close()
8de75c8
        os.chown(self.fname, pwd.getpwnam('lftp-tester').pw_uid, grp.getgrnam('lftp-tester').gr_gid)
8de75c8
8de75c8
    def testRemoteName(self):
8de75c8
        "PUT -o: specifies remote file name"
8de75c8
        remote_fname = 'someotherfilename'
8de75c8
        self.runner.connect(user=ftp_user, password=ftp_pass)
8de75c8
        self.runner.add("put %s -o %s/pub/%s" % (self.fname, ftp_root, remote_fname))
8de75c8
        self.runner.run()
8de75c8
        self.runner.filter()
8de75c8
8de75c8
        self.assert_(os.path.exists(os.path.join(ftp_root, 'pub', remote_fname)))
8de75c8
        self.assertContainsString(self.runner.server, '226 ', 'PUT -o failed')
8de75c8
8de75c8
        os.remove(self.fname)
8de75c8
        os.remove(os.path.join(ftp_root, 'pub', remote_fname))
8de75c8
8de75c8
    def testDeleteSource(self):
8de75c8
        "PUT -E: delete source files after successful transfer"
8de75c8
        self.runner.connect(user=ftp_user, password=ftp_pass)
8de75c8
        self.runner.add("cd %s/pub" % ftp_root)
8de75c8
        self.runner.add("put -E %s" % (self.fname))
8de75c8
        self.runner.run()
8de75c8
        self.runner.filter()
8de75c8
8de75c8
        self.assert_(os.path.exists(os.path.join(ftp_root, 'pub', self.fname)))
8de75c8
        self.assert_(not os.path.exists(self.full_path))
8de75c8
        self.assertContainsString(self.runner.server, '226 ', 'PUT -E failed')
8de75c8
        os.remove(self.ftp_full_path)
8de75c8
8de75c8
    def testASCII(self):
8de75c8
        "PUT -a: transfer using ASCII mode"
8de75c8
        print "****** testASCII Not yet implemented ********"
8de75c8
        #self.runner.connect(user=ftp_user, password=ftp_pass)
8de75c8
        #self.runner.add("put -a %s" % (self.fname))
8de75c8
        #print self.runner.run()
8de75c8
        #self.runner.filter()
8de75c8
8de75c8
        #self.assert_(os.path.exists(os.path.join(ftp_root, 'pub', self.fname)))
8de75c8
        #self.assertContainsString(self.runner.server, 'Opening ASCII mode data connection', 'GET -a failed')
8de75c8
        #self.assertContainsString(self.runner.server, '226 ', 'PUT -o failed')
8de75c8
8de75c8
        #os.remove(self.fname)
8de75c8
        #os.remove(self.ftp_full_path)
8de75c8
8de75c8
    def testBaseDirectory(self):
8de75c8
        "PUT -O: specifies base directory or URL where files should be placed"
8de75c8
        # FIXME: add some code here
8de75c8
        print "****** testBaseDirectory Not yet implemented ********"
8de75c8
8de75c8
    def testContents(self):
8de75c8
        "PUT: tests if get preservers contents"
8de75c8
        self.runner.connect(user=ftp_user, password=ftp_pass)
8de75c8
        self.runner.add("cd %s/pub" % ftp_root)
8de75c8
        self.runner.add("put %s" % (self.fname))
8de75c8
        self.runner.run()
8de75c8
        self.runner.filter()
8de75c8
8de75c8
        self.assert_(os.path.exists(os.path.join(ftp_root, 'pub', self.fname)))
8de75c8
        self.assertContainsString(self.runner.server, '226 ', 'Contents test failed')
8de75c8
        self.assertSameContent(self.ftp_full_path, self.fname)
8de75c8
8de75c8
        os.remove(self.fname)
8de75c8
        os.remove(self.ftp_full_path)
8de75c8
8de75c8
class TestMGET(TestBase):
8de75c8
    def setUp(self):
8de75c8
        TestBase.setUp(self)
8de75c8
        self.basename = 'mgetfile'
8de75c8
        self.fname1 = 'mgetfile1'
8de75c8
        self.fname2 = 'mgetfile2'
8de75c8
        self.full_path1 = os.path.join(ftp_root, 'pub', self.fname1)
8de75c8
        self.full_path2 = os.path.join(ftp_root, 'pub', self.fname2)
8de75c8
        self._create_chowned_file(self.fname1, 'pub', 0777)
8de75c8
        self._create_chowned_file(self.fname2, 'pub', 0777)
8de75c8
8de75c8
    def _clean_up(self):
8de75c8
        os.remove(self.fname1)
8de75c8
        os.remove(self.fname2)
8de75c8
        os.remove(self.full_path1)
8de75c8
        os.remove(self.full_path2)
8de75c8
8de75c8
    def testDELETE(self):
8de75c8
        "MGET -E: delete source files after successful transfer"
8de75c8
        self.runner.connect(user=ftp_user, password=ftp_pass)
8de75c8
        self.runner.add("mget -E %s/pub/%s*" % (ftp_root, self.basename))
8de75c8
        self.runner.run()
8de75c8
        self.runner.filter()
8de75c8
8de75c8
        self.assertContainsString(self.runner.server, '226 ', 'MGET failed')
8de75c8
        self.assertContainsString(self.runner.server, '250 ', 'MGET failed')
8de75c8
        self.assert_(os.path.exists(self.fname1))
8de75c8
        self.assert_(os.path.exists(self.fname2))
8de75c8
8de75c8
        os.remove(self.fname1)
8de75c8
        os.remove(self.fname2)
8de75c8
8de75c8
    def testASCII(self):
8de75c8
        "MGET -a: transfer using ASCII mode"
8de75c8
        self.runner.connect(user=ftp_user, password=ftp_pass)
8de75c8
        self.runner.add("mget -a %s/pub/%s*" % (ftp_root, self.basename))
8de75c8
        self.runner.run()
8de75c8
        self.runner.filter()
8de75c8
8de75c8
        self.assertContainsString(self.runner.server, 'Opening ASCII mode data connection', 'GET -a failed')
8de75c8
        self.assertContainsString(self.runner.server, '226 ', 'MGET failed')
8de75c8
        self._clean_up()
8de75c8
8de75c8
    def testLocalPath(self):
8de75c8
        "MGET -O: specifies base directory or URL where files should be placed"
8de75c8
        # FIXME: does not work
8de75c8
        print "****** testLocalPath Currently does not work ********"
8de75c8
        #dest = '/tmp'
8de75c8
8de75c8
        #self.runner.connect(user=ftp_user, password=ftp_pass)
8de75c8
        #self.runner.add("mget %s/pub/%s* -O %s" % (ftp_root, self.basename, dest))
8de75c8
        #self.runner.run()
8de75c8
        #self.runner.filter()
8de75c8
8de75c8
        #self.assertContainsString(self.runner.server, '226 ', 'MGET failed')
8de75c8
        #self._clean_up()
8de75c8
8de75c8
    def testContents(self):
8de75c8
        "MGET: tests if get preservers contents"
8de75c8
        self.runner.connect(user=ftp_user, password=ftp_pass)
8de75c8
        self.runner.add("mget %s/pub/%s*" % (ftp_root, self.basename))
8de75c8
        self.runner.run()
8de75c8
        self.runner.filter()
8de75c8
8de75c8
        self.assertContainsString(self.runner.server, '226 ', 'Contents test failed')
8de75c8
        self.assertSameContent(self.full_path1, self.fname1)
8de75c8
        self.assertSameContent(self.full_path2, self.fname2)
8de75c8
        self._clean_up()
8de75c8
8de75c8
class TestMPUT(TestBase):
8de75c8
    def setUp(self):
8de75c8
        TestBase.setUp(self)
8de75c8
        self.basename = 'putfile'
8de75c8
        self.fname1  = 'putfile1'
8de75c8
        self.fname2  = 'putfile2'
8de75c8
8de75c8
        self.ftp_full_path1 = os.path.join(ftp_root, 'pub', self.fname1)
8de75c8
        self.ftp_full_path2 = os.path.join(ftp_root, 'pub', self.fname2)
8de75c8
8de75c8
        for fn in [ self.fname1, self.fname2 ]:
8de75c8
            f = open(fn, 'w')
8de75c8
            f.write(str(fn))
8de75c8
            f.flush()
8de75c8
            f.close()
8de75c8
            os.chown(fn, pwd.getpwnam('lftp-tester').pw_uid, grp.getgrnam('lftp-tester').gr_gid)
8de75c8
8de75c8
    def _clean_up(self):
8de75c8
        os.remove(self.fname1)
8de75c8
        os.remove(self.fname2)
8de75c8
        os.remove(self.ftp_full_path1)
8de75c8
        os.remove(self.ftp_full_path2)
8de75c8
8de75c8
    def testDeleteSource(self):
8de75c8
        "MPUT -E: delete source files after successful transfer"
8de75c8
        self.runner.connect(user=ftp_user, password=ftp_pass)
8de75c8
        self.runner.add("cd %s/pub" % ftp_root)
8de75c8
        self.runner.add("mput -E %s*" % (self.basename))
8de75c8
        self.runner.run()
8de75c8
        self.runner.filter()
8de75c8
8de75c8
        self.assert_(os.path.exists(self.ftp_full_path1))
8de75c8
        self.assert_(os.path.exists(self.ftp_full_path2))
8de75c8
        self.assert_(not os.path.exists(self.fname1))
8de75c8
        self.assert_(not os.path.exists(self.fname2))
8de75c8
        self.assertContains(self.runner.server, '226 Transfer complete.\n', 'MPUT -E failed')
8de75c8
        os.remove(self.ftp_full_path1)
8de75c8
        os.remove(self.ftp_full_path2)
8de75c8
8de75c8
    def testASCII(self):
8de75c8
        "PUT -a: transfer using ASCII mode"
8de75c8
        # FIXME: add some code here
8de75c8
        print "****** testASCII Not yet implemented ********"
8de75c8
        #self.runner.connect(user=ftp_user, password=ftp_pass)
8de75c8
        #self.runner.add("put -a %s" % (self.fname))
8de75c8
        #print self.runner.run()
8de75c8
        #self.runner.filter()
8de75c8
8de75c8
        #self.assert_(os.path.exists(os.path.join(ftp_root, 'pub', self.fname)))
8de75c8
        #self.assertContainsString(self.runner.server, 'Opening ASCII mode data connection', 'GET -a failed')
8de75c8
        #self.assertContainsString(self.runner.server, '226 ', 'PUT -o failed')
8de75c8
8de75c8
        #os.remove(self.fname)
8de75c8
        #os.remove(self.ftp_full_path)
8de75c8
8de75c8
    def testBaseDirectory(self):
8de75c8
        "MPUT -O: specifies base directory or URL where files should be placed"
8de75c8
        # FIXME: add some code here
8de75c8
        print "****** testBaseDirectory Not yet implemented ********"
8de75c8
8de75c8
    def testContents(self):
8de75c8
        "MPUT: tests if mput preservers contents"
8de75c8
        self.runner.connect(user=ftp_user, password=ftp_pass)
8de75c8
        self.runner.add("cd %s/pub" % ftp_root)
8de75c8
        self.runner.add("mput %s*" % (self.basename))
8de75c8
        self.runner.run()
8de75c8
        self.runner.filter()
8de75c8
8de75c8
        self.assertContainsString(self.runner.server, '226 ', 'Contents test failed')
8de75c8
        self.assert_(os.path.exists(self.ftp_full_path1))
8de75c8
        self.assert_(os.path.exists(self.ftp_full_path2))
8de75c8
        self.assertSameContent(self.ftp_full_path1, self.fname1)
8de75c8
        self.assertSameContent(self.ftp_full_path2, self.fname2)
8de75c8
8de75c8
        self._clean_up()
8de75c8
8de75c8
class TestGLOB(TestBase):
8de75c8
    def setUp(self):
8de75c8
        TestBase.setUp(self)
8de75c8
        for f in os.listdir(os.path.join(ftp_root, 'pub')):
8de75c8
            try:
8de75c8
                os.remove(os.path.join(ftp_root, 'pub', f)) 
8de75c8
            except OSError:
8de75c8
                os.rmdir(os.path.join(ftp_root, 'pub', f))
8de75c8
8de75c8
        self.dirpath = os.path.join(ftp_root, 'pub', 'pubdir')
8de75c8
        os.mkdir(self.dirpath)
8de75c8
        self._create_chowned_file('pubfile', 'pub', 0777)
8de75c8
8de75c8
    def testFiles(self):
8de75c8
        " GLOB -f: globbing files only "
8de75c8
        self.runner.connect()
8de75c8
        self.runner.add("glob -f ls pub/*")
8de75c8
        self.runner.run()
8de75c8
        self.runner.filter()
8de75c8
8de75c8
        self.assertContainsString(self.runner.regular, 'pubfile', 'GLOB -f failed')
8de75c8
        self.assertDoesNotContainString(self.runner.regular, 'pubdir', 'GLOB -f failed')
8de75c8
8de75c8
    def testDirectories(self):
8de75c8
        " GLOB -d: globbing directories only "
8de75c8
        # FIXME - does not seem to work
8de75c8
        pass
8de75c8
        #self.runner.connect()
8de75c8
        #self.runner.add("glob -d ls pub/*")
8de75c8
        #self.runner.run()
8de75c8
        #self.runner.filter()
8de75c8
8de75c8
        #self.assertContainsString(self.runner.regular, 'pubdir', 'GLOB -d failed')
8de75c8
        #self.assertDoesNotContainString(self.runner.regular, 'pubfile', 'GLOB -d failed')
8de75c8
8de75c8
    def testAll(self):
8de75c8
        " GLOB -a: globbing everything "
8de75c8
        # FIXME - does not seem to work
8de75c8
        pass
8de75c8
        #self.runner.connect()
8de75c8
        #self.runner.add("glob -d ls pub/*")
8de75c8
        #self.runner.run()
8de75c8
        #self.runner.filter()
8de75c8
8de75c8
        #self.assertContainsString(self.runner.regular, 'pubdir', 'GLOB -a failed')
8de75c8
        #self.assertContainsString(self.runner.regular, 'pubfile', 'GLOB -a failed')
8de75c8
8de75c8
class TestRegression(TestBase):
8de75c8
    def testBundledPerl(self):
8de75c8
        "176175: lftp used to ship its copy of perl-String-CRC32 - see if fixed "
8de75c8
        p = popen2.Popen4("rpm -q --provides lftp")
8de75c8
        p.wait()
8de75c8
        self.assertDoesNotContainString(p.fromchild.readlines(),
8de75c8
                                        "perl-String-CRC32",
8de75c8
                                        "lftp bundles its own copy of perl-String-CRC32")
8de75c8
    def testHTTP_LS(self):
8de75c8
        "171884: first ls returns nothing when using http protocol"
8de75c8
        self.runner.connect(host="http://mirror.centos.org/centos/")
8de75c8
        self.runner.add("ls")
8de75c8
        self.runner.run()
8de75c8
        self.runner.filter()
8de75c8
8de75c8
        self.assert_(len(self.runner.regular) > 0)
8de75c8
8de75c8
class TestQueue(TestBase):
8de75c8
    def setUp(self):
8de75c8
        TestBase.setUp(self)
8de75c8
        self.script_file = tempfile.NamedTemporaryFile(mode='wb+')
8de75c8
        self.runner.connect()
8de75c8
        self.runner.add("queue stop")
8de75c8
        self.runner.add("debug -o %s" % self.script_file.name)
8de75c8
8de75c8
    def _runQueue(self):
8de75c8
        self.runner.add("queue start")
8de75c8
        self.runner.run(sleep=3)
8de75c8
        content = self.script_file.readlines()
8de75c8
        self.runner.filter(content)
8de75c8
8de75c8
    def _listQueue(self):
8de75c8
        self.runner.add("queue")
8de75c8
        self.runner.run()
8de75c8
        self.runner.filter()
8de75c8
8de75c8
    def testInsertBefore(self):
8de75c8
        "QUEUE: insert a command before another one"
8de75c8
        self.runner.add("queue ls")
8de75c8
        self.runner.add("cd /pub")
8de75c8
        self.runner.add("queue -n 1 get somefile")
8de75c8
        self._listQueue()
8de75c8
8de75c8
        self.assertContains(self.runner.regular, ' 1. get somefile\n', 'QUEUE: insert before failed')
8de75c8
8de75c8
    def testRememberLocation(self):
8de75c8
        "QUEUE: tests of queue remembers changing location with cd"
8de75c8
        # This test makes sense for RHEL 5+
8de75c8
8de75c8
        self.runner.add("queue ls")
8de75c8
        self.runner.add("cd /pub")
8de75c8
        self.runner.add("queue ls")
8de75c8
        self._listQueue()
8de75c8
8de75c8
        self.assertContainsString(self.runner.regular, 'cd /pub', 'QUEUE: remember location failed')
8de75c8
8de75c8
    def testDeleteFromQueue(self):
8de75c8
        "QUEUE: delete a command from queue"
8de75c8
        self.runner.add("cd /pub")
8de75c8
        self.runner.add("queue ls")
8de75c8
        self.runner.add("queue -d 1")
8de75c8
        self._runQueue()
8de75c8
8de75c8
        self.assertDoesNotContainString(self.runner.regular, 'LIST\r\n', 'QUEUE: remember location failed')
8de75c8
8de75c8
    def testMoveInQueue(self):
8de75c8
        "QUEUE: move a command in queue from one position to another"
8de75c8
        self.runner.add("queue ls")
8de75c8
        self.runner.add("queue get somefile")
8de75c8
        self.runner.add("queue -m 2 1")
8de75c8
        self._listQueue()
8de75c8
8de75c8
        self.assertContains(self.runner.regular, ' 1. get somefile\n', 'QUEUE: move in queue failed')
8de75c8
8de75c8
def die(code, code_req, msg):
8de75c8
    if code != code_req:
8de75c8
        print >>sys.stderr, msg
8de75c8
        sys.exit(1)
8de75c8
8de75c8
if __name__ == "__main__":
8de75c8
    # get vsftpd root dir
8de75c8
    ftp_root = pwd.getpwnam('ftp').pw_dir
8de75c8
8de75c8
    if os.getuid() != 0:
8de75c8
       print "This test must be run as root"
8de75c8
       sys.exit(1)
8de75c8
8de75c8
    # add user for testing automatically
8de75c8
    (status, output) = commands.getstatusoutput('useradd %s' % (ftp_user))
8de75c8
    die(status, 0, output)
8de75c8
    print "User %s added.." % ftp_user
8de75c8
    (status, output) = commands.getstatusoutput('echo %s | passwd --stdin %s' % (ftp_pass, ftp_user))
8de75c8
    die(status, 0, output)
8de75c8
    ftp_home = pwd.getpwnam(ftp_user).pw_dir
8de75c8
    os.chmod(os.path.join(ftp_root, 'pub'), 0777)
8de75c8
8de75c8
    try:
8de75c8
        # start vsftpd with basic config
8de75c8
        (status, output) = commands.getstatusoutput('service vsftpd restart')
8de75c8
        # run the tests
8de75c8
        unittest.main()
8de75c8
        die(status, 0, output)
8de75c8
    finally:
8de75c8
        # delete the test user
8de75c8
        (status, output) = commands.getstatusoutput('userdel -r %s' % (ftp_user))
8de75c8
        die(status, 0, output)
8de75c8
8de75c8
8de75c8