Blob Blame History Raw
#!/usr/bin/python
#coding=utf-8

import unittest
import tempfile
import popen2
import os
import stat
import sys
import commands
import pwd
import grp
import shutil
import time
#import rpm

ftp_user = 'lftp-tester'
ftp_pass = 'lftp-tester'
ftp_home = '/home/lftp-tester'
ftp_root = '/var/ftp'

class LftpRunner(object):
    def __init__(self):
        self.reset()
        self.cmds = []
        self.script_file = tempfile.NamedTemporaryFile(mode='wb+')

    def reset(self):
        self.rc = -1

        self.out = []
        self.err = []

        self.server = []
        self.client = []
        self.regular = []
        self.info = []

    def add(self, command):
        self.cmds.append(command + '\n')

    def save(self):
        foo = open('/tmp/lftp', 'wb+')
        foo.writelines(self.cmds)
        self.script_file.writelines(self.cmds)
        self.script_file.flush()

    def filter(self, content=None):
        filter_hash = { '----' : self.info,
                        '--->' : self.client,
                        '<---' : self.server }
        if content == None:
            content = self.out

        for line in content:
            first = line.split()[0]
            try:
                filter_hash[first].append(line[5:])
            except KeyError:
                self.regular.append(line)

        return self.server, self.client, self.info, self.regular

    def run(self, options="-df", sleep=0):
        self.save()

        if not "f" in options:
            options += " -f"

        # print os.path.abspath(self.script_file.name)
        # with open(os.path.abspath(self.script_file.name)) as f:
        #    for l in f:
        #        print l

        p = popen2.Popen4("/usr/bin/lftp %s %s" % (options, os.path.abspath(self.script_file.name)))

        if sleep:
            time.sleep(sleep)

        self.rc = p.wait()
        self.out = p.fromchild.readlines()

        # Debug output:
        # print self.out

        return self.rc, self.out

    def connect(self, host='localhost', user='anonymous', password='anonymous@localhost.localdomain'):
        self.add("open %s" % host)
        if user == 'anonymous':
            self.add('anon')
        else:
            self.add('user %s %s' % (user, password))

class TestBase(unittest.TestCase):
    def setUp(self):
        unittest.TestCase.setUp(self)
        self.runner = LftpRunner()

    def assertContains(self, haystack, needle, msg):
        self.assert_(needle in haystack, msg)

    def assertDoesNotContain(self, haystack, needle, msg):
        self.assert_(not needle in haystack, msg)

    def assertContainsString(self, haystack, needle, msg):
        found = False

        for item in haystack:
            if needle in item:
                found = True
                return

        if found == False:
            self.fail(msg)

    def assertDoesNotContainString(self, haystack, needle, msg):
        found = False

        for item in haystack:
            if needle in item:
                found = True

        if found == True:
            self.fail(msg)

    def assertFilePermissions(self, file_path, permissions):
        real_perm = stat.S_IMODE(os.stat(file_path)[stat.ST_MODE])
        self.assertEquals(real_perm, permissions, "%o != %o" % (real_perm, permissions))

    def assertSameContent(self, file1, file2):
        # there's probably a pure-python way to do that, but this would be more portable 
        # even across RHEL4
        p = popen2.Popen3("/usr/bin/diff -q %s %s" % (file1, file2))
        self.assertEquals(p.wait(), 0)

    def _ftp_root_put_file(self, filename, path="", contents=""):
        f = open(os.path.join(ftp_root, path, filename), 'w')
        f.write(contents)
        f.flush()
        f.close()

        return f

    def _ftp_root_remove_file(self, filename):
        os.remove(filename)

    def _create_chowned_file(self, filename, path="", mode="0644"):
        newf = self._ftp_root_put_file(filename, path)
        os.chown(newf.name, pwd.getpwnam('lftp-tester').pw_uid, grp.getgrnam('lftp-tester').gr_gid)
        os.chmod(newf.name, mode)

        return newf


class TestLogging(TestBase):
    def setUp(self):
        TestBase.setUp(self)

    def testLogAnon(self):
        " Logging in and out using anonymous login "
        self.runner.connect()
        self.runner.add("ls -l /")
        self.runner.run()
        self.runner.filter()

        self.assertContainsString(self.runner.server, "230 ", "Could not log in")
        self.assertDoesNotContainString(self.runner.server, "530 ", "Could not log in")

    def testLogUser(self):
        " Logging in and out using a local user "
        self.runner.connect(user=ftp_user, password=ftp_pass)
        self.runner.add("ls -l /")
        self.runner.run()
        self.runner.filter()

        self.assertContainsString(self.runner.server, "230 ", "Could not log in")
        # Only 530 is not good enough here, on rhel8 vsftpd receives 530 because of TLS but you can
        # still log in with name and password successfully.
        self.assertDoesNotContainString(self.runner.server, "530 Login incorrect", "Login incorrect")

    def testLogUserBadPassword(self):
        " Logging in and out using a local user with bad password - must not succeed "
        self.runner.connect(user=ftp_user, password="blablableble")
        self.runner.add("ls -l /")
        self.runner.run()
        self.runner.filter()

        self.assertContainsString(self.runner.server, "530 ", "User was able to log in with bad password")


class TestBasicCommands(TestBase):
    def setUp(self):
        TestBase.setUp(self)
        self.file_name = 'test-basic-commands'
        self.file_full_path = os.path.join(ftp_root, self.file_name)
        f = self._ftp_root_put_file(self.file_name, contents="blabla")
        self.assert_(os.path.exists(os.path.join(ftp_root, self.file_name)))

    def tearDown(self):
        f = self._ftp_root_remove_file(os.path.join(ftp_root, self.file_name))
        self.assert_(not os.path.exists(os.path.join(ftp_root, self.file_name)))

    def testLS(self):
        "LS: Tests listing a directory "
        self.runner.connect()
        self.runner.add("ls -l /")
        self.runner.run("-df")
        self.runner.filter()

        self.assertContainsString(self.runner.server, '150 ', 'LS failed')
        self.assertContainsString(self.runner.regular, 'test-basic-commands', 'LS failed')

    def testPWD(self):
        "PWD: Tests printing current directory "
        self.runner.connect()
        self.runner.add("cd pub")
        self.runner.add("pwd")
        self.runner.run("-df")
        self.runner.filter()

        self.assertContainsString(self.runner.client, 'PWD', 'PWD failed')
        self.assertContainsString(self.runner.regular, 'ftp://localhost/pub', 'PWD failed')

    def testCD(self):
        "CD: Tests changing directory "
        self.runner.connect()
        self.runner.add("cd pub")
        self.runner.add("pwd")
        self.runner.run("-df")
        self.runner.filter()

        self.assertContainsString(self.runner.client, 'CWD /pub', 'CD failed')
        self.assertContainsString(self.runner.regular, 'ftp://localhost/pub', 'CD failed')

    def testGET(self):
        "GET: Test downloading a file "
        # get a file from ftp
        self.runner.connect()
        self.runner.add("get %s" % self.file_name)
        self.runner.run("-df")
        self.runner.filter()

        # checks its presence in the CWD, clean up
        self.assert_(os.path.exists(self.file_name), "GET failed")
        os.remove(self.file_name)

    def testPUT(self):
        "PUT: Test uploading a file "
        # create a dummy file
        fname = 'upload-file'
        f = open(fname, 'w')
        f.write('content')
        f.flush()
        f.close()

        # store the file
        self.runner.connect()
        self.runner.add("cd /pub")
        self.runner.add("put %s" % fname)
        self.runner.add("ls /pub/%s" % fname)
        self.runner.run("-df")
        self.runner.filter()

        # check if the store op was successfull
        fullpath = os.path.join(ftp_root, 'pub', fname)
        self.assertContainsString(self.runner.server, '226 ', 'PUT failed')
        self.assert_(os.path.exists(fullpath), 'PUT failed')

        # remove the file from the ftp root
        os.remove(fullpath)
        self.assert_(not os.path.exists(fullpath))

    def testExternalCommand(self):
        " Running external command "
        self.runner.connect()
        self.runner.add("!ls sanity.py")
        self.runner.run()
        self.runner.filter()

        self.assertContains(self.runner.regular, 'sanity.py\n', "Running external command failed")

    def testALIAS(self):
        "ALIAS: Tests defining an alias "
        self.runner.connect()
        self.runner.add("alias dir ls -l")
        self.runner.add("dir")
        self.runner.run()
        self.runner.filter()

        self.assertContainsString(self.runner.client, 'LIST -l', 'Setting alias failed')

    def testCAT(self):
        "CAT: Printing a remote file to std. output "
        # print file
        self.runner.connect()
        self.runner.add("cat %s" % self.file_name)
        self.runner.run()
        self.runner.filter()

        # assert its contents on std. output
        self.assertContainsString(self.runner.regular, 'blabla', 'CAT failed')

    def testCHMOD(self):
        "CHMOD: Changing permissions on a remote file "
        os.chown(self.file_full_path, pwd.getpwnam('lftp-tester').pw_uid, grp.getgrnam('lftp-tester').gr_gid)
        os.chmod(self.file_full_path, 0777)
        old_mode = stat.S_IMODE(os.stat(self.file_full_path)[stat.ST_MODE])

        # change the permission
        self.runner.connect(user=ftp_user, password=ftp_pass)
        self.runner.add("cd %s" % ftp_root)
        self.runner.add("ls -l")
        self.runner.add("ls -l %s" % self.file_name)
        self.runner.add("chmod 0644 %s" % self.file_name)
        self.runner.add("ls -l %s" % self.file_name)
        self.runner.run()
        self.runner.filter()

        # check against the old mode
        self.assertContainsString(self.runner.server, '200 ', 'RMDIR failed')
        new_mode = stat.S_IMODE(os.stat(self.file_full_path)[stat.ST_MODE])
        self.assertNotEqual(old_mode, new_mode, 'CHMOD failed')
        self.assertEquals(new_mode, 0644)

    def testCLS(self):
        "CLS"
        self.runner.connect()
        self.runner.add("cls")
        self.runner.run()
        self.runner.filter()

        self.assertContainsString(self.runner.client, 'LIST', "CLS failed")
        self.assertContains(self.runner.regular, "%s\n" % self.file_name , "CLS failed")

    def testCOMMAND(self):
        "COMMAND: Run a command, ignoring aliases "
        # less is a common defined alias..we try running it as a command, should fail
        self.runner.connect()
        self.runner.add("command less")
        self.runner.run()
        self.runner.filter()

        self.assertContains(self.runner.regular, "Unknown command `less'.\n", "COMMAND failed (or less nor defined as alias)")

    def testECHO(self):
        "ECHO: echo a string "
        # less is a common defined alias..we try running it as a command, should fail
        self.runner.connect()
        self.runner.add("echo foo")
        self.runner.run()
        self.runner.filter()

        self.assertContains(self.runner.regular, "foo\n", "ECHO failed")

    def testFIND(self):
        " FIND: List files in the directory recursively."
        self.runner.connect()
        self.runner.add("find .")
        self.runner.run()
        self.runner.filter()

        self.assertContains(self.runner.regular, "./\n", "FIND failed")
        self.assertContains(self.runner.regular, "./pub/\n", "FIND failed")
        self.assertContains(self.runner.regular, "./%s\n" % self.file_name, "FIND failed")

    def testMKDIR(self):
        "MKDIR: Create remote directories."
        self.runner.connect()
        self.runner.add("mkdir -p /pub/foo/bar")
        self.runner.run()
        self.runner.filter()

        self.assertContainsString(self.runner.server, '257 ', 'MKDIR failed')
        self.assert_(os.path.exists(os.path.join(ftp_root, 'pub', 'foo', 'bar')))
        shutil.rmtree(os.path.join(ftp_root, 'pub', 'foo'))

    def testRMDIR(self):
        "RMDIR: Remove remote directories."
        try:
           os.mkdir(os.path.join(ftp_root, 'pub', 'foo'))
           os.chmod(os.path.join(ftp_root, 'pub', 'foo'), 0777)
           os.chown(os.path.join(ftp_root, 'pub', 'foo'),
                    pwd.getpwnam('lftp-tester').pw_uid, grp.getgrnam('lftp-tester').gr_gid)
        except OSError:
           pass
        self.assert_(os.path.exists(os.path.join(ftp_root, 'pub', 'foo')))

        self.runner.connect(user=ftp_user, password=ftp_pass)
        self.runner.add("cd %s" % ftp_root)
        self.runner.add("ls -l pub")
        self.runner.add("rmdir pub/foo")
        self.runner.run()
        self.runner.filter()

        self.assertContainsString(self.runner.server, '250 ', 'RMDIR failed')
        self.assert_(not os.path.exists(os.path.join(ftp_root, 'pub', 'foo')))

    def testMV(self):
        "MV: rename/move files"
        fname  = 'newfile'
        fname2 = 'newfile-newname'
        f = self._create_chowned_file(fname, 'pub', mode=0777)

        self.runner.connect(user=ftp_user, password=ftp_pass)
        self.runner.add("mv %s/pub/%s %s/pub/%s" % (ftp_root, fname, ftp_root, fname2))
        self.runner.add("ls %s/pub/%s" % (ftp_root, fname2))
        self.runner.run()
        self.runner.filter()

        self.assertContainsString(self.runner.server, '250 ', 'MV failed')
        self.assert_(os.path.exists(os.path.join(ftp_root, 'pub', fname2)))
        # FIXME: verify that the file keeps permissions and content

    def testRM(self):
        "RM: Remove remote files."
        fname  = 'newfile'
        f = self._create_chowned_file(fname, 'pub', mode=0777)

        self.runner.connect(user=ftp_user, password=ftp_pass)
        self.runner.add("rm %s/pub/%s" % (ftp_root, fname))
        self.runner.run()
        self.runner.filter()

        self.assertContainsString(self.runner.server, '250 ', 'RM failed')
        self.assert_(not os.path.exists(os.path.abspath(f.name)))

    def testNLIST(self):
        "NLIST: List remote file names"
        self.runner.connect()
        self.runner.add("nlist /")
        self.runner.run()
        self.runner.filter()

        self.assertContainsString(self.runner.regular, '/test-basic-commands', 'NLIST failed')
        self.assertContainsString(self.runner.regular, '/pub', 'NLIST failed')

    def testRENLIST(self):
        "RENLIST: Same as `nlist', but ignores the cache."
        self.runner.connect()
        self.runner.add("renlist /")
        self.runner.run()
        self.runner.filter()

        self.assertContainsString(self.runner.regular, '/test-basic-commands', 'RENLIST failed')
        self.assertContainsString(self.runner.regular, '/pub', 'NLIST failed')

    def testRELS(self):
        "RELS: Same as ls, but ignores the cache."
        self.runner.connect()
        self.runner.add("rels -l /")
        self.runner.run("-df")
        self.runner.filter()

        self.assertContainsString(self.runner.server, '150 ', 'RELS failed')
        self.assertContainsString(self.runner.regular, 'test-basic-commands', 'RELS failed')

    def testLCD(self):
        "LCD: Change current local directory"
        dirname = 'foodir'
        os.mkdir(dirname)

        self.runner.connect()
        self.runner.add("lcd %s" % dirname)
        self.runner.add("lpwd")
        self.runner.add("lcd -")
        self.runner.run()
        self.runner.filter()

        os.rmdir(dirname)
        self.assertContainsString(self.runner.regular,
                                  os.path.join(os.getcwd(), dirname),
                                  'LCD failed')

    def testLPWD(self):
        "LPWD: Print current working directory on local machine"
        self.runner.connect()
        self.runner.add("lpwd")
        self.runner.run()
        self.runner.filter()

        self.assertContainsString(self.runner.regular, os.getcwd(), 'LPWD failed')

    def testMRM(self):
        "MRM: Remove remote files using wildcards"
        fname  = 'mrmfile'
        self._create_chowned_file(fname + '1', 'pub', mode=0777)
        self._create_chowned_file(fname + '2', 'pub', mode=0777)

        self.runner.connect(user=ftp_user, password=ftp_pass)
        self.runner.add("ls")
        self.runner.add("mrm %s/pub/%s*" % (ftp_root, fname))
        self.runner.run()
        self.runner.filter()

        # assert that we deleted something
        self.assertContainsString(self.runner.server, '250 ', 'MRM failed')
        # assert that we deleted exactly 2 files
        self.assertEquals(len([ str for str in self.runner.server if str == '250 Delete operation successful.\n']), 2)

class TestBOOKMARK(TestBase):
    def testADD(self):
        "ADD:  add current place or given location"
        self.runner.connect()
        self.runner.add("bookmark add pb /pub")
        self.runner.add("bookmark list")
        self.runner.run()
        self.runner.filter()

        self.assertContainsString(self.runner.regular, 'pb\t/pub\n', 'Bookmark add failed')

    def testDEL(self):
        "DEL: remove bookmark with name"
        self.runner.connect()
        self.runner.add("bookmark add pb /pub")
        self.runner.add("bookmark del pb /pub")
        self.runner.add("bookmark list")
        self.runner.run()
        self.runner.filter()

        self.assertDoesNotContainString(self.runner.regular, 'pb\t/pub\n', 'Bookmark del failed')

class TestGET(TestBase):
    def setUp(self):
        TestBase.setUp(self)
        self.fname  = 'newfile'
        self.full_path = os.path.join(ftp_root, 'pub', self.fname)
        self._create_chowned_file(self.fname, 'pub', 0777)

    def testDELETE(self):
        "GET -E: delete source files after successful transfer"
        self.runner.connect(user=ftp_user, password=ftp_pass)
        self.runner.add("get -E %s/pub/%s" % (ftp_root, self.fname))
        self.runner.run()
        self.runner.filter()

        self.assertContainsString(self.runner.server, '226 ', 'RELS failed')
        self.assertContainsString(self.runner.server, '250 ', 'GET -E failed')
        os.remove(self.fname)

    def testASCII(self):
        "GET -a: transfer using ASCII mode"
        self.runner.connect(user=ftp_user, password=ftp_pass)
        self.runner.add("get -a %s/pub/%s" % (ftp_root, self.fname))
        self.runner.run()
        self.runner.filter()

        self.assertContainsString(self.runner.server, 'Opening ASCII mode data connection', 'GET -a failed')
        self.assertContainsString(self.runner.server, '226 ', 'GET -a failed')
        os.remove(self.full_path)
        os.remove(self.fname)

    def testLocalName(self):
        "GET -o: destination file name"
        self.runner.connect(user=ftp_user, password=ftp_pass)
        self.runner.add("get %s/pub/%s -o newfile.local" % (ftp_root, self.fname))
        self.runner.run()
        self.runner.filter()

        self.assertContainsString(self.runner.server, '226 ', 'GET -o failed')
        os.remove('newfile.local')
        os.remove(self.full_path)

    def testLocalPath(self):
        "GET -O: specifies base directory or URL where files should be placed"
        # FIXME: does not work
        print "****** testLocalPath Currently does not work ********"
        #dest = '/tmp'

        #self.runner.connect(user=ftp_user, password=ftp_pass)
        #self.runner.add("get %s/pub/%s -O %s" % (ftp_root, self.fname, dest))
        #self.runner.run()
        #self.runner.filter()

        #self.assertContainsString(self.runner.server, '226 ', 'GET -O failed')
        #os.remove(os.path.join(dest, self.fname))
        #os.remove(self.full_path)

    def testContents(self):
        "GET: tests if get preservers contents"
        self.runner.connect(user=ftp_user, password=ftp_pass)
        self.runner.add("get %s/pub/%s" % (ftp_root, self.fname))
        self.runner.run()
        self.runner.filter()

        self.assertContainsString(self.runner.server, '226 ', 'Contents test failed')
        self.assertSameContent(self.full_path, 'newfile')
        os.remove('newfile')
        os.remove(self.full_path)

class TestPUT(TestBase):
    def setUp(self):
        TestBase.setUp(self)
        self.fname  = 'putfile'
        self.full_path = os.path.join(os.getcwd(), self.fname)
        self.ftp_full_path = os.path.join(ftp_root, 'pub', self.fname)

        f = open(self.fname, 'w')
        f.write('putfile')
        f.flush()
        f.close()
        os.chown(self.fname, pwd.getpwnam('lftp-tester').pw_uid, grp.getgrnam('lftp-tester').gr_gid)

    def testRemoteName(self):
        "PUT -o: specifies remote file name"
        remote_fname = 'someotherfilename'
        self.runner.connect(user=ftp_user, password=ftp_pass)
        self.runner.add("put %s -o %s/pub/%s" % (self.fname, ftp_root, remote_fname))
        self.runner.run()
        self.runner.filter()

        self.assert_(os.path.exists(os.path.join(ftp_root, 'pub', remote_fname)))
        self.assertContainsString(self.runner.server, '226 ', 'PUT -o failed')

        os.remove(self.fname)
        os.remove(os.path.join(ftp_root, 'pub', remote_fname))

    def testDeleteSource(self):
        "PUT -E: delete source files after successful transfer"
        self.runner.connect(user=ftp_user, password=ftp_pass)
        self.runner.add("cd %s/pub" % ftp_root)
        self.runner.add("put -E %s" % (self.fname))
        self.runner.run()
        self.runner.filter()

        self.assert_(os.path.exists(os.path.join(ftp_root, 'pub', self.fname)))
        self.assert_(not os.path.exists(self.full_path))
        self.assertContainsString(self.runner.server, '226 ', 'PUT -E failed')
        os.remove(self.ftp_full_path)

    def testASCII(self):
        "PUT -a: transfer using ASCII mode"
        print "****** testASCII Not yet implemented ********"
        #self.runner.connect(user=ftp_user, password=ftp_pass)
        #self.runner.add("put -a %s" % (self.fname))
        #print self.runner.run()
        #self.runner.filter()

        #self.assert_(os.path.exists(os.path.join(ftp_root, 'pub', self.fname)))
        #self.assertContainsString(self.runner.server, 'Opening ASCII mode data connection', 'GET -a failed')
        #self.assertContainsString(self.runner.server, '226 ', 'PUT -o failed')

        #os.remove(self.fname)
        #os.remove(self.ftp_full_path)

    def testBaseDirectory(self):
        "PUT -O: specifies base directory or URL where files should be placed"
        # FIXME: add some code here
        print "****** testBaseDirectory Not yet implemented ********"

    def testContents(self):
        "PUT: tests if get preservers contents"
        self.runner.connect(user=ftp_user, password=ftp_pass)
        self.runner.add("cd %s/pub" % ftp_root)
        self.runner.add("put %s" % (self.fname))
        self.runner.run()
        self.runner.filter()

        self.assert_(os.path.exists(os.path.join(ftp_root, 'pub', self.fname)))
        self.assertContainsString(self.runner.server, '226 ', 'Contents test failed')
        self.assertSameContent(self.ftp_full_path, self.fname)

        os.remove(self.fname)
        os.remove(self.ftp_full_path)

class TestMGET(TestBase):
    def setUp(self):
        TestBase.setUp(self)
        self.basename = 'mgetfile'
        self.fname1 = 'mgetfile1'
        self.fname2 = 'mgetfile2'
        self.full_path1 = os.path.join(ftp_root, 'pub', self.fname1)
        self.full_path2 = os.path.join(ftp_root, 'pub', self.fname2)
        self._create_chowned_file(self.fname1, 'pub', 0777)
        self._create_chowned_file(self.fname2, 'pub', 0777)

    def _clean_up(self):
        os.remove(self.fname1)
        os.remove(self.fname2)
        os.remove(self.full_path1)
        os.remove(self.full_path2)

    def testDELETE(self):
        "MGET -E: delete source files after successful transfer"
        self.runner.connect(user=ftp_user, password=ftp_pass)
        self.runner.add("mget -E %s/pub/%s*" % (ftp_root, self.basename))
        self.runner.run()
        self.runner.filter()

        self.assertContainsString(self.runner.server, '226 ', 'MGET failed')
        self.assertContainsString(self.runner.server, '250 ', 'MGET failed')
        self.assert_(os.path.exists(self.fname1))
        self.assert_(os.path.exists(self.fname2))

        os.remove(self.fname1)
        os.remove(self.fname2)

    def testASCII(self):
        "MGET -a: transfer using ASCII mode"
        self.runner.connect(user=ftp_user, password=ftp_pass)
        self.runner.add("mget -a %s/pub/%s*" % (ftp_root, self.basename))
        self.runner.run()
        self.runner.filter()

        self.assertContainsString(self.runner.server, 'Opening ASCII mode data connection', 'GET -a failed')
        self.assertContainsString(self.runner.server, '226 ', 'MGET failed')
        self._clean_up()

    def testLocalPath(self):
        "MGET -O: specifies base directory or URL where files should be placed"
        # FIXME: does not work
        print "****** testLocalPath Currently does not work ********"
        #dest = '/tmp'

        #self.runner.connect(user=ftp_user, password=ftp_pass)
        #self.runner.add("mget %s/pub/%s* -O %s" % (ftp_root, self.basename, dest))
        #self.runner.run()
        #self.runner.filter()

        #self.assertContainsString(self.runner.server, '226 ', 'MGET failed')
        #self._clean_up()

    def testContents(self):
        "MGET: tests if get preservers contents"
        self.runner.connect(user=ftp_user, password=ftp_pass)
        self.runner.add("mget %s/pub/%s*" % (ftp_root, self.basename))
        self.runner.run()
        self.runner.filter()

        self.assertContainsString(self.runner.server, '226 ', 'Contents test failed')
        self.assertSameContent(self.full_path1, self.fname1)
        self.assertSameContent(self.full_path2, self.fname2)
        self._clean_up()

class TestMPUT(TestBase):
    def setUp(self):
        TestBase.setUp(self)
        self.basename = 'putfile'
        self.fname1  = 'putfile1'
        self.fname2  = 'putfile2'

        self.ftp_full_path1 = os.path.join(ftp_root, 'pub', self.fname1)
        self.ftp_full_path2 = os.path.join(ftp_root, 'pub', self.fname2)

        for fn in [ self.fname1, self.fname2 ]:
            f = open(fn, 'w')
            f.write(str(fn))
            f.flush()
            f.close()
            os.chown(fn, pwd.getpwnam('lftp-tester').pw_uid, grp.getgrnam('lftp-tester').gr_gid)

    def _clean_up(self):
        os.remove(self.fname1)
        os.remove(self.fname2)
        os.remove(self.ftp_full_path1)
        os.remove(self.ftp_full_path2)

    def testDeleteSource(self):
        "MPUT -E: delete source files after successful transfer"
        self.runner.connect(user=ftp_user, password=ftp_pass)
        self.runner.add("cd %s/pub" % ftp_root)
        self.runner.add("mput -E %s*" % (self.basename))
        self.runner.run()
        self.runner.filter()

        self.assert_(os.path.exists(self.ftp_full_path1))
        self.assert_(os.path.exists(self.ftp_full_path2))
        self.assert_(not os.path.exists(self.fname1))
        self.assert_(not os.path.exists(self.fname2))
        self.assertContains(self.runner.server, '226 Transfer complete.\n', 'MPUT -E failed')
        os.remove(self.ftp_full_path1)
        os.remove(self.ftp_full_path2)

    def testASCII(self):
        "PUT -a: transfer using ASCII mode"
        # FIXME: add some code here
        print "****** testASCII Not yet implemented ********"
        #self.runner.connect(user=ftp_user, password=ftp_pass)
        #self.runner.add("put -a %s" % (self.fname))
        #print self.runner.run()
        #self.runner.filter()

        #self.assert_(os.path.exists(os.path.join(ftp_root, 'pub', self.fname)))
        #self.assertContainsString(self.runner.server, 'Opening ASCII mode data connection', 'GET -a failed')
        #self.assertContainsString(self.runner.server, '226 ', 'PUT -o failed')

        #os.remove(self.fname)
        #os.remove(self.ftp_full_path)

    def testBaseDirectory(self):
        "MPUT -O: specifies base directory or URL where files should be placed"
        # FIXME: add some code here
        print "****** testBaseDirectory Not yet implemented ********"

    def testContents(self):
        "MPUT: tests if mput preservers contents"
        self.runner.connect(user=ftp_user, password=ftp_pass)
        self.runner.add("cd %s/pub" % ftp_root)
        self.runner.add("mput %s*" % (self.basename))
        self.runner.run()
        self.runner.filter()

        self.assertContainsString(self.runner.server, '226 ', 'Contents test failed')
        self.assert_(os.path.exists(self.ftp_full_path1))
        self.assert_(os.path.exists(self.ftp_full_path2))
        self.assertSameContent(self.ftp_full_path1, self.fname1)
        self.assertSameContent(self.ftp_full_path2, self.fname2)

        self._clean_up()

class TestGLOB(TestBase):
    def setUp(self):
        TestBase.setUp(self)
        for f in os.listdir(os.path.join(ftp_root, 'pub')):
            try:
                os.remove(os.path.join(ftp_root, 'pub', f)) 
            except OSError:
                os.rmdir(os.path.join(ftp_root, 'pub', f))

        self.dirpath = os.path.join(ftp_root, 'pub', 'pubdir')
        os.mkdir(self.dirpath)
        self._create_chowned_file('pubfile', 'pub', 0777)

    def testFiles(self):
        " GLOB -f: globbing files only "
        self.runner.connect()
        self.runner.add("glob -f ls pub/*")
        self.runner.run()
        self.runner.filter()

        self.assertContainsString(self.runner.regular, 'pubfile', 'GLOB -f failed')
        self.assertDoesNotContainString(self.runner.regular, 'pubdir', 'GLOB -f failed')

    def testDirectories(self):
        " GLOB -d: globbing directories only "
        # FIXME - does not seem to work
        pass
        #self.runner.connect()
        #self.runner.add("glob -d ls pub/*")
        #self.runner.run()
        #self.runner.filter()

        #self.assertContainsString(self.runner.regular, 'pubdir', 'GLOB -d failed')
        #self.assertDoesNotContainString(self.runner.regular, 'pubfile', 'GLOB -d failed')

    def testAll(self):
        " GLOB -a: globbing everything "
        # FIXME - does not seem to work
        pass
        #self.runner.connect()
        #self.runner.add("glob -d ls pub/*")
        #self.runner.run()
        #self.runner.filter()

        #self.assertContainsString(self.runner.regular, 'pubdir', 'GLOB -a failed')
        #self.assertContainsString(self.runner.regular, 'pubfile', 'GLOB -a failed')

class TestRegression(TestBase):
    def testBundledPerl(self):
        "176175: lftp used to ship its copy of perl-String-CRC32 - see if fixed "
        p = popen2.Popen4("rpm -q --provides lftp")
        p.wait()
        self.assertDoesNotContainString(p.fromchild.readlines(),
                                        "perl-String-CRC32",
                                        "lftp bundles its own copy of perl-String-CRC32")
    def testHTTP_LS(self):
        "171884: first ls returns nothing when using http protocol"
        self.runner.connect(host="http://mirror.centos.org/centos/")
        self.runner.add("ls")
        self.runner.run()
        self.runner.filter()

        self.assert_(len(self.runner.regular) > 0)

class TestQueue(TestBase):
    def setUp(self):
        TestBase.setUp(self)
        self.script_file = tempfile.NamedTemporaryFile(mode='wb+')
        self.runner.connect()
        self.runner.add("queue stop")
        self.runner.add("debug -o %s" % self.script_file.name)

    def _runQueue(self):
        self.runner.add("queue start")
        self.runner.run(sleep=3)
        content = self.script_file.readlines()
        self.runner.filter(content)

    def _listQueue(self):
        self.runner.add("queue")
        self.runner.run()
        self.runner.filter()

    def testInsertBefore(self):
        "QUEUE: insert a command before another one"
        self.runner.add("queue ls")
        self.runner.add("cd /pub")
        self.runner.add("queue -n 1 get somefile")
        self._listQueue()

        self.assertContains(self.runner.regular, ' 1. get somefile\n', 'QUEUE: insert before failed')

    def testRememberLocation(self):
        "QUEUE: tests of queue remembers changing location with cd"
        # This test makes sense for RHEL 5+

        self.runner.add("queue ls")
        self.runner.add("cd /pub")
        self.runner.add("queue ls")
        self._listQueue()

        self.assertContainsString(self.runner.regular, 'cd /pub', 'QUEUE: remember location failed')

    def testDeleteFromQueue(self):
        "QUEUE: delete a command from queue"
        self.runner.add("cd /pub")
        self.runner.add("queue ls")
        self.runner.add("queue -d 1")
        self._runQueue()

        self.assertDoesNotContainString(self.runner.regular, 'LIST\r\n', 'QUEUE: remember location failed')

    def testMoveInQueue(self):
        "QUEUE: move a command in queue from one position to another"
        self.runner.add("queue ls")
        self.runner.add("queue get somefile")
        self.runner.add("queue -m 2 1")
        self._listQueue()

        self.assertContains(self.runner.regular, ' 1. get somefile\n', 'QUEUE: move in queue failed')

def die(code, code_req, msg):
    if code != code_req:
        print >>sys.stderr, msg
        sys.exit(1)

if __name__ == "__main__":
    # get vsftpd root dir
    ftp_root = pwd.getpwnam('ftp').pw_dir

    if os.getuid() != 0:
       print "This test must be run as root"
       sys.exit(1)

    # add user for testing automatically
    (status, output) = commands.getstatusoutput('useradd %s' % (ftp_user))
    die(status, 0, output)
    print "User %s added.." % ftp_user
    (status, output) = commands.getstatusoutput('echo %s | passwd --stdin %s' % (ftp_pass, ftp_user))
    die(status, 0, output)
    ftp_home = pwd.getpwnam(ftp_user).pw_dir
    os.chmod(os.path.join(ftp_root, 'pub'), 0777)

    try:
        # start vsftpd with basic config
        (status, output) = commands.getstatusoutput('service vsftpd restart')
        # run the tests
        unittest.main()
        die(status, 0, output)
    finally:
        # delete the test user
        (status, output) = commands.getstatusoutput('userdel -r %s' % (ftp_user))
        die(status, 0, output)