|
|
8de75c8 |
#!/usr/bin/python
|
|
|
8de75c8 |
#coding=utf-8
|
|
|
8de75c8 |
|
|
|
8de75c8 |
import unittest
|
|
|
8de75c8 |
import tempfile
|
|
|
8de75c8 |
import popen2
|
|
|
8de75c8 |
import os
|
|
|
8de75c8 |
import stat
|
|
|
8de75c8 |
import sys
|
|
|
8de75c8 |
import commands
|
|
|
8de75c8 |
import pwd
|
|
|
8de75c8 |
import grp
|
|
|
8de75c8 |
import shutil
|
|
|
8de75c8 |
import time
|
|
|
8de75c8 |
#import rpm
|
|
|
8de75c8 |
|
|
|
8de75c8 |
ftp_user = 'lftp-tester'
|
|
|
8de75c8 |
ftp_pass = 'lftp-tester'
|
|
|
8de75c8 |
ftp_home = '/home/lftp-tester'
|
|
|
8de75c8 |
ftp_root = '/var/ftp'
|
|
|
8de75c8 |
|
|
|
8de75c8 |
class LftpRunner(object):
|
|
|
8de75c8 |
def __init__(self):
|
|
|
8de75c8 |
self.reset()
|
|
|
8de75c8 |
self.cmds = []
|
|
|
8de75c8 |
self.script_file = tempfile.NamedTemporaryFile(mode='wb+')
|
|
|
8de75c8 |
|
|
|
8de75c8 |
def reset(self):
|
|
|
8de75c8 |
self.rc = -1
|
|
|
8de75c8 |
|
|
|
8de75c8 |
self.out = []
|
|
|
8de75c8 |
self.err = []
|
|
|
8de75c8 |
|
|
|
8de75c8 |
self.server = []
|
|
|
8de75c8 |
self.client = []
|
|
|
8de75c8 |
self.regular = []
|
|
|
8de75c8 |
self.info = []
|
|
|
8de75c8 |
|
|
|
8de75c8 |
def add(self, command):
|
|
|
8de75c8 |
self.cmds.append(command + '\n')
|
|
|
8de75c8 |
|
|
|
8de75c8 |
def save(self):
|
|
|
8de75c8 |
foo = open('/tmp/lftp', 'wb+')
|
|
|
8de75c8 |
foo.writelines(self.cmds)
|
|
|
8de75c8 |
self.script_file.writelines(self.cmds)
|
|
|
8de75c8 |
self.script_file.flush()
|
|
|
8de75c8 |
|
|
|
8de75c8 |
def filter(self, content=None):
|
|
|
8de75c8 |
filter_hash = { '----' : self.info,
|
|
|
8de75c8 |
'--->' : self.client,
|
|
|
8de75c8 |
'<---' : self.server }
|
|
|
8de75c8 |
if content == None:
|
|
|
8de75c8 |
content = self.out
|
|
|
8de75c8 |
|
|
|
8de75c8 |
for line in content:
|
|
|
8de75c8 |
first = line.split()[0]
|
|
|
8de75c8 |
try:
|
|
|
8de75c8 |
filter_hash[first].append(line[5:])
|
|
|
8de75c8 |
except KeyError:
|
|
|
8de75c8 |
self.regular.append(line)
|
|
|
8de75c8 |
|
|
|
8de75c8 |
return self.server, self.client, self.info, self.regular
|
|
|
8de75c8 |
|
|
|
8de75c8 |
def run(self, options="-df", sleep=0):
|
|
|
8de75c8 |
self.save()
|
|
|
8de75c8 |
|
|
|
8de75c8 |
if not "f" in options:
|
|
|
8de75c8 |
options += " -f"
|
|
|
8de75c8 |
|
|
|
8de75c8 |
# print os.path.abspath(self.script_file.name)
|
|
|
8de75c8 |
# with open(os.path.abspath(self.script_file.name)) as f:
|
|
|
8de75c8 |
# for l in f:
|
|
|
8de75c8 |
# print l
|
|
|
8de75c8 |
|
|
|
8de75c8 |
p = popen2.Popen4("/usr/bin/lftp %s %s" % (options, os.path.abspath(self.script_file.name)))
|
|
|
8de75c8 |
|
|
|
8de75c8 |
if sleep:
|
|
|
8de75c8 |
time.sleep(sleep)
|
|
|
8de75c8 |
|
|
|
8de75c8 |
self.rc = p.wait()
|
|
|
8de75c8 |
self.out = p.fromchild.readlines()
|
|
|
8de75c8 |
|
|
|
8de75c8 |
# Debug output:
|
|
|
8de75c8 |
# print self.out
|
|
|
8de75c8 |
|
|
|
8de75c8 |
return self.rc, self.out
|
|
|
8de75c8 |
|
|
|
8de75c8 |
def connect(self, host='localhost', user='anonymous', password='anonymous@localhost.localdomain'):
|
|
|
8de75c8 |
self.add("open %s" % host)
|
|
|
8de75c8 |
if user == 'anonymous':
|
|
|
8de75c8 |
self.add('anon')
|
|
|
8de75c8 |
else:
|
|
|
8de75c8 |
self.add('user %s %s' % (user, password))
|
|
|
8de75c8 |
|
|
|
8de75c8 |
class TestBase(unittest.TestCase):
|
|
|
8de75c8 |
def setUp(self):
|
|
|
8de75c8 |
unittest.TestCase.setUp(self)
|
|
|
8de75c8 |
self.runner = LftpRunner()
|
|
|
8de75c8 |
|
|
|
8de75c8 |
def assertContains(self, haystack, needle, msg):
|
|
|
8de75c8 |
self.assert_(needle in haystack, msg)
|
|
|
8de75c8 |
|
|
|
8de75c8 |
def assertDoesNotContain(self, haystack, needle, msg):
|
|
|
8de75c8 |
self.assert_(not needle in haystack, msg)
|
|
|
8de75c8 |
|
|
|
8de75c8 |
def assertContainsString(self, haystack, needle, msg):
|
|
|
8de75c8 |
found = False
|
|
|
8de75c8 |
|
|
|
8de75c8 |
for item in haystack:
|
|
|
8de75c8 |
if needle in item:
|
|
|
8de75c8 |
found = True
|
|
|
8de75c8 |
return
|
|
|
8de75c8 |
|
|
|
8de75c8 |
if found == False:
|
|
|
8de75c8 |
self.fail(msg)
|
|
|
8de75c8 |
|
|
|
8de75c8 |
def assertDoesNotContainString(self, haystack, needle, msg):
|
|
|
8de75c8 |
found = False
|
|
|
8de75c8 |
|
|
|
8de75c8 |
for item in haystack:
|
|
|
8de75c8 |
if needle in item:
|
|
|
8de75c8 |
found = True
|
|
|
8de75c8 |
|
|
|
8de75c8 |
if found == True:
|
|
|
8de75c8 |
self.fail(msg)
|
|
|
8de75c8 |
|
|
|
8de75c8 |
def assertFilePermissions(self, file_path, permissions):
|
|
|
8de75c8 |
real_perm = stat.S_IMODE(os.stat(file_path)[stat.ST_MODE])
|
|
|
8de75c8 |
self.assertEquals(real_perm, permissions, "%o != %o" % (real_perm, permissions))
|
|
|
8de75c8 |
|
|
|
8de75c8 |
def assertSameContent(self, file1, file2):
|
|
|
8de75c8 |
# there's probably a pure-python way to do that, but this would be more portable
|
|
|
8de75c8 |
# even across RHEL4
|
|
|
8de75c8 |
p = popen2.Popen3("/usr/bin/diff -q %s %s" % (file1, file2))
|
|
|
8de75c8 |
self.assertEquals(p.wait(), 0)
|
|
|
8de75c8 |
|
|
|
8de75c8 |
def _ftp_root_put_file(self, filename, path="", contents=""):
|
|
|
8de75c8 |
f = open(os.path.join(ftp_root, path, filename), 'w')
|
|
|
8de75c8 |
f.write(contents)
|
|
|
8de75c8 |
f.flush()
|
|
|
8de75c8 |
f.close()
|
|
|
8de75c8 |
|
|
|
8de75c8 |
return f
|
|
|
8de75c8 |
|
|
|
8de75c8 |
def _ftp_root_remove_file(self, filename):
|
|
|
8de75c8 |
os.remove(filename)
|
|
|
8de75c8 |
|
|
|
8de75c8 |
def _create_chowned_file(self, filename, path="", mode="0644"):
|
|
|
8de75c8 |
newf = self._ftp_root_put_file(filename, path)
|
|
|
8de75c8 |
os.chown(newf.name, pwd.getpwnam('lftp-tester').pw_uid, grp.getgrnam('lftp-tester').gr_gid)
|
|
|
8de75c8 |
os.chmod(newf.name, mode)
|
|
|
8de75c8 |
|
|
|
8de75c8 |
return newf
|
|
|
8de75c8 |
|
|
|
8de75c8 |
|
|
|
8de75c8 |
class TestLogging(TestBase):
|
|
|
8de75c8 |
def setUp(self):
|
|
|
8de75c8 |
TestBase.setUp(self)
|
|
|
8de75c8 |
|
|
|
8de75c8 |
def testLogAnon(self):
|
|
|
8de75c8 |
" Logging in and out using anonymous login "
|
|
|
8de75c8 |
self.runner.connect()
|
|
|
8de75c8 |
self.runner.add("ls -l /")
|
|
|
8de75c8 |
self.runner.run()
|
|
|
8de75c8 |
self.runner.filter()
|
|
|
8de75c8 |
|
|
|
8de75c8 |
self.assertContainsString(self.runner.server, "230 ", "Could not log in")
|
|
|
8de75c8 |
self.assertDoesNotContainString(self.runner.server, "530 ", "Could not log in")
|
|
|
8de75c8 |
|
|
|
8de75c8 |
def testLogUser(self):
|
|
|
8de75c8 |
" Logging in and out using a local user "
|
|
|
8de75c8 |
self.runner.connect(user=ftp_user, password=ftp_pass)
|
|
|
8de75c8 |
self.runner.add("ls -l /")
|
|
|
8de75c8 |
self.runner.run()
|
|
|
8de75c8 |
self.runner.filter()
|
|
|
8de75c8 |
|
|
|
8de75c8 |
self.assertContainsString(self.runner.server, "230 ", "Could not log in")
|
|
|
8de75c8 |
# Only 530 is not good enough here, on rhel8 vsftpd receives 530 because of TLS but you can
|
|
|
8de75c8 |
# still log in with name and password successfully.
|
|
|
8de75c8 |
self.assertDoesNotContainString(self.runner.server, "530 Login incorrect", "Login incorrect")
|
|
|
8de75c8 |
|
|
|
8de75c8 |
def testLogUserBadPassword(self):
|
|
|
8de75c8 |
" Logging in and out using a local user with bad password - must not succeed "
|
|
|
8de75c8 |
self.runner.connect(user=ftp_user, password="blablableble")
|
|
|
8de75c8 |
self.runner.add("ls -l /")
|
|
|
8de75c8 |
self.runner.run()
|
|
|
8de75c8 |
self.runner.filter()
|
|
|
8de75c8 |
|
|
|
8de75c8 |
self.assertContainsString(self.runner.server, "530 ", "User was able to log in with bad password")
|
|
|
8de75c8 |
|
|
|
8de75c8 |
|
|
|
8de75c8 |
class TestBasicCommands(TestBase):
|
|
|
8de75c8 |
def setUp(self):
|
|
|
8de75c8 |
TestBase.setUp(self)
|
|
|
8de75c8 |
self.file_name = 'test-basic-commands'
|
|
|
8de75c8 |
self.file_full_path = os.path.join(ftp_root, self.file_name)
|
|
|
8de75c8 |
f = self._ftp_root_put_file(self.file_name, contents="blabla")
|
|
|
8de75c8 |
self.assert_(os.path.exists(os.path.join(ftp_root, self.file_name)))
|
|
|
8de75c8 |
|
|
|
8de75c8 |
def tearDown(self):
|
|
|
8de75c8 |
f = self._ftp_root_remove_file(os.path.join(ftp_root, self.file_name))
|
|
|
8de75c8 |
self.assert_(not os.path.exists(os.path.join(ftp_root, self.file_name)))
|
|
|
8de75c8 |
|
|
|
8de75c8 |
def testLS(self):
|
|
|
8de75c8 |
"LS: Tests listing a directory "
|
|
|
8de75c8 |
self.runner.connect()
|
|
|
8de75c8 |
self.runner.add("ls -l /")
|
|
|
8de75c8 |
self.runner.run("-df")
|
|
|
8de75c8 |
self.runner.filter()
|
|
|
8de75c8 |
|
|
|
8de75c8 |
self.assertContainsString(self.runner.server, '150 ', 'LS failed')
|
|
|
8de75c8 |
self.assertContainsString(self.runner.regular, 'test-basic-commands', 'LS failed')
|
|
|
8de75c8 |
|
|
|
8de75c8 |
def testPWD(self):
|
|
|
8de75c8 |
"PWD: Tests printing current directory "
|
|
|
8de75c8 |
self.runner.connect()
|
|
|
8de75c8 |
self.runner.add("cd pub")
|
|
|
8de75c8 |
self.runner.add("pwd")
|
|
|
8de75c8 |
self.runner.run("-df")
|
|
|
8de75c8 |
self.runner.filter()
|
|
|
8de75c8 |
|
|
|
8de75c8 |
self.assertContainsString(self.runner.client, 'PWD', 'PWD failed')
|
|
|
8de75c8 |
self.assertContainsString(self.runner.regular, 'ftp://localhost/pub', 'PWD failed')
|
|
|
8de75c8 |
|
|
|
8de75c8 |
def testCD(self):
|
|
|
8de75c8 |
"CD: Tests changing directory "
|
|
|
8de75c8 |
self.runner.connect()
|
|
|
8de75c8 |
self.runner.add("cd pub")
|
|
|
8de75c8 |
self.runner.add("pwd")
|
|
|
8de75c8 |
self.runner.run("-df")
|
|
|
8de75c8 |
self.runner.filter()
|
|
|
8de75c8 |
|
|
|
8de75c8 |
self.assertContainsString(self.runner.client, 'CWD /pub', 'CD failed')
|
|
|
8de75c8 |
self.assertContainsString(self.runner.regular, 'ftp://localhost/pub', 'CD failed')
|
|
|
8de75c8 |
|
|
|
8de75c8 |
def testGET(self):
|
|
|
8de75c8 |
"GET: Test downloading a file "
|
|
|
8de75c8 |
# get a file from ftp
|
|
|
8de75c8 |
self.runner.connect()
|
|
|
8de75c8 |
self.runner.add("get %s" % self.file_name)
|
|
|
8de75c8 |
self.runner.run("-df")
|
|
|
8de75c8 |
self.runner.filter()
|
|
|
8de75c8 |
|
|
|
8de75c8 |
# checks its presence in the CWD, clean up
|
|
|
8de75c8 |
self.assert_(os.path.exists(self.file_name), "GET failed")
|
|
|
8de75c8 |
os.remove(self.file_name)
|
|
|
8de75c8 |
|
|
|
8de75c8 |
def testPUT(self):
|
|
|
8de75c8 |
"PUT: Test uploading a file "
|
|
|
8de75c8 |
# create a dummy file
|
|
|
8de75c8 |
fname = 'upload-file'
|
|
|
8de75c8 |
f = open(fname, 'w')
|
|
|
8de75c8 |
f.write('content')
|
|
|
8de75c8 |
f.flush()
|
|
|
8de75c8 |
f.close()
|
|
|
8de75c8 |
|
|
|
8de75c8 |
# store the file
|
|
|
8de75c8 |
self.runner.connect()
|
|
|
8de75c8 |
self.runner.add("cd /pub")
|
|
|
8de75c8 |
self.runner.add("put %s" % fname)
|
|
|
8de75c8 |
self.runner.add("ls /pub/%s" % fname)
|
|
|
8de75c8 |
self.runner.run("-df")
|
|
|
8de75c8 |
self.runner.filter()
|
|
|
8de75c8 |
|
|
|
8de75c8 |
# check if the store op was successfull
|
|
|
8de75c8 |
fullpath = os.path.join(ftp_root, 'pub', fname)
|
|
|
8de75c8 |
self.assertContainsString(self.runner.server, '226 ', 'PUT failed')
|
|
|
8de75c8 |
self.assert_(os.path.exists(fullpath), 'PUT failed')
|
|
|
8de75c8 |
|
|
|
8de75c8 |
# remove the file from the ftp root
|
|
|
8de75c8 |
os.remove(fullpath)
|
|
|
8de75c8 |
self.assert_(not os.path.exists(fullpath))
|
|
|
8de75c8 |
|
|
|
8de75c8 |
def testExternalCommand(self):
|
|
|
8de75c8 |
" Running external command "
|
|
|
8de75c8 |
self.runner.connect()
|
|
|
8de75c8 |
self.runner.add("!ls sanity.py")
|
|
|
8de75c8 |
self.runner.run()
|
|
|
8de75c8 |
self.runner.filter()
|
|
|
8de75c8 |
|
|
|
8de75c8 |
self.assertContains(self.runner.regular, 'sanity.py\n', "Running external command failed")
|
|
|
8de75c8 |
|
|
|
8de75c8 |
def testALIAS(self):
|
|
|
8de75c8 |
"ALIAS: Tests defining an alias "
|
|
|
8de75c8 |
self.runner.connect()
|
|
|
8de75c8 |
self.runner.add("alias dir ls -l")
|
|
|
8de75c8 |
self.runner.add("dir")
|
|
|
8de75c8 |
self.runner.run()
|
|
|
8de75c8 |
self.runner.filter()
|
|
|
8de75c8 |
|
|
|
8de75c8 |
self.assertContainsString(self.runner.client, 'LIST -l', 'Setting alias failed')
|
|
|
8de75c8 |
|
|
|
8de75c8 |
def testCAT(self):
|
|
|
8de75c8 |
"CAT: Printing a remote file to std. output "
|
|
|
8de75c8 |
# print file
|
|
|
8de75c8 |
self.runner.connect()
|
|
|
8de75c8 |
self.runner.add("cat %s" % self.file_name)
|
|
|
8de75c8 |
self.runner.run()
|
|
|
8de75c8 |
self.runner.filter()
|
|
|
8de75c8 |
|
|
|
8de75c8 |
# assert its contents on std. output
|
|
|
8de75c8 |
self.assertContainsString(self.runner.regular, 'blabla', 'CAT failed')
|
|
|
8de75c8 |
|
|
|
8de75c8 |
def testCHMOD(self):
|
|
|
8de75c8 |
"CHMOD: Changing permissions on a remote file "
|
|
|
8de75c8 |
os.chown(self.file_full_path, pwd.getpwnam('lftp-tester').pw_uid, grp.getgrnam('lftp-tester').gr_gid)
|
|
|
8de75c8 |
os.chmod(self.file_full_path, 0777)
|
|
|
8de75c8 |
old_mode = stat.S_IMODE(os.stat(self.file_full_path)[stat.ST_MODE])
|
|
|
8de75c8 |
|
|
|
8de75c8 |
# change the permission
|
|
|
8de75c8 |
self.runner.connect(user=ftp_user, password=ftp_pass)
|
|
|
8de75c8 |
self.runner.add("cd %s" % ftp_root)
|
|
|
8de75c8 |
self.runner.add("ls -l")
|
|
|
8de75c8 |
self.runner.add("ls -l %s" % self.file_name)
|
|
|
8de75c8 |
self.runner.add("chmod 0644 %s" % self.file_name)
|
|
|
8de75c8 |
self.runner.add("ls -l %s" % self.file_name)
|
|
|
8de75c8 |
self.runner.run()
|
|
|
8de75c8 |
self.runner.filter()
|
|
|
8de75c8 |
|
|
|
8de75c8 |
# check against the old mode
|
|
|
8de75c8 |
self.assertContainsString(self.runner.server, '200 ', 'RMDIR failed')
|
|
|
8de75c8 |
new_mode = stat.S_IMODE(os.stat(self.file_full_path)[stat.ST_MODE])
|
|
|
8de75c8 |
self.assertNotEqual(old_mode, new_mode, 'CHMOD failed')
|
|
|
8de75c8 |
self.assertEquals(new_mode, 0644)
|
|
|
8de75c8 |
|
|
|
8de75c8 |
def testCLS(self):
|
|
|
8de75c8 |
"CLS"
|
|
|
8de75c8 |
self.runner.connect()
|
|
|
8de75c8 |
self.runner.add("cls")
|
|
|
8de75c8 |
self.runner.run()
|
|
|
8de75c8 |
self.runner.filter()
|
|
|
8de75c8 |
|
|
|
8de75c8 |
self.assertContainsString(self.runner.client, 'LIST', "CLS failed")
|
|
|
8de75c8 |
self.assertContains(self.runner.regular, "%s\n" % self.file_name , "CLS failed")
|
|
|
8de75c8 |
|
|
|
8de75c8 |
def testCOMMAND(self):
|
|
|
8de75c8 |
"COMMAND: Run a command, ignoring aliases "
|
|
|
8de75c8 |
# less is a common defined alias..we try running it as a command, should fail
|
|
|
8de75c8 |
self.runner.connect()
|
|
|
8de75c8 |
self.runner.add("command less")
|
|
|
8de75c8 |
self.runner.run()
|
|
|
8de75c8 |
self.runner.filter()
|
|
|
8de75c8 |
|
|
|
8de75c8 |
self.assertContains(self.runner.regular, "Unknown command `less'.\n", "COMMAND failed (or less nor defined as alias)")
|
|
|
8de75c8 |
|
|
|
8de75c8 |
def testECHO(self):
|
|
|
8de75c8 |
"ECHO: echo a string "
|
|
|
8de75c8 |
# less is a common defined alias..we try running it as a command, should fail
|
|
|
8de75c8 |
self.runner.connect()
|
|
|
8de75c8 |
self.runner.add("echo foo")
|
|
|
8de75c8 |
self.runner.run()
|
|
|
8de75c8 |
self.runner.filter()
|
|
|
8de75c8 |
|
|
|
8de75c8 |
self.assertContains(self.runner.regular, "foo\n", "ECHO failed")
|
|
|
8de75c8 |
|
|
|
8de75c8 |
def testFIND(self):
|
|
|
8de75c8 |
" FIND: List files in the directory recursively."
|
|
|
8de75c8 |
self.runner.connect()
|
|
|
8de75c8 |
self.runner.add("find .")
|
|
|
8de75c8 |
self.runner.run()
|
|
|
8de75c8 |
self.runner.filter()
|
|
|
8de75c8 |
|
|
|
8de75c8 |
self.assertContains(self.runner.regular, "./\n", "FIND failed")
|
|
|
8de75c8 |
self.assertContains(self.runner.regular, "./pub/\n", "FIND failed")
|
|
|
8de75c8 |
self.assertContains(self.runner.regular, "./%s\n" % self.file_name, "FIND failed")
|
|
|
8de75c8 |
|
|
|
8de75c8 |
def testMKDIR(self):
|
|
|
8de75c8 |
"MKDIR: Create remote directories."
|
|
|
8de75c8 |
self.runner.connect()
|
|
|
8de75c8 |
self.runner.add("mkdir -p /pub/foo/bar")
|
|
|
8de75c8 |
self.runner.run()
|
|
|
8de75c8 |
self.runner.filter()
|
|
|
8de75c8 |
|
|
|
8de75c8 |
self.assertContainsString(self.runner.server, '257 ', 'MKDIR failed')
|
|
|
8de75c8 |
self.assert_(os.path.exists(os.path.join(ftp_root, 'pub', 'foo', 'bar')))
|
|
|
8de75c8 |
shutil.rmtree(os.path.join(ftp_root, 'pub', 'foo'))
|
|
|
8de75c8 |
|
|
|
8de75c8 |
def testRMDIR(self):
|
|
|
8de75c8 |
"RMDIR: Remove remote directories."
|
|
|
8de75c8 |
try:
|
|
|
8de75c8 |
os.mkdir(os.path.join(ftp_root, 'pub', 'foo'))
|
|
|
8de75c8 |
os.chmod(os.path.join(ftp_root, 'pub', 'foo'), 0777)
|
|
|
8de75c8 |
os.chown(os.path.join(ftp_root, 'pub', 'foo'),
|
|
|
8de75c8 |
pwd.getpwnam('lftp-tester').pw_uid, grp.getgrnam('lftp-tester').gr_gid)
|
|
|
8de75c8 |
except OSError:
|
|
|
8de75c8 |
pass
|
|
|
8de75c8 |
self.assert_(os.path.exists(os.path.join(ftp_root, 'pub', 'foo')))
|
|
|
8de75c8 |
|
|
|
8de75c8 |
self.runner.connect(user=ftp_user, password=ftp_pass)
|
|
|
8de75c8 |
self.runner.add("cd %s" % ftp_root)
|
|
|
8de75c8 |
self.runner.add("ls -l pub")
|
|
|
8de75c8 |
self.runner.add("rmdir pub/foo")
|
|
|
8de75c8 |
self.runner.run()
|
|
|
8de75c8 |
self.runner.filter()
|
|
|
8de75c8 |
|
|
|
8de75c8 |
self.assertContainsString(self.runner.server, '250 ', 'RMDIR failed')
|
|
|
8de75c8 |
self.assert_(not os.path.exists(os.path.join(ftp_root, 'pub', 'foo')))
|
|
|
8de75c8 |
|
|
|
8de75c8 |
def testMV(self):
|
|
|
8de75c8 |
"MV: rename/move files"
|
|
|
8de75c8 |
fname = 'newfile'
|
|
|
8de75c8 |
fname2 = 'newfile-newname'
|
|
|
8de75c8 |
f = self._create_chowned_file(fname, 'pub', mode=0777)
|
|
|
8de75c8 |
|
|
|
8de75c8 |
self.runner.connect(user=ftp_user, password=ftp_pass)
|
|
|
8de75c8 |
self.runner.add("mv %s/pub/%s %s/pub/%s" % (ftp_root, fname, ftp_root, fname2))
|
|
|
8de75c8 |
self.runner.add("ls %s/pub/%s" % (ftp_root, fname2))
|
|
|
8de75c8 |
self.runner.run()
|
|
|
8de75c8 |
self.runner.filter()
|
|
|
8de75c8 |
|
|
|
8de75c8 |
self.assertContainsString(self.runner.server, '250 ', 'MV failed')
|
|
|
8de75c8 |
self.assert_(os.path.exists(os.path.join(ftp_root, 'pub', fname2)))
|
|
|
8de75c8 |
# FIXME: verify that the file keeps permissions and content
|
|
|
8de75c8 |
|
|
|
8de75c8 |
def testRM(self):
|
|
|
8de75c8 |
"RM: Remove remote files."
|
|
|
8de75c8 |
fname = 'newfile'
|
|
|
8de75c8 |
f = self._create_chowned_file(fname, 'pub', mode=0777)
|
|
|
8de75c8 |
|
|
|
8de75c8 |
self.runner.connect(user=ftp_user, password=ftp_pass)
|
|
|
8de75c8 |
self.runner.add("rm %s/pub/%s" % (ftp_root, fname))
|
|
|
8de75c8 |
self.runner.run()
|
|
|
8de75c8 |
self.runner.filter()
|
|
|
8de75c8 |
|
|
|
8de75c8 |
self.assertContainsString(self.runner.server, '250 ', 'RM failed')
|
|
|
8de75c8 |
self.assert_(not os.path.exists(os.path.abspath(f.name)))
|
|
|
8de75c8 |
|
|
|
8de75c8 |
def testNLIST(self):
|
|
|
8de75c8 |
"NLIST: List remote file names"
|
|
|
8de75c8 |
self.runner.connect()
|
|
|
8de75c8 |
self.runner.add("nlist /")
|
|
|
8de75c8 |
self.runner.run()
|
|
|
8de75c8 |
self.runner.filter()
|
|
|
8de75c8 |
|
|
|
8de75c8 |
self.assertContainsString(self.runner.regular, '/test-basic-commands', 'NLIST failed')
|
|
|
8de75c8 |
self.assertContainsString(self.runner.regular, '/pub', 'NLIST failed')
|
|
|
8de75c8 |
|
|
|
8de75c8 |
def testRENLIST(self):
|
|
|
8de75c8 |
"RENLIST: Same as `nlist', but ignores the cache."
|
|
|
8de75c8 |
self.runner.connect()
|
|
|
8de75c8 |
self.runner.add("renlist /")
|
|
|
8de75c8 |
self.runner.run()
|
|
|
8de75c8 |
self.runner.filter()
|
|
|
8de75c8 |
|
|
|
8de75c8 |
self.assertContainsString(self.runner.regular, '/test-basic-commands', 'RENLIST failed')
|
|
|
8de75c8 |
self.assertContainsString(self.runner.regular, '/pub', 'NLIST failed')
|
|
|
8de75c8 |
|
|
|
8de75c8 |
def testRELS(self):
|
|
|
8de75c8 |
"RELS: Same as ls, but ignores the cache."
|
|
|
8de75c8 |
self.runner.connect()
|
|
|
8de75c8 |
self.runner.add("rels -l /")
|
|
|
8de75c8 |
self.runner.run("-df")
|
|
|
8de75c8 |
self.runner.filter()
|
|
|
8de75c8 |
|
|
|
8de75c8 |
self.assertContainsString(self.runner.server, '150 ', 'RELS failed')
|
|
|
8de75c8 |
self.assertContainsString(self.runner.regular, 'test-basic-commands', 'RELS failed')
|
|
|
8de75c8 |
|
|
|
8de75c8 |
def testLCD(self):
|
|
|
8de75c8 |
"LCD: Change current local directory"
|
|
|
8de75c8 |
dirname = 'foodir'
|
|
|
8de75c8 |
os.mkdir(dirname)
|
|
|
8de75c8 |
|
|
|
8de75c8 |
self.runner.connect()
|
|
|
8de75c8 |
self.runner.add("lcd %s" % dirname)
|
|
|
8de75c8 |
self.runner.add("lpwd")
|
|
|
8de75c8 |
self.runner.add("lcd -")
|
|
|
8de75c8 |
self.runner.run()
|
|
|
8de75c8 |
self.runner.filter()
|
|
|
8de75c8 |
|
|
|
8de75c8 |
os.rmdir(dirname)
|
|
|
8de75c8 |
self.assertContainsString(self.runner.regular,
|
|
|
8de75c8 |
os.path.join(os.getcwd(), dirname),
|
|
|
8de75c8 |
'LCD failed')
|
|
|
8de75c8 |
|
|
|
8de75c8 |
def testLPWD(self):
|
|
|
8de75c8 |
"LPWD: Print current working directory on local machine"
|
|
|
8de75c8 |
self.runner.connect()
|
|
|
8de75c8 |
self.runner.add("lpwd")
|
|
|
8de75c8 |
self.runner.run()
|
|
|
8de75c8 |
self.runner.filter()
|
|
|
8de75c8 |
|
|
|
8de75c8 |
self.assertContainsString(self.runner.regular, os.getcwd(), 'LPWD failed')
|
|
|
8de75c8 |
|
|
|
8de75c8 |
def testMRM(self):
|
|
|
8de75c8 |
"MRM: Remove remote files using wildcards"
|
|
|
8de75c8 |
fname = 'mrmfile'
|
|
|
8de75c8 |
self._create_chowned_file(fname + '1', 'pub', mode=0777)
|
|
|
8de75c8 |
self._create_chowned_file(fname + '2', 'pub', mode=0777)
|
|
|
8de75c8 |
|
|
|
8de75c8 |
self.runner.connect(user=ftp_user, password=ftp_pass)
|
|
|
8de75c8 |
self.runner.add("ls")
|
|
|
8de75c8 |
self.runner.add("mrm %s/pub/%s*" % (ftp_root, fname))
|
|
|
8de75c8 |
self.runner.run()
|
|
|
8de75c8 |
self.runner.filter()
|
|
|
8de75c8 |
|
|
|
8de75c8 |
# assert that we deleted something
|
|
|
8de75c8 |
self.assertContainsString(self.runner.server, '250 ', 'MRM failed')
|
|
|
8de75c8 |
# assert that we deleted exactly 2 files
|
|
|
8de75c8 |
self.assertEquals(len([ str for str in self.runner.server if str == '250 Delete operation successful.\n']), 2)
|
|
|
8de75c8 |
|
|
|
8de75c8 |
class TestBOOKMARK(TestBase):
|
|
|
8de75c8 |
def testADD(self):
|
|
|
8de75c8 |
"ADD: add current place or given location"
|
|
|
8de75c8 |
self.runner.connect()
|
|
|
8de75c8 |
self.runner.add("bookmark add pb /pub")
|
|
|
8de75c8 |
self.runner.add("bookmark list")
|
|
|
8de75c8 |
self.runner.run()
|
|
|
8de75c8 |
self.runner.filter()
|
|
|
8de75c8 |
|
|
|
8de75c8 |
self.assertContainsString(self.runner.regular, 'pb\t/pub\n', 'Bookmark add failed')
|
|
|
8de75c8 |
|
|
|
8de75c8 |
def testDEL(self):
|
|
|
8de75c8 |
"DEL: remove bookmark with name"
|
|
|
8de75c8 |
self.runner.connect()
|
|
|
8de75c8 |
self.runner.add("bookmark add pb /pub")
|
|
|
8de75c8 |
self.runner.add("bookmark del pb /pub")
|
|
|
8de75c8 |
self.runner.add("bookmark list")
|
|
|
8de75c8 |
self.runner.run()
|
|
|
8de75c8 |
self.runner.filter()
|
|
|
8de75c8 |
|
|
|
8de75c8 |
self.assertDoesNotContainString(self.runner.regular, 'pb\t/pub\n', 'Bookmark del failed')
|
|
|
8de75c8 |
|
|
|
8de75c8 |
class TestGET(TestBase):
|
|
|
8de75c8 |
def setUp(self):
|
|
|
8de75c8 |
TestBase.setUp(self)
|
|
|
8de75c8 |
self.fname = 'newfile'
|
|
|
8de75c8 |
self.full_path = os.path.join(ftp_root, 'pub', self.fname)
|
|
|
8de75c8 |
self._create_chowned_file(self.fname, 'pub', 0777)
|
|
|
8de75c8 |
|
|
|
8de75c8 |
def testDELETE(self):
|
|
|
8de75c8 |
"GET -E: delete source files after successful transfer"
|
|
|
8de75c8 |
self.runner.connect(user=ftp_user, password=ftp_pass)
|
|
|
8de75c8 |
self.runner.add("get -E %s/pub/%s" % (ftp_root, self.fname))
|
|
|
8de75c8 |
self.runner.run()
|
|
|
8de75c8 |
self.runner.filter()
|
|
|
8de75c8 |
|
|
|
8de75c8 |
self.assertContainsString(self.runner.server, '226 ', 'RELS failed')
|
|
|
8de75c8 |
self.assertContainsString(self.runner.server, '250 ', 'GET -E failed')
|
|
|
8de75c8 |
os.remove(self.fname)
|
|
|
8de75c8 |
|
|
|
8de75c8 |
def testASCII(self):
|
|
|
8de75c8 |
"GET -a: transfer using ASCII mode"
|
|
|
8de75c8 |
self.runner.connect(user=ftp_user, password=ftp_pass)
|
|
|
8de75c8 |
self.runner.add("get -a %s/pub/%s" % (ftp_root, self.fname))
|
|
|
8de75c8 |
self.runner.run()
|
|
|
8de75c8 |
self.runner.filter()
|
|
|
8de75c8 |
|
|
|
8de75c8 |
self.assertContainsString(self.runner.server, 'Opening ASCII mode data connection', 'GET -a failed')
|
|
|
8de75c8 |
self.assertContainsString(self.runner.server, '226 ', 'GET -a failed')
|
|
|
8de75c8 |
os.remove(self.full_path)
|
|
|
8de75c8 |
os.remove(self.fname)
|
|
|
8de75c8 |
|
|
|
8de75c8 |
def testLocalName(self):
|
|
|
8de75c8 |
"GET -o: destination file name"
|
|
|
8de75c8 |
self.runner.connect(user=ftp_user, password=ftp_pass)
|
|
|
8de75c8 |
self.runner.add("get %s/pub/%s -o newfile.local" % (ftp_root, self.fname))
|
|
|
8de75c8 |
self.runner.run()
|
|
|
8de75c8 |
self.runner.filter()
|
|
|
8de75c8 |
|
|
|
8de75c8 |
self.assertContainsString(self.runner.server, '226 ', 'GET -o failed')
|
|
|
8de75c8 |
os.remove('newfile.local')
|
|
|
8de75c8 |
os.remove(self.full_path)
|
|
|
8de75c8 |
|
|
|
8de75c8 |
def testLocalPath(self):
|
|
|
8de75c8 |
"GET -O: specifies base directory or URL where files should be placed"
|
|
|
8de75c8 |
# FIXME: does not work
|
|
|
8de75c8 |
print "****** testLocalPath Currently does not work ********"
|
|
|
8de75c8 |
#dest = '/tmp'
|
|
|
8de75c8 |
|
|
|
8de75c8 |
#self.runner.connect(user=ftp_user, password=ftp_pass)
|
|
|
8de75c8 |
#self.runner.add("get %s/pub/%s -O %s" % (ftp_root, self.fname, dest))
|
|
|
8de75c8 |
#self.runner.run()
|
|
|
8de75c8 |
#self.runner.filter()
|
|
|
8de75c8 |
|
|
|
8de75c8 |
#self.assertContainsString(self.runner.server, '226 ', 'GET -O failed')
|
|
|
8de75c8 |
#os.remove(os.path.join(dest, self.fname))
|
|
|
8de75c8 |
#os.remove(self.full_path)
|
|
|
8de75c8 |
|
|
|
8de75c8 |
def testContents(self):
|
|
|
8de75c8 |
"GET: tests if get preservers contents"
|
|
|
8de75c8 |
self.runner.connect(user=ftp_user, password=ftp_pass)
|
|
|
8de75c8 |
self.runner.add("get %s/pub/%s" % (ftp_root, self.fname))
|
|
|
8de75c8 |
self.runner.run()
|
|
|
8de75c8 |
self.runner.filter()
|
|
|
8de75c8 |
|
|
|
8de75c8 |
self.assertContainsString(self.runner.server, '226 ', 'Contents test failed')
|
|
|
8de75c8 |
self.assertSameContent(self.full_path, 'newfile')
|
|
|
8de75c8 |
os.remove('newfile')
|
|
|
8de75c8 |
os.remove(self.full_path)
|
|
|
8de75c8 |
|
|
|
8de75c8 |
class TestPUT(TestBase):
|
|
|
8de75c8 |
def setUp(self):
|
|
|
8de75c8 |
TestBase.setUp(self)
|
|
|
8de75c8 |
self.fname = 'putfile'
|
|
|
8de75c8 |
self.full_path = os.path.join(os.getcwd(), self.fname)
|
|
|
8de75c8 |
self.ftp_full_path = os.path.join(ftp_root, 'pub', self.fname)
|
|
|
8de75c8 |
|
|
|
8de75c8 |
f = open(self.fname, 'w')
|
|
|
8de75c8 |
f.write('putfile')
|
|
|
8de75c8 |
f.flush()
|
|
|
8de75c8 |
f.close()
|
|
|
8de75c8 |
os.chown(self.fname, pwd.getpwnam('lftp-tester').pw_uid, grp.getgrnam('lftp-tester').gr_gid)
|
|
|
8de75c8 |
|
|
|
8de75c8 |
def testRemoteName(self):
|
|
|
8de75c8 |
"PUT -o: specifies remote file name"
|
|
|
8de75c8 |
remote_fname = 'someotherfilename'
|
|
|
8de75c8 |
self.runner.connect(user=ftp_user, password=ftp_pass)
|
|
|
8de75c8 |
self.runner.add("put %s -o %s/pub/%s" % (self.fname, ftp_root, remote_fname))
|
|
|
8de75c8 |
self.runner.run()
|
|
|
8de75c8 |
self.runner.filter()
|
|
|
8de75c8 |
|
|
|
8de75c8 |
self.assert_(os.path.exists(os.path.join(ftp_root, 'pub', remote_fname)))
|
|
|
8de75c8 |
self.assertContainsString(self.runner.server, '226 ', 'PUT -o failed')
|
|
|
8de75c8 |
|
|
|
8de75c8 |
os.remove(self.fname)
|
|
|
8de75c8 |
os.remove(os.path.join(ftp_root, 'pub', remote_fname))
|
|
|
8de75c8 |
|
|
|
8de75c8 |
def testDeleteSource(self):
|
|
|
8de75c8 |
"PUT -E: delete source files after successful transfer"
|
|
|
8de75c8 |
self.runner.connect(user=ftp_user, password=ftp_pass)
|
|
|
8de75c8 |
self.runner.add("cd %s/pub" % ftp_root)
|
|
|
8de75c8 |
self.runner.add("put -E %s" % (self.fname))
|
|
|
8de75c8 |
self.runner.run()
|
|
|
8de75c8 |
self.runner.filter()
|
|
|
8de75c8 |
|
|
|
8de75c8 |
self.assert_(os.path.exists(os.path.join(ftp_root, 'pub', self.fname)))
|
|
|
8de75c8 |
self.assert_(not os.path.exists(self.full_path))
|
|
|
8de75c8 |
self.assertContainsString(self.runner.server, '226 ', 'PUT -E failed')
|
|
|
8de75c8 |
os.remove(self.ftp_full_path)
|
|
|
8de75c8 |
|
|
|
8de75c8 |
def testASCII(self):
|
|
|
8de75c8 |
"PUT -a: transfer using ASCII mode"
|
|
|
8de75c8 |
print "****** testASCII Not yet implemented ********"
|
|
|
8de75c8 |
#self.runner.connect(user=ftp_user, password=ftp_pass)
|
|
|
8de75c8 |
#self.runner.add("put -a %s" % (self.fname))
|
|
|
8de75c8 |
#print self.runner.run()
|
|
|
8de75c8 |
#self.runner.filter()
|
|
|
8de75c8 |
|
|
|
8de75c8 |
#self.assert_(os.path.exists(os.path.join(ftp_root, 'pub', self.fname)))
|
|
|
8de75c8 |
#self.assertContainsString(self.runner.server, 'Opening ASCII mode data connection', 'GET -a failed')
|
|
|
8de75c8 |
#self.assertContainsString(self.runner.server, '226 ', 'PUT -o failed')
|
|
|
8de75c8 |
|
|
|
8de75c8 |
#os.remove(self.fname)
|
|
|
8de75c8 |
#os.remove(self.ftp_full_path)
|
|
|
8de75c8 |
|
|
|
8de75c8 |
def testBaseDirectory(self):
|
|
|
8de75c8 |
"PUT -O: specifies base directory or URL where files should be placed"
|
|
|
8de75c8 |
# FIXME: add some code here
|
|
|
8de75c8 |
print "****** testBaseDirectory Not yet implemented ********"
|
|
|
8de75c8 |
|
|
|
8de75c8 |
def testContents(self):
|
|
|
8de75c8 |
"PUT: tests if get preservers contents"
|
|
|
8de75c8 |
self.runner.connect(user=ftp_user, password=ftp_pass)
|
|
|
8de75c8 |
self.runner.add("cd %s/pub" % ftp_root)
|
|
|
8de75c8 |
self.runner.add("put %s" % (self.fname))
|
|
|
8de75c8 |
self.runner.run()
|
|
|
8de75c8 |
self.runner.filter()
|
|
|
8de75c8 |
|
|
|
8de75c8 |
self.assert_(os.path.exists(os.path.join(ftp_root, 'pub', self.fname)))
|
|
|
8de75c8 |
self.assertContainsString(self.runner.server, '226 ', 'Contents test failed')
|
|
|
8de75c8 |
self.assertSameContent(self.ftp_full_path, self.fname)
|
|
|
8de75c8 |
|
|
|
8de75c8 |
os.remove(self.fname)
|
|
|
8de75c8 |
os.remove(self.ftp_full_path)
|
|
|
8de75c8 |
|
|
|
8de75c8 |
class TestMGET(TestBase):
|
|
|
8de75c8 |
def setUp(self):
|
|
|
8de75c8 |
TestBase.setUp(self)
|
|
|
8de75c8 |
self.basename = 'mgetfile'
|
|
|
8de75c8 |
self.fname1 = 'mgetfile1'
|
|
|
8de75c8 |
self.fname2 = 'mgetfile2'
|
|
|
8de75c8 |
self.full_path1 = os.path.join(ftp_root, 'pub', self.fname1)
|
|
|
8de75c8 |
self.full_path2 = os.path.join(ftp_root, 'pub', self.fname2)
|
|
|
8de75c8 |
self._create_chowned_file(self.fname1, 'pub', 0777)
|
|
|
8de75c8 |
self._create_chowned_file(self.fname2, 'pub', 0777)
|
|
|
8de75c8 |
|
|
|
8de75c8 |
def _clean_up(self):
|
|
|
8de75c8 |
os.remove(self.fname1)
|
|
|
8de75c8 |
os.remove(self.fname2)
|
|
|
8de75c8 |
os.remove(self.full_path1)
|
|
|
8de75c8 |
os.remove(self.full_path2)
|
|
|
8de75c8 |
|
|
|
8de75c8 |
def testDELETE(self):
|
|
|
8de75c8 |
"MGET -E: delete source files after successful transfer"
|
|
|
8de75c8 |
self.runner.connect(user=ftp_user, password=ftp_pass)
|
|
|
8de75c8 |
self.runner.add("mget -E %s/pub/%s*" % (ftp_root, self.basename))
|
|
|
8de75c8 |
self.runner.run()
|
|
|
8de75c8 |
self.runner.filter()
|
|
|
8de75c8 |
|
|
|
8de75c8 |
self.assertContainsString(self.runner.server, '226 ', 'MGET failed')
|
|
|
8de75c8 |
self.assertContainsString(self.runner.server, '250 ', 'MGET failed')
|
|
|
8de75c8 |
self.assert_(os.path.exists(self.fname1))
|
|
|
8de75c8 |
self.assert_(os.path.exists(self.fname2))
|
|
|
8de75c8 |
|
|
|
8de75c8 |
os.remove(self.fname1)
|
|
|
8de75c8 |
os.remove(self.fname2)
|
|
|
8de75c8 |
|
|
|
8de75c8 |
def testASCII(self):
|
|
|
8de75c8 |
"MGET -a: transfer using ASCII mode"
|
|
|
8de75c8 |
self.runner.connect(user=ftp_user, password=ftp_pass)
|
|
|
8de75c8 |
self.runner.add("mget -a %s/pub/%s*" % (ftp_root, self.basename))
|
|
|
8de75c8 |
self.runner.run()
|
|
|
8de75c8 |
self.runner.filter()
|
|
|
8de75c8 |
|
|
|
8de75c8 |
self.assertContainsString(self.runner.server, 'Opening ASCII mode data connection', 'GET -a failed')
|
|
|
8de75c8 |
self.assertContainsString(self.runner.server, '226 ', 'MGET failed')
|
|
|
8de75c8 |
self._clean_up()
|
|
|
8de75c8 |
|
|
|
8de75c8 |
def testLocalPath(self):
|
|
|
8de75c8 |
"MGET -O: specifies base directory or URL where files should be placed"
|
|
|
8de75c8 |
# FIXME: does not work
|
|
|
8de75c8 |
print "****** testLocalPath Currently does not work ********"
|
|
|
8de75c8 |
#dest = '/tmp'
|
|
|
8de75c8 |
|
|
|
8de75c8 |
#self.runner.connect(user=ftp_user, password=ftp_pass)
|
|
|
8de75c8 |
#self.runner.add("mget %s/pub/%s* -O %s" % (ftp_root, self.basename, dest))
|
|
|
8de75c8 |
#self.runner.run()
|
|
|
8de75c8 |
#self.runner.filter()
|
|
|
8de75c8 |
|
|
|
8de75c8 |
#self.assertContainsString(self.runner.server, '226 ', 'MGET failed')
|
|
|
8de75c8 |
#self._clean_up()
|
|
|
8de75c8 |
|
|
|
8de75c8 |
def testContents(self):
|
|
|
8de75c8 |
"MGET: tests if get preservers contents"
|
|
|
8de75c8 |
self.runner.connect(user=ftp_user, password=ftp_pass)
|
|
|
8de75c8 |
self.runner.add("mget %s/pub/%s*" % (ftp_root, self.basename))
|
|
|
8de75c8 |
self.runner.run()
|
|
|
8de75c8 |
self.runner.filter()
|
|
|
8de75c8 |
|
|
|
8de75c8 |
self.assertContainsString(self.runner.server, '226 ', 'Contents test failed')
|
|
|
8de75c8 |
self.assertSameContent(self.full_path1, self.fname1)
|
|
|
8de75c8 |
self.assertSameContent(self.full_path2, self.fname2)
|
|
|
8de75c8 |
self._clean_up()
|
|
|
8de75c8 |
|
|
|
8de75c8 |
class TestMPUT(TestBase):
|
|
|
8de75c8 |
def setUp(self):
|
|
|
8de75c8 |
TestBase.setUp(self)
|
|
|
8de75c8 |
self.basename = 'putfile'
|
|
|
8de75c8 |
self.fname1 = 'putfile1'
|
|
|
8de75c8 |
self.fname2 = 'putfile2'
|
|
|
8de75c8 |
|
|
|
8de75c8 |
self.ftp_full_path1 = os.path.join(ftp_root, 'pub', self.fname1)
|
|
|
8de75c8 |
self.ftp_full_path2 = os.path.join(ftp_root, 'pub', self.fname2)
|
|
|
8de75c8 |
|
|
|
8de75c8 |
for fn in [ self.fname1, self.fname2 ]:
|
|
|
8de75c8 |
f = open(fn, 'w')
|
|
|
8de75c8 |
f.write(str(fn))
|
|
|
8de75c8 |
f.flush()
|
|
|
8de75c8 |
f.close()
|
|
|
8de75c8 |
os.chown(fn, pwd.getpwnam('lftp-tester').pw_uid, grp.getgrnam('lftp-tester').gr_gid)
|
|
|
8de75c8 |
|
|
|
8de75c8 |
def _clean_up(self):
|
|
|
8de75c8 |
os.remove(self.fname1)
|
|
|
8de75c8 |
os.remove(self.fname2)
|
|
|
8de75c8 |
os.remove(self.ftp_full_path1)
|
|
|
8de75c8 |
os.remove(self.ftp_full_path2)
|
|
|
8de75c8 |
|
|
|
8de75c8 |
def testDeleteSource(self):
|
|
|
8de75c8 |
"MPUT -E: delete source files after successful transfer"
|
|
|
8de75c8 |
self.runner.connect(user=ftp_user, password=ftp_pass)
|
|
|
8de75c8 |
self.runner.add("cd %s/pub" % ftp_root)
|
|
|
8de75c8 |
self.runner.add("mput -E %s*" % (self.basename))
|
|
|
8de75c8 |
self.runner.run()
|
|
|
8de75c8 |
self.runner.filter()
|
|
|
8de75c8 |
|
|
|
8de75c8 |
self.assert_(os.path.exists(self.ftp_full_path1))
|
|
|
8de75c8 |
self.assert_(os.path.exists(self.ftp_full_path2))
|
|
|
8de75c8 |
self.assert_(not os.path.exists(self.fname1))
|
|
|
8de75c8 |
self.assert_(not os.path.exists(self.fname2))
|
|
|
8de75c8 |
self.assertContains(self.runner.server, '226 Transfer complete.\n', 'MPUT -E failed')
|
|
|
8de75c8 |
os.remove(self.ftp_full_path1)
|
|
|
8de75c8 |
os.remove(self.ftp_full_path2)
|
|
|
8de75c8 |
|
|
|
8de75c8 |
def testASCII(self):
|
|
|
8de75c8 |
"PUT -a: transfer using ASCII mode"
|
|
|
8de75c8 |
# FIXME: add some code here
|
|
|
8de75c8 |
print "****** testASCII Not yet implemented ********"
|
|
|
8de75c8 |
#self.runner.connect(user=ftp_user, password=ftp_pass)
|
|
|
8de75c8 |
#self.runner.add("put -a %s" % (self.fname))
|
|
|
8de75c8 |
#print self.runner.run()
|
|
|
8de75c8 |
#self.runner.filter()
|
|
|
8de75c8 |
|
|
|
8de75c8 |
#self.assert_(os.path.exists(os.path.join(ftp_root, 'pub', self.fname)))
|
|
|
8de75c8 |
#self.assertContainsString(self.runner.server, 'Opening ASCII mode data connection', 'GET -a failed')
|
|
|
8de75c8 |
#self.assertContainsString(self.runner.server, '226 ', 'PUT -o failed')
|
|
|
8de75c8 |
|
|
|
8de75c8 |
#os.remove(self.fname)
|
|
|
8de75c8 |
#os.remove(self.ftp_full_path)
|
|
|
8de75c8 |
|
|
|
8de75c8 |
def testBaseDirectory(self):
|
|
|
8de75c8 |
"MPUT -O: specifies base directory or URL where files should be placed"
|
|
|
8de75c8 |
# FIXME: add some code here
|
|
|
8de75c8 |
print "****** testBaseDirectory Not yet implemented ********"
|
|
|
8de75c8 |
|
|
|
8de75c8 |
def testContents(self):
|
|
|
8de75c8 |
"MPUT: tests if mput preservers contents"
|
|
|
8de75c8 |
self.runner.connect(user=ftp_user, password=ftp_pass)
|
|
|
8de75c8 |
self.runner.add("cd %s/pub" % ftp_root)
|
|
|
8de75c8 |
self.runner.add("mput %s*" % (self.basename))
|
|
|
8de75c8 |
self.runner.run()
|
|
|
8de75c8 |
self.runner.filter()
|
|
|
8de75c8 |
|
|
|
8de75c8 |
self.assertContainsString(self.runner.server, '226 ', 'Contents test failed')
|
|
|
8de75c8 |
self.assert_(os.path.exists(self.ftp_full_path1))
|
|
|
8de75c8 |
self.assert_(os.path.exists(self.ftp_full_path2))
|
|
|
8de75c8 |
self.assertSameContent(self.ftp_full_path1, self.fname1)
|
|
|
8de75c8 |
self.assertSameContent(self.ftp_full_path2, self.fname2)
|
|
|
8de75c8 |
|
|
|
8de75c8 |
self._clean_up()
|
|
|
8de75c8 |
|
|
|
8de75c8 |
class TestGLOB(TestBase):
|
|
|
8de75c8 |
def setUp(self):
|
|
|
8de75c8 |
TestBase.setUp(self)
|
|
|
8de75c8 |
for f in os.listdir(os.path.join(ftp_root, 'pub')):
|
|
|
8de75c8 |
try:
|
|
|
8de75c8 |
os.remove(os.path.join(ftp_root, 'pub', f))
|
|
|
8de75c8 |
except OSError:
|
|
|
8de75c8 |
os.rmdir(os.path.join(ftp_root, 'pub', f))
|
|
|
8de75c8 |
|
|
|
8de75c8 |
self.dirpath = os.path.join(ftp_root, 'pub', 'pubdir')
|
|
|
8de75c8 |
os.mkdir(self.dirpath)
|
|
|
8de75c8 |
self._create_chowned_file('pubfile', 'pub', 0777)
|
|
|
8de75c8 |
|
|
|
8de75c8 |
def testFiles(self):
|
|
|
8de75c8 |
" GLOB -f: globbing files only "
|
|
|
8de75c8 |
self.runner.connect()
|
|
|
8de75c8 |
self.runner.add("glob -f ls pub/*")
|
|
|
8de75c8 |
self.runner.run()
|
|
|
8de75c8 |
self.runner.filter()
|
|
|
8de75c8 |
|
|
|
8de75c8 |
self.assertContainsString(self.runner.regular, 'pubfile', 'GLOB -f failed')
|
|
|
8de75c8 |
self.assertDoesNotContainString(self.runner.regular, 'pubdir', 'GLOB -f failed')
|
|
|
8de75c8 |
|
|
|
8de75c8 |
def testDirectories(self):
|
|
|
8de75c8 |
" GLOB -d: globbing directories only "
|
|
|
8de75c8 |
# FIXME - does not seem to work
|
|
|
8de75c8 |
pass
|
|
|
8de75c8 |
#self.runner.connect()
|
|
|
8de75c8 |
#self.runner.add("glob -d ls pub/*")
|
|
|
8de75c8 |
#self.runner.run()
|
|
|
8de75c8 |
#self.runner.filter()
|
|
|
8de75c8 |
|
|
|
8de75c8 |
#self.assertContainsString(self.runner.regular, 'pubdir', 'GLOB -d failed')
|
|
|
8de75c8 |
#self.assertDoesNotContainString(self.runner.regular, 'pubfile', 'GLOB -d failed')
|
|
|
8de75c8 |
|
|
|
8de75c8 |
def testAll(self):
|
|
|
8de75c8 |
" GLOB -a: globbing everything "
|
|
|
8de75c8 |
# FIXME - does not seem to work
|
|
|
8de75c8 |
pass
|
|
|
8de75c8 |
#self.runner.connect()
|
|
|
8de75c8 |
#self.runner.add("glob -d ls pub/*")
|
|
|
8de75c8 |
#self.runner.run()
|
|
|
8de75c8 |
#self.runner.filter()
|
|
|
8de75c8 |
|
|
|
8de75c8 |
#self.assertContainsString(self.runner.regular, 'pubdir', 'GLOB -a failed')
|
|
|
8de75c8 |
#self.assertContainsString(self.runner.regular, 'pubfile', 'GLOB -a failed')
|
|
|
8de75c8 |
|
|
|
8de75c8 |
class TestRegression(TestBase):
|
|
|
8de75c8 |
def testBundledPerl(self):
|
|
|
8de75c8 |
"176175: lftp used to ship its copy of perl-String-CRC32 - see if fixed "
|
|
|
8de75c8 |
p = popen2.Popen4("rpm -q --provides lftp")
|
|
|
8de75c8 |
p.wait()
|
|
|
8de75c8 |
self.assertDoesNotContainString(p.fromchild.readlines(),
|
|
|
8de75c8 |
"perl-String-CRC32",
|
|
|
8de75c8 |
"lftp bundles its own copy of perl-String-CRC32")
|
|
|
8de75c8 |
def testHTTP_LS(self):
|
|
|
8de75c8 |
"171884: first ls returns nothing when using http protocol"
|
|
|
8de75c8 |
self.runner.connect(host="http://mirror.centos.org/centos/")
|
|
|
8de75c8 |
self.runner.add("ls")
|
|
|
8de75c8 |
self.runner.run()
|
|
|
8de75c8 |
self.runner.filter()
|
|
|
8de75c8 |
|
|
|
8de75c8 |
self.assert_(len(self.runner.regular) > 0)
|
|
|
8de75c8 |
|
|
|
8de75c8 |
class TestQueue(TestBase):
|
|
|
8de75c8 |
def setUp(self):
|
|
|
8de75c8 |
TestBase.setUp(self)
|
|
|
8de75c8 |
self.script_file = tempfile.NamedTemporaryFile(mode='wb+')
|
|
|
8de75c8 |
self.runner.connect()
|
|
|
8de75c8 |
self.runner.add("queue stop")
|
|
|
8de75c8 |
self.runner.add("debug -o %s" % self.script_file.name)
|
|
|
8de75c8 |
|
|
|
8de75c8 |
def _runQueue(self):
|
|
|
8de75c8 |
self.runner.add("queue start")
|
|
|
8de75c8 |
self.runner.run(sleep=3)
|
|
|
8de75c8 |
content = self.script_file.readlines()
|
|
|
8de75c8 |
self.runner.filter(content)
|
|
|
8de75c8 |
|
|
|
8de75c8 |
def _listQueue(self):
|
|
|
8de75c8 |
self.runner.add("queue")
|
|
|
8de75c8 |
self.runner.run()
|
|
|
8de75c8 |
self.runner.filter()
|
|
|
8de75c8 |
|
|
|
8de75c8 |
def testInsertBefore(self):
|
|
|
8de75c8 |
"QUEUE: insert a command before another one"
|
|
|
8de75c8 |
self.runner.add("queue ls")
|
|
|
8de75c8 |
self.runner.add("cd /pub")
|
|
|
8de75c8 |
self.runner.add("queue -n 1 get somefile")
|
|
|
8de75c8 |
self._listQueue()
|
|
|
8de75c8 |
|
|
|
8de75c8 |
self.assertContains(self.runner.regular, ' 1. get somefile\n', 'QUEUE: insert before failed')
|
|
|
8de75c8 |
|
|
|
8de75c8 |
def testRememberLocation(self):
|
|
|
8de75c8 |
"QUEUE: tests of queue remembers changing location with cd"
|
|
|
8de75c8 |
# This test makes sense for RHEL 5+
|
|
|
8de75c8 |
|
|
|
8de75c8 |
self.runner.add("queue ls")
|
|
|
8de75c8 |
self.runner.add("cd /pub")
|
|
|
8de75c8 |
self.runner.add("queue ls")
|
|
|
8de75c8 |
self._listQueue()
|
|
|
8de75c8 |
|
|
|
8de75c8 |
self.assertContainsString(self.runner.regular, 'cd /pub', 'QUEUE: remember location failed')
|
|
|
8de75c8 |
|
|
|
8de75c8 |
def testDeleteFromQueue(self):
|
|
|
8de75c8 |
"QUEUE: delete a command from queue"
|
|
|
8de75c8 |
self.runner.add("cd /pub")
|
|
|
8de75c8 |
self.runner.add("queue ls")
|
|
|
8de75c8 |
self.runner.add("queue -d 1")
|
|
|
8de75c8 |
self._runQueue()
|
|
|
8de75c8 |
|
|
|
8de75c8 |
self.assertDoesNotContainString(self.runner.regular, 'LIST\r\n', 'QUEUE: remember location failed')
|
|
|
8de75c8 |
|
|
|
8de75c8 |
def testMoveInQueue(self):
|
|
|
8de75c8 |
"QUEUE: move a command in queue from one position to another"
|
|
|
8de75c8 |
self.runner.add("queue ls")
|
|
|
8de75c8 |
self.runner.add("queue get somefile")
|
|
|
8de75c8 |
self.runner.add("queue -m 2 1")
|
|
|
8de75c8 |
self._listQueue()
|
|
|
8de75c8 |
|
|
|
8de75c8 |
self.assertContains(self.runner.regular, ' 1. get somefile\n', 'QUEUE: move in queue failed')
|
|
|
8de75c8 |
|
|
|
8de75c8 |
def die(code, code_req, msg):
|
|
|
8de75c8 |
if code != code_req:
|
|
|
8de75c8 |
print >>sys.stderr, msg
|
|
|
8de75c8 |
sys.exit(1)
|
|
|
8de75c8 |
|
|
|
8de75c8 |
if __name__ == "__main__":
|
|
|
8de75c8 |
# get vsftpd root dir
|
|
|
8de75c8 |
ftp_root = pwd.getpwnam('ftp').pw_dir
|
|
|
8de75c8 |
|
|
|
8de75c8 |
if os.getuid() != 0:
|
|
|
8de75c8 |
print "This test must be run as root"
|
|
|
8de75c8 |
sys.exit(1)
|
|
|
8de75c8 |
|
|
|
8de75c8 |
# add user for testing automatically
|
|
|
8de75c8 |
(status, output) = commands.getstatusoutput('useradd %s' % (ftp_user))
|
|
|
8de75c8 |
die(status, 0, output)
|
|
|
8de75c8 |
print "User %s added.." % ftp_user
|
|
|
8de75c8 |
(status, output) = commands.getstatusoutput('echo %s | passwd --stdin %s' % (ftp_pass, ftp_user))
|
|
|
8de75c8 |
die(status, 0, output)
|
|
|
8de75c8 |
ftp_home = pwd.getpwnam(ftp_user).pw_dir
|
|
|
8de75c8 |
os.chmod(os.path.join(ftp_root, 'pub'), 0777)
|
|
|
8de75c8 |
|
|
|
8de75c8 |
try:
|
|
|
8de75c8 |
# start vsftpd with basic config
|
|
|
8de75c8 |
(status, output) = commands.getstatusoutput('service vsftpd restart')
|
|
|
8de75c8 |
# run the tests
|
|
|
8de75c8 |
unittest.main()
|
|
|
8de75c8 |
die(status, 0, output)
|
|
|
8de75c8 |
finally:
|
|
|
8de75c8 |
# delete the test user
|
|
|
8de75c8 |
(status, output) = commands.getstatusoutput('userdel -r %s' % (ftp_user))
|
|
|
8de75c8 |
die(status, 0, output)
|
|
|
8de75c8 |
|
|
|
8de75c8 |
|
|
|
8de75c8 |
|