#!/usr/bin/python
#coding=utf-8
import unittest
import tempfile
import popen2
import os
import stat
import sys
import commands
import pwd
import grp
import shutil
import time
#import rpm
ftp_user = 'lftp-tester'
ftp_pass = 'lftp-tester'
ftp_home = '/home/lftp-tester'
ftp_root = '/var/ftp'
class LftpRunner(object):
def __init__(self):
self.reset()
self.cmds = []
self.script_file = tempfile.NamedTemporaryFile(mode='wb+')
def reset(self):
self.rc = -1
self.out = []
self.err = []
self.server = []
self.client = []
self.regular = []
self.info = []
def add(self, command):
self.cmds.append(command + '\n')
def save(self):
foo = open('/tmp/lftp', 'wb+')
foo.writelines(self.cmds)
self.script_file.writelines(self.cmds)
self.script_file.flush()
def filter(self, content=None):
filter_hash = { '----' : self.info,
'--->' : self.client,
'<---' : self.server }
if content == None:
content = self.out
for line in content:
first = line.split()[0]
try:
filter_hash[first].append(line[5:])
except KeyError:
self.regular.append(line)
return self.server, self.client, self.info, self.regular
def run(self, options="-df", sleep=0):
self.save()
if not "f" in options:
options += " -f"
# print os.path.abspath(self.script_file.name)
# with open(os.path.abspath(self.script_file.name)) as f:
# for l in f:
# print l
p = popen2.Popen4("/usr/bin/lftp %s %s" % (options, os.path.abspath(self.script_file.name)))
if sleep:
time.sleep(sleep)
self.rc = p.wait()
self.out = p.fromchild.readlines()
# Debug output:
# print self.out
return self.rc, self.out
def connect(self, host='localhost', user='anonymous', password='anonymous@localhost.localdomain'):
self.add("open %s" % host)
if user == 'anonymous':
self.add('anon')
else:
self.add('user %s %s' % (user, password))
class TestBase(unittest.TestCase):
def setUp(self):
unittest.TestCase.setUp(self)
self.runner = LftpRunner()
def assertContains(self, haystack, needle, msg):
self.assert_(needle in haystack, msg)
def assertDoesNotContain(self, haystack, needle, msg):
self.assert_(not needle in haystack, msg)
def assertContainsString(self, haystack, needle, msg):
found = False
for item in haystack:
if needle in item:
found = True
return
if found == False:
self.fail(msg)
def assertDoesNotContainString(self, haystack, needle, msg):
found = False
for item in haystack:
if needle in item:
found = True
if found == True:
self.fail(msg)
def assertFilePermissions(self, file_path, permissions):
real_perm = stat.S_IMODE(os.stat(file_path)[stat.ST_MODE])
self.assertEquals(real_perm, permissions, "%o != %o" % (real_perm, permissions))
def assertSameContent(self, file1, file2):
# there's probably a pure-python way to do that, but this would be more portable
# even across RHEL4
p = popen2.Popen3("/usr/bin/diff -q %s %s" % (file1, file2))
self.assertEquals(p.wait(), 0)
def _ftp_root_put_file(self, filename, path="", contents=""):
f = open(os.path.join(ftp_root, path, filename), 'w')
f.write(contents)
f.flush()
f.close()
return f
def _ftp_root_remove_file(self, filename):
os.remove(filename)
def _create_chowned_file(self, filename, path="", mode="0644"):
newf = self._ftp_root_put_file(filename, path)
os.chown(newf.name, pwd.getpwnam('lftp-tester').pw_uid, grp.getgrnam('lftp-tester').gr_gid)
os.chmod(newf.name, mode)
return newf
class TestLogging(TestBase):
def setUp(self):
TestBase.setUp(self)
def testLogAnon(self):
" Logging in and out using anonymous login "
self.runner.connect()
self.runner.add("ls -l /")
self.runner.run()
self.runner.filter()
self.assertContainsString(self.runner.server, "230 ", "Could not log in")
self.assertDoesNotContainString(self.runner.server, "530 ", "Could not log in")
def testLogUser(self):
" Logging in and out using a local user "
self.runner.connect(user=ftp_user, password=ftp_pass)
self.runner.add("ls -l /")
self.runner.run()
self.runner.filter()
self.assertContainsString(self.runner.server, "230 ", "Could not log in")
# Only 530 is not good enough here, on rhel8 vsftpd receives 530 because of TLS but you can
# still log in with name and password successfully.
self.assertDoesNotContainString(self.runner.server, "530 Login incorrect", "Login incorrect")
def testLogUserBadPassword(self):
" Logging in and out using a local user with bad password - must not succeed "
self.runner.connect(user=ftp_user, password="blablableble")
self.runner.add("ls -l /")
self.runner.run()
self.runner.filter()
self.assertContainsString(self.runner.server, "530 ", "User was able to log in with bad password")
class TestBasicCommands(TestBase):
def setUp(self):
TestBase.setUp(self)
self.file_name = 'test-basic-commands'
self.file_full_path = os.path.join(ftp_root, self.file_name)
f = self._ftp_root_put_file(self.file_name, contents="blabla")
self.assert_(os.path.exists(os.path.join(ftp_root, self.file_name)))
def tearDown(self):
f = self._ftp_root_remove_file(os.path.join(ftp_root, self.file_name))
self.assert_(not os.path.exists(os.path.join(ftp_root, self.file_name)))
def testLS(self):
"LS: Tests listing a directory "
self.runner.connect()
self.runner.add("ls -l /")
self.runner.run("-df")
self.runner.filter()
self.assertContainsString(self.runner.server, '150 ', 'LS failed')
self.assertContainsString(self.runner.regular, 'test-basic-commands', 'LS failed')
def testPWD(self):
"PWD: Tests printing current directory "
self.runner.connect()
self.runner.add("cd pub")
self.runner.add("pwd")
self.runner.run("-df")
self.runner.filter()
self.assertContainsString(self.runner.client, 'PWD', 'PWD failed')
self.assertContainsString(self.runner.regular, 'ftp://localhost/pub', 'PWD failed')
def testCD(self):
"CD: Tests changing directory "
self.runner.connect()
self.runner.add("cd pub")
self.runner.add("pwd")
self.runner.run("-df")
self.runner.filter()
self.assertContainsString(self.runner.client, 'CWD /pub', 'CD failed')
self.assertContainsString(self.runner.regular, 'ftp://localhost/pub', 'CD failed')
def testGET(self):
"GET: Test downloading a file "
# get a file from ftp
self.runner.connect()
self.runner.add("get %s" % self.file_name)
self.runner.run("-df")
self.runner.filter()
# checks its presence in the CWD, clean up
self.assert_(os.path.exists(self.file_name), "GET failed")
os.remove(self.file_name)
def testPUT(self):
"PUT: Test uploading a file "
# create a dummy file
fname = 'upload-file'
f = open(fname, 'w')
f.write('content')
f.flush()
f.close()
# store the file
self.runner.connect()
self.runner.add("cd /pub")
self.runner.add("put %s" % fname)
self.runner.add("ls /pub/%s" % fname)
self.runner.run("-df")
self.runner.filter()
# check if the store op was successfull
fullpath = os.path.join(ftp_root, 'pub', fname)
self.assertContainsString(self.runner.server, '226 ', 'PUT failed')
self.assert_(os.path.exists(fullpath), 'PUT failed')
# remove the file from the ftp root
os.remove(fullpath)
self.assert_(not os.path.exists(fullpath))
def testExternalCommand(self):
" Running external command "
self.runner.connect()
self.runner.add("!ls sanity.py")
self.runner.run()
self.runner.filter()
self.assertContains(self.runner.regular, 'sanity.py\n', "Running external command failed")
def testALIAS(self):
"ALIAS: Tests defining an alias "
self.runner.connect()
self.runner.add("alias dir ls -l")
self.runner.add("dir")
self.runner.run()
self.runner.filter()
self.assertContainsString(self.runner.client, 'LIST -l', 'Setting alias failed')
def testCAT(self):
"CAT: Printing a remote file to std. output "
# print file
self.runner.connect()
self.runner.add("cat %s" % self.file_name)
self.runner.run()
self.runner.filter()
# assert its contents on std. output
self.assertContainsString(self.runner.regular, 'blabla', 'CAT failed')
def testCHMOD(self):
"CHMOD: Changing permissions on a remote file "
os.chown(self.file_full_path, pwd.getpwnam('lftp-tester').pw_uid, grp.getgrnam('lftp-tester').gr_gid)
os.chmod(self.file_full_path, 0777)
old_mode = stat.S_IMODE(os.stat(self.file_full_path)[stat.ST_MODE])
# change the permission
self.runner.connect(user=ftp_user, password=ftp_pass)
self.runner.add("cd %s" % ftp_root)
self.runner.add("ls -l")
self.runner.add("ls -l %s" % self.file_name)
self.runner.add("chmod 0644 %s" % self.file_name)
self.runner.add("ls -l %s" % self.file_name)
self.runner.run()
self.runner.filter()
# check against the old mode
self.assertContainsString(self.runner.server, '200 ', 'RMDIR failed')
new_mode = stat.S_IMODE(os.stat(self.file_full_path)[stat.ST_MODE])
self.assertNotEqual(old_mode, new_mode, 'CHMOD failed')
self.assertEquals(new_mode, 0644)
def testCLS(self):
"CLS"
self.runner.connect()
self.runner.add("cls")
self.runner.run()
self.runner.filter()
self.assertContainsString(self.runner.client, 'LIST', "CLS failed")
self.assertContains(self.runner.regular, "%s\n" % self.file_name , "CLS failed")
def testCOMMAND(self):
"COMMAND: Run a command, ignoring aliases "
# less is a common defined alias..we try running it as a command, should fail
self.runner.connect()
self.runner.add("command less")
self.runner.run()
self.runner.filter()
self.assertContains(self.runner.regular, "Unknown command `less'.\n", "COMMAND failed (or less nor defined as alias)")
def testECHO(self):
"ECHO: echo a string "
# less is a common defined alias..we try running it as a command, should fail
self.runner.connect()
self.runner.add("echo foo")
self.runner.run()
self.runner.filter()
self.assertContains(self.runner.regular, "foo\n", "ECHO failed")
def testFIND(self):
" FIND: List files in the directory recursively."
self.runner.connect()
self.runner.add("find .")
self.runner.run()
self.runner.filter()
self.assertContains(self.runner.regular, "./\n", "FIND failed")
self.assertContains(self.runner.regular, "./pub/\n", "FIND failed")
self.assertContains(self.runner.regular, "./%s\n" % self.file_name, "FIND failed")
def testMKDIR(self):
"MKDIR: Create remote directories."
self.runner.connect()
self.runner.add("mkdir -p /pub/foo/bar")
self.runner.run()
self.runner.filter()
self.assertContainsString(self.runner.server, '257 ', 'MKDIR failed')
self.assert_(os.path.exists(os.path.join(ftp_root, 'pub', 'foo', 'bar')))
shutil.rmtree(os.path.join(ftp_root, 'pub', 'foo'))
def testRMDIR(self):
"RMDIR: Remove remote directories."
try:
os.mkdir(os.path.join(ftp_root, 'pub', 'foo'))
os.chmod(os.path.join(ftp_root, 'pub', 'foo'), 0777)
os.chown(os.path.join(ftp_root, 'pub', 'foo'),
pwd.getpwnam('lftp-tester').pw_uid, grp.getgrnam('lftp-tester').gr_gid)
except OSError:
pass
self.assert_(os.path.exists(os.path.join(ftp_root, 'pub', 'foo')))
self.runner.connect(user=ftp_user, password=ftp_pass)
self.runner.add("cd %s" % ftp_root)
self.runner.add("ls -l pub")
self.runner.add("rmdir pub/foo")
self.runner.run()
self.runner.filter()
self.assertContainsString(self.runner.server, '250 ', 'RMDIR failed')
self.assert_(not os.path.exists(os.path.join(ftp_root, 'pub', 'foo')))
def testMV(self):
"MV: rename/move files"
fname = 'newfile'
fname2 = 'newfile-newname'
f = self._create_chowned_file(fname, 'pub', mode=0777)
self.runner.connect(user=ftp_user, password=ftp_pass)
self.runner.add("mv %s/pub/%s %s/pub/%s" % (ftp_root, fname, ftp_root, fname2))
self.runner.add("ls %s/pub/%s" % (ftp_root, fname2))
self.runner.run()
self.runner.filter()
self.assertContainsString(self.runner.server, '250 ', 'MV failed')
self.assert_(os.path.exists(os.path.join(ftp_root, 'pub', fname2)))
# FIXME: verify that the file keeps permissions and content
def testRM(self):
"RM: Remove remote files."
fname = 'newfile'
f = self._create_chowned_file(fname, 'pub', mode=0777)
self.runner.connect(user=ftp_user, password=ftp_pass)
self.runner.add("rm %s/pub/%s" % (ftp_root, fname))
self.runner.run()
self.runner.filter()
self.assertContainsString(self.runner.server, '250 ', 'RM failed')
self.assert_(not os.path.exists(os.path.abspath(f.name)))
def testNLIST(self):
"NLIST: List remote file names"
self.runner.connect()
self.runner.add("nlist /")
self.runner.run()
self.runner.filter()
self.assertContainsString(self.runner.regular, '/test-basic-commands', 'NLIST failed')
self.assertContainsString(self.runner.regular, '/pub', 'NLIST failed')
def testRENLIST(self):
"RENLIST: Same as `nlist', but ignores the cache."
self.runner.connect()
self.runner.add("renlist /")
self.runner.run()
self.runner.filter()
self.assertContainsString(self.runner.regular, '/test-basic-commands', 'RENLIST failed')
self.assertContainsString(self.runner.regular, '/pub', 'NLIST failed')
def testRELS(self):
"RELS: Same as ls, but ignores the cache."
self.runner.connect()
self.runner.add("rels -l /")
self.runner.run("-df")
self.runner.filter()
self.assertContainsString(self.runner.server, '150 ', 'RELS failed')
self.assertContainsString(self.runner.regular, 'test-basic-commands', 'RELS failed')
def testLCD(self):
"LCD: Change current local directory"
dirname = 'foodir'
os.mkdir(dirname)
self.runner.connect()
self.runner.add("lcd %s" % dirname)
self.runner.add("lpwd")
self.runner.add("lcd -")
self.runner.run()
self.runner.filter()
os.rmdir(dirname)
self.assertContainsString(self.runner.regular,
os.path.join(os.getcwd(), dirname),
'LCD failed')
def testLPWD(self):
"LPWD: Print current working directory on local machine"
self.runner.connect()
self.runner.add("lpwd")
self.runner.run()
self.runner.filter()
self.assertContainsString(self.runner.regular, os.getcwd(), 'LPWD failed')
def testMRM(self):
"MRM: Remove remote files using wildcards"
fname = 'mrmfile'
self._create_chowned_file(fname + '1', 'pub', mode=0777)
self._create_chowned_file(fname + '2', 'pub', mode=0777)
self.runner.connect(user=ftp_user, password=ftp_pass)
self.runner.add("ls")
self.runner.add("mrm %s/pub/%s*" % (ftp_root, fname))
self.runner.run()
self.runner.filter()
# assert that we deleted something
self.assertContainsString(self.runner.server, '250 ', 'MRM failed')
# assert that we deleted exactly 2 files
self.assertEquals(len([ str for str in self.runner.server if str == '250 Delete operation successful.\n']), 2)
class TestBOOKMARK(TestBase):
def testADD(self):
"ADD: add current place or given location"
self.runner.connect()
self.runner.add("bookmark add pb /pub")
self.runner.add("bookmark list")
self.runner.run()
self.runner.filter()
self.assertContainsString(self.runner.regular, 'pb\t/pub\n', 'Bookmark add failed')
def testDEL(self):
"DEL: remove bookmark with name"
self.runner.connect()
self.runner.add("bookmark add pb /pub")
self.runner.add("bookmark del pb /pub")
self.runner.add("bookmark list")
self.runner.run()
self.runner.filter()
self.assertDoesNotContainString(self.runner.regular, 'pb\t/pub\n', 'Bookmark del failed')
class TestGET(TestBase):
def setUp(self):
TestBase.setUp(self)
self.fname = 'newfile'
self.full_path = os.path.join(ftp_root, 'pub', self.fname)
self._create_chowned_file(self.fname, 'pub', 0777)
def testDELETE(self):
"GET -E: delete source files after successful transfer"
self.runner.connect(user=ftp_user, password=ftp_pass)
self.runner.add("get -E %s/pub/%s" % (ftp_root, self.fname))
self.runner.run()
self.runner.filter()
self.assertContainsString(self.runner.server, '226 ', 'RELS failed')
self.assertContainsString(self.runner.server, '250 ', 'GET -E failed')
os.remove(self.fname)
def testASCII(self):
"GET -a: transfer using ASCII mode"
self.runner.connect(user=ftp_user, password=ftp_pass)
self.runner.add("get -a %s/pub/%s" % (ftp_root, self.fname))
self.runner.run()
self.runner.filter()
self.assertContainsString(self.runner.server, 'Opening ASCII mode data connection', 'GET -a failed')
self.assertContainsString(self.runner.server, '226 ', 'GET -a failed')
os.remove(self.full_path)
os.remove(self.fname)
def testLocalName(self):
"GET -o: destination file name"
self.runner.connect(user=ftp_user, password=ftp_pass)
self.runner.add("get %s/pub/%s -o newfile.local" % (ftp_root, self.fname))
self.runner.run()
self.runner.filter()
self.assertContainsString(self.runner.server, '226 ', 'GET -o failed')
os.remove('newfile.local')
os.remove(self.full_path)
def testLocalPath(self):
"GET -O: specifies base directory or URL where files should be placed"
# FIXME: does not work
print "****** testLocalPath Currently does not work ********"
#dest = '/tmp'
#self.runner.connect(user=ftp_user, password=ftp_pass)
#self.runner.add("get %s/pub/%s -O %s" % (ftp_root, self.fname, dest))
#self.runner.run()
#self.runner.filter()
#self.assertContainsString(self.runner.server, '226 ', 'GET -O failed')
#os.remove(os.path.join(dest, self.fname))
#os.remove(self.full_path)
def testContents(self):
"GET: tests if get preservers contents"
self.runner.connect(user=ftp_user, password=ftp_pass)
self.runner.add("get %s/pub/%s" % (ftp_root, self.fname))
self.runner.run()
self.runner.filter()
self.assertContainsString(self.runner.server, '226 ', 'Contents test failed')
self.assertSameContent(self.full_path, 'newfile')
os.remove('newfile')
os.remove(self.full_path)
class TestPUT(TestBase):
def setUp(self):
TestBase.setUp(self)
self.fname = 'putfile'
self.full_path = os.path.join(os.getcwd(), self.fname)
self.ftp_full_path = os.path.join(ftp_root, 'pub', self.fname)
f = open(self.fname, 'w')
f.write('putfile')
f.flush()
f.close()
os.chown(self.fname, pwd.getpwnam('lftp-tester').pw_uid, grp.getgrnam('lftp-tester').gr_gid)
def testRemoteName(self):
"PUT -o: specifies remote file name"
remote_fname = 'someotherfilename'
self.runner.connect(user=ftp_user, password=ftp_pass)
self.runner.add("put %s -o %s/pub/%s" % (self.fname, ftp_root, remote_fname))
self.runner.run()
self.runner.filter()
self.assert_(os.path.exists(os.path.join(ftp_root, 'pub', remote_fname)))
self.assertContainsString(self.runner.server, '226 ', 'PUT -o failed')
os.remove(self.fname)
os.remove(os.path.join(ftp_root, 'pub', remote_fname))
def testDeleteSource(self):
"PUT -E: delete source files after successful transfer"
self.runner.connect(user=ftp_user, password=ftp_pass)
self.runner.add("cd %s/pub" % ftp_root)
self.runner.add("put -E %s" % (self.fname))
self.runner.run()
self.runner.filter()
self.assert_(os.path.exists(os.path.join(ftp_root, 'pub', self.fname)))
self.assert_(not os.path.exists(self.full_path))
self.assertContainsString(self.runner.server, '226 ', 'PUT -E failed')
os.remove(self.ftp_full_path)
def testASCII(self):
"PUT -a: transfer using ASCII mode"
print "****** testASCII Not yet implemented ********"
#self.runner.connect(user=ftp_user, password=ftp_pass)
#self.runner.add("put -a %s" % (self.fname))
#print self.runner.run()
#self.runner.filter()
#self.assert_(os.path.exists(os.path.join(ftp_root, 'pub', self.fname)))
#self.assertContainsString(self.runner.server, 'Opening ASCII mode data connection', 'GET -a failed')
#self.assertContainsString(self.runner.server, '226 ', 'PUT -o failed')
#os.remove(self.fname)
#os.remove(self.ftp_full_path)
def testBaseDirectory(self):
"PUT -O: specifies base directory or URL where files should be placed"
# FIXME: add some code here
print "****** testBaseDirectory Not yet implemented ********"
def testContents(self):
"PUT: tests if get preservers contents"
self.runner.connect(user=ftp_user, password=ftp_pass)
self.runner.add("cd %s/pub" % ftp_root)
self.runner.add("put %s" % (self.fname))
self.runner.run()
self.runner.filter()
self.assert_(os.path.exists(os.path.join(ftp_root, 'pub', self.fname)))
self.assertContainsString(self.runner.server, '226 ', 'Contents test failed')
self.assertSameContent(self.ftp_full_path, self.fname)
os.remove(self.fname)
os.remove(self.ftp_full_path)
class TestMGET(TestBase):
def setUp(self):
TestBase.setUp(self)
self.basename = 'mgetfile'
self.fname1 = 'mgetfile1'
self.fname2 = 'mgetfile2'
self.full_path1 = os.path.join(ftp_root, 'pub', self.fname1)
self.full_path2 = os.path.join(ftp_root, 'pub', self.fname2)
self._create_chowned_file(self.fname1, 'pub', 0777)
self._create_chowned_file(self.fname2, 'pub', 0777)
def _clean_up(self):
os.remove(self.fname1)
os.remove(self.fname2)
os.remove(self.full_path1)
os.remove(self.full_path2)
def testDELETE(self):
"MGET -E: delete source files after successful transfer"
self.runner.connect(user=ftp_user, password=ftp_pass)
self.runner.add("mget -E %s/pub/%s*" % (ftp_root, self.basename))
self.runner.run()
self.runner.filter()
self.assertContainsString(self.runner.server, '226 ', 'MGET failed')
self.assertContainsString(self.runner.server, '250 ', 'MGET failed')
self.assert_(os.path.exists(self.fname1))
self.assert_(os.path.exists(self.fname2))
os.remove(self.fname1)
os.remove(self.fname2)
def testASCII(self):
"MGET -a: transfer using ASCII mode"
self.runner.connect(user=ftp_user, password=ftp_pass)
self.runner.add("mget -a %s/pub/%s*" % (ftp_root, self.basename))
self.runner.run()
self.runner.filter()
self.assertContainsString(self.runner.server, 'Opening ASCII mode data connection', 'GET -a failed')
self.assertContainsString(self.runner.server, '226 ', 'MGET failed')
self._clean_up()
def testLocalPath(self):
"MGET -O: specifies base directory or URL where files should be placed"
# FIXME: does not work
print "****** testLocalPath Currently does not work ********"
#dest = '/tmp'
#self.runner.connect(user=ftp_user, password=ftp_pass)
#self.runner.add("mget %s/pub/%s* -O %s" % (ftp_root, self.basename, dest))
#self.runner.run()
#self.runner.filter()
#self.assertContainsString(self.runner.server, '226 ', 'MGET failed')
#self._clean_up()
def testContents(self):
"MGET: tests if get preservers contents"
self.runner.connect(user=ftp_user, password=ftp_pass)
self.runner.add("mget %s/pub/%s*" % (ftp_root, self.basename))
self.runner.run()
self.runner.filter()
self.assertContainsString(self.runner.server, '226 ', 'Contents test failed')
self.assertSameContent(self.full_path1, self.fname1)
self.assertSameContent(self.full_path2, self.fname2)
self._clean_up()
class TestMPUT(TestBase):
def setUp(self):
TestBase.setUp(self)
self.basename = 'putfile'
self.fname1 = 'putfile1'
self.fname2 = 'putfile2'
self.ftp_full_path1 = os.path.join(ftp_root, 'pub', self.fname1)
self.ftp_full_path2 = os.path.join(ftp_root, 'pub', self.fname2)
for fn in [ self.fname1, self.fname2 ]:
f = open(fn, 'w')
f.write(str(fn))
f.flush()
f.close()
os.chown(fn, pwd.getpwnam('lftp-tester').pw_uid, grp.getgrnam('lftp-tester').gr_gid)
def _clean_up(self):
os.remove(self.fname1)
os.remove(self.fname2)
os.remove(self.ftp_full_path1)
os.remove(self.ftp_full_path2)
def testDeleteSource(self):
"MPUT -E: delete source files after successful transfer"
self.runner.connect(user=ftp_user, password=ftp_pass)
self.runner.add("cd %s/pub" % ftp_root)
self.runner.add("mput -E %s*" % (self.basename))
self.runner.run()
self.runner.filter()
self.assert_(os.path.exists(self.ftp_full_path1))
self.assert_(os.path.exists(self.ftp_full_path2))
self.assert_(not os.path.exists(self.fname1))
self.assert_(not os.path.exists(self.fname2))
self.assertContains(self.runner.server, '226 Transfer complete.\n', 'MPUT -E failed')
os.remove(self.ftp_full_path1)
os.remove(self.ftp_full_path2)
def testASCII(self):
"PUT -a: transfer using ASCII mode"
# FIXME: add some code here
print "****** testASCII Not yet implemented ********"
#self.runner.connect(user=ftp_user, password=ftp_pass)
#self.runner.add("put -a %s" % (self.fname))
#print self.runner.run()
#self.runner.filter()
#self.assert_(os.path.exists(os.path.join(ftp_root, 'pub', self.fname)))
#self.assertContainsString(self.runner.server, 'Opening ASCII mode data connection', 'GET -a failed')
#self.assertContainsString(self.runner.server, '226 ', 'PUT -o failed')
#os.remove(self.fname)
#os.remove(self.ftp_full_path)
def testBaseDirectory(self):
"MPUT -O: specifies base directory or URL where files should be placed"
# FIXME: add some code here
print "****** testBaseDirectory Not yet implemented ********"
def testContents(self):
"MPUT: tests if mput preservers contents"
self.runner.connect(user=ftp_user, password=ftp_pass)
self.runner.add("cd %s/pub" % ftp_root)
self.runner.add("mput %s*" % (self.basename))
self.runner.run()
self.runner.filter()
self.assertContainsString(self.runner.server, '226 ', 'Contents test failed')
self.assert_(os.path.exists(self.ftp_full_path1))
self.assert_(os.path.exists(self.ftp_full_path2))
self.assertSameContent(self.ftp_full_path1, self.fname1)
self.assertSameContent(self.ftp_full_path2, self.fname2)
self._clean_up()
class TestGLOB(TestBase):
def setUp(self):
TestBase.setUp(self)
for f in os.listdir(os.path.join(ftp_root, 'pub')):
try:
os.remove(os.path.join(ftp_root, 'pub', f))
except OSError:
os.rmdir(os.path.join(ftp_root, 'pub', f))
self.dirpath = os.path.join(ftp_root, 'pub', 'pubdir')
os.mkdir(self.dirpath)
self._create_chowned_file('pubfile', 'pub', 0777)
def testFiles(self):
" GLOB -f: globbing files only "
self.runner.connect()
self.runner.add("glob -f ls pub/*")
self.runner.run()
self.runner.filter()
self.assertContainsString(self.runner.regular, 'pubfile', 'GLOB -f failed')
self.assertDoesNotContainString(self.runner.regular, 'pubdir', 'GLOB -f failed')
def testDirectories(self):
" GLOB -d: globbing directories only "
# FIXME - does not seem to work
pass
#self.runner.connect()
#self.runner.add("glob -d ls pub/*")
#self.runner.run()
#self.runner.filter()
#self.assertContainsString(self.runner.regular, 'pubdir', 'GLOB -d failed')
#self.assertDoesNotContainString(self.runner.regular, 'pubfile', 'GLOB -d failed')
def testAll(self):
" GLOB -a: globbing everything "
# FIXME - does not seem to work
pass
#self.runner.connect()
#self.runner.add("glob -d ls pub/*")
#self.runner.run()
#self.runner.filter()
#self.assertContainsString(self.runner.regular, 'pubdir', 'GLOB -a failed')
#self.assertContainsString(self.runner.regular, 'pubfile', 'GLOB -a failed')
class TestRegression(TestBase):
def testBundledPerl(self):
"176175: lftp used to ship its copy of perl-String-CRC32 - see if fixed "
p = popen2.Popen4("rpm -q --provides lftp")
p.wait()
self.assertDoesNotContainString(p.fromchild.readlines(),
"perl-String-CRC32",
"lftp bundles its own copy of perl-String-CRC32")
def testHTTP_LS(self):
"171884: first ls returns nothing when using http protocol"
self.runner.connect(host="http://mirror.centos.org/centos/")
self.runner.add("ls")
self.runner.run()
self.runner.filter()
self.assert_(len(self.runner.regular) > 0)
class TestQueue(TestBase):
def setUp(self):
TestBase.setUp(self)
self.script_file = tempfile.NamedTemporaryFile(mode='wb+')
self.runner.connect()
self.runner.add("queue stop")
self.runner.add("debug -o %s" % self.script_file.name)
def _runQueue(self):
self.runner.add("queue start")
self.runner.run(sleep=3)
content = self.script_file.readlines()
self.runner.filter(content)
def _listQueue(self):
self.runner.add("queue")
self.runner.run()
self.runner.filter()
def testInsertBefore(self):
"QUEUE: insert a command before another one"
self.runner.add("queue ls")
self.runner.add("cd /pub")
self.runner.add("queue -n 1 get somefile")
self._listQueue()
self.assertContains(self.runner.regular, ' 1. get somefile\n', 'QUEUE: insert before failed')
def testRememberLocation(self):
"QUEUE: tests of queue remembers changing location with cd"
# This test makes sense for RHEL 5+
self.runner.add("queue ls")
self.runner.add("cd /pub")
self.runner.add("queue ls")
self._listQueue()
self.assertContainsString(self.runner.regular, 'cd /pub', 'QUEUE: remember location failed')
def testDeleteFromQueue(self):
"QUEUE: delete a command from queue"
self.runner.add("cd /pub")
self.runner.add("queue ls")
self.runner.add("queue -d 1")
self._runQueue()
self.assertDoesNotContainString(self.runner.regular, 'LIST\r\n', 'QUEUE: remember location failed')
def testMoveInQueue(self):
"QUEUE: move a command in queue from one position to another"
self.runner.add("queue ls")
self.runner.add("queue get somefile")
self.runner.add("queue -m 2 1")
self._listQueue()
self.assertContains(self.runner.regular, ' 1. get somefile\n', 'QUEUE: move in queue failed')
def die(code, code_req, msg):
if code != code_req:
print >>sys.stderr, msg
sys.exit(1)
if __name__ == "__main__":
# get vsftpd root dir
ftp_root = pwd.getpwnam('ftp').pw_dir
if os.getuid() != 0:
print "This test must be run as root"
sys.exit(1)
# add user for testing automatically
(status, output) = commands.getstatusoutput('useradd %s' % (ftp_user))
die(status, 0, output)
print "User %s added.." % ftp_user
(status, output) = commands.getstatusoutput('echo %s | passwd --stdin %s' % (ftp_pass, ftp_user))
die(status, 0, output)
ftp_home = pwd.getpwnam(ftp_user).pw_dir
os.chmod(os.path.join(ftp_root, 'pub'), 0777)
try:
# start vsftpd with basic config
(status, output) = commands.getstatusoutput('service vsftpd restart')
# run the tests
unittest.main()
die(status, 0, output)
finally:
# delete the test user
(status, output) = commands.getstatusoutput('userdel -r %s' % (ftp_user))
die(status, 0, output)