Source code for bottle

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Bottle is a fast and simple micro-framework for small web applications. It
offers request dispatching (Routes) with url parameter support, templates,
a built-in HTTP Server and adapters for many third party WSGI/HTTP-server and
template engines - all in a single file and with no dependencies other than the
Python Standard Library.

Homepage and documentation: http://bottlepy.org/

Copyright (c) 2016, Marcel Hellkamp.
License: MIT (see LICENSE for details)
"""

from __future__ import with_statement

__author__ = 'Marcel Hellkamp'
__version__ = '0.12.18'
__license__ = 'MIT'

# The gevent server adapter needs to patch some modules before they are imported
# This is why we parse the commandline parameters here but handle them later
if __name__ == '__main__':
    from optparse import OptionParser
    _cmd_parser = OptionParser(usage="usage: %prog [options] package.module:app")
    _opt = _cmd_parser.add_option
    _opt("--version", action="store_true", help="show version number.")
    _opt("-b", "--bind", metavar="ADDRESS", help="bind socket to ADDRESS.")
    _opt("-s", "--server", default='wsgiref', help="use SERVER as backend.")
    _opt("-p", "--plugin", action="append", help="install additional plugin/s.")
    _opt("--debug", action="store_true", help="start server in debug mode.")
    _opt("--reload", action="store_true", help="auto-reload on file changes.")
    _cmd_options, _cmd_args = _cmd_parser.parse_args()
    if _cmd_options.server and _cmd_options.server.startswith('gevent'):
        import gevent.monkey; gevent.monkey.patch_all()

import base64, cgi, email.utils, functools, hmac, itertools, mimetypes,\
        os, re, subprocess, sys, tempfile, threading, time, warnings, hashlib

from datetime import date as datedate, datetime, timedelta
from tempfile import TemporaryFile
from traceback import format_exc, print_exc
from inspect import getargspec
from unicodedata import normalize


try: from simplejson import dumps as json_dumps, loads as json_lds
except ImportError: # pragma: no cover
    try: from json import dumps as json_dumps, loads as json_lds
    except ImportError:
        try: from django.utils.simplejson import dumps as json_dumps, loads as json_lds
        except ImportError:
            def json_dumps(data):
                raise ImportError("JSON support requires Python 2.6 or simplejson.")
            json_lds = json_dumps



# We now try to fix 2.5/2.6/3.1/3.2 incompatibilities.
# It ain't pretty but it works... Sorry for the mess.

py   = sys.version_info
py3k = py >= (3, 0, 0)
py25 = py <  (2, 6, 0)
py31 = (3, 1, 0) <= py < (3, 2, 0)

# Workaround for the missing "as" keyword in py3k.
def _e(): return sys.exc_info()[1]

# Workaround for the "print is a keyword/function" Python 2/3 dilemma
# and a fallback for mod_wsgi (resticts stdout/err attribute access)
try:
    _stdout, _stderr = sys.stdout.write, sys.stderr.write
except IOError:
    _stdout = lambda x: sys.stdout.write(x)
    _stderr = lambda x: sys.stderr.write(x)

# Lots of stdlib and builtin differences.
if py3k:
    import http.client as httplib
    import _thread as thread
    from urllib.parse import urljoin, SplitResult as UrlSplitResult
    from urllib.parse import urlencode, quote as urlquote, unquote as urlunquote
    urlunquote = functools.partial(urlunquote, encoding='latin1')
    from http.cookies import SimpleCookie
    if py >= (3, 3, 0):
        from collections.abc import MutableMapping as DictMixin
        from types import ModuleType as new_module
    else:
        from collections import MutableMapping as DictMixin
        from imp import new_module
    import pickle
    from io import BytesIO
    from configparser import ConfigParser
    basestring = str
    unicode = str
    json_loads = lambda s: json_lds(touni(s))
    callable = lambda x: hasattr(x, '__call__')
    imap = map
    def _raise(*a): raise a[0](a[1]).with_traceback(a[2])
else: # 2.x
    import httplib
    import thread
    from urlparse import urljoin, SplitResult as UrlSplitResult
    from urllib import urlencode, quote as urlquote, unquote as urlunquote
    from Cookie import SimpleCookie
    from itertools import imap
    import cPickle as pickle
    from imp import new_module
    from StringIO import StringIO as BytesIO
    from ConfigParser import SafeConfigParser as ConfigParser
    if py25:
        msg  = "Python 2.5 support may be dropped in future versions of Bottle."
        warnings.warn(msg, DeprecationWarning)
        from UserDict import DictMixin
        def next(it): return it.next()
        bytes = str
    else: # 2.6, 2.7
        from collections import MutableMapping as DictMixin
    unicode = unicode
    json_loads = json_lds
    eval(compile('def _raise(*a): raise a[0], a[1], a[2]', '<py3fix>', 'exec'))

# Some helpers for string/byte handling
def tob(s, enc='utf8'):
    return s.encode(enc) if isinstance(s, unicode) else bytes(s)
def touni(s, enc='utf8', err='strict'):
    return s.decode(enc, err) if isinstance(s, bytes) else unicode(s)
tonat = touni if py3k else tob

# 3.2 fixes cgi.FieldStorage to accept bytes (which makes a lot of sense).
# 3.1 needs a workaround.
if py31:
    from io import TextIOWrapper
    class NCTextIOWrapper(TextIOWrapper):
        def close(self): pass # Keep wrapped buffer open.


# A bug in functools causes it to break if the wrapper is an instance method
def update_wrapper(wrapper, wrapped, *a, **ka):
    try: functools.update_wrapper(wrapper, wrapped, *a, **ka)
    except AttributeError: pass



# These helpers are used at module level and need to be defined first.
# And yes, I know PEP-8, but sometimes a lower-case classname makes more sense.

def depr(message, hard=False):
    warnings.warn(message, DeprecationWarning, stacklevel=3)

def makelist(data): # This is just to handy
    if isinstance(data, (tuple, list, set, dict)): return list(data)
    elif data: return [data]
    else: return []


class DictProperty(object):
    ''' Property that maps to a key in a local dict-like attribute. '''
    def __init__(self, attr, key=None, read_only=False):
        self.attr, self.key, self.read_only = attr, key, read_only

    def __call__(self, func):
        functools.update_wrapper(self, func, updated=[])
        self.getter, self.key = func, self.key or func.__name__
        return self

    def __get__(self, obj, cls):
        if obj is None: return self
        key, storage = self.key, getattr(obj, self.attr)
        if key not in storage: storage[key] = self.getter(obj)
        return storage[key]

    def __set__(self, obj, value):
        if self.read_only: raise AttributeError("Read-Only property.")
        getattr(obj, self.attr)[self.key] = value

    def __delete__(self, obj):
        if self.read_only: raise AttributeError("Read-Only property.")
        del getattr(obj, self.attr)[self.key]


class cached_property(object):
    ''' A property that is only computed once per instance and then replaces
        itself with an ordinary attribute. Deleting the attribute resets the
        property. '''

    def __init__(self, func):
        self.__doc__ = getattr(func, '__doc__')
        self.func = func

    def __get__(self, obj, cls):
        if obj is None: return self
        value = obj.__dict__[self.func.__name__] = self.func(obj)
        return value


class lazy_attribute(object):
    ''' A property that caches itself to the class object. '''
    def __init__(self, func):
        functools.update_wrapper(self, func, updated=[])
        self.getter = func

    def __get__(self, obj, cls):
        value = self.getter(cls)
        setattr(cls, self.__name__, value)
        return value






###############################################################################
# Exceptions and Events ########################################################
###############################################################################


[docs]class BottleException(Exception): """ 보틀에서 쓰는 예외들의 베이스 클래스. """ pass
############################################################################### # Routing ###################################################################### ############################################################################### class RouteError(BottleException): """ This is a base class for all routing related exceptions """ class RouteReset(BottleException): """ If raised by a plugin or request handler, the route is reset and all plugins are re-applied. """ class RouterUnknownModeError(RouteError): pass class RouteSyntaxError(RouteError): """ The route parser found something not supported by this router. """ class RouteBuildError(RouteError): """ The route could not be built. """ def _re_flatten(p): ''' Turn all capturing groups in a regular expression pattern into non-capturing groups. ''' if '(' not in p: return p return re.sub(r'(\\*)(\(\?P<[^>]+>|\((?!\?))', lambda m: m.group(0) if len(m.group(1)) % 2 else m.group(1) + '(?:', p) class Router(object): ''' A Router is an ordered collection of route->target pairs. It is used to efficiently match WSGI requests against a number of routes and return the first target that satisfies the request. The target may be anything, usually a string, ID or callable object. A route consists of a path-rule and a HTTP method. The path-rule is either a static path (e.g. `/contact`) or a dynamic path that contains wildcards (e.g. `/wiki/<page>`). The wildcard syntax and details on the matching order are described in docs:`routing`. ''' default_pattern = '[^/]+' default_filter = 're' #: The current CPython regexp implementation does not allow more #: than 99 matching groups per regular expression. _MAX_GROUPS_PER_PATTERN = 99 def __init__(self, strict=False): self.rules = [] # All rules in order self._groups = {} # index of regexes to find them in dyna_routes self.builder = {} # Data structure for the url builder self.static = {} # Search structure for static routes self.dyna_routes = {} self.dyna_regexes = {} # Search structure for dynamic routes #: If true, static routes are no longer checked first. self.strict_order = strict self.filters = { 're': lambda conf: (_re_flatten(conf or self.default_pattern), None, None), 'int': lambda conf: (r'-?\d+', int, lambda x: str(int(x))), 'float': lambda conf: (r'-?[\d.]+', float, lambda x: str(float(x))), 'path': lambda conf: (r'.+?', None, None)} def add_filter(self, name, func): ''' Add a filter. The provided function is called with the configuration string as parameter and must return a (regexp, to_python, to_url) tuple. The first element is a string, the last two are callables or None. ''' self.filters[name] = func rule_syntax = re.compile('(\\\\*)'\ '(?:(?::([a-zA-Z_][a-zA-Z_0-9]*)?()(?:#(.*?)#)?)'\ '|(?:<([a-zA-Z_][a-zA-Z_0-9]*)?(?::([a-zA-Z_]*)'\ '(?::((?:\\\\.|[^\\\\>]+)+)?)?)?>))') def _itertokens(self, rule): offset, prefix = 0, '' for match in self.rule_syntax.finditer(rule): prefix += rule[offset:match.start()] g = match.groups() if len(g[0])%2: # Escaped wildcard prefix += match.group(0)[len(g[0]):] offset = match.end() continue if prefix: yield prefix, None, None name, filtr, conf = g[4:7] if g[2] is None else g[1:4] yield name, filtr or 'default', conf or None offset, prefix = match.end(), '' if offset <= len(rule) or prefix: yield prefix+rule[offset:], None, None def add(self, rule, method, target, name=None): ''' Add a new rule or replace the target for an existing rule. ''' anons = 0 # Number of anonymous wildcards found keys = [] # Names of keys pattern = '' # Regular expression pattern with named groups filters = [] # Lists of wildcard input filters builder = [] # Data structure for the URL builder is_static = True for key, mode, conf in self._itertokens(rule): if mode: is_static = False if mode == 'default': mode = self.default_filter mask, in_filter, out_filter = self.filters[mode](conf) if not key: pattern += '(?:%s)' % mask key = 'anon%d' % anons anons += 1 else: pattern += '(?P<%s>%s)' % (key, mask) keys.append(key) if in_filter: filters.append((key, in_filter)) builder.append((key, out_filter or str)) elif key: pattern += re.escape(key) builder.append((None, key)) self.builder[rule] = builder if name: self.builder[name] = builder if is_static and not self.strict_order: self.static.setdefault(method, {}) self.static[method][self.build(rule)] = (target, None) return try: re_pattern = re.compile('^(%s)$' % pattern) re_match = re_pattern.match except re.error: raise RouteSyntaxError("Could not add Route: %s (%s)" % (rule, _e())) if filters: def getargs(path): url_args = re_match(path).groupdict() for name, wildcard_filter in filters: try: url_args[name] = wildcard_filter(url_args[name]) except ValueError: raise HTTPError(400, 'Path has wrong format.') return url_args elif re_pattern.groupindex: def getargs(path): return re_match(path).groupdict() else: getargs = None flatpat = _re_flatten(pattern) whole_rule = (rule, flatpat, target, getargs) if (flatpat, method) in self._groups: if DEBUG: msg = 'Route <%s %s> overwrites a previously defined route' warnings.warn(msg % (method, rule), RuntimeWarning) self.dyna_routes[method][self._groups[flatpat, method]] = whole_rule else: self.dyna_routes.setdefault(method, []).append(whole_rule) self._groups[flatpat, method] = len(self.dyna_routes[method]) - 1 self._compile(method) def _compile(self, method): all_rules = self.dyna_routes[method] comborules = self.dyna_regexes[method] = [] maxgroups = self._MAX_GROUPS_PER_PATTERN for x in range(0, len(all_rules), maxgroups): some = all_rules[x:x+maxgroups] combined = (flatpat for (_, flatpat, _, _) in some) combined = '|'.join('(^%s$)' % flatpat for flatpat in combined) combined = re.compile(combined).match rules = [(target, getargs) for (_, _, target, getargs) in some] comborules.append((combined, rules)) def build(self, _name, *anons, **query): ''' Build an URL by filling the wildcards in a rule. ''' builder = self.builder.get(_name) if not builder: raise RouteBuildError("No route with that name.", _name) try: for i, value in enumerate(anons): query['anon%d'%i] = value url = ''.join([f(query.pop(n)) if n else f for (n,f) in builder]) return url if not query else url+'?'+urlencode(query) except KeyError: raise RouteBuildError('Missing URL argument: %r' % _e().args[0]) def match(self, environ): ''' Return a (target, url_agrs) tuple or raise HTTPError(400/404/405). ''' verb = environ['REQUEST_METHOD'].upper() path = environ['PATH_INFO'] or '/' target = None if verb == 'HEAD': methods = ['PROXY', verb, 'GET', 'ANY'] else: methods = ['PROXY', verb, 'ANY'] for method in methods: if method in self.static and path in self.static[method]: target, getargs = self.static[method][path] return target, getargs(path) if getargs else {} elif method in self.dyna_regexes: for combined, rules in self.dyna_regexes[method]: match = combined(path) if match: target, getargs = rules[match.lastindex - 1] return target, getargs(path) if getargs else {} # No matching route found. Collect alternative methods for 405 response allowed = set([]) nocheck = set(methods) for method in set(self.static) - nocheck: if path in self.static[method]: allowed.add(verb) for method in set(self.dyna_regexes) - allowed - nocheck: for combined, rules in self.dyna_regexes[method]: match = combined(path) if match: allowed.add(method) if allowed: allow_header = ",".join(sorted(allowed)) raise HTTPError(405, "Method not allowed.", Allow=allow_header) # No matching route and no alternative method found. We give up raise HTTPError(404, "Not found: " + repr(path))
[docs]class Route(object): ''' 이 클래스는 라우트 콜백과 라우트별 메타데이터 및 설정을 감싸며 필요시 플러그인들을 적용한다. 또한 URL 경로 규칙을 Router에서 쓸 수 있는 정규 표현식으로 바꾸는 일도 맡는다. ''' def __init__(self, app, rule, method, callback, name=None, plugins=None, skiplist=None, **config): #: 이 라우트가 설치된 응용. self.app = app #: 경로 규칙 문자열. (예: ``/wiki/:page``) self.rule = rule #: HTTP 메소드 문자열. (예: ``GET``) self.method = method #: 어떤 플러그인도 적용되지 않은 원래 콜백. 인트로스펙션에 유용함. self.callback = callback #: 라우트의 이름. 지정돼 있지 않으면 ``None``. self.name = name or None #: 라우트별 플러그인 목록. (:meth:`Bottle.route` 참고.) self.plugins = plugins or [] #: 이 라우트에 적용하지 않을 플러그인 목록. (:meth:`Bottle.route` 참고.) self.skiplist = skiplist or [] #: :meth:`Bottle.route` 데코레이터에 추가로 준 키워드 인자들이 #: 이 딕셔너리에 저장된다. 라우트별 플러그인 설정 및 메타데이터에 #: 쓰인다. self.config = ConfigDict().load_dict(config, make_namespaces=True) def __call__(self, *a, **ka): depr("Some APIs changed to return Route() instances instead of"\ " callables. Make sure to use the Route.call method and not to"\ " call Route instances directly.") #0.12 return self.call(*a, **ka)
[docs] @cached_property def call(self): ''' 모든 플러그인이 적용된 라우트 콜백. 이 프로퍼티는 필요시 생성되며 이후 요청 처리 속도를 높이기 위해 캐싱된다.''' return self._make_callback()
[docs] def reset(self): ''' 캐싱된 값이 있으면 잊어 버리기. 다음 :attr:`call` 접근 때 모든 플러그인이 재적용된다. ''' self.__dict__.pop('call', None)
[docs] def prepare(self): ''' 필요시 이뤄지는 작업 모두를 즉시 하기. (디버깅에 유용)''' self.call
@property def _context(self): depr('Switch to Plugin API v2 and access the Route object directly.') #0.12 return dict(rule=self.rule, method=self.method, callback=self.callback, name=self.name, app=self.app, config=self.config, apply=self.plugins, skip=self.skiplist)
[docs] def all_plugins(self): ''' 이 라우트에 영향 주는 모든 플러그인 내놓기. ''' unique = set() for p in reversed(self.app.plugins + self.plugins): if True in self.skiplist: break name = getattr(p, 'name', False) if name and (name in self.skiplist or name in unique): continue if p in self.skiplist or type(p) in self.skiplist: continue if name: unique.add(name) yield p
def _make_callback(self): callback = self.callback for plugin in self.all_plugins(): try: if hasattr(plugin, 'apply'): api = getattr(plugin, 'api', 1) context = self if api > 1 else self._context callback = plugin.apply(callback, context) else: callback = plugin(callback) except RouteReset: # Try again with changed configuration. return self._make_callback() if not callback is self.callback: update_wrapper(callback, self.callback) return callback
[docs] def get_undecorated_callback(self): ''' 콜백 반환. 콜백이 데코레이터로 꾸며진 함수면 원래 함수를 복원하려고 시도한다. ''' func = self.callback func = getattr(func, '__func__' if py3k else 'im_func', func) closure_attr = '__closure__' if py3k else 'func_closure' while hasattr(func, closure_attr) and getattr(func, closure_attr): func = getattr(func, closure_attr)[0].cell_contents return func
[docs] def get_callback_args(self): ''' (아마도) 콜백이 키워드 인자로 받는 인자 이름 목록 반환. 콜백이 데코레이터로 꾸며진 함수면 인트로스펙션 전에 원래 함수 복원을 시도한다. ''' return getargspec(self.get_undecorated_callback())[0]
[docs] def get_config(self, key, default=None): ''' 설정 필드를 검색해서 그 값을 반환. 먼저 route.config를 확인하고 다음으로 route.app.config 확인.''' for conf in (self.config, self.app.conifg): if key in conf: return conf[key] return default
def __repr__(self): cb = self.get_undecorated_callback() return '<%s %r %r>' % (self.method, self.rule, cb)
############################################################################### # Application Object ########################################################### ###############################################################################
[docs]class Bottle(object): """ 각 Bottle 객체는 하나의 구별된 웹 응용을 나타내며, 라우트와 콜백, 플러그인, 자원, 설정으로 구성된다. 인스턴스는 호출 가능한 WSGI 응용이다. :param catchall: 참(기본)이면 모든 예외를 처리한다. 디버깅용 미들웨어가 예외를 처리하게 하려면 끄면 된다. """ def __init__(self, catchall=True, autojson=True): #: 앱 설정을 위한 :class:`ConfigDict`. self.config = ConfigDict() self.config._on_change = functools.partial(self.trigger_hook, 'config') self.config.meta_set('autojson', 'validate', bool) self.config.meta_set('catchall', 'validate', bool) self.config['catchall'] = catchall self.config['autojson'] = autojson #: 응용 파일들을 위한 :class:`ResourceManager`. self.resources = ResourceManager() self.routes = [] # List of installed :class:`Route` instances. self.router = Router() # Maps requests to :class:`Route` instances. self.error_handler = {} # Core plugins self.plugins = [] # List of installed plugins. if self.config['autojson']: self.install(JSONPlugin()) self.install(TemplatePlugin()) #: 참이면 대부분의 예외를 잡아서 :exc:`HTTPError`\로 반환한다. catchall = DictProperty('config', 'catchall') __hook_names = 'before_request', 'after_request', 'app_reset', 'config' __hook_reversed = 'after_request' @cached_property def _hooks(self): return dict((name, []) for name in self.__hook_names)
[docs] def add_hook(self, name, func): ''' 훅에 콜백 붙이기. 현재 세 가지 훅이 구현돼 있다. before_request 각 요청 전에 한 번 실행. 요청 문맥은 사용할 수 있지만 아직 라우팅은 이뤄지지 않은 상태. after_request 각 요청 후에 그 결과와 상관없이 한 번 실행. app_reset :meth:`Bottle.reset`\이 호출될 때마다 호출. ''' if name in self.__hook_reversed: self._hooks[name].insert(0, func) else: self._hooks[name].append(func)
[docs] def remove_hook(self, name, func): ''' 훅에서 콜백 제거. ''' if name in self._hooks and func in self._hooks[name]: self._hooks[name].remove(func) return True
[docs] def trigger_hook(self, __name, *args, **kwargs): ''' 훅을 실행하고 결과 리스트 반환. ''' return [hook(*args, **kwargs) for hook in self._hooks[__name][:]]
[docs] def hook(self, name): """ 훅에 콜백을 붙이는 데코레이터 반환. 자세한 내용은 :meth:`add_hook` 참고.""" def decorator(func): self.add_hook(name, func) return func return decorator
[docs] def mount(self, prefix, app, **options): ''' 응용(:class:`Bottle` 또는 단순 WSGI)을 특정 URL 프리픽스에 붙이기. 예:: root_app.mount('/admin/', admin_app) :param prefix: 경로 프리픽스 내지 `마운트 지점`. 슬래시로 끝나면 그 슬래시가 꼭 있어야 한다. :param app: :class:`Bottle` 인스턴스 또는 WSGI 응용. 다른 매개변수들은 모두 하위 :meth:`route` 호출로 전달된다. ''' if isinstance(app, basestring): depr('Parameter order of Bottle.mount() changed.', True) # 0.10 segments = [p for p in prefix.split('/') if p] if not segments: raise ValueError('Empty path prefix.') path_depth = len(segments) def mountpoint_wrapper(): try: request.path_shift(path_depth) rs = HTTPResponse([]) def start_response(status, headerlist, exc_info=None): if exc_info: try: _raise(*exc_info) finally: exc_info = None rs.status = status for name, value in headerlist: rs.add_header(name, value) return rs.body.append body = app(request.environ, start_response) if body and rs.body: body = itertools.chain(rs.body, body) rs.body = body or rs.body return rs finally: request.path_shift(-path_depth) options.setdefault('skip', True) options.setdefault('method', 'PROXY') options.setdefault('mountpoint', {'prefix': prefix, 'target': app}) options['callback'] = mountpoint_wrapper self.route('/%s/<:re:.*>' % '/'.join(segments), **options) if not prefix.endswith('/'): self.route('/' + '/'.join(segments), **options)
[docs] def merge(self, routes): ''' 다른 :class:`Bottle` 응용의 라우트나 :class:`Route` 객체 리스트를 이 응용으로 병합. 라우트의 '소유자'가 그대로 유지된다. 즉 :data:`Route.app` 속성이 바뀌지 않는다. ''' if isinstance(routes, Bottle): routes = routes.routes for route in routes: self.add_route(route)
[docs] def install(self, plugin): ''' plugin을 플러그인 목록에 추가하고 이 응용의 모든 라우트에 적용될 수 있게 준비. plugin은 단순 데코레이터일 수도 있고 :class:`Plugin` API를 구현하는 객체일 수도 있다. ''' if hasattr(plugin, 'setup'): plugin.setup(self) if not callable(plugin) and not hasattr(plugin, 'apply'): raise TypeError("Plugins must be callable or implement .apply()") self.plugins.append(plugin) self.reset() return plugin
[docs] def uninstall(self, plugin): ''' 플러그인 제거. 특정 플러그인을 제거하려면 인스턴스를 주면 되고, 어떤 타입의 모든 플러그인을 제거하려면 그 타입 객체를, ``name`` 속성이 일치하는 모든 플러그인을 제거하려면 문자열을, 모든 플러그인을 제거하려면 ``True``\를 주면 된다. 제거된 플러그인 목록을 반환한다. ''' removed, remove = [], plugin for i, plugin in list(enumerate(self.plugins))[::-1]: if remove is True or remove is plugin or remove is type(plugin) \ or getattr(plugin, 'name', True) == remove: removed.append(plugin) del self.plugins[i] if hasattr(plugin, 'close'): plugin.close() if removed: self.reset() return removed
[docs] def reset(self, route=None): ''' 모든 라우트를 재설정(플러그인들을 강제로 재적용)하고 캐시 모두 비우기. ID나 라우트 객체를 주면 그 특정 라우트만 영향받는다. ''' if route is None: routes = self.routes elif isinstance(route, Route): routes = [route] else: routes = [self.routes[route]] for route in routes: route.reset() if DEBUG: for route in routes: route.prepare() self.trigger_hook('app_reset')
[docs] def close(self): ''' 응용과 설치된 모든 플러그인 닫기. ''' for plugin in self.plugins: if hasattr(plugin, 'close'): plugin.close() self.stopped = True
[docs] def run(self, **kwargs): ''' 같은 매개변수로 :func:`run` 호출. ''' run(self, **kwargs)
[docs] def match(self, environ): """ 걸리는 라우트를 검색해서 (:class:`Route` , urlargs) 튜플을 반환. 두 번째 값은 URL에서 추출한 매개변수들을 담은 딕셔너리다. 불일치 시 :exc:`HTTPError` (404/405)를 던진다.""" return self.router.match(environ)
[docs] def get_url(self, routename, **kargs): """ 지정한 라우트에 걸리는 문자열을 반환. """ scriptname = request.environ.get('SCRIPT_NAME', '').strip('/') + '/' location = self.router.build(routename, **kargs).lstrip('/') return urljoin(urljoin('/', scriptname), location)
[docs] def add_route(self, route): ''' 라우트 객체 추가. 단 :data:`Route.app` 속성은 바꾸지 않음. ''' self.routes.append(route) self.router.add(route.rule, route.method, route, name=route.name) if DEBUG: route.prepare()
[docs] def route(self, path=None, method='GET', callback=None, name=None, apply=None, skip=None, **config): """ 요청 URL에 함수를 결속시키는 데코레이터. 예:: @app.route('/hello/:name') def hello(name): return 'Hello %s' % name ``:name`` 부분은 와일드카드다. 자세한 문법은 :class:`Router` 참고. :param path: 받을 요청 경로 또는 경로 목록. 경로를 지정하지 않으면 함수 시그너처를 가지고 자동으로 생성한다. :param method: 받을 HTTP 메소드(`GET`, `POST`, `PUT`, ...) 또는 메소드들의 목록. (기본값: `GET`) :param callback: 데코레이터 문법을 피하기 위한 선택적인 지정 방법. ``route(..., callback=func)``\가 ``route(...)(func)``\와 같다. :param name: 이 라우트의 이름. (기본값: None) :param apply: 데코레이터, 또는 플러그인, 또는 플러그인 목록. 설치돼 있는 플러그인들에 더해서 라우트 콜백에 적용된다. :param skip: 플러그인 또는 플러그인 클래스 또는 이름의 목록. 걸리는 플러그인은 이 라우트에 설치하지 않는다. ``True``\면 모두 건너뛴다. 추가 키워드 인자가 있으면 라우트별 설정에 저장돼서 플러그인들로 전달된다. (:meth:`Plugin.apply` 참고.) """ if callable(path): path, callback = None, path plugins = makelist(apply) skiplist = makelist(skip) def decorator(callback): # TODO: Documentation and tests if isinstance(callback, basestring): callback = load(callback) for rule in makelist(path) or yieldroutes(callback): for verb in makelist(method): verb = verb.upper() route = Route(self, rule, verb, callback, name=name, plugins=plugins, skiplist=skiplist, **config) self.add_route(route) return callback return decorator(callback) if callback else decorator
[docs] def get(self, path=None, method='GET', **options): """ :meth:`route`\와 같음. """ return self.route(path, method, **options)
[docs] def post(self, path=None, method='POST', **options): """ :meth:`route`\와 같되 method 매개변수가 ``POST``. """ return self.route(path, method, **options)
[docs] def put(self, path=None, method='PUT', **options): """ :meth:`route`\와 같되 method 매개변수가 ``PUT``. """ return self.route(path, method, **options)
[docs] def delete(self, path=None, method='DELETE', **options): """ :meth:`route`\와 같되 method 매개변수가 ``DELETE``. """ return self.route(path, method, **options)
[docs] def error(self, code=500): """ 데코레이터: HTTP 오류 코드에 대한 출력 핸들러 등록""" def wrapper(handler): self.error_handler[int(code)] = handler return handler return wrapper
def default_error_handler(self, res): return tob(template(ERROR_PAGE_TEMPLATE, e=res)) def _handle(self, environ): path = environ['bottle.raw_path'] = environ['PATH_INFO'] if py3k: try: environ['PATH_INFO'] = path.encode('latin1').decode('utf8') except UnicodeError: return HTTPError(400, 'Invalid path string. Expected UTF-8') try: environ['bottle.app'] = self request.bind(environ) response.bind() try: self.trigger_hook('before_request') route, args = self.router.match(environ) environ['route.handle'] = route environ['bottle.route'] = route environ['route.url_args'] = args return route.call(**args) finally: self.trigger_hook('after_request') except HTTPResponse: return _e() except RouteReset: route.reset() return self._handle(environ) except (KeyboardInterrupt, SystemExit, MemoryError): raise except Exception: if not self.catchall: raise stacktrace = format_exc() environ['wsgi.errors'].write(stacktrace) return HTTPError(500, "Internal Server Error", _e(), stacktrace) def _cast(self, out, peek=None): """ Try to convert the parameter into something WSGI compatible and set correct HTTP headers when possible. Support: False, str, unicode, dict, HTTPResponse, HTTPError, file-like, iterable of strings and iterable of unicodes """ # Empty output is done here if not out: if 'Content-Length' not in response: response['Content-Length'] = 0 return [] # Join lists of byte or unicode strings. Mixed lists are NOT supported if isinstance(out, (tuple, list))\ and isinstance(out[0], (bytes, unicode)): out = out[0][0:0].join(out) # b'abc'[0:0] -> b'' # Encode unicode strings if isinstance(out, unicode): out = out.encode(response.charset) # Byte Strings are just returned if isinstance(out, bytes): if 'Content-Length' not in response: response['Content-Length'] = len(out) return [out] # HTTPError or HTTPException (recursive, because they may wrap anything) # TODO: Handle these explicitly in handle() or make them iterable. if isinstance(out, HTTPError): out.apply(response) out = self.error_handler.get(out.status_code, self.default_error_handler)(out) return self._cast(out) if isinstance(out, HTTPResponse): out.apply(response) return self._cast(out.body) # File-like objects. if hasattr(out, 'read'): if 'wsgi.file_wrapper' in request.environ: return request.environ['wsgi.file_wrapper'](out) elif hasattr(out, 'close') or not hasattr(out, '__iter__'): return WSGIFileWrapper(out) # Handle Iterables. We peek into them to detect their inner type. try: iout = iter(out) first = next(iout) while not first: first = next(iout) except StopIteration: return self._cast('') except HTTPResponse: first = _e() except (KeyboardInterrupt, SystemExit, MemoryError): raise except Exception: if not self.catchall: raise first = HTTPError(500, 'Unhandled exception', _e(), format_exc()) # These are the inner types allowed in iterator or generator objects. if isinstance(first, HTTPResponse): return self._cast(first) elif isinstance(first, bytes): new_iter = itertools.chain([first], iout) elif isinstance(first, unicode): encoder = lambda x: x.encode(response.charset) new_iter = imap(encoder, itertools.chain([first], iout)) else: msg = 'Unsupported response type: %s' % type(first) return self._cast(HTTPError(500, msg)) if hasattr(out, 'close'): new_iter = _closeiter(new_iter, out.close) return new_iter
[docs] def wsgi(self, environ, start_response): """ 보틀의 WSGI 인터페이스. """ try: out = self._cast(self._handle(environ)) # rfc2616 section 4.3 if response._status_code in (100, 101, 204, 304)\ or environ['REQUEST_METHOD'] == 'HEAD': if hasattr(out, 'close'): out.close() out = [] start_response(response._status_line, response.headerlist) return out except (KeyboardInterrupt, SystemExit, MemoryError): raise except Exception: if not self.catchall: raise err = '<h1>Critical error while processing request: %s</h1>' \ % html_escape(environ.get('PATH_INFO', '/')) if DEBUG: err += '<h2>Error:</h2>\n<pre>\n%s\n</pre>\n' \ '<h2>Traceback:</h2>\n<pre>\n%s\n</pre>\n' \ % (html_escape(repr(_e())), html_escape(format_exc())) environ['wsgi.errors'].write(err) headers = [('Content-Type', 'text/html; charset=UTF-8')] start_response('500 INTERNAL SERVER ERROR', headers, sys.exc_info()) return [tob(err)]
def __call__(self, environ, start_response): ''' Each instance of :class:'Bottle' is a WSGI application. ''' return self.wsgi(environ, start_response)
############################################################################### # HTTP and WSGI Tools ########################################################## ###############################################################################
[docs]class BaseRequest(object): """ WSGI 환경 딕셔너리들의 래퍼에 여러 편리한 접근 메소드와 프로퍼티를 더한 것. 대부분 읽기 전용이다. 요청에 새 속성을 추가하면 실제로는 환경 딕셔너리에 ('bottle.request.ext.<name>'으로) 추가된다. 요청별 데이터를 저장하고 접근하려 할 때 권장하는 방식이다. """ __slots__ = ('environ') #: :attr:`body`\를 위한 메모리 버퍼의 바이트 단위 최대 크기. MEMFILE_MAX = 102400 def __init__(self, environ=None): """ WSGI environ 딕셔너리 포장. """ #: 포장된 WSGI environ 딕셔너리. 유일한 진짜 속성이다. #: 다른 속성들은 사실 모두 읽기 전용 프로퍼티다. self.environ = {} if environ is None else environ self.environ['bottle.request'] = self
[docs] @DictProperty('environ', 'bottle.app', read_only=True) def app(self): ''' 이 요청을 처리 중인 보틀 응용. ''' raise RuntimeError('This request is not connected to an application.')
[docs] @DictProperty('environ', 'bottle.route', read_only=True) def route(self): """ 이 요청에 걸린 보틀 :class:`Route` 객체. """ raise RuntimeError('This request is not connected to a route.')
[docs] @DictProperty('environ', 'route.url_args', read_only=True) def url_args(self): """ URL에서 추출한 인자들. """ raise RuntimeError('This request is not connected to a route.')
@property def path(self): ''' ``PATH_INFO`` 값 앞에 슬래시를 딱 한 개 붙인 것. (이상 동작하는 클라이언트에 대처하고 "빈 경로" 경우를 피하기 위해.) ''' return '/' + self.environ.get('PATH_INFO','').lstrip('/') @property def method(self): ''' ``REQUEST_METHOD`` 값을 대문자 문자열로. ''' return self.environ.get('REQUEST_METHOD', 'GET').upper()
[docs] @DictProperty('environ', 'bottle.request.headers', read_only=True) def headers(self): ''' 대소문자 구별 없이 HTTP 요청 헤더에 접근할 수 있게 해 주는 :class:`WSGIHeaderDict`. ''' return WSGIHeaderDict(self.environ)
[docs] def get_header(self, name, default=None): ''' 요청 헤더의 값을 반환. 없으면 주어진 기본값 반환. ''' return self.headers.get(name, default)
[docs] @DictProperty('environ', 'bottle.request.cookies', read_only=True) def cookies(self): """ 쿠키들을 파싱해서 담은 :class:`FormsDict`. 서명된 쿠키가 디코딩되어 있지 않다. 서명된 쿠키에는 :meth:`get_cookie` 사용. """ cookies = SimpleCookie(self.environ.get('HTTP_COOKIE','')).values() return FormsDict((c.key, c.value) for c in cookies)
[docs] @DictProperty('environ', 'bottle.request.query', read_only=True) def query(self): ''' :attr:`query_string`\을 파싱해서 담은 :class:`FormsDict`. 이 값들을 "URL 인자"나 "GET 매개변수"라고도 하는데, :class:`Router`\에서 제공하는 "URL 와일드카드"와 혼동하지 말아야 한다. ''' get = self.environ['bottle.get'] = FormsDict() pairs = _parse_qsl(self.environ.get('QUERY_STRING', '')) for key, value in pairs: get[key] = value return get
[docs] @DictProperty('environ', 'bottle.request.forms', read_only=True) def forms(self): """ `url-encoded`\나 `multipart/form-data`\로 인코딩된 POST 또는 PUT 요청 바디의 양식 값들을 파싱한 것. :class:`FormsDict`\로 결과를 반환한다. 모든 키와 값이 문자열이다. 파일 업로드는 :attr:`files`\에 따로 저장된다. """ forms = FormsDict() for name, item in self.POST.allitems(): if not isinstance(item, FileUpload): forms[name] = item return forms
[docs] @DictProperty('environ', 'bottle.request.params', read_only=True) def params(self): """ :attr:`query`\와 :attr:`forms`\의 값을 합친 :class:`FormsDict`. 파일 업로드는 :attr:`files`\에 저장된다. """ params = FormsDict() for key, value in self.query.allitems(): params[key] = value for key, value in self.forms.allitems(): params[key] = value return params
[docs] @DictProperty('environ', 'bottle.request.files', read_only=True) def files(self): """ `multipart/form-data`\로 인코딩된 POST 또는 PUT 요청 바디에서 파싱한 파일 업로드. 값들이 :class:`FileUpload`\의 인스턴스다. """ files = FormsDict() for name, item in self.POST.allitems(): if isinstance(item, FileUpload): files[name] = item return files
[docs] @DictProperty('environ', 'bottle.request.json', read_only=True) def json(self): ''' ``Content-Type`` 헤더가 ``application/json``\인 경우 이 프로퍼티는 파싱된 요청 바디 내용물을 담는다. 메모리 고갈을 피하기 위해 :attr:`MEMFILE_MAX`\보다 작은 요청만 처리한다. ''' ctype = self.environ.get('CONTENT_TYPE', '').lower().split(';')[0] if ctype == 'application/json': b = self._get_body_string() if not b: return None return json_loads(b) return None
def _iter_body(self, read, bufsize): maxread = max(0, self.content_length) while maxread: part = read(min(maxread, bufsize)) if not part: break yield part maxread -= len(part) def _iter_chunked(self, read, bufsize): err = HTTPError(400, 'Error while parsing chunked transfer body.') rn, sem, bs = tob('\r\n'), tob(';'), tob('') while True: header = read(1) while header[-2:] != rn: c = read(1) header += c if not c: raise err if len(header) > bufsize: raise err size, _, _ = header.partition(sem) try: maxread = int(tonat(size.strip()), 16) except ValueError: raise err if maxread == 0: break buff = bs while maxread > 0: if not buff: buff = read(min(maxread, bufsize)) part, buff = buff[:maxread], buff[maxread:] if not part: raise err yield part maxread -= len(part) if read(2) != rn: raise err @DictProperty('environ', 'bottle.request.body', read_only=True) def _body(self): body_iter = self._iter_chunked if self.chunked else self._iter_body read_func = self.environ['wsgi.input'].read body, body_size, is_temp_file = BytesIO(), 0, False for part in body_iter(read_func, self.MEMFILE_MAX): body.write(part) body_size += len(part) if not is_temp_file and body_size > self.MEMFILE_MAX: body, tmp = TemporaryFile(mode='w+b'), body body.write(tmp.getvalue()) del tmp is_temp_file = True self.environ['wsgi.input'] = body body.seek(0) return body def _get_body_string(self): ''' read body until content-length or MEMFILE_MAX into a string. Raise HTTPError(413) on requests that are to large. ''' clen = self.content_length if clen > self.MEMFILE_MAX: raise HTTPError(413, 'Request to large') if clen < 0: clen = self.MEMFILE_MAX + 1 data = self.body.read(clen) if len(data) > self.MEMFILE_MAX: # Fail fast raise HTTPError(413, 'Request to large') return data @property def body(self): """ seek 가능한 파일스러운 객체로 된 HTTP 요청 바디. :attr:`MEMFILE_MAX`\에 따라 임시 파일이거나 :class:`io.BytesIO` 인스턴스다. 이 프로퍼티에 처음 접근할 때 환경 변수 ``wsgi.input``\을 읽어서 교체한다. 이후 접근 때는 그 파일 객체에 `seek(0)`\만 한다. """ self._body.seek(0) return self._body @property def chunked(self): ''' chunked 전송 인코딩이었으면 True. ''' return 'chunked' in self.environ.get('HTTP_TRANSFER_ENCODING', '').lower() #: :attr:`query`\의 별칭. GET = query
[docs] @DictProperty('environ', 'bottle.request.post', read_only=True) def POST(self): """ :attr:`forms`\와 :attr:`files`\의 값을 하나로 합친 :class:`FormsDict`. 값이 문자열(양식 값) 또는 :class:`cgi.FieldStorage` 인스턴스(파일 업로드)다. """ post = FormsDict() # We default to application/x-www-form-urlencoded for everything that # is not multipart and take the fast path (also: 3.1 workaround) if not self.content_type.startswith('multipart/'): pairs = _parse_qsl(tonat(self._get_body_string(), 'latin1')) for key, value in pairs: post[key] = value return post safe_env = {'QUERY_STRING':''} # Build a safe environment for cgi for key in ('REQUEST_METHOD', 'CONTENT_TYPE', 'CONTENT_LENGTH'): if key in self.environ: safe_env[key] = self.environ[key] args = dict(fp=self.body, environ=safe_env, keep_blank_values=True) if py31: args['fp'] = NCTextIOWrapper(args['fp'], encoding='utf8', newline='\n') elif py3k: args['encoding'] = 'utf8' data = cgi.FieldStorage(**args) self['_cgi.FieldStorage'] = data #http://bugs.python.org/issue18394#msg207958 data = data.list or [] for item in data: if item.filename: post[item.name] = FileUpload(item.file, item.name, item.filename, item.headers) else: post[item.name] = item.value return post
@property def url(self): """ 호스트 이름과 스킴을 포함한 전체 요청 URI. 앱이 역방향 프록시나 로드 밸런서 뒤에서 도는데 이상한 결과가 나온다면 ``X-Forwarded-Host`` 헤더가 올바로 설정되는지 확인해 보자. """ return self.urlparts.geturl()
[docs] @DictProperty('environ', 'bottle.request.urlparts', read_only=True) def urlparts(self): ''' :attr:`url` 문자열을 :class:`urlparse.SplitResult` 튜플로 만든 것. 그 튜플에는 (scheme, host, path, query_string, fragment)가 들어 있는데, fragment는 서버에게 보이지 않으므로 항상 비어 있다. ''' env = self.environ http = env.get('HTTP_X_FORWARDED_PROTO') or env.get('wsgi.url_scheme', 'http') host = env.get('HTTP_X_FORWARDED_HOST') or env.get('HTTP_HOST') if not host: # HTTP 1.1 requires a Host-header. This is for HTTP/1.0 clients. host = env.get('SERVER_NAME', '127.0.0.1') port = env.get('SERVER_PORT') if port and port != ('80' if http == 'http' else '443'): host += ':' + port path = urlquote(self.fullpath) return UrlSplitResult(http, host, path, env.get('QUERY_STRING'), '')
@property def fullpath(self): """ (존재 시) :attr:`script_name`\을 포함한 요청 경로. """ return urljoin(self.script_name, self.path.lstrip('/')) @property def query_string(self): """ URL의 :attr:`query` 부분(``?``\와 ``#`` 사이 전부)을 그대로 문자열로 만든 것. """ return self.environ.get('QUERY_STRING', '') @property def script_name(self): ''' 응용을 호출하기 전에 상위 단계(서버 또는 라우팅 미들웨어)에서 제거한 URL `path` 부분 원래 값. 그 스크립트 경로 앞과 뒤에 슬래시를 붙여서 반환한다. ''' script_name = self.environ.get('SCRIPT_NAME', '').strip('/') return '/' + script_name + '/' if script_name else '/'
[docs] def path_shift(self, shift=1): ''' :attr:`path`\에서 :attr:`script_name`\으로 또는 반대로 경로 분절들을 이동. :param shift: 옮길 경로 분절 수. 음수로 해서 이동 방향을 바꿀 수도 있다. (기본값: 1) ''' script = self.environ.get('SCRIPT_NAME','/') self['SCRIPT_NAME'], self['PATH_INFO'] = path_shift(script, self.path, shift)
@property def content_length(self): ''' 정수로 된 요청 바디 길이. 이 헤더를 설정할 책임이 클라이언트에게 있다. 설정하지 않았으면 바디의 실제 길이를 알 수 없으므로 -1을 반환한다. 그 경우 :attr:`body`\가 비어 있게 된다. ''' return int(self.environ.get('CONTENT_LENGTH') or -1) @property def content_type(self): ''' 소문자로 된 Content-Type 헤더. (기본값: 빈 문자열) ''' return self.environ.get('CONTENT_TYPE', '').lower() @property def is_xhr(self): ''' XMLHttpRequest로 시작된 요청이면 True. 자바스크립트 라이브러리에서 `X-Requested-With` 헤더를 지원하는 경우에만 동작한다. (인기 있는 라이브러리들은 대부분 지원한다.) ''' requested_with = self.environ.get('HTTP_X_REQUESTED_WITH','') return requested_with.lower() == 'xmlhttprequest' @property def is_ajax(self): ''' :attr:`is_xhr`\의 별칭. "Ajax"는 올바른 용어가 아니다. ''' return self.is_xhr @property def auth(self): """ (user, password) 튜플로 된 HTTP 인증 데이터. 이 구현체에선 현재 basic 인증만 지원한다. (digest는 지원하지 않는다.) 상위 수준에서 (가령 프론트엔드 웹 서버나 미들웨어에서) 인증이 이뤄지는 경우 password 필드가 None이지만 user 필드는 환경 변수 ``REMOTE_USER``\에서 찾아 넣는다. 오류 발생 시 None을 반환한다. """ basic = parse_auth(self.environ.get('HTTP_AUTHORIZATION','')) if basic: return basic ruser = self.environ.get('REMOTE_USER') if ruser: return (ruser, None) return None @property def remote_route(self): """ 이 요청에 관련된 모든 IP 주소 목록. 클라이언트 IP로 시작해서 0개 이상의 프록시가 따라온다. 모든 프록시에서 ``X-Forwarded-For`` 헤더를 지원하는 경우에만 동작한다. 악의적 클라이언트가 이 정보를 위조할 수 있다는 점에 유의하자. """ proxy = self.environ.get('HTTP_X_FORWARDED_FOR') if proxy: return [ip.strip() for ip in proxy.split(',')] remote = self.environ.get('REMOTE_ADDR') return [remote] if remote else [] @property def remote_addr(self): """ 문자열로 된 클라이언트 IP. 악의적 클라이언트가 이 정보를 위조할 수 있다는 점에 유의하자. """ route = self.remote_route return route[0] if route else None
[docs] def copy(self): """ :attr:`environ`\을 얕게 복사한 새 :class:`Request` 반환. """ return Request(self.environ.copy())
def get(self, value, default=None): return self.environ.get(value, default) def __getitem__(self, key): return self.environ[key] def __delitem__(self, key): self[key] = ""; del(self.environ[key]) def __iter__(self): return iter(self.environ) def __len__(self): return len(self.environ) def keys(self): return self.environ.keys() def __setitem__(self, key, value): """ Change an environ value and clear all caches that depend on it. """ if self.environ.get('bottle.request.readonly'): raise KeyError('The environ dictionary is read-only.') self.environ[key] = value todelete = () if key == 'wsgi.input': todelete = ('body', 'forms', 'files', 'params', 'post', 'json') elif key == 'QUERY_STRING': todelete = ('query', 'params') elif key.startswith('HTTP_'): todelete = ('headers', 'cookies') for key in todelete: self.environ.pop('bottle.request.'+key, None) def __repr__(self): return '<%s: %s %s>' % (self.__class__.__name__, self.method, self.url) def __getattr__(self, name): ''' Search in self.environ for additional user defined attributes. ''' try: var = self.environ['bottle.request.ext.%s'%name] return var.__get__(self) if hasattr(var, '__get__') else var except KeyError: raise AttributeError('Attribute %r not defined.' % name) def __setattr__(self, name, value): if name == 'environ': return object.__setattr__(self, name, value) self.environ['bottle.request.ext.%s'%name] = value
def _hkey(key): if '\n' in key or '\r' in key or '\0' in key: raise ValueError("Header names must not contain control characters: %r" % key) return key.title().replace('_', '-') def _hval(value): value = tonat(value) if '\n' in value or '\r' in value or '\0' in value: raise ValueError("Header value must not contain control characters: %r" % value) return value class HeaderProperty(object): def __init__(self, name, reader=None, writer=None, default=''): self.name, self.default = name, default self.reader, self.writer = reader, writer self.__doc__ = '%r 헤더의 현재 값.' % name.title() def __get__(self, obj, cls): if obj is None: return self value = obj.get_header(self.name, self.default) return self.reader(value) if self.reader else value def __set__(self, obj, value): obj[self.name] = self.writer(value) if self.writer else value def __delete__(self, obj): del obj[self.name]
[docs]class BaseResponse(object): """ 응답 바디, 헤더, 쿠키 저장 클래스. 이 클래스는 분명 헤더에 대한 대소문자 구별 없는 dict스러운 항목 접근을 지원하지만 dict가 아니다. 무엇보다 응답에 반복문을 돌리면 헤더가 아니라 바디 조각들을 내놓는다. :param body: 지원 타입들 중 하나로 된 응답 바디. :param status: HTTP 상태 코드(예: 200) 또는 이유 문구를 포함한 상태 행(예: '200 OK'). :param headers: 이름-값 짝의 딕셔너리 또는 리스트. 추가로 준 키워드 인자들은 헤더 목록에 추가된다. 헤더 이름의 밑줄이 대시로 바뀐다. """ default_status = 200 default_content_type = 'text/html; charset=UTF-8' # Header blacklist for specific response codes # (rfc2616 section 10.2.3 and 10.3.5) bad_headers = { 204: set(('Content-Type',)), 304: set(('Allow', 'Content-Encoding', 'Content-Language', 'Content-Length', 'Content-Range', 'Content-Type', 'Content-Md5', 'Last-Modified'))} def __init__(self, body='', status=None, headers=None, **more_headers): self._cookies = None self._headers = {} self.body = body self.status = status or self.default_status if headers: if isinstance(headers, dict): headers = headers.items() for name, value in headers: self.add_header(name, value) if more_headers: for name, value in more_headers.items(): self.add_header(name, value)
[docs] def copy(self, cls=None): ''' 자기 사본을 반환. ''' cls = cls or BaseResponse assert issubclass(cls, BaseResponse) copy = cls() copy.status = self.status copy._headers = dict((k, v[:]) for (k, v) in self._headers.items()) if self._cookies: copy._cookies = SimpleCookie() copy._cookies.load(self._cookies.output(header='')) return copy
def __iter__(self): return iter(self.body) def close(self): if hasattr(self.body, 'close'): self.body.close() @property def status_line(self): ''' 문자열로 된 HTTP 상태 행. (예: ``404 Not Found``)''' return self._status_line @property def status_code(self): ''' 정수로 된 HTTP 상태 코드. (예: 404)''' return self._status_code def _set_status(self, status): if isinstance(status, int): code, status = status, _HTTP_STATUS_LINES.get(status) elif ' ' in status: status = status.strip() code = int(status.split()[0]) else: raise ValueError('String status line without a reason phrase.') if not 100 <= code <= 999: raise ValueError('Status code out of range.') self._status_code = code self._status_line = str(status or ('%d Unknown' % code)) def _get_status(self): return self._status_line status = property(_get_status, _set_status, None, ''' HTTP 응답 상태를 바꾸기 위한 쓰기 가능 프로퍼티. 숫자 코드 (100-999) 또는 자체 이유 문구(예: "404 Brain not found")가 있는 문자열을 받는다. 그에 따라 :data:`status_line`\과 :data:`status_code`\가 모두 갱신된다. 반환 값은 항상 상태 행 문자열이다. ''') del _get_status, _set_status @property def headers(self): ''' :class:`HeaderDict` 인스턴스. 응답 헤더들에 대한 대소문자 구별 없는 dict스러운 뷰. ''' hdict = HeaderDict() hdict.dict = self._headers return hdict def __contains__(self, name): return _hkey(name) in self._headers def __delitem__(self, name): del self._headers[_hkey(name)] def __getitem__(self, name): return self._headers[_hkey(name)][-1] def __setitem__(self, name, value): self._headers[_hkey(name)] = [_hval(value)]
[docs] def get_header(self, name, default=None): ''' 앞서 지정했던 헤더 값을 반환. 그 이름의 헤더가 없으면 기본값 반환. ''' return self._headers.get(_hkey(name), [default])[-1]
[docs] def set_header(self, name, value): ''' 새 응답 헤더 만들기. 앞서 같은 이름으로 지정한 헤더 있으면 교체. ''' self._headers[_hkey(name)] = [_hval(value)]
[docs] def add_header(self, name, value): ''' 응답 헤더 추가. 중복을 제거하지 않음. ''' self._headers.setdefault(_hkey(name), []).append(_hval(value))
[docs] def iter_headers(self): ''' (헤더, 값) 튜플을 내놓는다. 현재 응답 상태 코드에 허용되지 않는 헤더들은 건너뛴다. ''' return self.headerlist
@property def headerlist(self): """ WSGI에 부합하는 (헤더, 값) 튜플 리스트. """ out = [] headers = list(self._headers.items()) if 'Content-Type' not in self._headers: headers.append(('Content-Type', [self.default_content_type])) if self._status_code in self.bad_headers: bad_headers = self.bad_headers[self._status_code] headers = [h for h in headers if h[0] not in bad_headers] out += [(name, val) for (name, vals) in headers for val in vals] if self._cookies: for c in self._cookies.values(): out.append(('Set-Cookie', _hval(c.OutputString()))) if py3k: out = [(k, v.encode('utf8').decode('latin1')) for (k, v) in out] return out content_type = HeaderProperty('Content-Type') content_length = HeaderProperty('Content-Length', reader=int) expires = HeaderProperty('Expires', reader=lambda x: datetime.utcfromtimestamp(parse_date(x)), writer=lambda x: http_date(x)) @property def charset(self, default='UTF-8'): """ content-type 헤더에 지정된 문자셋 반환. (기본값: utf8) """ if 'charset=' in self.content_type: return self.content_type.split('charset=')[-1].split(';')[0].strip() return default def __repr__(self): out = '' for name, value in self.headerlist: out += '%s: %s\n' % (name.title(), value.strip()) return out
def local_property(name=None): if name: depr('local_property() is deprecated and will be removed.') #0.12 ls = threading.local() def fget(self): try: return ls.var except AttributeError: raise RuntimeError("Request context not initialized.") def fset(self, value): ls.var = value def fdel(self): del ls.var return property(fget, fset, fdel, '스레드 로컬 프로퍼티')
[docs]class LocalRequest(BaseRequest): ''' 스레드마다 각기 다른 속성 집합이 있는 :class:`BaseRequest`\의 스레드 로컬 서브클래스. 일반적으로 이 클래스의 전역 인스턴스는 하나만(:data:`request`) 있다. 요청/응답 처리 사이클 중에 접근 시 그 인스턴스는 (다중 스레드 서버에서도) 항상 *현재* 요청을 가리킨다. ''' bind = BaseRequest.__init__ environ = local_property()
[docs]class LocalResponse(BaseResponse): ''' 스레드마다 각기 다른 속성 집합이 있는 :class:`BaseResponse`\의 스레드 로컬 서브클래스. 일반적으로 이 클래스의 전역 인스턴스는 하나만(:data:`response`) 있다. 요청/응답 처리 사이클 마지막에 그 속성들을 사용해 HTTP 응답을 만든다. ''' bind = BaseResponse.__init__ _status_line = local_property() _status_code = local_property() _cookies = local_property() _headers = local_property() body = local_property()
Request = BaseRequest Response = BaseResponse
[docs]class HTTPResponse(Response, BottleException): def __init__(self, body='', status=None, headers=None, **more_headers): super(HTTPResponse, self).__init__(body, status, headers, **more_headers) def apply(self, response): response._status_code = self._status_code response._status_line = self._status_line response._headers = self._headers response._cookies = self._cookies response.body = self.body
[docs]class HTTPError(HTTPResponse): default_status = 500 def __init__(self, status=None, body=None, exception=None, traceback=None, **options): self.exception = exception self.traceback = traceback super(HTTPError, self).__init__(body, status, **options)
############################################################################### # Plugins ###################################################################### ############################################################################### class PluginError(BottleException): pass class JSONPlugin(object): name = 'json' api = 2 def __init__(self, json_dumps=json_dumps): self.json_dumps = json_dumps def apply(self, callback, route): dumps = self.json_dumps if not dumps: return callback def wrapper(*a, **ka): try: rv = callback(*a, **ka) except HTTPError: rv = _e() if isinstance(rv, dict): #Attempt to serialize, raises exception on failure json_response = dumps(rv) #Set content type only if serialization succesful response.content_type = 'application/json' return json_response elif isinstance(rv, HTTPResponse) and isinstance(rv.body, dict): rv.body = dumps(rv.body) rv.content_type = 'application/json' return rv return wrapper class TemplatePlugin(object): ''' This plugin applies the :func:`view` decorator to all routes with a `template` config parameter. If the parameter is a tuple, the second element must be a dict with additional options (e.g. `template_engine`) or default variables for the template. ''' name = 'template' api = 2 def apply(self, callback, route): conf = route.config.get('template') if isinstance(conf, (tuple, list)) and len(conf) == 2: return view(conf[0], **conf[1])(callback) elif isinstance(conf, str): return view(conf)(callback) else: return callback #: Not a plugin, but part of the plugin API. TODO: Find a better place. class _ImportRedirect(object): def __init__(self, name, impmask): ''' Create a virtual package that redirects imports (see PEP 302). ''' self.name = name self.impmask = impmask self.module = sys.modules.setdefault(name, new_module(name)) self.module.__dict__.update({'__file__': __file__, '__path__': [], '__all__': [], '__loader__': self}) sys.meta_path.append(self) def find_module(self, fullname, path=None): if '.' not in fullname: return packname = fullname.rsplit('.', 1)[0] if packname != self.name: return return self def load_module(self, fullname): if fullname in sys.modules: return sys.modules[fullname] modname = fullname.rsplit('.', 1)[1] realname = self.impmask % modname __import__(realname) module = sys.modules[fullname] = sys.modules[realname] setattr(self.module, modname, module) module.__loader__ = self return module ############################################################################### # Common Utilities ############################################################# ###############################################################################
[docs]class MultiDict(DictMixin): """ 이 dict는 키별로 여러 값을 저장한다는 점을 빼면 일반 dict와 똑같이 동작한다. 키를 주면 최신 값만 반환한다. 전체 값 리스트에 접근하는 데 쓸 수 있는 메소드들이 따로 있다. """ def __init__(self, *a, **k): self.dict = dict((k, [v]) for (k, v) in dict(*a, **k).items()) def __len__(self): return len(self.dict) def __iter__(self): return iter(self.dict) def __contains__(self, key): return key in self.dict def __delitem__(self, key): del self.dict[key] def __getitem__(self, key): return self.dict[key][-1] def __setitem__(self, key, value): self.append(key, value)
[docs] def keys(self): return self.dict.keys()
if py3k: def values(self): return (v[-1] for v in self.dict.values()) def items(self): return ((k, v[-1]) for k, v in self.dict.items()) def allitems(self): return ((k, v) for k, vl in self.dict.items() for v in vl) iterkeys = keys itervalues = values iteritems = items iterallitems = allitems else:
[docs] def values(self): return [v[-1] for v in self.dict.values()]
[docs] def items(self): return [(k, v[-1]) for k, v in self.dict.items()]
[docs] def iterkeys(self): return self.dict.iterkeys()
[docs] def itervalues(self): return (v[-1] for v in self.dict.itervalues())
[docs] def iteritems(self): return ((k, v[-1]) for k, v in self.dict.iteritems())
def iterallitems(self): return ((k, v) for k, vl in self.dict.iteritems() for v in vl) def allitems(self): return [(k, v) for k, vl in self.dict.iteritems() for v in vl]
[docs] def get(self, key, default=None, index=-1, type=None): ''' 키에 대한 최신 값을 반환. :param default: 키가 없거나 타입 변환이 실패한 경우 반환할 기본값. :param index: 값 리스트에 대한 색인. :param type: 지정돼 있으면 그 콜러블을 이용해 값을 특정 타입으로 캐스트한다. 콜러블에서 예외를 던지면 막고서 기본값을 반환한다. ''' try: val = self.dict[key][index] return type(val) if type else val except Exception: pass return default
[docs] def append(self, key, value): ''' 이 키에 대한 값 리스트에 새 값을 추가. ''' self.dict.setdefault(key, []).append(value)
[docs] def replace(self, key, value): ''' 값 리스트를 값 하나로 교체. ''' self.dict[key] = [value]
[docs] def getall(self, key): ''' 키에 대한 (비어 있을 수 있는) 값 리스트 반환. ''' return self.dict.get(key) or []
#: 다른 multi-dict API (Django)를 흉내내기 위한 WTForms 에일리어스. getone = get getlist = getall
[docs]class FormsDict(MultiDict): ''' 요청 양식 데이터를 저장하는 데 쓰는 :class:`MultiDict`\의 서브클래스. 일반 dict스러운 항목 접근 방법(데이터를 그대로 네이티브 문자열로 반환)에 더해서 이 컨테이너는 값에 속성처럼 접근하는 것도 지원한다. 속성은 :attr:`input_encoding`\(기본값: 'utf8')에 맞도록 자동으로 디코딩 또는 재인코딩된다. 없는 속성은 기본값으로 빈 문자열을 내놓는다. ''' #: 속성 값에 쓰는 인코딩. input_encoding = 'utf8' #: 참(기본값)이면 유니코드열을 먼저 `latin1`\으로 인코딩한 다음 #: :attr:`input_encoding`\에 맞게 디코딩한다. recode_unicode = True def _fix(self, s, encoding=None): if isinstance(s, unicode) and self.recode_unicode: # Python 3 WSGI return s.encode('latin1').decode(encoding or self.input_encoding) elif isinstance(s, bytes): # Python 2 WSGI return s.decode(encoding or self.input_encoding) else: return s
[docs] def decode(self, encoding=None): ''' :attr:`input_encoding`\에 맞게 디코딩 내지 재인코딩된 전체 키와 값의 사본 반환. 어떤 라이브러리(예: WTForms)는 유니코드 딕셔너리를 원한다. ''' copy = FormsDict() enc = copy.input_encoding = encoding or self.input_encoding copy.recode_unicode = False for key, value in self.allitems(): copy.append(self._fix(key, enc), self._fix(value, enc)) return copy
[docs] def getunicode(self, name, default=None, encoding=None): ''' 유니코드열로 된 값, 또는 default 반환. ''' try: return self._fix(self[name], encoding) except (UnicodeError, KeyError): return default
def __getattr__(self, name, default=unicode()): # Without this guard, pickle generates a cryptic TypeError: if name.startswith('__') and name.endswith('__'): return super(FormsDict, self).__getattr__(name) return self.getunicode(name, default=default)
[docs]class HeaderDict(MultiDict): """ :class:`MultiDict`\의 대소문자 구별 없는 버전이며 기본적으로 값을 덧붙이지 않고 이전 값을 교체한다. """ def __init__(self, *a, **ka): self.dict = {} if a or ka: self.update(*a, **ka) def __contains__(self, key): return _hkey(key) in self.dict def __delitem__(self, key): del self.dict[_hkey(key)] def __getitem__(self, key): return self.dict[_hkey(key)][-1] def __setitem__(self, key, value): self.dict[_hkey(key)] = [_hval(value)]
[docs] def append(self, key, value): self.dict.setdefault(_hkey(key), []).append(_hval(value))
[docs] def replace(self, key, value): self.dict[_hkey(key)] = [_hval(value)]
[docs] def getall(self, key): return self.dict.get(_hkey(key)) or []
[docs] def get(self, key, default=None, index=-1): return MultiDict.get(self, _hkey(key), default, index)
def filter(self, names): for name in (_hkey(n) for n in names): if name in self.dict: del self.dict[name]
[docs]class WSGIHeaderDict(DictMixin): ''' WSGI environ dict를 포장해서 HTTP_* 필드에 편리하게 접근할 수 있게 하는 dict스러운 클래스. 키와 값은 네이티브 문자열(2.x는 bytes, 3.x는 unicode)이며 키는 대소문자 구별이 없다. WSGI 환경에 네이티브 아닌 문자열 값이 들어 있으면 무손실 'latin1' 문자셋을 이용해 디코딩 또는 인코딩한다. 관련 PEP가 바뀌는 경우에도 API가 안정적으로 유지될 것이다. 현재 PEP 333, 444, 3333을 지원한다. (PEP 444는 네이티브 아닌 문자열을 쓰는 유일한 PEP다.) ''' #: ``HTTP_``\로 시작하지 않는 키들의 목록. cgikeys = ('CONTENT_TYPE', 'CONTENT_LENGTH') def __init__(self, environ): self.environ = environ def _ekey(self, key): ''' Translate header field name to CGI/WSGI environ key. ''' key = key.replace('-','_').upper() if key in self.cgikeys: return key return 'HTTP_' + key
[docs] def raw(self, key, default=None): ''' 헤더 값을 그대로 (bytes 또는 unicode로) 반환. ''' return self.environ.get(self._ekey(key), default)
def __getitem__(self, key): return tonat(self.environ[self._ekey(key)], 'latin1') def __setitem__(self, key, value): raise TypeError("%s is read-only." % self.__class__) def __delitem__(self, key): raise TypeError("%s is read-only." % self.__class__) def __iter__(self): for key in self.environ: if key[:5] == 'HTTP_': yield key[5:].replace('_', '-').title() elif key in self.cgikeys: yield key.replace('_', '-').title()
[docs] def keys(self): return [x for x in self]
def __len__(self): return len(self.keys()) def __contains__(self, key): return self._ekey(key) in self.environ
[docs]class ConfigDict(dict): ''' dict스러운 설정 저장소. 네임스페이스, 값 검사, 메타데이터, 변경 주시 등을 추가로 지원. 이 저장소는 빠른 읽기 접근이 가능하게 최적화돼 있다. 키를 가져오거나 비변경 dict 메소드(예: `dict.get()`) 사용 시 원래 dict 대비 오버헤드가 전혀 없다. ''' __slots__ = ('_meta', '_on_change') class Namespace(DictMixin): def __init__(self, config, namespace): self._config = config self._prefix = namespace def __getitem__(self, key): depr('Accessing namespaces as dicts is discouraged. ' 'Only use flat item access: ' 'cfg["names"]["pace"]["key"] -> cfg["name.space.key"]') #0.12 return self._config[self._prefix + '.' + key] def __setitem__(self, key, value): self._config[self._prefix + '.' + key] = value def __delitem__(self, key): del self._config[self._prefix + '.' + key] def __iter__(self): ns_prefix = self._prefix + '.' for key in self._config: ns, dot, name = key.rpartition('.') if ns == self._prefix and name: yield name def keys(self): return [x for x in self] def __len__(self): return len(self.keys()) def __contains__(self, key): return self._prefix + '.' + key in self._config def __repr__(self): return '<Config.Namespace %s.*>' % self._prefix def __str__(self): return '<Config.Namespace %s.*>' % self._prefix # Deprecated ConfigDict features def __getattr__(self, key): depr('Attribute access is deprecated.') #0.12 if key not in self and key[0].isupper(): self[key] = ConfigDict.Namespace(self._config, self._prefix + '.' + key) if key not in self and key.startswith('__'): raise AttributeError(key) return self.get(key) def __setattr__(self, key, value): if key in ('_config', '_prefix'): self.__dict__[key] = value return depr('Attribute assignment is deprecated.') #0.12 if hasattr(DictMixin, key): raise AttributeError('Read-only attribute.') if key in self and self[key] and isinstance(self[key], self.__class__): raise AttributeError('Non-empty namespace attribute.') self[key] = value def __delattr__(self, key): if key in self: val = self.pop(key) if isinstance(val, self.__class__): prefix = key + '.' for key in self: if key.startswith(prefix): del self[prefix+key] def __call__(self, *a, **ka): depr('Calling ConfDict is deprecated. Use the update() method.') #0.12 self.update(*a, **ka) return self def __init__(self, *a, **ka): self._meta = {} self._on_change = lambda name, value: None if a or ka: depr('Constructor does no longer accept parameters.') #0.12 self.update(*a, **ka)
[docs] def load_config(self, filename): ''' \*.ini 방식 설정 파일에서 값 읽어 들이기. 설정 파일에 섹션이 있으면 그 이름을 네임스페이스로 해서 값을 넣는다. 두 가지 특수 섹션 ``DEFAULT``\와 ``bottle``\은 루트 네임스페이스(프리픽스 없음)를 가리킨다. ''' conf = ConfigParser() conf.read(filename) for section in conf.sections(): for key, value in conf.items(section): if section not in ('DEFAULT', 'bottle'): key = section + '.' + key self[key] = value return self
[docs] def load_dict(self, source, namespace='', make_namespaces=False): ''' 딕셔너리 구조에서 값 가져오기. 중첩 구조를 써서 네임스페이스를 나타낼 수 있다. >>> ConfigDict().load_dict({'name': {'space': {'key': 'value'}}}) {'name.space.key': 'value'} ''' stack = [(namespace, source)] while stack: prefix, source = stack.pop() if not isinstance(source, dict): raise TypeError('Source is not a dict (r)' % type(key)) for key, value in source.items(): if not isinstance(key, basestring): raise TypeError('Key is not a string (%r)' % type(key)) full_key = prefix + '.' + key if prefix else key if isinstance(value, dict): stack.append((full_key, value)) if make_namespaces: self[full_key] = self.Namespace(self, full_key) else: self[full_key] = value return self
[docs] def update(self, *a, **ka): ''' 첫 번째 매개변수가 문자열이면 모든 키 이름 앞에 그 네임스페이스를 붙인다. 그 경우 외에는 일반 dict.update()처럼 동작한다. 예: ``update('some.namespace', key='value')`` ''' prefix = '' if a and isinstance(a[0], basestring): prefix = a[0].strip('.') + '.' a = a[1:] for key, value in dict(*a, **ka).items(): self[prefix+key] = value
[docs] def setdefault(self, key, value): if key not in self: self[key] = value return self[key]
def __setitem__(self, key, value): if not isinstance(key, basestring): raise TypeError('Key has type %r (not a string)' % type(key)) value = self.meta_get(key, 'filter', lambda x: x)(value) if key in self and self[key] is value: return self._on_change(key, value) dict.__setitem__(self, key, value) def __delitem__(self, key): dict.__delitem__(self, key)
[docs] def clear(self): for key in self: del self[key]
[docs] def meta_get(self, key, metafield, default=None): ''' 키에 대한 메타 필드 값을 반환. ''' return self._meta.get(key, {}).get(metafield, default)
[docs] def meta_set(self, key, metafield, value): ''' 키에 대한 메타 필드에 새 값 설정. 기존 키에 대한 변경 핸들러 실행을 일으킨다. ''' self._meta.setdefault(key, {})[metafield] = value if key in self: self[key] = self[key]
[docs] def meta_list(self, key): ''' 키에 대해 정의된 메타 필드 이름들의 이터러블 반환. ''' return self._meta.get(key, {}).keys()
# Deprecated ConfigDict features def __getattr__(self, key): depr('Attribute access is deprecated.') #0.12 if key not in self and key[0].isupper(): self[key] = self.Namespace(self, key) if key not in self and key.startswith('__'): raise AttributeError(key) return self.get(key) def __setattr__(self, key, value): if key in self.__slots__: return dict.__setattr__(self, key, value) depr('Attribute assignment is deprecated.') #0.12 if hasattr(dict, key): raise AttributeError('Read-only attribute.') if key in self and self[key] and isinstance(self[key], self.Namespace): raise AttributeError('Non-empty namespace attribute.') self[key] = value def __delattr__(self, key): if key in self: val = self.pop(key) if isinstance(val, self.Namespace): prefix = key + '.' for key in self: if key.startswith(prefix): del self[prefix+key] def __call__(self, *a, **ka): depr('Calling ConfDict is deprecated. Use the update() method.') #0.12 self.update(*a, **ka) return self
[docs]class AppStack(list): """ 스택스러운 리스트. 호출하면 스택 상단 항목 반환. """ def __call__(self): """ Return the current default application. """ return self[-1]
[docs] def push(self, value=None): """ 스택에 새 :class:`Bottle` 인스턴스 추가. """ if not isinstance(value, Bottle): value = Bottle() self.append(value) return value
class WSGIFileWrapper(object): def __init__(self, fp, buffer_size=1024*64): self.fp, self.buffer_size = fp, buffer_size for attr in ('fileno', 'close', 'read', 'readlines', 'tell', 'seek'): if hasattr(fp, attr): setattr(self, attr, getattr(fp, attr)) def __iter__(self): buff, read = self.buffer_size, self.read while True: part = read(buff) if not part: return yield part class _closeiter(object): ''' This only exists to be able to attach a .close method to iterators that do not support attribute assignment (most of itertools). ''' def __init__(self, iterator, close=None): self.iterator = iterator self.close_callbacks = makelist(close) def __iter__(self): return iter(self.iterator) def close(self): for func in self.close_callbacks: func()
[docs]class ResourceManager(object): ''' 탐색 경로 목록을 관리하고 응용 관련 자원(파일)을 찾아서 여는 걸 돕는 클래스. :param base: :meth:`add_path` 호출을 위한 기본값. :param opener: 자원 여는 데 쓰는 콜러블. :param cachemode: 어떤 탐색 결과를 캐싱할지 제어. 'all', 'found', 'none' 중 하나. ''' def __init__(self, base='./', opener=open, cachemode='all'): self.opener = open self.base = base self.cachemode = cachemode #: 탐색 경로 목록. 자세한 건 :meth:`add_path` 참고. self.path = [] #: 결정된 경로들의 캐시. ``res.cache.clear()``\로 캐시 비울 수 있음. self.cache = {}
[docs] def add_path(self, path, base=None, index=None, create=False): ''' 탐색 경로 목록에 새 경로 추가. 존재하지 않는 경로면 False 반환. :param path: 새 탐색 경로. 상대 경로는 정규화된 절대 경로로 바뀐다. 경로가 파일처럼 생겼으면 (`/`\로 끝나지 않으면) 파일명 부분을 없앤다. :param base: 상대 탐색 경로를 절대 경로로 바꾸는 데 쓰는 경로. 기본은 :attr:`base` 속성 값인데, 그 기본값은 ``os.getcwd()``. :param index: 탐색 경로 목록 내 위치. 기본은 마지막 색인 위치 (리스트에 덧붙이기). `base` 매개변수는 파이썬 모듈 내지 패키지와 함께 설치된 파일을 쉽게 참조할 수 있게 해 준다. :: res.add_path('./resources/', __file__) ''' base = os.path.abspath(os.path.dirname(base or self.base)) path = os.path.abspath(os.path.join(base, os.path.dirname(path))) path += os.sep if path in self.path: self.path.remove(path) if create and not os.path.isdir(path): os.makedirs(path) if index is None: self.path.append(path) else: self.path.insert(index, path) self.cache.clear() return os.path.exists(path)
def __iter__(self): ''' Iterate over all existing files in all registered paths. ''' search = self.path[:] while search: path = search.pop() if not os.path.isdir(path): continue for name in os.listdir(path): full = os.path.join(path, name) if os.path.isdir(full): search.append(full) else: yield full
[docs] def lookup(self, name): ''' 자원을 탐색해서 파일 절대 경로를 반환. 없으면 `None` 반환. :attr:`path` 목록을 차례로 탐색한다. 먼저 걸린 경로를 반환한다. 심볼릭 링크를 따라간다. 결과를 캐싱해서 이후 검색 속도를 높인다. ''' if name not in self.cache or DEBUG: for path in self.path: fpath = os.path.join(path, name) if os.path.isfile(fpath): if self.cachemode in ('all', 'found'): self.cache[name] = fpath return fpath if self.cachemode == 'all': self.cache[name] = None return self.cache[name]
[docs] def open(self, name, mode='r', *args, **kwargs): ''' 자원을 찾아서 파일 객체 반환, 또는 IOError 던짐. ''' fname = self.lookup(name) if not fname: raise IOError("Resource %r not found." % name) return self.opener(fname, mode=mode, *args, **kwargs)
[docs]class FileUpload(object): def __init__(self, fileobj, name, filename, headers=None): ''' Wrapper for file uploads. ''' #: 열린 파일(스러운) 객체 (BytesIO 버퍼나 임시 파일) self.file = fileobj #: 업로드 양식 필드 이름 self.name = name #: 클라이언트가 보낸 그대로의 파일 이름 (안전하지 않은 문자 있을 수 있음) self.raw_filename = filename #: 추가 헤더들(예: content-type)이 있는 :class:`HeaderDict` self.headers = HeaderDict(headers) if headers else HeaderDict() content_type = HeaderProperty('Content-Type') content_length = HeaderProperty('Content-Length', reader=int, default=-1)
[docs] def get_header(self, name, default=None): """ 멀티파트 부분 안의 헤더 값 반환. """ return self.headers.get(name, default)
[docs] @cached_property def filename(self): ''' 클라이언트 파일 시스템 상의 파일 이름. 단 파일 시스템 호환성이 보장되도록 정규화 돼 있음. 빈 파일 이름은 'empty'로 반환. 최종 파일 이름에서는 ASCII 문자, 숫자, 대시, 밑줄, 마침표만 허용한다. 가능한 경우 강세 표시를 없앤다. 공백을 대시 한 개로 바꾼다. 앞이나 뒤의 마침표 및 대시를 제거한다. 파일 이름 길이를 255문자로 제한한다. ''' fname = self.raw_filename if not isinstance(fname, unicode): fname = fname.decode('utf8', 'ignore') fname = normalize('NFKD', fname).encode('ASCII', 'ignore').decode('ASCII') fname = os.path.basename(fname.replace('\\', os.path.sep)) fname = re.sub(r'[^a-zA-Z0-9-_.\s]', '', fname).strip() fname = re.sub(r'[-\s]+', '-', fname).strip('.-') return fname[:255] or 'empty'
def _copy_file(self, fp, chunk_size=2**16): read, write, offset = self.file.read, fp.write, self.file.tell() while 1: buf = read(chunk_size) if not buf: break write(buf) self.file.seek(offset)
[docs] def save(self, destination, overwrite=False, chunk_size=2**16): ''' 파일을 디스크에 저장하거나 내용물을 열린 파일(스러운) 객체로 복사한다. *destination*\이 디렉터리면 경로에 :attr:`filename`\을 덧붙인다. 기본적으로 기존 파일을 덮어 쓰지 않는다 (IOError). :param destination: 파일 경로나 디렉터리, 파일(스러운) 객체. :param overwrite: 참이면 기본 파일을 교체. (기본값: False) :param chunk_size: 한 번에 읽을 바이트 수. (기본값: 64kb) ''' if isinstance(destination, basestring): # Except file-likes here if os.path.isdir(destination): destination = os.path.join(destination, self.filename) if not overwrite and os.path.exists(destination): raise IOError('File exists.') with open(destination, 'wb') as fp: self._copy_file(fp, chunk_size) else: self._copy_file(destination, chunk_size)
############################################################################### # Application Helper ########################################################### ############################################################################### def abort(code=500, text='Unknown Error.'): """ Aborts execution and causes a HTTP error. """ raise HTTPError(code, text) def redirect(url, code=None): """ Aborts execution and causes a 303 or 302 redirect, depending on the HTTP protocol version. """ if not code: code = 303 if request.get('SERVER_PROTOCOL') == "HTTP/1.1" else 302 res = response.copy(cls=HTTPResponse) res.status = code res.body = "" res.set_header('Location', urljoin(request.url, url)) raise res def _file_iter_range(fp, offset, bytes, maxread=1024*1024): ''' Yield chunks from a range in a file. No chunk is bigger than maxread.''' fp.seek(offset) while bytes > 0: part = fp.read(min(bytes, maxread)) if not part: break bytes -= len(part) yield part def static_file(filename, root, mimetype='auto', download=False, charset='UTF-8'): """ Open a file in a safe way and return :exc:`HTTPResponse` with status code 200, 305, 403 or 404. The ``Content-Type``, ``Content-Encoding``, ``Content-Length`` and ``Last-Modified`` headers are set if possible. Special support for ``If-Modified-Since``, ``Range`` and ``HEAD`` requests. :param filename: Name or path of the file to send. :param root: Root path for file lookups. Should be an absolute directory path. :param mimetype: Defines the content-type header (default: guess from file extension) :param download: If True, ask the browser to open a `Save as...` dialog instead of opening the file with the associated program. You can specify a custom filename as a string. If not specified, the original filename is used (default: False). :param charset: The charset to use for files with a ``text/*`` mime-type. (default: UTF-8) """ root = os.path.abspath(root) + os.sep filename = os.path.abspath(os.path.join(root, filename.strip('/\\'))) headers = dict() if not filename.startswith(root): return HTTPError(403, "Access denied.") if not os.path.exists(filename) or not os.path.isfile(filename): return HTTPError(404, "File does not exist.") if not os.access(filename, os.R_OK): return HTTPError(403, "You do not have permission to access this file.") if mimetype == 'auto': mimetype, encoding = mimetypes.guess_type(filename) if encoding: headers['Content-Encoding'] = encoding if mimetype: if mimetype[:5] == 'text/' and charset and 'charset' not in mimetype: mimetype += '; charset=%s' % charset headers['Content-Type'] = mimetype if download: download = os.path.basename(filename if download == True else download) headers['Content-Disposition'] = 'attachment; filename="%s"' % download stats = os.stat(filename) headers['Content-Length'] = clen = stats.st_size lm = time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(stats.st_mtime)) headers['Last-Modified'] = lm ims = request.environ.get('HTTP_IF_MODIFIED_SINCE') if ims: ims = parse_date(ims.split(";")[0].strip()) if ims is not None and ims >= int(stats.st_mtime): headers['Date'] = time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime()) return HTTPResponse(status=304, **headers) body = '' if request.method == 'HEAD' else open(filename, 'rb') headers["Accept-Ranges"] = "bytes" ranges = request.environ.get('HTTP_RANGE') if 'HTTP_RANGE' in request.environ: ranges = list(parse_range_header(request.environ['HTTP_RANGE'], clen)) if not ranges: return HTTPError(416, "Requested Range Not Satisfiable") offset, end = ranges[0] headers["Content-Range"] = "bytes %d-%d/%d" % (offset, end-1, clen) headers["Content-Length"] = str(end-offset) if body: body = _file_iter_range(body, offset, end-offset) return HTTPResponse(body, status=206, **headers) return HTTPResponse(body, **headers) ############################################################################### # HTTP Utilities and MISC (TODO) ############################################### ###############################################################################
[docs]def debug(mode=True): """ 디버그 수준 변경. 현재 지원하는 디버그 수준은 한 가지뿐이다.""" global DEBUG if mode: warnings.simplefilter('default') DEBUG = bool(mode)
def http_date(value): if isinstance(value, (datedate, datetime)): value = value.utctimetuple() elif isinstance(value, (int, float)): value = time.gmtime(value) if not isinstance(value, basestring): value = time.strftime("%a, %d %b %Y %H:%M:%S GMT", value) return value
[docs]def parse_date(ims): """ rfc1123, rfc850, asctime 형식 타임스탬프를 파싱해서 UTC 에포크 시간 반환. """ try: ts = email.utils.parsedate_tz(ims) return time.mktime(ts[:8] + (0,)) - (ts[9] or 0) - time.timezone except (TypeError, ValueError, IndexError, OverflowError): return None
[docs]def parse_auth(header): """ rfc2617 HTTP 인증 헤더 문자열(basic)을 파싱해서 (user,pass) 튜플 또는 None 반환.""" try: method, data = header.split(None, 1) if method.lower() == 'basic': user, pwd = touni(base64.b64decode(tob(data))).split(':',1) return user, pwd except (KeyError, ValueError): return None
def parse_range_header(header, maxlen=0): ''' Yield (start, end) ranges parsed from a HTTP Range header. Skip unsatisfiable ranges. The end index is non-inclusive.''' if not header or header[:6] != 'bytes=': return ranges = [r.split('-', 1) for r in header[6:].split(',') if '-' in r] for start, end in ranges: try: if not start: # bytes=-100 -> last 100 bytes start, end = max(0, maxlen-int(end)), maxlen elif not end: # bytes=100- -> all but the first 99 bytes start, end = int(start), maxlen else: # bytes=100-200 -> bytes 100-200 (inclusive) start, end = int(start), min(int(end)+1, maxlen) if 0 <= start < end <= maxlen: yield start, end except ValueError: pass def _parse_qsl(qs): r = [] for pair in qs.replace(';','&').split('&'): if not pair: continue nv = pair.split('=', 1) if len(nv) != 2: nv.append('') key = urlunquote(nv[0].replace('+', ' ')) value = urlunquote(nv[1].replace('+', ' ')) r.append((key, value)) return r def _lscmp(a, b): ''' Compares two strings in a cryptographically safe way: Runtime is not affected by length of common prefix. ''' return not sum(0 if x==y else 1 for x, y in zip(a, b)) and len(a) == len(b) def html_escape(string): ''' Escape HTML special characters ``&<>`` and quotes ``'"``. ''' return string.replace('&','&amp;').replace('<','&lt;').replace('>','&gt;')\ .replace('"','&quot;').replace("'",'&#039;') def html_quote(string): ''' Escape and quote a string to be used as an HTTP attribute.''' return '"%s"' % html_escape(string).replace('\n','&#10;')\ .replace('\r','&#13;').replace('\t','&#9;')
[docs]def yieldroutes(func): """ func 매개변수의 시그너처(이름, 인자)에 맞는 라우트를 내놓는 제너레이터 반환. 함수가 선택적 키워드 인자를 받는 경우 여러 라우트를 내놓을 수도 있다. 출력 예를 보자. :: a() -> '/a' b(x, y) -> '/b/<x>/<y>' c(x, y=5) -> '/c/<x>' 및 '/c/<x>/<y>' d(x=5, y=6) -> '/d' 및 '/d/<x>' 및 '/d/<x>/<y>' """ path = '/' + func.__name__.replace('__','/').lstrip('/') spec = getargspec(func) argc = len(spec[0]) - len(spec[3] or []) path += ('/<%s>' * argc) % tuple(spec[0][:argc]) yield path for arg in spec[0][argc:]: path += '/<%s>' % arg yield path
[docs]def path_shift(script_name, path_info, shift=1): ''' PATH_INFO에서 SCRIPT_NAME으로 또는 반대로 경로 분절들을 이동. :return: 수정된 경로들. :param script_name: SCRIPT_NAME 경로. :param path_info: PATH_INFO 경로. :param shift: 옮길 경로 분절 수. 음수로 해서 이동 방향을 바꿀 수도 있다. (기본값: 1) ''' if shift == 0: return script_name, path_info pathlist = path_info.strip('/').split('/') scriptlist = script_name.strip('/').split('/') if pathlist and pathlist[0] == '': pathlist = [] if scriptlist and scriptlist[0] == '': scriptlist = [] if shift > 0 and shift <= len(pathlist): moved = pathlist[:shift] scriptlist = scriptlist + moved pathlist = pathlist[shift:] elif shift < 0 and shift >= -len(scriptlist): moved = scriptlist[shift:] pathlist = moved + pathlist scriptlist = scriptlist[:shift] else: empty = 'SCRIPT_NAME' if shift < 0 else 'PATH_INFO' raise AssertionError("Cannot shift. Nothing left from %s" % empty) new_script_name = '/' + '/'.join(scriptlist) new_path_info = '/' + '/'.join(pathlist) if path_info.endswith('/') and pathlist: new_path_info += '/' return new_script_name, new_path_info
def auth_basic(check, realm="private", text="Access denied"): ''' Callback decorator to require HTTP auth (basic). TODO: Add route(check_auth=...) parameter. ''' def decorator(func): def wrapper(*a, **ka): user, password = request.auth or (None, None) if user is None or not check(user, password): err = HTTPError(401, text) err.add_header('WWW-Authenticate', 'Basic realm="%s"' % realm) return err return func(*a, **ka) return wrapper return decorator # Shortcuts for common Bottle methods. # They all refer to the current default application. def make_default_app_wrapper(name): ''' Return a callable that relays calls to the current default app. ''' @functools.wraps(getattr(Bottle, name)) def wrapper(*a, **ka): return getattr(app(), name)(*a, **ka) return wrapper route = make_default_app_wrapper('route') get = make_default_app_wrapper('get') post = make_default_app_wrapper('post') put = make_default_app_wrapper('put') delete = make_default_app_wrapper('delete') error = make_default_app_wrapper('error') mount = make_default_app_wrapper('mount') hook = make_default_app_wrapper('hook') install = make_default_app_wrapper('install') uninstall = make_default_app_wrapper('uninstall') url = make_default_app_wrapper('get_url') ############################################################################### # Server Adapter ############################################################### ############################################################################### class ServerAdapter(object): quiet = False def __init__(self, host='127.0.0.1', port=8080, **options): self.options = options self.host = host self.port = int(port) def run(self, handler): # pragma: no cover pass def __repr__(self): args = ', '.join(['%s=%s'%(k,repr(v)) for k, v in self.options.items()]) return "%s(%s)" % (self.__class__.__name__, args) class CGIServer(ServerAdapter): quiet = True def run(self, handler): # pragma: no cover from wsgiref.handlers import CGIHandler def fixed_environ(environ, start_response): environ.setdefault('PATH_INFO', '') return handler(environ, start_response) CGIHandler().run(fixed_environ) class FlupFCGIServer(ServerAdapter): def run(self, handler): # pragma: no cover import flup.server.fcgi self.options.setdefault('bindAddress', (self.host, self.port)) flup.server.fcgi.WSGIServer(handler, **self.options).run() class WSGIRefServer(ServerAdapter): def run(self, app): # pragma: no cover from wsgiref.simple_server import WSGIRequestHandler, WSGIServer from wsgiref.simple_server import make_server import socket class FixedHandler(WSGIRequestHandler): def address_string(self): # Prevent reverse DNS lookups please. return self.client_address[0] def log_request(*args, **kw): if not self.quiet: return WSGIRequestHandler.log_request(*args, **kw) handler_cls = self.options.get('handler_class', FixedHandler) server_cls = self.options.get('server_class', WSGIServer) if ':' in self.host: # Fix wsgiref for IPv6 addresses. if getattr(server_cls, 'address_family') == socket.AF_INET: class server_cls(server_cls): address_family = socket.AF_INET6 srv = make_server(self.host, self.port, app, server_cls, handler_cls) srv.serve_forever() class CherryPyServer(ServerAdapter): def run(self, handler): # pragma: no cover from cherrypy import wsgiserver self.options['bind_addr'] = (self.host, self.port) self.options['wsgi_app'] = handler certfile = self.options.get('certfile') if certfile: del self.options['certfile'] keyfile = self.options.get('keyfile') if keyfile: del self.options['keyfile'] server = wsgiserver.CherryPyWSGIServer(**self.options) if certfile: server.ssl_certificate = certfile if keyfile: server.ssl_private_key = keyfile try: server.start() finally: server.stop() class WaitressServer(ServerAdapter): def run(self, handler): from waitress import serve serve(handler, host=self.host, port=self.port) class PasteServer(ServerAdapter): def run(self, handler): # pragma: no cover from paste import httpserver from paste.translogger import TransLogger handler = TransLogger(handler, setup_console_handler=(not self.quiet)) httpserver.serve(handler, host=self.host, port=str(self.port), **self.options) class MeinheldServer(ServerAdapter): def run(self, handler): from meinheld import server server.listen((self.host, self.port)) server.run(handler) class FapwsServer(ServerAdapter): """ Extremely fast webserver using libev. See http://www.fapws.org/ """ def run(self, handler): # pragma: no cover import fapws._evwsgi as evwsgi from fapws import base, config port = self.port if float(config.SERVER_IDENT[-2:]) > 0.4: # fapws3 silently changed its API in 0.5 port = str(port) evwsgi.start(self.host, port) # fapws3 never releases the GIL. Complain upstream. I tried. No luck. if 'BOTTLE_CHILD' in os.environ and not self.quiet: _stderr("WARNING: Auto-reloading does not work with Fapws3.\n") _stderr(" (Fapws3 breaks python thread support)\n") evwsgi.set_base_module(base) def app(environ, start_response): environ['wsgi.multiprocess'] = False return handler(environ, start_response) evwsgi.wsgi_cb(('', app)) evwsgi.run() class TornadoServer(ServerAdapter): """ The super hyped asynchronous server by facebook. Untested. """ def run(self, handler): # pragma: no cover import tornado.wsgi, tornado.httpserver, tornado.ioloop container = tornado.wsgi.WSGIContainer(handler) server = tornado.httpserver.HTTPServer(container) server.listen(port=self.port,address=self.host) tornado.ioloop.IOLoop.instance().start() class AppEngineServer(ServerAdapter): """ Adapter for Google App Engine. """ quiet = True def run(self, handler): from google.appengine.ext.webapp import util # A main() function in the handler script enables 'App Caching'. # Lets makes sure it is there. This _really_ improves performance. module = sys.modules.get('__main__') if module and not hasattr(module, 'main'): module.main = lambda: util.run_wsgi_app(handler) util.run_wsgi_app(handler) class TwistedServer(ServerAdapter): """ Untested. """ def run(self, handler): from twisted.web import server, wsgi from twisted.python.threadpool import ThreadPool from twisted.internet import reactor thread_pool = ThreadPool() thread_pool.start() reactor.addSystemEventTrigger('after', 'shutdown', thread_pool.stop) factory = server.Site(wsgi.WSGIResource(reactor, thread_pool, handler)) reactor.listenTCP(self.port, factory, interface=self.host) reactor.run() class DieselServer(ServerAdapter): """ Untested. """ def run(self, handler): from diesel.protocols.wsgi import WSGIApplication app = WSGIApplication(handler, port=self.port) app.run() class GeventServer(ServerAdapter): """ Untested. Options: * `fast` (default: False) uses libevent's http server, but has some issues: No streaming, no pipelining, no SSL. * See gevent.wsgi.WSGIServer() documentation for more options. """ def run(self, handler): from gevent import pywsgi, local if not isinstance(threading.local(), local.local): msg = "Bottle requires gevent.monkey.patch_all() (before import)" raise RuntimeError(msg) if self.options.pop('fast', None): depr('The "fast" option has been deprecated and removed by Gevent.') if self.quiet: self.options['log'] = None address = (self.host, self.port) server = pywsgi.WSGIServer(address, handler, **self.options) if 'BOTTLE_CHILD' in os.environ: import signal signal.signal(signal.SIGINT, lambda s, f: server.stop()) server.serve_forever() class GeventSocketIOServer(ServerAdapter): def run(self,handler): from socketio import server address = (self.host, self.port) server.SocketIOServer(address, handler, **self.options).serve_forever() class GunicornServer(ServerAdapter): """ Untested. See http://gunicorn.org/configure.html for options. """ def run(self, handler): from gunicorn.app.base import Application config = {'bind': "%s:%d" % (self.host, int(self.port))} config.update(self.options) class GunicornApplication(Application): def init(self, parser, opts, args): return config def load(self): return handler GunicornApplication().run() class EventletServer(ServerAdapter): """ Untested """ def run(self, handler): from eventlet import wsgi, listen try: wsgi.server(listen((self.host, self.port)), handler, log_output=(not self.quiet)) except TypeError: # Fallback, if we have old version of eventlet wsgi.server(listen((self.host, self.port)), handler) class RocketServer(ServerAdapter): """ Untested. """ def run(self, handler): from rocket import Rocket server = Rocket((self.host, self.port), 'wsgi', { 'wsgi_app' : handler }) server.start() class BjoernServer(ServerAdapter): """ Fast server written in C: https://github.com/jonashaag/bjoern """ def run(self, handler): from bjoern import run run(handler, self.host, self.port) class AutoServer(ServerAdapter): """ Untested. """ adapters = [WaitressServer, PasteServer, TwistedServer, CherryPyServer, WSGIRefServer] def run(self, handler): for sa in self.adapters: try: return sa(self.host, self.port, **self.options).run(handler) except ImportError: pass server_names = { 'cgi': CGIServer, 'flup': FlupFCGIServer, 'wsgiref': WSGIRefServer, 'waitress': WaitressServer, 'cherrypy': CherryPyServer, 'paste': PasteServer, 'fapws3': FapwsServer, 'tornado': TornadoServer, 'gae': AppEngineServer, 'twisted': TwistedServer, 'diesel': DieselServer, 'meinheld': MeinheldServer, 'gunicorn': GunicornServer, 'eventlet': EventletServer, 'gevent': GeventServer, 'geventSocketIO':GeventSocketIOServer, 'rocket': RocketServer, 'bjoern' : BjoernServer, 'auto': AutoServer, } ############################################################################### # Application Control ########################################################## ###############################################################################
[docs]def load(target, **namespace): """ 모듈을 임포트하거나 모듈에서 객체 가져오기. * ``package.module``: `module`\을 모듈 객체로 반환. * ``pack.mod:name``: `pack.mod`\의 모듈 변수 `name` 반환. * ``pack.mod:func()``:`pack.mod.func()` 호출하고 결과 반환. 마지막 형식은 함수 호출뿐 아니라 임의 타입의 식도 받는다. 이 함수에 전달한 키워드 인자들을 지역 변수로 사용할 수 있게 된다. 예: ``import_string('re:compile(x)', x='[a-z]')`` """ module, target = target.split(":", 1) if ':' in target else (target, None) if module not in sys.modules: __import__(module) if not target: return sys.modules[module] if target.isalnum(): return getattr(sys.modules[module], target) package_name = module.split('.')[0] namespace[package_name] = sys.modules[package_name] return eval('%s.%s' % (module, target), namespace)
[docs]def load_app(target): """ 모듈에서 보틀 응용을 적재하면서 임포트가 현재 기본 응용에 영향을 주지 않도록 하고, 응용 객체를 따로 반환한다. target 매개변수에 대해선 :func:`load` 참고. """ global NORUN; NORUN, nr_old = True, NORUN try: tmp = default_app.push() # Create a new "default application" rv = load(target) # Import the target module return rv if callable(rv) else tmp finally: default_app.remove(tmp) # Remove the temporary added default application NORUN = nr_old
_debug = debug
[docs]def run(app=None, server='wsgiref', host='127.0.0.1', port=8080, interval=1, reloader=False, quiet=False, plugins=None, debug=None, **kargs): """ 서버 인스턴스 시작. 서버가 종료할 때까지 이 메소드는 블록된다. :param app: WSGI 응용 또는 :func:`load_app`\에서 지원하는 대상 문자열. (기본값: :func:`default_app`) :param server: 사용할 서버 어댑터. 유효한 이름은 :data:`server_names`\의 키 참고. :class:`ServerAdapter` 서브클래스 전달해도 됨. (기본값: `wsgiref`) :param host: 바인드할 서버 주소. 외부 인터페이스 포함 모든 인터페이스에 리슨하려면 ``0.0.0.0``. (기본값: 127.0.0.1) :param port: 바인드할 서버 포트. 1024 아래 값에는 루트 권한 필요. (기본값: 8080) :param reloader: 자동 재적재 모드로 서버 시작? (기본값: False) :param interval: 자동 재적재 확인 간격. 초 단위. (기본값: 1) :param quiet: stdout 및 stderr 출력 끄기? (기본값: False) :param options: 서버 어댑터로 전달할 옵션들. """ if NORUN: return if reloader and not os.environ.get('BOTTLE_CHILD'): try: lockfile = None fd, lockfile = tempfile.mkstemp(prefix='bottle.', suffix='.lock') os.close(fd) # We only need this file to exist. We never write to it while os.path.exists(lockfile): args = [sys.executable] + sys.argv environ = os.environ.copy() environ['BOTTLE_CHILD'] = 'true' environ['BOTTLE_LOCKFILE'] = lockfile p = subprocess.Popen(args, env=environ) while p.poll() is None: # Busy wait... os.utime(lockfile, None) # I am alive! time.sleep(interval) if p.poll() != 3: if os.path.exists(lockfile): os.unlink(lockfile) sys.exit(p.poll()) except KeyboardInterrupt: pass finally: if os.path.exists(lockfile): os.unlink(lockfile) return try: if debug is not None: _debug(debug) app = app or default_app() if isinstance(app, basestring): app = load_app(app) if not callable(app): raise ValueError("Application is not callable: %r" % app) for plugin in plugins or []: app.install(plugin) if server in server_names: server = server_names.get(server) if isinstance(server, basestring): server = load(server) if isinstance(server, type): server = server(host=host, port=port, **kargs) if not isinstance(server, ServerAdapter): raise ValueError("Unknown or unsupported server: %r" % server) server.quiet = server.quiet or quiet if not server.quiet: _stderr("Bottle v%s server starting up (using %s)...\n" % (__version__, repr(server))) _stderr("Listening on http://%s:%d/\n" % (server.host, server.port)) _stderr("Hit Ctrl-C to quit.\n\n") if reloader: lockfile = os.environ.get('BOTTLE_LOCKFILE') bgcheck = FileCheckerThread(lockfile, interval) with bgcheck: server.run(app) if bgcheck.status == 'reload': sys.exit(3) else: server.run(app) except KeyboardInterrupt: pass except (SystemExit, MemoryError): raise except: if not reloader: raise if not getattr(server, 'quiet', quiet): print_exc() time.sleep(interval) sys.exit(3)
class FileCheckerThread(threading.Thread): ''' Interrupt main-thread as soon as a changed module file is detected, the lockfile gets deleted or gets to old. ''' def __init__(self, lockfile, interval): threading.Thread.__init__(self) self.lockfile, self.interval = lockfile, interval #: Is one of 'reload', 'error' or 'exit' self.status = None def run(self): exists = os.path.exists mtime = lambda path: os.stat(path).st_mtime files = dict() for module in list(sys.modules.values()): path = getattr(module, '__file__', '') or '' if path[-4:] in ('.pyo', '.pyc'): path = path[:-1] if path and exists(path): files[path] = mtime(path) while not self.status: if not exists(self.lockfile)\ or mtime(self.lockfile) < time.time() - self.interval - 5: self.status = 'error' thread.interrupt_main() for path, lmtime in list(files.items()): if not exists(path) or mtime(path) > lmtime: self.status = 'reload' thread.interrupt_main() break time.sleep(self.interval) def __enter__(self): self.start() def __exit__(self, exc_type, exc_val, exc_tb): if not self.status: self.status = 'exit' # silent exit self.join() return exc_type is not None and issubclass(exc_type, KeyboardInterrupt) ############################################################################### # Template Adapters ############################################################ ############################################################################### class TemplateError(HTTPError): def __init__(self, message): HTTPError.__init__(self, 500, message)
[docs]class BaseTemplate(object): """ 템플릿 어댑터 기반 클래스이자 최소 API """ extensions = ['tpl','html','thtml','stpl'] settings = {} #used in prepare() defaults = {} #used in render()
[docs] def __init__(self, source=None, name=None, lookup=[], encoding='utf8', **settings): """ 새 템플릿 만들기. source 매개변수(str 또는 buffer)가 없으면 name 인자를 이용해 템플릿 파일 이름을 추측한다. 서브클래스에선 self.source 및/또는 self.filename이 설정돼 있다고 상정할 수 있다. 둘 모두 문자열이다. lookup, encoding, settings 매개변수는 인스턴스 변수로 저장된다. lookup 매개변수는 디렉터리 경로들을 담은 리스트다. encoding 매개변수는 바이트열이나 파일을 디코딩하는 데 쓰게 된다. settings 매개변수는 엔진별 설정들의 딕셔너리다. """ self.name = name self.source = source.read() if hasattr(source, 'read') else source self.filename = source.filename if hasattr(source, 'filename') else None self.lookup = [os.path.abspath(x) for x in lookup] self.encoding = encoding self.settings = self.settings.copy() # Copy from class variable self.settings.update(settings) # Apply if not self.source and self.name: self.filename = self.search(self.name, self.lookup) if not self.filename: raise TemplateError('Template %s not found.' % repr(name)) if not self.source and not self.filename: raise TemplateError('No template specified.') self.prepare(**self.settings)
[docs] @classmethod def search(cls, name, lookup=[]): """ lookup에 지정된 모든 디렉터리에서 이름을 탐색. 일단 확장자 없이, 다음엔 많이 쓰는 확장자들로. 먼저 걸린 것 반환. """ if not lookup: depr('The template lookup path list should not be empty.') #0.12 lookup = ['.'] if os.path.isabs(name) and os.path.isfile(name): depr('Absolute template path names are deprecated.') #0.12 return os.path.abspath(name) for spath in lookup: spath = os.path.abspath(spath) + os.sep fname = os.path.abspath(os.path.join(spath, name)) if not fname.startswith(spath): continue if os.path.isfile(fname): return fname for ext in cls.extensions: if os.path.isfile('%s.%s' % (fname, ext)): return '%s.%s' % (fname, ext)
[docs] @classmethod def global_config(cls, key, *args): ''' class.settings에 저장된 전역 설정을 읽거나 설정한다. ''' if args: cls.settings = cls.settings.copy() # Make settings local to class cls.settings[key] = args[0] else: return cls.settings[key]
[docs] def prepare(self, **options): """ 준비 동작(파싱, 캐싱, ...) 수행. 템플릿을 갱신하거나 설정을 바꾸기 위해 다시 호출하는 게 가능해야 한다. """ raise NotImplementedError
[docs] def render(self, *args, **kwargs): """ 지정한 지역 변수들로 템플릿을 렌더링해서 바이트열 또는 유니코드열 하나를 반환한다. 바이트열인 경우 인코딩이 self.encoding과 일치해야 한다. 이 메소드는 스레드에 안전해야 한다. 지역 변수들이 딕셔너리로(args) 제공될 수도 있고 바로 키워드로(kwargs) 제공될 수도 있다. """ raise NotImplementedError
class MakoTemplate(BaseTemplate): def prepare(self, **options): from mako.template import Template from mako.lookup import TemplateLookup options.update({'input_encoding':self.encoding}) options.setdefault('format_exceptions', bool(DEBUG)) lookup = TemplateLookup(directories=self.lookup, **options) if self.source: self.tpl = Template(self.source, lookup=lookup, **options) else: self.tpl = Template(uri=self.name, filename=self.filename, lookup=lookup, **options) def render(self, *args, **kwargs): for dictarg in args: kwargs.update(dictarg) _defaults = self.defaults.copy() _defaults.update(kwargs) return self.tpl.render(**_defaults) class CheetahTemplate(BaseTemplate): def prepare(self, **options): from Cheetah.Template import Template self.context = threading.local() self.context.vars = {} options['searchList'] = [self.context.vars] if self.source: self.tpl = Template(source=self.source, **options) else: self.tpl = Template(file=self.filename, **options) def render(self, *args, **kwargs): for dictarg in args: kwargs.update(dictarg) self.context.vars.update(self.defaults) self.context.vars.update(kwargs) out = str(self.tpl) self.context.vars.clear() return out class Jinja2Template(BaseTemplate): def prepare(self, filters=None, tests=None, globals={}, **kwargs): from jinja2 import Environment, FunctionLoader if 'prefix' in kwargs: # TODO: to be removed after a while raise RuntimeError('The keyword argument `prefix` has been removed. ' 'Use the full jinja2 environment name line_statement_prefix instead.') self.env = Environment(loader=FunctionLoader(self.loader), **kwargs) if filters: self.env.filters.update(filters) if tests: self.env.tests.update(tests) if globals: self.env.globals.update(globals) if self.source: self.tpl = self.env.from_string(self.source) else: self.tpl = self.env.get_template(self.filename) def render(self, *args, **kwargs): for dictarg in args: kwargs.update(dictarg) _defaults = self.defaults.copy() _defaults.update(kwargs) return self.tpl.render(**_defaults) def loader(self, name): fname = self.search(name, self.lookup) if not fname: return with open(fname, "rb") as f: return f.read().decode(self.encoding)
[docs]class SimpleTemplate(BaseTemplate):
[docs] def prepare(self, escape_func=html_escape, noescape=False, syntax=None, **ka): self.cache = {} enc = self.encoding self._str = lambda x: touni(x, enc) self._escape = lambda x: escape_func(touni(x, enc)) self.syntax = syntax if noescape: self._str, self._escape = self._escape, self._str
@cached_property def co(self): return compile(self.code, self.filename or '<string>', 'exec') @cached_property def code(self): source = self.source if not source: with open(self.filename, 'rb') as f: source = f.read() try: source, encoding = touni(source), 'utf8' except UnicodeError: depr('Template encodings other than utf8 are no longer supported.') #0.11 source, encoding = touni(source, 'latin1'), 'latin1' parser = StplParser(source, encoding=encoding, syntax=self.syntax) code = parser.translate() self.encoding = parser.encoding return code def _rebase(self, _env, _name=None, **kwargs): if _name is None: depr('Rebase function called without arguments.' ' You were probably looking for {{base}}?', True) #0.12 _env['_rebase'] = (_name, kwargs) def _include(self, _env, _name=None, **kwargs): if _name is None: depr('Rebase function called without arguments.' ' You were probably looking for {{base}}?', True) #0.12 env = _env.copy() env.update(kwargs) if _name not in self.cache: self.cache[_name] = self.__class__(name=_name, lookup=self.lookup) return self.cache[_name].execute(env['_stdout'], env) def execute(self, _stdout, kwargs): env = self.defaults.copy() env.update(kwargs) env.update({'_stdout': _stdout, '_printlist': _stdout.extend, 'include': functools.partial(self._include, env), 'rebase': functools.partial(self._rebase, env), '_rebase': None, '_str': self._str, '_escape': self._escape, 'get': env.get, 'setdefault': env.setdefault, 'defined': env.__contains__ }) eval(self.co, env) if env.get('_rebase'): subtpl, rargs = env.pop('_rebase') rargs['base'] = ''.join(_stdout) #copy stdout del _stdout[:] # clear stdout return self._include(env, subtpl, **rargs) return env
[docs] def render(self, *args, **kwargs): """ 키워드 인자들을 지역 변수로 해서 템플릿을 렌더링한다. """ env = {}; stdout = [] for dictarg in args: env.update(dictarg) env.update(kwargs) self.execute(stdout, env) return ''.join(stdout)
class StplSyntaxError(TemplateError): pass class StplParser(object): ''' Parser for stpl templates. ''' _re_cache = {} #: Cache for compiled re patterns # This huge pile of voodoo magic splits python code into 8 different tokens. # 1: All kinds of python strings (trust me, it works) _re_tok = '([urbURB]?(?:\'\'(?!\')|""(?!")|\'{6}|"{6}' \ '|\'(?:[^\\\\\']|\\\\.)+?\'|"(?:[^\\\\"]|\\\\.)+?"' \ '|\'{3}(?:[^\\\\]|\\\\.|\\n)+?\'{3}' \ '|"{3}(?:[^\\\\]|\\\\.|\\n)+?"{3}))' _re_inl = _re_tok.replace('|\\n','') # We re-use this string pattern later # 2: Comments (until end of line, but not the newline itself) _re_tok += '|(#.*)' # 3,4: Open and close grouping tokens _re_tok += '|([\\[\\{\\(])' _re_tok += '|([\\]\\}\\)])' # 5,6: Keywords that start or continue a python block (only start of line) _re_tok += '|^([ \\t]*(?:if|for|while|with|try|def|class)\\b)' \ '|^([ \\t]*(?:elif|else|except|finally)\\b)' # 7: Our special 'end' keyword (but only if it stands alone) _re_tok += '|((?:^|;)[ \\t]*end[ \\t]*(?=(?:%(block_close)s[ \\t]*)?\\r?$|;|#))' # 8: A customizable end-of-code-block template token (only end of line) _re_tok += '|(%(block_close)s[ \\t]*(?=\\r?$))' # 9: And finally, a single newline. The 10th token is 'everything else' _re_tok += '|(\\r?\\n)' # Match the start tokens of code areas in a template _re_split = '(?m)^[ \t]*(\\\\?)((%(line_start)s)|(%(block_start)s))(%%?)' # Match inline statements (may contain python strings) _re_inl = '(?m)%%(inline_start)s((?:%s|[^\'"\n]*?)+)%%(inline_end)s' % _re_inl _re_tok = '(?m)' + _re_tok default_syntax = '<% %> % {{ }}' def __init__(self, source, syntax=None, encoding='utf8'): self.source, self.encoding = touni(source, encoding), encoding self.set_syntax(syntax or self.default_syntax) self.code_buffer, self.text_buffer = [], [] self.lineno, self.offset = 1, 0 self.indent, self.indent_mod = 0, 0 self.paren_depth = 0 def get_syntax(self): ''' Tokens as a space separated string (default: <% %> % {{ }}) ''' return self._syntax def set_syntax(self, syntax): self._syntax = syntax self._tokens = syntax.split() if not syntax in self._re_cache: names = 'block_start block_close line_start inline_start inline_end' etokens = map(re.escape, self._tokens) pattern_vars = dict(zip(names.split(), etokens)) patterns = (self._re_split, self._re_tok, self._re_inl) patterns = [re.compile(p%pattern_vars) for p in patterns] self._re_cache[syntax] = patterns self.re_split, self.re_tok, self.re_inl = self._re_cache[syntax] syntax = property(get_syntax, set_syntax) def translate(self): if self.offset: raise RuntimeError('Parser is a one time instance.') while True: m = self.re_split.search(self.source[self.offset:]) if m: text = self.source[self.offset:self.offset+m.start()] self.text_buffer.append(text) self.offset += m.end() if m.group(1): # New escape syntax line, sep, _ = self.source[self.offset:].partition('\n') self.text_buffer.append(m.group(2)+m.group(5)+line+sep) self.offset += len(line+sep)+1 continue elif m.group(5): # Old escape syntax depr('Escape code lines with a backslash.') #0.12 line, sep, _ = self.source[self.offset:].partition('\n') self.text_buffer.append(m.group(2)+line+sep) self.offset += len(line+sep)+1 continue self.flush_text() self.read_code(multiline=bool(m.group(4))) else: break self.text_buffer.append(self.source[self.offset:]) self.flush_text() return ''.join(self.code_buffer) def read_code(self, multiline): code_line, comment = '', '' while True: m = self.re_tok.search(self.source[self.offset:]) if not m: code_line += self.source[self.offset:] self.offset = len(self.source) self.write_code(code_line.strip(), comment) return code_line += self.source[self.offset:self.offset+m.start()] self.offset += m.end() _str, _com, _po, _pc, _blk1, _blk2, _end, _cend, _nl = m.groups() if (code_line or self.paren_depth > 0) and (_blk1 or _blk2): # a if b else c code_line += _blk1 or _blk2 continue if _str: # Python string code_line += _str elif _com: # Python comment (up to EOL) comment = _com if multiline and _com.strip().endswith(self._tokens[1]): multiline = False # Allow end-of-block in comments elif _po: # open parenthesis self.paren_depth += 1 code_line += _po elif _pc: # close parenthesis if self.paren_depth > 0: # we could check for matching parentheses here, but it's # easier to leave that to python - just check counts self.paren_depth -= 1 code_line += _pc elif _blk1: # Start-block keyword (if/for/while/def/try/...) code_line, self.indent_mod = _blk1, -1 self.indent += 1 elif _blk2: # Continue-block keyword (else/elif/except/...) code_line, self.indent_mod = _blk2, -1 elif _end: # The non-standard 'end'-keyword (ends a block) self.indent -= 1 elif _cend: # The end-code-block template token (usually '%>') if multiline: multiline = False else: code_line += _cend else: # \n self.write_code(code_line.strip(), comment) self.lineno += 1 code_line, comment, self.indent_mod = '', '', 0 if not multiline: break def flush_text(self): text = ''.join(self.text_buffer) del self.text_buffer[:] if not text: return parts, pos, nl = [], 0, '\\\n'+' '*self.indent for m in self.re_inl.finditer(text): prefix, pos = text[pos:m.start()], m.end() if prefix: parts.append(nl.join(map(repr, prefix.splitlines(True)))) if prefix.endswith('\n'): parts[-1] += nl parts.append(self.process_inline(m.group(1).strip())) if pos < len(text): prefix = text[pos:] lines = prefix.splitlines(True) if lines[-1].endswith('\\\\\n'): lines[-1] = lines[-1][:-3] elif lines[-1].endswith('\\\\\r\n'): lines[-1] = lines[-1][:-4] parts.append(nl.join(map(repr, lines))) code = '_printlist((%s,))' % ', '.join(parts) self.lineno += code.count('\n')+1 self.write_code(code) def process_inline(self, chunk): if chunk[0] == '!': return '_str(%s)' % chunk[1:] return '_escape(%s)' % chunk def write_code(self, line, comment=''): line, comment = self.fix_backward_compatibility(line, comment) code = ' ' * (self.indent+self.indent_mod) code += line.lstrip() + comment + '\n' self.code_buffer.append(code) def fix_backward_compatibility(self, line, comment): parts = line.strip().split(None, 2) if parts and parts[0] in ('include', 'rebase'): depr('The include and rebase keywords are functions now.') #0.12 if len(parts) == 1: return "_printlist([base])", comment elif len(parts) == 2: return "_=%s(%r)" % tuple(parts), comment else: return "_=%s(%r, %s)" % tuple(parts), comment if self.lineno <= 2 and not line.strip() and 'coding' in comment: m = re.match(r"#.*coding[:=]\s*([-\w.]+)", comment) if m: depr('PEP263 encoding strings in templates are deprecated.') #0.12 enc = m.group(1) self.source = self.source.encode(self.encoding).decode(enc) self.encoding = enc return line, comment.replace('coding','coding*') return line, comment
[docs]def template(*args, **kwargs): ''' 렌더링된 템플릿을 문자열 이터레이터로 얻기. 첫 번째 매개변수로 이름이나 파일 이름, 템플릿 문자열을 쓸 수 있다. 템플릿 렌더링 인자를 딕셔너리로 또는 (키워드 인자로) 직접 줄 수 있다. ''' tpl = args[0] if args else None adapter = kwargs.pop('template_adapter', SimpleTemplate) lookup = kwargs.pop('template_lookup', TEMPLATE_PATH) tplid = (id(lookup), tpl) if tplid not in TEMPLATES or DEBUG: settings = kwargs.pop('template_settings', {}) if isinstance(tpl, adapter): TEMPLATES[tplid] = tpl if settings: TEMPLATES[tplid].prepare(**settings) elif "\n" in tpl or "{" in tpl or "%" in tpl or '$' in tpl: TEMPLATES[tplid] = adapter(source=tpl, lookup=lookup, **settings) else: TEMPLATES[tplid] = adapter(name=tpl, lookup=lookup, **settings) if not TEMPLATES[tplid]: abort(500, 'Template (%s) not found' % tpl) for dictarg in args[1:]: kwargs.update(dictarg) return TEMPLATES[tplid].render(kwargs)
mako_template = functools.partial(template, template_adapter=MakoTemplate) cheetah_template = functools.partial(template, template_adapter=CheetahTemplate) jinja2_template = functools.partial(template, template_adapter=Jinja2Template)
[docs]def view(tpl_name, **defaults): ''' 데코레이터: 핸들러 대신 템플릿을 렌더링. 핸들러에서 다음처럼 동작을 제어할 수 있다. - 템플릿 변수들의 dict를 반환하면 그걸로 템플릿을 채운다. - dict 아닌 뭔가를 반환하면 view 데코레이터가 템플릿을 처리하지 않고 핸들러 결과를 그대로 반환한다. 예를 들어 HTTPResponse(dict)를 반환해서 autojson이나 기타 castfilter로 JSON을 얻을 수 있다. ''' def decorator(func): @functools.wraps(func) def wrapper(*args, **kwargs): result = func(*args, **kwargs) if isinstance(result, (dict, DictMixin)): tplvars = defaults.copy() tplvars.update(result) return template(tpl_name, **tplvars) elif result is None: return template(tpl_name, defaults) return result return wrapper return decorator
mako_view = functools.partial(view, template_adapter=MakoTemplate) cheetah_view = functools.partial(view, template_adapter=CheetahTemplate) jinja2_view = functools.partial(view, template_adapter=Jinja2Template) ############################################################################### # Constants and Globals ######################################################## ############################################################################### TEMPLATE_PATH = ['./', './views/'] TEMPLATES = {} DEBUG = False NORUN = False # If set, run() does nothing. Used by load_app() #: HTTP 상태 코드(예: 404)를 문구(예: 'Not Found')로 매핑하는 dict HTTP_CODES = httplib.responses HTTP_CODES[418] = "I'm a teapot" # RFC 2324 HTTP_CODES[422] = "Unprocessable Entity" # RFC 4918 HTTP_CODES[428] = "Precondition Required" HTTP_CODES[429] = "Too Many Requests" HTTP_CODES[431] = "Request Header Fields Too Large" HTTP_CODES[511] = "Network Authentication Required" _HTTP_STATUS_LINES = dict((k, '%d %s'%(k,v)) for (k,v) in HTTP_CODES.items()) #: The default template used for error pages. Override with @error() ERROR_PAGE_TEMPLATE = """ %%try: %%from %s import DEBUG, HTTP_CODES, request, touni <!DOCTYPE HTML PUBLIC "-//IETF//DTD HTML 2.0//EN"> <html> <head> <title>Error: {{e.status}}</title> <style type="text/css"> html {background-color: #eee; font-family: sans;} body {background-color: #fff; border: 1px solid #ddd; padding: 15px; margin: 15px;} pre {background-color: #eee; border: 1px solid #ddd; padding: 5px;} </style> </head> <body> <h1>Error: {{e.status}}</h1> <p>Sorry, the requested URL <tt>{{repr(request.url)}}</tt> caused an error:</p> <pre>{{e.body}}</pre> %%if DEBUG and e.exception: <h2>Exception:</h2> <pre>{{repr(e.exception)}}</pre> %%end %%if DEBUG and e.traceback: <h2>Traceback:</h2> <pre>{{e.traceback}}</pre> %%end </body> </html> %%except ImportError: <b>ImportError:</b> Could not generate the error page. Please add bottle to the import path. %%end """ % __name__ #: 스레드에 안전한 :class:`LocalRequest` 인스턴스. 요청 콜백 내에서 #: 접근 시 이 인스턴스는 (다중 스레드 서버에서도) 항상 *현재* 요청을 #: 가리킨다. request = LocalRequest() #: 스레드에 안전한 :class:`LocalResponse` 인스턴스. *현재* 요청에 대한 #: HTTP 응답을 바꾸는 데 쓴다. response = LocalResponse() #: A thread-safe namespace. Not used by Bottle. local = threading.local() # Initialize app stack (create first empty Bottle app) # BC: 0.6.4 and needed for run() app = default_app = AppStack() app.push() #: A virtual package that redirects import statements. #: Example: ``import bottle.ext.sqlite`` actually imports `bottle_sqlite`. ext = _ImportRedirect('bottle.ext' if __name__ == '__main__' else __name__+".ext", 'bottle_%s').module if __name__ == '__main__': opt, args, parser = _cmd_options, _cmd_args, _cmd_parser if opt.version: _stdout('Bottle %s\n'%__version__) sys.exit(0) if not args: parser.print_help() _stderr('\nError: No application specified.\n') sys.exit(1) sys.path.insert(0, '.') sys.modules.setdefault('bottle', sys.modules['__main__']) host, port = (opt.bind or 'localhost'), 8080 if ':' in host and host.rfind(']') < host.rfind(':'): host, port = host.rsplit(':', 1) host = host.strip('[]') run(args[0], host=host, port=int(port), server=opt.server, reloader=opt.reload, plugins=opt.plugin, debug=opt.debug) # THE END