Blob Blame History Raw
import urlparse
import json

import httplib2

from keystoneclient.v2_0 import endpoints
from tests import utils


class EndpointTests(utils.TestCase):
    def setUp(self):
        super(EndpointTests, self).setUp()
        self.TEST_REQUEST_HEADERS = {'X-Auth-Token': 'aToken',
                                     'User-Agent': 'python-keystoneclient'}
        self.TEST_POST_HEADERS = {'Content-Type': 'application/json',
                                  'X-Auth-Token': 'aToken',
                                  'User-Agent': 'python-keystoneclient'}
        self.TEST_ENDPOINTS = {'endpoints': [
                                {
                                  'adminurl':
                                    'http://host-1:8774/v1.1/$(tenant_id)s',
                                  'id':
                                    '8f9531231e044e218824b0e58688d262',
                                  'internalurl':
                                    'http://host-1:8774/v1.1/$(tenant_id)s',
                                  'publicurl':
                                    'http://host-1:8774/v1.1/$(tenant_id)s',
                                  'region':
                                    'RegionOne'
                                },
                                {
                                  'adminurl':
                                    'http://host-1:8774/v1.1/$(tenant_id)s',
                                  'id':
                                    '8f9531231e044e218824b0e58688d263',
                                  'internalurl':
                                    'http://host-1:8774/v1.1/$(tenant_id)s',
                                  'publicurl':
                                    'http://host-1:8774/v1.1/$(tenant_id)s',
                                  'region':
                                    'RegionOne'
                                }
                              ]
                            }

    def test_create(self):
        req_body = {"endpoint": {"region": "RegionOne",
                                 "publicurl":
                                     "http://host-3:8774/v1.1/$(tenant_id)s",
                                 "internalurl":
                                     "http://host-3:8774/v1.1/$(tenant_id)s",
                                 "adminurl":
                                     "http://host-3:8774/v1.1/$(tenant_id)s",
                                 "service_id": "e044e21"}}

        resp_body = {"endpoint": {
                        "adminurl":
                        "http://host-3:8774/v1.1/$(tenant_id)s",
                        "region": "RegionOne",
                        "id": "1fd485b2ffd54f409a5ecd42cba11401",
                        "internalurl":
                        "http://host-3:8774/v1.1/$(tenant_id)s",
                        "publicurl":
                        "http://host-3:8774/v1.1/$(tenant_id)s"}}

        resp = httplib2.Response({
            "status": 200,
            "body": json.dumps(resp_body),
            })

        httplib2.Http.request(urlparse.urljoin(self.TEST_URL,
                              'v2.0/endpoints'),
                              'POST',
                              body=json.dumps(req_body),
                              headers=self.TEST_POST_HEADERS) \
                              .AndReturn((resp, resp['body']))
        self.mox.ReplayAll()

        endpoint = self.client.endpoints.create(
            region=req_body['endpoint']['region'],
            publicurl=req_body['endpoint']['publicurl'],
            adminurl=req_body['endpoint']['adminurl'],
            internalurl=req_body['endpoint']['internalurl'],
            service_id=req_body['endpoint']['service_id']
        )
        self.assertTrue(isinstance(endpoint, endpoints.Endpoint))

    def test_delete(self):
        resp = httplib2.Response({
            "status": 200,
            "body": ""
            })
        httplib2.Http.request(urlparse.urljoin(self.TEST_URL,
                              'v2.0/endpoints/8f953'),
                              'DELETE',
                              headers=self.TEST_REQUEST_HEADERS) \
                              .AndReturn((resp, resp['body']))
        self.mox.ReplayAll()

        self.client.endpoints.delete('8f953')

    def test_list(self):
        resp = httplib2.Response({
            "status": 200,
            "body": json.dumps(self.TEST_ENDPOINTS),
            })

        httplib2.Http.request(urlparse.urljoin(self.TEST_URL,
                              'v2.0/endpoints?fresh=1234'),
                              'GET',
                              headers=self.TEST_REQUEST_HEADERS) \
                              .AndReturn((resp, resp['body']))
        self.mox.ReplayAll()

        endpoint_list = self.client.endpoints.list()
        [self.assertTrue(isinstance(r, endpoints.Endpoint)) \
                for r in endpoint_list]