From 8de75c80275d68894682a38fa8c77df181f4a8fd Mon Sep 17 00:00:00 2001 From: Ondrej Mejzlik Date: Oct 12 2020 11:55:09 +0000 Subject: Adding basic sanity test to upstream --- diff --git a/sanity/Makefile b/sanity/Makefile new file mode 100644 index 0000000..d0f89cd --- /dev/null +++ b/sanity/Makefile @@ -0,0 +1,79 @@ +# Copyright (c) 2006 Red Hat, Inc. All rights reserved. This copyrighted material +# is made available to anyone wishing to use, modify, copy, or +# redistribute it subject to the terms and conditions of the GNU General +# Public License v.2. +# +# This program is distributed in the hope that it will be useful, but WITHOUT ANY +# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A +# PARTICULAR PURPOSE. See the GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +# +# Author: Jakub Hrozek + +# The toplevel namespace within which the test lives. +TOPLEVEL_NAMESPACE=CoreOS + +# The name of the package under test: +PACKAGE_NAME=lftp + +# The path of the test below the package: +RELATIVE_PATH=sanity + +# Version of the Test. Used with make tag. +export TESTVERSION=1.1 + +# The combined namespace of the test. +export TEST=/$(TOPLEVEL_NAMESPACE)/$(PACKAGE_NAME)/$(RELATIVE_PATH) + + +# A phony target is one that is not really the name of a file. +# It is just a name for some commands to be executed when you +# make an explicit request. There are two reasons to use a +# phony target: to avoid a conflict with a file of the same +# name, and to improve performance. +.PHONY: all install download clean + +# executables to be built should be added here, they will be generated on the system under test. +BUILT_FILES= + +# data files, .c files, scripts anything needed to either compile the test and/or run it. +FILES=$(METADATA) runtest.sh Makefile PURPOSE sanity.py vsftpd.conf main.fmf + +run: $(FILES) build + ./runtest.sh + +build: $(BUILT_FILES) + test -x runtest.sh || chmod a+x runtest.sh + +clean: + rm -f *~ *.rpm $(BUILT_FILES) + +# You may need to add other targets e.g. to build executables from source code +# Add them here: + + +# Include Common Makefile +include /usr/share/rhts/lib/rhts-make.include + +# Generate the testinfo.desc here: +$(METADATA): Makefile + @touch $(METADATA) +# Change to the test owner's name + @echo "Owner: David Kutalek " > $(METADATA) + @echo "Name: $(TEST)" >> $(METADATA) + @echo "Path: $(TEST_DIR)" >> $(METADATA) + @echo "License: GNU GPL" >> $(METADATA) + @echo "TestVersion: $(TESTVERSION)" >> $(METADATA) + @echo "Description: Basic Sanity test for lftp">> $(METADATA) + @echo "Type: Sanity" >> $(METADATA) + @echo "TestTime: 40m" >> $(METADATA) + @echo "Confidential: no" >> $(METADATA) + @echo "RunFor: lftp" >> $(METADATA) + @echo "Requires: lftp python2 vsftpd expect lftp" >> $(METADATA) + @echo "RhtsRequires: library(selinux-policy/common)" >> $(METADATA) + @echo "Destructive: no" >> $(METADATA) + + rhts-lint $(METADATA) diff --git a/sanity/PURPOSE b/sanity/PURPOSE new file mode 100644 index 0000000..8fd9bb9 --- /dev/null +++ b/sanity/PURPOSE @@ -0,0 +1,10 @@ +PURPOSE of /CoreOS/lftp/sanity +Description: Basic sanity test for lftp +Author: Jakub Hrozek + +This is a basic sanity test for the lftp package. It is implemented +in python on top of the unittesting.py module. + +Its purpose is to ensure that the basic functionality of lftp is preserved +as expected. + diff --git a/sanity/main.fmf b/sanity/main.fmf new file mode 100644 index 0000000..6331d21 --- /dev/null +++ b/sanity/main.fmf @@ -0,0 +1,34 @@ +summary: Basic Sanity test for lftp +description: |+ + This is a basic sanity test for the lftp package. It is implemented + in python on top of the unittesting.py module. + + Its purpose is to ensure that the basic functionality of lftp is preserved + as expected. + +contact: Ondrej Mejzlik +component: +- lftp +test: ./runtest.sh +framework: beakerlib +require: +- library(selinux-policy/common) +recommend: +- lftp +- python2 +- vsftpd +- expect +- lftp +duration: 40m +enabled: true +tag: +- TIPpass_Apps +- TIPpass_infra +- Tier1 +- tip_destructive_el8 +tier: '1' +relevancy: | + distro < rhel-6: False +extra-nitrate: TC#0054211 +extra-summary: /CoreOS/lftp/sanity +extra-task: /CoreOS/lftp/sanity diff --git a/sanity/runtest.sh b/sanity/runtest.sh new file mode 100755 index 0000000..ea6f9d7 --- /dev/null +++ b/sanity/runtest.sh @@ -0,0 +1,79 @@ +#!/bin/bash +# vim: dict=/usr/share/beakerlib/dictionary.vim cpt=.,w,b,u,t,i,k +# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +# +# runtest.sh of /CoreOS/lftp/sanity +# Description: Basic lftp sanity test +# Author: Jakub Hrozek +# Modified by: David Kutalek +# +# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +# +# Copyright (c) 2006, 2012 Red Hat, Inc. All rights reserved. +# +# This copyrighted material is made available to anyone wishing +# to use, modify, copy, or redistribute it subject to the terms +# and conditions of the GNU General Public License version 2. +# +# This program is distributed in the hope that it will be +# useful, but WITHOUT ANY WARRANTY; without even the implied +# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR +# PURPOSE. See the GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public +# License along with this program; if not, write to the Free +# Software Foundation, Inc., 51 Franklin Street, Fifth Floor, +# Boston, MA 02110-1301, USA. +# +# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +# Include Beaker environment +. /usr/share/beakerlib/beakerlib.sh + +PACKAGE="lftp" + +rlJournalStart + rlPhaseStartSetup + rlRun "rlImport selinux-policy/common" 0 "Importing SELinux library" + rlAssertRpm $PACKAGE + rlAssertRpm "expect" + rlAssertRpm "vsftpd" + rlFileBackup "/etc/vsftpd/vsftpd.conf" + if ! rlIsRHEL '<8'; then + # On rhel8 tcp_wrappers are not compiled in. + rlRun "sed -i 's/tcp_wrappers=YES/tcp_wrappers=NO/' vsftpd.conf" 0 "Disable tcp_wrappers" + fi + # User is created in the pyhon script + rlRun "cp -f ./vsftpd.conf /etc/vsftpd/vsftpd.conf" + rlServiceStart "vsftpd" + rlRun "TmpDir=\$(mktemp -d)" 0 "Creating tmp directory" + rlRun "cp ./sanity.py $TmpDir/" + rlRun "pushd $TmpDir" + if rlIsRHEL '<=6'; then + rlRun "rlSEBooleanBackup allow_ftpd_anon_write" + rlRun "rlSEBooleanBackup allow_ftpd_full_access" + rlRun "rlSEBooleanOn allow_ftpd_anon_write" + rlRun "rlSEBooleanOn allow_ftpd_full_access" + else + rlRun "rlSEBooleanBackup ftpd_anon_write" + rlRun "rlSEBooleanBackup ftpd_full_access" + rlRun "rlSEBooleanOn ftpd_anon_write" + rlRun "rlSEBooleanOn ftpd_full_access" + fi + rlPhaseEnd + + rlPhaseStartTest + rlRun "python2 sanity.py -v" + rlPhaseEnd + + rlPhaseStartCleanup + rlFileRestore + rlServiceRestore "vsftpd" + rlRun "popd" + rlRun "rm -r $TmpDir" 0 "Removing tmp directory" + rlRun "rlSEBooleanRestore" 0 "Restoring SELinux booleans" + # If the python script does not finish this deletes the user from the system + rlRun "userdel -rf lftp-tester" 0,6 "Delete lftp-tester user" + rlPhaseEnd +rlJournalPrintText +rlJournalEnd diff --git a/sanity/sanity.py b/sanity/sanity.py new file mode 100755 index 0000000..e34c88d --- /dev/null +++ b/sanity/sanity.py @@ -0,0 +1,966 @@ +#!/usr/bin/python +#coding=utf-8 + +import unittest +import tempfile +import popen2 +import os +import stat +import sys +import commands +import pwd +import grp +import shutil +import time +#import rpm + +ftp_user = 'lftp-tester' +ftp_pass = 'lftp-tester' +ftp_home = '/home/lftp-tester' +ftp_root = '/var/ftp' + +class LftpRunner(object): + def __init__(self): + self.reset() + self.cmds = [] + self.script_file = tempfile.NamedTemporaryFile(mode='wb+') + + def reset(self): + self.rc = -1 + + self.out = [] + self.err = [] + + self.server = [] + self.client = [] + self.regular = [] + self.info = [] + + def add(self, command): + self.cmds.append(command + '\n') + + def save(self): + foo = open('/tmp/lftp', 'wb+') + foo.writelines(self.cmds) + self.script_file.writelines(self.cmds) + self.script_file.flush() + + def filter(self, content=None): + filter_hash = { '----' : self.info, + '--->' : self.client, + '<---' : self.server } + if content == None: + content = self.out + + for line in content: + first = line.split()[0] + try: + filter_hash[first].append(line[5:]) + except KeyError: + self.regular.append(line) + + return self.server, self.client, self.info, self.regular + + def run(self, options="-df", sleep=0): + self.save() + + if not "f" in options: + options += " -f" + + # print os.path.abspath(self.script_file.name) + # with open(os.path.abspath(self.script_file.name)) as f: + # for l in f: + # print l + + p = popen2.Popen4("/usr/bin/lftp %s %s" % (options, os.path.abspath(self.script_file.name))) + + if sleep: + time.sleep(sleep) + + self.rc = p.wait() + self.out = p.fromchild.readlines() + + # Debug output: + # print self.out + + return self.rc, self.out + + def connect(self, host='localhost', user='anonymous', password='anonymous@localhost.localdomain'): + self.add("open %s" % host) + if user == 'anonymous': + self.add('anon') + else: + self.add('user %s %s' % (user, password)) + +class TestBase(unittest.TestCase): + def setUp(self): + unittest.TestCase.setUp(self) + self.runner = LftpRunner() + + def assertContains(self, haystack, needle, msg): + self.assert_(needle in haystack, msg) + + def assertDoesNotContain(self, haystack, needle, msg): + self.assert_(not needle in haystack, msg) + + def assertContainsString(self, haystack, needle, msg): + found = False + + for item in haystack: + if needle in item: + found = True + return + + if found == False: + self.fail(msg) + + def assertDoesNotContainString(self, haystack, needle, msg): + found = False + + for item in haystack: + if needle in item: + found = True + + if found == True: + self.fail(msg) + + def assertFilePermissions(self, file_path, permissions): + real_perm = stat.S_IMODE(os.stat(file_path)[stat.ST_MODE]) + self.assertEquals(real_perm, permissions, "%o != %o" % (real_perm, permissions)) + + def assertSameContent(self, file1, file2): + # there's probably a pure-python way to do that, but this would be more portable + # even across RHEL4 + p = popen2.Popen3("/usr/bin/diff -q %s %s" % (file1, file2)) + self.assertEquals(p.wait(), 0) + + def _ftp_root_put_file(self, filename, path="", contents=""): + f = open(os.path.join(ftp_root, path, filename), 'w') + f.write(contents) + f.flush() + f.close() + + return f + + def _ftp_root_remove_file(self, filename): + os.remove(filename) + + def _create_chowned_file(self, filename, path="", mode="0644"): + newf = self._ftp_root_put_file(filename, path) + os.chown(newf.name, pwd.getpwnam('lftp-tester').pw_uid, grp.getgrnam('lftp-tester').gr_gid) + os.chmod(newf.name, mode) + + return newf + + +class TestLogging(TestBase): + def setUp(self): + TestBase.setUp(self) + + def testLogAnon(self): + " Logging in and out using anonymous login " + self.runner.connect() + self.runner.add("ls -l /") + self.runner.run() + self.runner.filter() + + self.assertContainsString(self.runner.server, "230 ", "Could not log in") + self.assertDoesNotContainString(self.runner.server, "530 ", "Could not log in") + + def testLogUser(self): + " Logging in and out using a local user " + self.runner.connect(user=ftp_user, password=ftp_pass) + self.runner.add("ls -l /") + self.runner.run() + self.runner.filter() + + self.assertContainsString(self.runner.server, "230 ", "Could not log in") + # Only 530 is not good enough here, on rhel8 vsftpd receives 530 because of TLS but you can + # still log in with name and password successfully. + self.assertDoesNotContainString(self.runner.server, "530 Login incorrect", "Login incorrect") + + def testLogUserBadPassword(self): + " Logging in and out using a local user with bad password - must not succeed " + self.runner.connect(user=ftp_user, password="blablableble") + self.runner.add("ls -l /") + self.runner.run() + self.runner.filter() + + self.assertContainsString(self.runner.server, "530 ", "User was able to log in with bad password") + + +class TestBasicCommands(TestBase): + def setUp(self): + TestBase.setUp(self) + self.file_name = 'test-basic-commands' + self.file_full_path = os.path.join(ftp_root, self.file_name) + f = self._ftp_root_put_file(self.file_name, contents="blabla") + self.assert_(os.path.exists(os.path.join(ftp_root, self.file_name))) + + def tearDown(self): + f = self._ftp_root_remove_file(os.path.join(ftp_root, self.file_name)) + self.assert_(not os.path.exists(os.path.join(ftp_root, self.file_name))) + + def testLS(self): + "LS: Tests listing a directory " + self.runner.connect() + self.runner.add("ls -l /") + self.runner.run("-df") + self.runner.filter() + + self.assertContainsString(self.runner.server, '150 ', 'LS failed') + self.assertContainsString(self.runner.regular, 'test-basic-commands', 'LS failed') + + def testPWD(self): + "PWD: Tests printing current directory " + self.runner.connect() + self.runner.add("cd pub") + self.runner.add("pwd") + self.runner.run("-df") + self.runner.filter() + + self.assertContainsString(self.runner.client, 'PWD', 'PWD failed') + self.assertContainsString(self.runner.regular, 'ftp://localhost/pub', 'PWD failed') + + def testCD(self): + "CD: Tests changing directory " + self.runner.connect() + self.runner.add("cd pub") + self.runner.add("pwd") + self.runner.run("-df") + self.runner.filter() + + self.assertContainsString(self.runner.client, 'CWD /pub', 'CD failed') + self.assertContainsString(self.runner.regular, 'ftp://localhost/pub', 'CD failed') + + def testGET(self): + "GET: Test downloading a file " + # get a file from ftp + self.runner.connect() + self.runner.add("get %s" % self.file_name) + self.runner.run("-df") + self.runner.filter() + + # checks its presence in the CWD, clean up + self.assert_(os.path.exists(self.file_name), "GET failed") + os.remove(self.file_name) + + def testPUT(self): + "PUT: Test uploading a file " + # create a dummy file + fname = 'upload-file' + f = open(fname, 'w') + f.write('content') + f.flush() + f.close() + + # store the file + self.runner.connect() + self.runner.add("cd /pub") + self.runner.add("put %s" % fname) + self.runner.add("ls /pub/%s" % fname) + self.runner.run("-df") + self.runner.filter() + + # check if the store op was successfull + fullpath = os.path.join(ftp_root, 'pub', fname) + self.assertContainsString(self.runner.server, '226 ', 'PUT failed') + self.assert_(os.path.exists(fullpath), 'PUT failed') + + # remove the file from the ftp root + os.remove(fullpath) + self.assert_(not os.path.exists(fullpath)) + + def testExternalCommand(self): + " Running external command " + self.runner.connect() + self.runner.add("!ls sanity.py") + self.runner.run() + self.runner.filter() + + self.assertContains(self.runner.regular, 'sanity.py\n', "Running external command failed") + + def testALIAS(self): + "ALIAS: Tests defining an alias " + self.runner.connect() + self.runner.add("alias dir ls -l") + self.runner.add("dir") + self.runner.run() + self.runner.filter() + + self.assertContainsString(self.runner.client, 'LIST -l', 'Setting alias failed') + + def testCAT(self): + "CAT: Printing a remote file to std. output " + # print file + self.runner.connect() + self.runner.add("cat %s" % self.file_name) + self.runner.run() + self.runner.filter() + + # assert its contents on std. output + self.assertContainsString(self.runner.regular, 'blabla', 'CAT failed') + + def testCHMOD(self): + "CHMOD: Changing permissions on a remote file " + os.chown(self.file_full_path, pwd.getpwnam('lftp-tester').pw_uid, grp.getgrnam('lftp-tester').gr_gid) + os.chmod(self.file_full_path, 0777) + old_mode = stat.S_IMODE(os.stat(self.file_full_path)[stat.ST_MODE]) + + # change the permission + self.runner.connect(user=ftp_user, password=ftp_pass) + self.runner.add("cd %s" % ftp_root) + self.runner.add("ls -l") + self.runner.add("ls -l %s" % self.file_name) + self.runner.add("chmod 0644 %s" % self.file_name) + self.runner.add("ls -l %s" % self.file_name) + self.runner.run() + self.runner.filter() + + # check against the old mode + self.assertContainsString(self.runner.server, '200 ', 'RMDIR failed') + new_mode = stat.S_IMODE(os.stat(self.file_full_path)[stat.ST_MODE]) + self.assertNotEqual(old_mode, new_mode, 'CHMOD failed') + self.assertEquals(new_mode, 0644) + + def testCLS(self): + "CLS" + self.runner.connect() + self.runner.add("cls") + self.runner.run() + self.runner.filter() + + self.assertContainsString(self.runner.client, 'LIST', "CLS failed") + self.assertContains(self.runner.regular, "%s\n" % self.file_name , "CLS failed") + + def testCOMMAND(self): + "COMMAND: Run a command, ignoring aliases " + # less is a common defined alias..we try running it as a command, should fail + self.runner.connect() + self.runner.add("command less") + self.runner.run() + self.runner.filter() + + self.assertContains(self.runner.regular, "Unknown command `less'.\n", "COMMAND failed (or less nor defined as alias)") + + def testECHO(self): + "ECHO: echo a string " + # less is a common defined alias..we try running it as a command, should fail + self.runner.connect() + self.runner.add("echo foo") + self.runner.run() + self.runner.filter() + + self.assertContains(self.runner.regular, "foo\n", "ECHO failed") + + def testFIND(self): + " FIND: List files in the directory recursively." + self.runner.connect() + self.runner.add("find .") + self.runner.run() + self.runner.filter() + + self.assertContains(self.runner.regular, "./\n", "FIND failed") + self.assertContains(self.runner.regular, "./pub/\n", "FIND failed") + self.assertContains(self.runner.regular, "./%s\n" % self.file_name, "FIND failed") + + def testMKDIR(self): + "MKDIR: Create remote directories." + self.runner.connect() + self.runner.add("mkdir -p /pub/foo/bar") + self.runner.run() + self.runner.filter() + + self.assertContainsString(self.runner.server, '257 ', 'MKDIR failed') + self.assert_(os.path.exists(os.path.join(ftp_root, 'pub', 'foo', 'bar'))) + shutil.rmtree(os.path.join(ftp_root, 'pub', 'foo')) + + def testRMDIR(self): + "RMDIR: Remove remote directories." + try: + os.mkdir(os.path.join(ftp_root, 'pub', 'foo')) + os.chmod(os.path.join(ftp_root, 'pub', 'foo'), 0777) + os.chown(os.path.join(ftp_root, 'pub', 'foo'), + pwd.getpwnam('lftp-tester').pw_uid, grp.getgrnam('lftp-tester').gr_gid) + except OSError: + pass + self.assert_(os.path.exists(os.path.join(ftp_root, 'pub', 'foo'))) + + self.runner.connect(user=ftp_user, password=ftp_pass) + self.runner.add("cd %s" % ftp_root) + self.runner.add("ls -l pub") + self.runner.add("rmdir pub/foo") + self.runner.run() + self.runner.filter() + + self.assertContainsString(self.runner.server, '250 ', 'RMDIR failed') + self.assert_(not os.path.exists(os.path.join(ftp_root, 'pub', 'foo'))) + + def testMV(self): + "MV: rename/move files" + fname = 'newfile' + fname2 = 'newfile-newname' + f = self._create_chowned_file(fname, 'pub', mode=0777) + + self.runner.connect(user=ftp_user, password=ftp_pass) + self.runner.add("mv %s/pub/%s %s/pub/%s" % (ftp_root, fname, ftp_root, fname2)) + self.runner.add("ls %s/pub/%s" % (ftp_root, fname2)) + self.runner.run() + self.runner.filter() + + self.assertContainsString(self.runner.server, '250 ', 'MV failed') + self.assert_(os.path.exists(os.path.join(ftp_root, 'pub', fname2))) + # FIXME: verify that the file keeps permissions and content + + def testRM(self): + "RM: Remove remote files." + fname = 'newfile' + f = self._create_chowned_file(fname, 'pub', mode=0777) + + self.runner.connect(user=ftp_user, password=ftp_pass) + self.runner.add("rm %s/pub/%s" % (ftp_root, fname)) + self.runner.run() + self.runner.filter() + + self.assertContainsString(self.runner.server, '250 ', 'RM failed') + self.assert_(not os.path.exists(os.path.abspath(f.name))) + + def testNLIST(self): + "NLIST: List remote file names" + self.runner.connect() + self.runner.add("nlist /") + self.runner.run() + self.runner.filter() + + self.assertContainsString(self.runner.regular, '/test-basic-commands', 'NLIST failed') + self.assertContainsString(self.runner.regular, '/pub', 'NLIST failed') + + def testRENLIST(self): + "RENLIST: Same as `nlist', but ignores the cache." + self.runner.connect() + self.runner.add("renlist /") + self.runner.run() + self.runner.filter() + + self.assertContainsString(self.runner.regular, '/test-basic-commands', 'RENLIST failed') + self.assertContainsString(self.runner.regular, '/pub', 'NLIST failed') + + def testRELS(self): + "RELS: Same as ls, but ignores the cache." + self.runner.connect() + self.runner.add("rels -l /") + self.runner.run("-df") + self.runner.filter() + + self.assertContainsString(self.runner.server, '150 ', 'RELS failed') + self.assertContainsString(self.runner.regular, 'test-basic-commands', 'RELS failed') + + def testLCD(self): + "LCD: Change current local directory" + dirname = 'foodir' + os.mkdir(dirname) + + self.runner.connect() + self.runner.add("lcd %s" % dirname) + self.runner.add("lpwd") + self.runner.add("lcd -") + self.runner.run() + self.runner.filter() + + os.rmdir(dirname) + self.assertContainsString(self.runner.regular, + os.path.join(os.getcwd(), dirname), + 'LCD failed') + + def testLPWD(self): + "LPWD: Print current working directory on local machine" + self.runner.connect() + self.runner.add("lpwd") + self.runner.run() + self.runner.filter() + + self.assertContainsString(self.runner.regular, os.getcwd(), 'LPWD failed') + + def testMRM(self): + "MRM: Remove remote files using wildcards" + fname = 'mrmfile' + self._create_chowned_file(fname + '1', 'pub', mode=0777) + self._create_chowned_file(fname + '2', 'pub', mode=0777) + + self.runner.connect(user=ftp_user, password=ftp_pass) + self.runner.add("ls") + self.runner.add("mrm %s/pub/%s*" % (ftp_root, fname)) + self.runner.run() + self.runner.filter() + + # assert that we deleted something + self.assertContainsString(self.runner.server, '250 ', 'MRM failed') + # assert that we deleted exactly 2 files + self.assertEquals(len([ str for str in self.runner.server if str == '250 Delete operation successful.\n']), 2) + +class TestBOOKMARK(TestBase): + def testADD(self): + "ADD: add current place or given location" + self.runner.connect() + self.runner.add("bookmark add pb /pub") + self.runner.add("bookmark list") + self.runner.run() + self.runner.filter() + + self.assertContainsString(self.runner.regular, 'pb\t/pub\n', 'Bookmark add failed') + + def testDEL(self): + "DEL: remove bookmark with name" + self.runner.connect() + self.runner.add("bookmark add pb /pub") + self.runner.add("bookmark del pb /pub") + self.runner.add("bookmark list") + self.runner.run() + self.runner.filter() + + self.assertDoesNotContainString(self.runner.regular, 'pb\t/pub\n', 'Bookmark del failed') + +class TestGET(TestBase): + def setUp(self): + TestBase.setUp(self) + self.fname = 'newfile' + self.full_path = os.path.join(ftp_root, 'pub', self.fname) + self._create_chowned_file(self.fname, 'pub', 0777) + + def testDELETE(self): + "GET -E: delete source files after successful transfer" + self.runner.connect(user=ftp_user, password=ftp_pass) + self.runner.add("get -E %s/pub/%s" % (ftp_root, self.fname)) + self.runner.run() + self.runner.filter() + + self.assertContainsString(self.runner.server, '226 ', 'RELS failed') + self.assertContainsString(self.runner.server, '250 ', 'GET -E failed') + os.remove(self.fname) + + def testASCII(self): + "GET -a: transfer using ASCII mode" + self.runner.connect(user=ftp_user, password=ftp_pass) + self.runner.add("get -a %s/pub/%s" % (ftp_root, self.fname)) + self.runner.run() + self.runner.filter() + + self.assertContainsString(self.runner.server, 'Opening ASCII mode data connection', 'GET -a failed') + self.assertContainsString(self.runner.server, '226 ', 'GET -a failed') + os.remove(self.full_path) + os.remove(self.fname) + + def testLocalName(self): + "GET -o: destination file name" + self.runner.connect(user=ftp_user, password=ftp_pass) + self.runner.add("get %s/pub/%s -o newfile.local" % (ftp_root, self.fname)) + self.runner.run() + self.runner.filter() + + self.assertContainsString(self.runner.server, '226 ', 'GET -o failed') + os.remove('newfile.local') + os.remove(self.full_path) + + def testLocalPath(self): + "GET -O: specifies base directory or URL where files should be placed" + # FIXME: does not work + print "****** testLocalPath Currently does not work ********" + #dest = '/tmp' + + #self.runner.connect(user=ftp_user, password=ftp_pass) + #self.runner.add("get %s/pub/%s -O %s" % (ftp_root, self.fname, dest)) + #self.runner.run() + #self.runner.filter() + + #self.assertContainsString(self.runner.server, '226 ', 'GET -O failed') + #os.remove(os.path.join(dest, self.fname)) + #os.remove(self.full_path) + + def testContents(self): + "GET: tests if get preservers contents" + self.runner.connect(user=ftp_user, password=ftp_pass) + self.runner.add("get %s/pub/%s" % (ftp_root, self.fname)) + self.runner.run() + self.runner.filter() + + self.assertContainsString(self.runner.server, '226 ', 'Contents test failed') + self.assertSameContent(self.full_path, 'newfile') + os.remove('newfile') + os.remove(self.full_path) + +class TestPUT(TestBase): + def setUp(self): + TestBase.setUp(self) + self.fname = 'putfile' + self.full_path = os.path.join(os.getcwd(), self.fname) + self.ftp_full_path = os.path.join(ftp_root, 'pub', self.fname) + + f = open(self.fname, 'w') + f.write('putfile') + f.flush() + f.close() + os.chown(self.fname, pwd.getpwnam('lftp-tester').pw_uid, grp.getgrnam('lftp-tester').gr_gid) + + def testRemoteName(self): + "PUT -o: specifies remote file name" + remote_fname = 'someotherfilename' + self.runner.connect(user=ftp_user, password=ftp_pass) + self.runner.add("put %s -o %s/pub/%s" % (self.fname, ftp_root, remote_fname)) + self.runner.run() + self.runner.filter() + + self.assert_(os.path.exists(os.path.join(ftp_root, 'pub', remote_fname))) + self.assertContainsString(self.runner.server, '226 ', 'PUT -o failed') + + os.remove(self.fname) + os.remove(os.path.join(ftp_root, 'pub', remote_fname)) + + def testDeleteSource(self): + "PUT -E: delete source files after successful transfer" + self.runner.connect(user=ftp_user, password=ftp_pass) + self.runner.add("cd %s/pub" % ftp_root) + self.runner.add("put -E %s" % (self.fname)) + self.runner.run() + self.runner.filter() + + self.assert_(os.path.exists(os.path.join(ftp_root, 'pub', self.fname))) + self.assert_(not os.path.exists(self.full_path)) + self.assertContainsString(self.runner.server, '226 ', 'PUT -E failed') + os.remove(self.ftp_full_path) + + def testASCII(self): + "PUT -a: transfer using ASCII mode" + print "****** testASCII Not yet implemented ********" + #self.runner.connect(user=ftp_user, password=ftp_pass) + #self.runner.add("put -a %s" % (self.fname)) + #print self.runner.run() + #self.runner.filter() + + #self.assert_(os.path.exists(os.path.join(ftp_root, 'pub', self.fname))) + #self.assertContainsString(self.runner.server, 'Opening ASCII mode data connection', 'GET -a failed') + #self.assertContainsString(self.runner.server, '226 ', 'PUT -o failed') + + #os.remove(self.fname) + #os.remove(self.ftp_full_path) + + def testBaseDirectory(self): + "PUT -O: specifies base directory or URL where files should be placed" + # FIXME: add some code here + print "****** testBaseDirectory Not yet implemented ********" + + def testContents(self): + "PUT: tests if get preservers contents" + self.runner.connect(user=ftp_user, password=ftp_pass) + self.runner.add("cd %s/pub" % ftp_root) + self.runner.add("put %s" % (self.fname)) + self.runner.run() + self.runner.filter() + + self.assert_(os.path.exists(os.path.join(ftp_root, 'pub', self.fname))) + self.assertContainsString(self.runner.server, '226 ', 'Contents test failed') + self.assertSameContent(self.ftp_full_path, self.fname) + + os.remove(self.fname) + os.remove(self.ftp_full_path) + +class TestMGET(TestBase): + def setUp(self): + TestBase.setUp(self) + self.basename = 'mgetfile' + self.fname1 = 'mgetfile1' + self.fname2 = 'mgetfile2' + self.full_path1 = os.path.join(ftp_root, 'pub', self.fname1) + self.full_path2 = os.path.join(ftp_root, 'pub', self.fname2) + self._create_chowned_file(self.fname1, 'pub', 0777) + self._create_chowned_file(self.fname2, 'pub', 0777) + + def _clean_up(self): + os.remove(self.fname1) + os.remove(self.fname2) + os.remove(self.full_path1) + os.remove(self.full_path2) + + def testDELETE(self): + "MGET -E: delete source files after successful transfer" + self.runner.connect(user=ftp_user, password=ftp_pass) + self.runner.add("mget -E %s/pub/%s*" % (ftp_root, self.basename)) + self.runner.run() + self.runner.filter() + + self.assertContainsString(self.runner.server, '226 ', 'MGET failed') + self.assertContainsString(self.runner.server, '250 ', 'MGET failed') + self.assert_(os.path.exists(self.fname1)) + self.assert_(os.path.exists(self.fname2)) + + os.remove(self.fname1) + os.remove(self.fname2) + + def testASCII(self): + "MGET -a: transfer using ASCII mode" + self.runner.connect(user=ftp_user, password=ftp_pass) + self.runner.add("mget -a %s/pub/%s*" % (ftp_root, self.basename)) + self.runner.run() + self.runner.filter() + + self.assertContainsString(self.runner.server, 'Opening ASCII mode data connection', 'GET -a failed') + self.assertContainsString(self.runner.server, '226 ', 'MGET failed') + self._clean_up() + + def testLocalPath(self): + "MGET -O: specifies base directory or URL where files should be placed" + # FIXME: does not work + print "****** testLocalPath Currently does not work ********" + #dest = '/tmp' + + #self.runner.connect(user=ftp_user, password=ftp_pass) + #self.runner.add("mget %s/pub/%s* -O %s" % (ftp_root, self.basename, dest)) + #self.runner.run() + #self.runner.filter() + + #self.assertContainsString(self.runner.server, '226 ', 'MGET failed') + #self._clean_up() + + def testContents(self): + "MGET: tests if get preservers contents" + self.runner.connect(user=ftp_user, password=ftp_pass) + self.runner.add("mget %s/pub/%s*" % (ftp_root, self.basename)) + self.runner.run() + self.runner.filter() + + self.assertContainsString(self.runner.server, '226 ', 'Contents test failed') + self.assertSameContent(self.full_path1, self.fname1) + self.assertSameContent(self.full_path2, self.fname2) + self._clean_up() + +class TestMPUT(TestBase): + def setUp(self): + TestBase.setUp(self) + self.basename = 'putfile' + self.fname1 = 'putfile1' + self.fname2 = 'putfile2' + + self.ftp_full_path1 = os.path.join(ftp_root, 'pub', self.fname1) + self.ftp_full_path2 = os.path.join(ftp_root, 'pub', self.fname2) + + for fn in [ self.fname1, self.fname2 ]: + f = open(fn, 'w') + f.write(str(fn)) + f.flush() + f.close() + os.chown(fn, pwd.getpwnam('lftp-tester').pw_uid, grp.getgrnam('lftp-tester').gr_gid) + + def _clean_up(self): + os.remove(self.fname1) + os.remove(self.fname2) + os.remove(self.ftp_full_path1) + os.remove(self.ftp_full_path2) + + def testDeleteSource(self): + "MPUT -E: delete source files after successful transfer" + self.runner.connect(user=ftp_user, password=ftp_pass) + self.runner.add("cd %s/pub" % ftp_root) + self.runner.add("mput -E %s*" % (self.basename)) + self.runner.run() + self.runner.filter() + + self.assert_(os.path.exists(self.ftp_full_path1)) + self.assert_(os.path.exists(self.ftp_full_path2)) + self.assert_(not os.path.exists(self.fname1)) + self.assert_(not os.path.exists(self.fname2)) + self.assertContains(self.runner.server, '226 Transfer complete.\n', 'MPUT -E failed') + os.remove(self.ftp_full_path1) + os.remove(self.ftp_full_path2) + + def testASCII(self): + "PUT -a: transfer using ASCII mode" + # FIXME: add some code here + print "****** testASCII Not yet implemented ********" + #self.runner.connect(user=ftp_user, password=ftp_pass) + #self.runner.add("put -a %s" % (self.fname)) + #print self.runner.run() + #self.runner.filter() + + #self.assert_(os.path.exists(os.path.join(ftp_root, 'pub', self.fname))) + #self.assertContainsString(self.runner.server, 'Opening ASCII mode data connection', 'GET -a failed') + #self.assertContainsString(self.runner.server, '226 ', 'PUT -o failed') + + #os.remove(self.fname) + #os.remove(self.ftp_full_path) + + def testBaseDirectory(self): + "MPUT -O: specifies base directory or URL where files should be placed" + # FIXME: add some code here + print "****** testBaseDirectory Not yet implemented ********" + + def testContents(self): + "MPUT: tests if mput preservers contents" + self.runner.connect(user=ftp_user, password=ftp_pass) + self.runner.add("cd %s/pub" % ftp_root) + self.runner.add("mput %s*" % (self.basename)) + self.runner.run() + self.runner.filter() + + self.assertContainsString(self.runner.server, '226 ', 'Contents test failed') + self.assert_(os.path.exists(self.ftp_full_path1)) + self.assert_(os.path.exists(self.ftp_full_path2)) + self.assertSameContent(self.ftp_full_path1, self.fname1) + self.assertSameContent(self.ftp_full_path2, self.fname2) + + self._clean_up() + +class TestGLOB(TestBase): + def setUp(self): + TestBase.setUp(self) + for f in os.listdir(os.path.join(ftp_root, 'pub')): + try: + os.remove(os.path.join(ftp_root, 'pub', f)) + except OSError: + os.rmdir(os.path.join(ftp_root, 'pub', f)) + + self.dirpath = os.path.join(ftp_root, 'pub', 'pubdir') + os.mkdir(self.dirpath) + self._create_chowned_file('pubfile', 'pub', 0777) + + def testFiles(self): + " GLOB -f: globbing files only " + self.runner.connect() + self.runner.add("glob -f ls pub/*") + self.runner.run() + self.runner.filter() + + self.assertContainsString(self.runner.regular, 'pubfile', 'GLOB -f failed') + self.assertDoesNotContainString(self.runner.regular, 'pubdir', 'GLOB -f failed') + + def testDirectories(self): + " GLOB -d: globbing directories only " + # FIXME - does not seem to work + pass + #self.runner.connect() + #self.runner.add("glob -d ls pub/*") + #self.runner.run() + #self.runner.filter() + + #self.assertContainsString(self.runner.regular, 'pubdir', 'GLOB -d failed') + #self.assertDoesNotContainString(self.runner.regular, 'pubfile', 'GLOB -d failed') + + def testAll(self): + " GLOB -a: globbing everything " + # FIXME - does not seem to work + pass + #self.runner.connect() + #self.runner.add("glob -d ls pub/*") + #self.runner.run() + #self.runner.filter() + + #self.assertContainsString(self.runner.regular, 'pubdir', 'GLOB -a failed') + #self.assertContainsString(self.runner.regular, 'pubfile', 'GLOB -a failed') + +class TestRegression(TestBase): + def testBundledPerl(self): + "176175: lftp used to ship its copy of perl-String-CRC32 - see if fixed " + p = popen2.Popen4("rpm -q --provides lftp") + p.wait() + self.assertDoesNotContainString(p.fromchild.readlines(), + "perl-String-CRC32", + "lftp bundles its own copy of perl-String-CRC32") + def testHTTP_LS(self): + "171884: first ls returns nothing when using http protocol" + self.runner.connect(host="http://mirror.centos.org/centos/") + self.runner.add("ls") + self.runner.run() + self.runner.filter() + + self.assert_(len(self.runner.regular) > 0) + +class TestQueue(TestBase): + def setUp(self): + TestBase.setUp(self) + self.script_file = tempfile.NamedTemporaryFile(mode='wb+') + self.runner.connect() + self.runner.add("queue stop") + self.runner.add("debug -o %s" % self.script_file.name) + + def _runQueue(self): + self.runner.add("queue start") + self.runner.run(sleep=3) + content = self.script_file.readlines() + self.runner.filter(content) + + def _listQueue(self): + self.runner.add("queue") + self.runner.run() + self.runner.filter() + + def testInsertBefore(self): + "QUEUE: insert a command before another one" + self.runner.add("queue ls") + self.runner.add("cd /pub") + self.runner.add("queue -n 1 get somefile") + self._listQueue() + + self.assertContains(self.runner.regular, ' 1. get somefile\n', 'QUEUE: insert before failed') + + def testRememberLocation(self): + "QUEUE: tests of queue remembers changing location with cd" + # This test makes sense for RHEL 5+ + + self.runner.add("queue ls") + self.runner.add("cd /pub") + self.runner.add("queue ls") + self._listQueue() + + self.assertContainsString(self.runner.regular, 'cd /pub', 'QUEUE: remember location failed') + + def testDeleteFromQueue(self): + "QUEUE: delete a command from queue" + self.runner.add("cd /pub") + self.runner.add("queue ls") + self.runner.add("queue -d 1") + self._runQueue() + + self.assertDoesNotContainString(self.runner.regular, 'LIST\r\n', 'QUEUE: remember location failed') + + def testMoveInQueue(self): + "QUEUE: move a command in queue from one position to another" + self.runner.add("queue ls") + self.runner.add("queue get somefile") + self.runner.add("queue -m 2 1") + self._listQueue() + + self.assertContains(self.runner.regular, ' 1. get somefile\n', 'QUEUE: move in queue failed') + +def die(code, code_req, msg): + if code != code_req: + print >>sys.stderr, msg + sys.exit(1) + +if __name__ == "__main__": + # get vsftpd root dir + ftp_root = pwd.getpwnam('ftp').pw_dir + + if os.getuid() != 0: + print "This test must be run as root" + sys.exit(1) + + # add user for testing automatically + (status, output) = commands.getstatusoutput('useradd %s' % (ftp_user)) + die(status, 0, output) + print "User %s added.." % ftp_user + (status, output) = commands.getstatusoutput('echo %s | passwd --stdin %s' % (ftp_pass, ftp_user)) + die(status, 0, output) + ftp_home = pwd.getpwnam(ftp_user).pw_dir + os.chmod(os.path.join(ftp_root, 'pub'), 0777) + + try: + # start vsftpd with basic config + (status, output) = commands.getstatusoutput('service vsftpd restart') + # run the tests + unittest.main() + die(status, 0, output) + finally: + # delete the test user + (status, output) = commands.getstatusoutput('userdel -r %s' % (ftp_user)) + die(status, 0, output) + + + diff --git a/sanity/vsftpd.conf b/sanity/vsftpd.conf new file mode 100644 index 0000000..4d45812 --- /dev/null +++ b/sanity/vsftpd.conf @@ -0,0 +1,117 @@ +# Example config file /etc/vsftpd/vsftpd.conf +# +# The default compiled in settings are fairly paranoid. This sample file +# loosens things up a bit, to make the ftp daemon more usable. +# Please see vsftpd.conf.5 for all compiled in defaults. +# +# READ THIS: This example file is NOT an exhaustive list of vsftpd options. +# Please read the vsftpd.conf.5 manual page to get a full idea of vsftpd's +# capabilities. +# +# Allow anonymous FTP? (Beware - allowed by default if you comment this out). +anonymous_enable=YES +# +# Uncomment this to allow local users to log in. +local_enable=YES +# +# Uncomment this to enable any form of FTP write command. +write_enable=YES +# +# Default umask for local users is 077. You may wish to change this to 022, +# if your users expect that (022 is used by most other ftpd's) +local_umask=022 +# +# Uncomment this to allow the anonymous FTP user to upload files. This only +# has an effect if the above global write enable is activated. Also, you will +# obviously need to create a directory writable by the FTP user. +anon_upload_enable=YES +# +# Uncomment this if you want the anonymous FTP user to be able to create +# new directories. +anon_mkdir_write_enable=YES +# +# Activate directory messages - messages given to remote users when they +# go into a certain directory. +dirmessage_enable=YES +# +# Activate logging of uploads/downloads. +xferlog_enable=YES +# +# Make sure PORT transfer connections originate from port 20 (ftp-data). +connect_from_port_20=YES +# +# If you want, you can arrange for uploaded anonymous files to be owned by +# a different user. Note! Using "root" for uploaded files is not +# recommended! +#chown_uploads=YES +#chown_username=whoever +# +# You may override where the log file goes if you like. The default is shown +# below. +#xferlog_file=/var/log/vsftpd.log +# +# If you want, you can have your log file in standard ftpd xferlog format. +# Note that the default log file location is /var/log/xferlog in this case. +xferlog_std_format=YES +# +# You may change the default value for timing out an idle session. +#idle_session_timeout=600 +# +# You may change the default value for timing out a data connection. +#data_connection_timeout=120 +# +# It is recommended that you define on your system a unique user which the +# ftp server can use as a totally isolated and unprivileged user. +#nopriv_user=ftpsecure +# +# Enable this and the server will recognise asynchronous ABOR requests. Not +# recommended for security (the code is non-trivial). Not enabling it, +# however, may confuse older FTP clients. +#async_abor_enable=YES +# +# By default the server will pretend to allow ASCII mode but in fact ignore +# the request. Turn on the below options to have the server actually do ASCII +# mangling on files when in ASCII mode. +# Beware that on some FTP servers, ASCII support allows a denial of service +# attack (DoS) via the command "SIZE /big/file" in ASCII mode. vsftpd +# predicted this attack and has always been safe, reporting the size of the +# raw file. +# ASCII mangling is a horrible feature of the protocol. +ascii_upload_enable=YES +ascii_download_enable=YES +# +# You may fully customise the login banner string: +#ftpd_banner=Welcome to blah FTP service. +# +# You may specify a file of disallowed anonymous e-mail addresses. Apparently +# useful for combatting certain DoS attacks. +#deny_email_enable=YES +# (default follows) +#banned_email_file=/etc/vsftpd/banned_emails +# +# You may specify an explicit list of local users to chroot() to their home +# directory. If chroot_local_user is YES, then this list becomes a list of +# users to NOT chroot(). +#chroot_list_enable=YES +# (default follows) +#chroot_list_file=/etc/vsftpd/chroot_list +# +# You may activate the "-R" option to the builtin ls. This is disabled by +# default to avoid remote users being able to cause excessive I/O on large +# sites. However, some broken FTP clients such as "ncftp" and "mirror" assume +# the presence of the "-R" option, so there is a strong case for enabling it. +#ls_recurse_enable=YES +# +# When "listen" directive is enabled, vsftpd runs in standalone mode and +# listens on IPv4 sockets. This directive cannot be used in conjunction +# with the listen_ipv6 directive. +listen=YES +# +# This directive enables listening on IPv6 sockets. To listen on IPv4 and IPv6 +# sockets, you must run two copies of vsftpd whith two configuration files. +# Make sure, that one of the listen options is commented !! +#listen_ipv6=YES + +pam_service_name=vsftpd +userlist_enable=YES +tcp_wrappers=YES