#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Bottle is a fast and simple micro-framework for small web applications. It
offers request dispatching (Routes) with url parameter support, templates,
a built-in HTTP Server and adapters for many third party WSGI/HTTP-server and
template engines - all in a single file and with no dependencies other than the
Python Standard Library.
Homepage and documentation: http://bottlepy.org/
Copyright (c) 2016, Marcel Hellkamp.
License: MIT (see LICENSE for details)
"""
from __future__ import with_statement
__author__ = 'Marcel Hellkamp'
__version__ = '0.12.18'
__license__ = 'MIT'
# The gevent server adapter needs to patch some modules before they are imported
# This is why we parse the commandline parameters here but handle them later
if __name__ == '__main__':
from optparse import OptionParser
_cmd_parser = OptionParser(usage="usage: %prog [options] package.module:app")
_opt = _cmd_parser.add_option
_opt("--version", action="store_true", help="show version number.")
_opt("-b", "--bind", metavar="ADDRESS", help="bind socket to ADDRESS.")
_opt("-s", "--server", default='wsgiref', help="use SERVER as backend.")
_opt("-p", "--plugin", action="append", help="install additional plugin/s.")
_opt("--debug", action="store_true", help="start server in debug mode.")
_opt("--reload", action="store_true", help="auto-reload on file changes.")
_cmd_options, _cmd_args = _cmd_parser.parse_args()
if _cmd_options.server and _cmd_options.server.startswith('gevent'):
import gevent.monkey; gevent.monkey.patch_all()
import base64, cgi, email.utils, functools, hmac, itertools, mimetypes,\
os, re, subprocess, sys, tempfile, threading, time, warnings, hashlib
from datetime import date as datedate, datetime, timedelta
from tempfile import TemporaryFile
from traceback import format_exc, print_exc
from inspect import getargspec
from unicodedata import normalize
try: from simplejson import dumps as json_dumps, loads as json_lds
except ImportError: # pragma: no cover
try: from json import dumps as json_dumps, loads as json_lds
except ImportError:
try: from django.utils.simplejson import dumps as json_dumps, loads as json_lds
except ImportError:
def json_dumps(data):
raise ImportError("JSON support requires Python 2.6 or simplejson.")
json_lds = json_dumps
# We now try to fix 2.5/2.6/3.1/3.2 incompatibilities.
# It ain't pretty but it works... Sorry for the mess.
py = sys.version_info
py3k = py >= (3, 0, 0)
py25 = py < (2, 6, 0)
py31 = (3, 1, 0) <= py < (3, 2, 0)
# Workaround for the missing "as" keyword in py3k.
def _e(): return sys.exc_info()[1]
# Workaround for the "print is a keyword/function" Python 2/3 dilemma
# and a fallback for mod_wsgi (resticts stdout/err attribute access)
try:
_stdout, _stderr = sys.stdout.write, sys.stderr.write
except IOError:
_stdout = lambda x: sys.stdout.write(x)
_stderr = lambda x: sys.stderr.write(x)
# Lots of stdlib and builtin differences.
if py3k:
import http.client as httplib
import _thread as thread
from urllib.parse import urljoin, SplitResult as UrlSplitResult
from urllib.parse import urlencode, quote as urlquote, unquote as urlunquote
urlunquote = functools.partial(urlunquote, encoding='latin1')
from http.cookies import SimpleCookie
if py >= (3, 3, 0):
from collections.abc import MutableMapping as DictMixin
from types import ModuleType as new_module
else:
from collections import MutableMapping as DictMixin
from imp import new_module
import pickle
from io import BytesIO
from configparser import ConfigParser
basestring = str
unicode = str
json_loads = lambda s: json_lds(touni(s))
callable = lambda x: hasattr(x, '__call__')
imap = map
def _raise(*a): raise a[0](a[1]).with_traceback(a[2])
else: # 2.x
import httplib
import thread
from urlparse import urljoin, SplitResult as UrlSplitResult
from urllib import urlencode, quote as urlquote, unquote as urlunquote
from Cookie import SimpleCookie
from itertools import imap
import cPickle as pickle
from imp import new_module
from StringIO import StringIO as BytesIO
from ConfigParser import SafeConfigParser as ConfigParser
if py25:
msg = "Python 2.5 support may be dropped in future versions of Bottle."
warnings.warn(msg, DeprecationWarning)
from UserDict import DictMixin
def next(it): return it.next()
bytes = str
else: # 2.6, 2.7
from collections import MutableMapping as DictMixin
unicode = unicode
json_loads = json_lds
eval(compile('def _raise(*a): raise a[0], a[1], a[2]', '<py3fix>', 'exec'))
# Some helpers for string/byte handling
def tob(s, enc='utf8'):
return s.encode(enc) if isinstance(s, unicode) else bytes(s)
def touni(s, enc='utf8', err='strict'):
return s.decode(enc, err) if isinstance(s, bytes) else unicode(s)
tonat = touni if py3k else tob
# 3.2 fixes cgi.FieldStorage to accept bytes (which makes a lot of sense).
# 3.1 needs a workaround.
if py31:
from io import TextIOWrapper
class NCTextIOWrapper(TextIOWrapper):
def close(self): pass # Keep wrapped buffer open.
# A bug in functools causes it to break if the wrapper is an instance method
def update_wrapper(wrapper, wrapped, *a, **ka):
try: functools.update_wrapper(wrapper, wrapped, *a, **ka)
except AttributeError: pass
# These helpers are used at module level and need to be defined first.
# And yes, I know PEP-8, but sometimes a lower-case classname makes more sense.
def depr(message, hard=False):
warnings.warn(message, DeprecationWarning, stacklevel=3)
def makelist(data): # This is just to handy
if isinstance(data, (tuple, list, set, dict)): return list(data)
elif data: return [data]
else: return []
class DictProperty(object):
''' Property that maps to a key in a local dict-like attribute. '''
def __init__(self, attr, key=None, read_only=False):
self.attr, self.key, self.read_only = attr, key, read_only
def __call__(self, func):
functools.update_wrapper(self, func, updated=[])
self.getter, self.key = func, self.key or func.__name__
return self
def __get__(self, obj, cls):
if obj is None: return self
key, storage = self.key, getattr(obj, self.attr)
if key not in storage: storage[key] = self.getter(obj)
return storage[key]
def __set__(self, obj, value):
if self.read_only: raise AttributeError("Read-Only property.")
getattr(obj, self.attr)[self.key] = value
def __delete__(self, obj):
if self.read_only: raise AttributeError("Read-Only property.")
del getattr(obj, self.attr)[self.key]
class cached_property(object):
''' A property that is only computed once per instance and then replaces
itself with an ordinary attribute. Deleting the attribute resets the
property. '''
def __init__(self, func):
self.__doc__ = getattr(func, '__doc__')
self.func = func
def __get__(self, obj, cls):
if obj is None: return self
value = obj.__dict__[self.func.__name__] = self.func(obj)
return value
class lazy_attribute(object):
''' A property that caches itself to the class object. '''
def __init__(self, func):
functools.update_wrapper(self, func, updated=[])
self.getter = func
def __get__(self, obj, cls):
value = self.getter(cls)
setattr(cls, self.__name__, value)
return value
###############################################################################
# Exceptions and Events ########################################################
###############################################################################
[docs]class BottleException(Exception):
""" 보틀에서 쓰는 예외들의 베이스 클래스. """
pass
###############################################################################
# Routing ######################################################################
###############################################################################
class RouteError(BottleException):
""" This is a base class for all routing related exceptions """
class RouteReset(BottleException):
""" If raised by a plugin or request handler, the route is reset and all
plugins are re-applied. """
class RouterUnknownModeError(RouteError): pass
class RouteSyntaxError(RouteError):
""" The route parser found something not supported by this router. """
class RouteBuildError(RouteError):
""" The route could not be built. """
def _re_flatten(p):
''' Turn all capturing groups in a regular expression pattern into
non-capturing groups. '''
if '(' not in p: return p
return re.sub(r'(\\*)(\(\?P<[^>]+>|\((?!\?))',
lambda m: m.group(0) if len(m.group(1)) % 2 else m.group(1) + '(?:', p)
class Router(object):
''' A Router is an ordered collection of route->target pairs. It is used to
efficiently match WSGI requests against a number of routes and return
the first target that satisfies the request. The target may be anything,
usually a string, ID or callable object. A route consists of a path-rule
and a HTTP method.
The path-rule is either a static path (e.g. `/contact`) or a dynamic
path that contains wildcards (e.g. `/wiki/<page>`). The wildcard syntax
and details on the matching order are described in docs:`routing`.
'''
default_pattern = '[^/]+'
default_filter = 're'
#: The current CPython regexp implementation does not allow more
#: than 99 matching groups per regular expression.
_MAX_GROUPS_PER_PATTERN = 99
def __init__(self, strict=False):
self.rules = [] # All rules in order
self._groups = {} # index of regexes to find them in dyna_routes
self.builder = {} # Data structure for the url builder
self.static = {} # Search structure for static routes
self.dyna_routes = {}
self.dyna_regexes = {} # Search structure for dynamic routes
#: If true, static routes are no longer checked first.
self.strict_order = strict
self.filters = {
're': lambda conf:
(_re_flatten(conf or self.default_pattern), None, None),
'int': lambda conf: (r'-?\d+', int, lambda x: str(int(x))),
'float': lambda conf: (r'-?[\d.]+', float, lambda x: str(float(x))),
'path': lambda conf: (r'.+?', None, None)}
def add_filter(self, name, func):
''' Add a filter. The provided function is called with the configuration
string as parameter and must return a (regexp, to_python, to_url) tuple.
The first element is a string, the last two are callables or None. '''
self.filters[name] = func
rule_syntax = re.compile('(\\\\*)'\
'(?:(?::([a-zA-Z_][a-zA-Z_0-9]*)?()(?:#(.*?)#)?)'\
'|(?:<([a-zA-Z_][a-zA-Z_0-9]*)?(?::([a-zA-Z_]*)'\
'(?::((?:\\\\.|[^\\\\>]+)+)?)?)?>))')
def _itertokens(self, rule):
offset, prefix = 0, ''
for match in self.rule_syntax.finditer(rule):
prefix += rule[offset:match.start()]
g = match.groups()
if len(g[0])%2: # Escaped wildcard
prefix += match.group(0)[len(g[0]):]
offset = match.end()
continue
if prefix:
yield prefix, None, None
name, filtr, conf = g[4:7] if g[2] is None else g[1:4]
yield name, filtr or 'default', conf or None
offset, prefix = match.end(), ''
if offset <= len(rule) or prefix:
yield prefix+rule[offset:], None, None
def add(self, rule, method, target, name=None):
''' Add a new rule or replace the target for an existing rule. '''
anons = 0 # Number of anonymous wildcards found
keys = [] # Names of keys
pattern = '' # Regular expression pattern with named groups
filters = [] # Lists of wildcard input filters
builder = [] # Data structure for the URL builder
is_static = True
for key, mode, conf in self._itertokens(rule):
if mode:
is_static = False
if mode == 'default': mode = self.default_filter
mask, in_filter, out_filter = self.filters[mode](conf)
if not key:
pattern += '(?:%s)' % mask
key = 'anon%d' % anons
anons += 1
else:
pattern += '(?P<%s>%s)' % (key, mask)
keys.append(key)
if in_filter: filters.append((key, in_filter))
builder.append((key, out_filter or str))
elif key:
pattern += re.escape(key)
builder.append((None, key))
self.builder[rule] = builder
if name: self.builder[name] = builder
if is_static and not self.strict_order:
self.static.setdefault(method, {})
self.static[method][self.build(rule)] = (target, None)
return
try:
re_pattern = re.compile('^(%s)$' % pattern)
re_match = re_pattern.match
except re.error:
raise RouteSyntaxError("Could not add Route: %s (%s)" % (rule, _e()))
if filters:
def getargs(path):
url_args = re_match(path).groupdict()
for name, wildcard_filter in filters:
try:
url_args[name] = wildcard_filter(url_args[name])
except ValueError:
raise HTTPError(400, 'Path has wrong format.')
return url_args
elif re_pattern.groupindex:
def getargs(path):
return re_match(path).groupdict()
else:
getargs = None
flatpat = _re_flatten(pattern)
whole_rule = (rule, flatpat, target, getargs)
if (flatpat, method) in self._groups:
if DEBUG:
msg = 'Route <%s %s> overwrites a previously defined route'
warnings.warn(msg % (method, rule), RuntimeWarning)
self.dyna_routes[method][self._groups[flatpat, method]] = whole_rule
else:
self.dyna_routes.setdefault(method, []).append(whole_rule)
self._groups[flatpat, method] = len(self.dyna_routes[method]) - 1
self._compile(method)
def _compile(self, method):
all_rules = self.dyna_routes[method]
comborules = self.dyna_regexes[method] = []
maxgroups = self._MAX_GROUPS_PER_PATTERN
for x in range(0, len(all_rules), maxgroups):
some = all_rules[x:x+maxgroups]
combined = (flatpat for (_, flatpat, _, _) in some)
combined = '|'.join('(^%s$)' % flatpat for flatpat in combined)
combined = re.compile(combined).match
rules = [(target, getargs) for (_, _, target, getargs) in some]
comborules.append((combined, rules))
def build(self, _name, *anons, **query):
''' Build an URL by filling the wildcards in a rule. '''
builder = self.builder.get(_name)
if not builder: raise RouteBuildError("No route with that name.", _name)
try:
for i, value in enumerate(anons): query['anon%d'%i] = value
url = ''.join([f(query.pop(n)) if n else f for (n,f) in builder])
return url if not query else url+'?'+urlencode(query)
except KeyError:
raise RouteBuildError('Missing URL argument: %r' % _e().args[0])
def match(self, environ):
''' Return a (target, url_agrs) tuple or raise HTTPError(400/404/405). '''
verb = environ['REQUEST_METHOD'].upper()
path = environ['PATH_INFO'] or '/'
target = None
if verb == 'HEAD':
methods = ['PROXY', verb, 'GET', 'ANY']
else:
methods = ['PROXY', verb, 'ANY']
for method in methods:
if method in self.static and path in self.static[method]:
target, getargs = self.static[method][path]
return target, getargs(path) if getargs else {}
elif method in self.dyna_regexes:
for combined, rules in self.dyna_regexes[method]:
match = combined(path)
if match:
target, getargs = rules[match.lastindex - 1]
return target, getargs(path) if getargs else {}
# No matching route found. Collect alternative methods for 405 response
allowed = set([])
nocheck = set(methods)
for method in set(self.static) - nocheck:
if path in self.static[method]:
allowed.add(verb)
for method in set(self.dyna_regexes) - allowed - nocheck:
for combined, rules in self.dyna_regexes[method]:
match = combined(path)
if match:
allowed.add(method)
if allowed:
allow_header = ",".join(sorted(allowed))
raise HTTPError(405, "Method not allowed.", Allow=allow_header)
# No matching route and no alternative method found. We give up
raise HTTPError(404, "Not found: " + repr(path))
[docs]class Route(object):
''' 이 클래스는 라우트 콜백과 라우트별 메타데이터 및 설정을 감싸며
필요시 플러그인들을 적용한다. 또한 URL 경로 규칙을 Router에서
쓸 수 있는 정규 표현식으로 바꾸는 일도 맡는다.
'''
def __init__(self, app, rule, method, callback, name=None,
plugins=None, skiplist=None, **config):
#: 이 라우트가 설치된 응용.
self.app = app
#: 경로 규칙 문자열. (예: ``/wiki/:page``)
self.rule = rule
#: HTTP 메소드 문자열. (예: ``GET``)
self.method = method
#: 어떤 플러그인도 적용되지 않은 원래 콜백. 인트로스펙션에 유용함.
self.callback = callback
#: 라우트의 이름. 지정돼 있지 않으면 ``None``.
self.name = name or None
#: 라우트별 플러그인 목록. (:meth:`Bottle.route` 참고.)
self.plugins = plugins or []
#: 이 라우트에 적용하지 않을 플러그인 목록. (:meth:`Bottle.route` 참고.)
self.skiplist = skiplist or []
#: :meth:`Bottle.route` 데코레이터에 추가로 준 키워드 인자들이
#: 이 딕셔너리에 저장된다. 라우트별 플러그인 설정 및 메타데이터에
#: 쓰인다.
self.config = ConfigDict().load_dict(config, make_namespaces=True)
def __call__(self, *a, **ka):
depr("Some APIs changed to return Route() instances instead of"\
" callables. Make sure to use the Route.call method and not to"\
" call Route instances directly.") #0.12
return self.call(*a, **ka)
[docs] @cached_property
def call(self):
''' 모든 플러그인이 적용된 라우트 콜백. 이 프로퍼티는 필요시
생성되며 이후 요청 처리 속도를 높이기 위해 캐싱된다.'''
return self._make_callback()
[docs] def reset(self):
''' 캐싱된 값이 있으면 잊어 버리기. 다음 :attr:`call` 접근 때 모든
플러그인이 재적용된다. '''
self.__dict__.pop('call', None)
[docs] def prepare(self):
''' 필요시 이뤄지는 작업 모두를 즉시 하기. (디버깅에 유용)'''
self.call
@property
def _context(self):
depr('Switch to Plugin API v2 and access the Route object directly.') #0.12
return dict(rule=self.rule, method=self.method, callback=self.callback,
name=self.name, app=self.app, config=self.config,
apply=self.plugins, skip=self.skiplist)
[docs] def all_plugins(self):
''' 이 라우트에 영향 주는 모든 플러그인 내놓기. '''
unique = set()
for p in reversed(self.app.plugins + self.plugins):
if True in self.skiplist: break
name = getattr(p, 'name', False)
if name and (name in self.skiplist or name in unique): continue
if p in self.skiplist or type(p) in self.skiplist: continue
if name: unique.add(name)
yield p
def _make_callback(self):
callback = self.callback
for plugin in self.all_plugins():
try:
if hasattr(plugin, 'apply'):
api = getattr(plugin, 'api', 1)
context = self if api > 1 else self._context
callback = plugin.apply(callback, context)
else:
callback = plugin(callback)
except RouteReset: # Try again with changed configuration.
return self._make_callback()
if not callback is self.callback:
update_wrapper(callback, self.callback)
return callback
[docs] def get_undecorated_callback(self):
''' 콜백 반환. 콜백이 데코레이터로 꾸며진 함수면 원래 함수를
복원하려고 시도한다. '''
func = self.callback
func = getattr(func, '__func__' if py3k else 'im_func', func)
closure_attr = '__closure__' if py3k else 'func_closure'
while hasattr(func, closure_attr) and getattr(func, closure_attr):
func = getattr(func, closure_attr)[0].cell_contents
return func
[docs] def get_callback_args(self):
''' (아마도) 콜백이 키워드 인자로 받는 인자 이름 목록 반환.
콜백이 데코레이터로 꾸며진 함수면 인트로스펙션 전에 원래 함수
복원을 시도한다. '''
return getargspec(self.get_undecorated_callback())[0]
[docs] def get_config(self, key, default=None):
''' 설정 필드를 검색해서 그 값을 반환. 먼저 route.config를
확인하고 다음으로 route.app.config 확인.'''
for conf in (self.config, self.app.conifg):
if key in conf: return conf[key]
return default
def __repr__(self):
cb = self.get_undecorated_callback()
return '<%s %r %r>' % (self.method, self.rule, cb)
###############################################################################
# Application Object ###########################################################
###############################################################################
[docs]class Bottle(object):
""" 각 Bottle 객체는 하나의 구별된 웹 응용을 나타내며, 라우트와 콜백,
플러그인, 자원, 설정으로 구성된다. 인스턴스는 호출 가능한 WSGI
응용이다.
:param catchall: 참(기본)이면 모든 예외를 처리한다. 디버깅용
미들웨어가 예외를 처리하게 하려면 끄면 된다.
"""
def __init__(self, catchall=True, autojson=True):
#: 앱 설정을 위한 :class:`ConfigDict`.
self.config = ConfigDict()
self.config._on_change = functools.partial(self.trigger_hook, 'config')
self.config.meta_set('autojson', 'validate', bool)
self.config.meta_set('catchall', 'validate', bool)
self.config['catchall'] = catchall
self.config['autojson'] = autojson
#: 응용 파일들을 위한 :class:`ResourceManager`.
self.resources = ResourceManager()
self.routes = [] # List of installed :class:`Route` instances.
self.router = Router() # Maps requests to :class:`Route` instances.
self.error_handler = {}
# Core plugins
self.plugins = [] # List of installed plugins.
if self.config['autojson']:
self.install(JSONPlugin())
self.install(TemplatePlugin())
#: 참이면 대부분의 예외를 잡아서 :exc:`HTTPError`\로 반환한다.
catchall = DictProperty('config', 'catchall')
__hook_names = 'before_request', 'after_request', 'app_reset', 'config'
__hook_reversed = 'after_request'
@cached_property
def _hooks(self):
return dict((name, []) for name in self.__hook_names)
[docs] def add_hook(self, name, func):
''' 훅에 콜백 붙이기. 현재 세 가지 훅이 구현돼 있다.
before_request
각 요청 전에 한 번 실행. 요청 문맥은 사용할 수 있지만
아직 라우팅은 이뤄지지 않은 상태.
after_request
각 요청 후에 그 결과와 상관없이 한 번 실행.
app_reset
:meth:`Bottle.reset`\이 호출될 때마다 호출.
'''
if name in self.__hook_reversed:
self._hooks[name].insert(0, func)
else:
self._hooks[name].append(func)
[docs] def remove_hook(self, name, func):
''' 훅에서 콜백 제거. '''
if name in self._hooks and func in self._hooks[name]:
self._hooks[name].remove(func)
return True
[docs] def trigger_hook(self, __name, *args, **kwargs):
''' 훅을 실행하고 결과 리스트 반환. '''
return [hook(*args, **kwargs) for hook in self._hooks[__name][:]]
[docs] def hook(self, name):
""" 훅에 콜백을 붙이는 데코레이터 반환. 자세한 내용은
:meth:`add_hook` 참고."""
def decorator(func):
self.add_hook(name, func)
return func
return decorator
[docs] def mount(self, prefix, app, **options):
''' 응용(:class:`Bottle` 또는 단순 WSGI)을 특정 URL 프리픽스에
붙이기. 예::
root_app.mount('/admin/', admin_app)
:param prefix: 경로 프리픽스 내지 `마운트 지점`. 슬래시로
끝나면 그 슬래시가 꼭 있어야 한다.
:param app: :class:`Bottle` 인스턴스 또는 WSGI 응용.
다른 매개변수들은 모두 하위 :meth:`route` 호출로 전달된다.
'''
if isinstance(app, basestring):
depr('Parameter order of Bottle.mount() changed.', True) # 0.10
segments = [p for p in prefix.split('/') if p]
if not segments: raise ValueError('Empty path prefix.')
path_depth = len(segments)
def mountpoint_wrapper():
try:
request.path_shift(path_depth)
rs = HTTPResponse([])
def start_response(status, headerlist, exc_info=None):
if exc_info:
try:
_raise(*exc_info)
finally:
exc_info = None
rs.status = status
for name, value in headerlist: rs.add_header(name, value)
return rs.body.append
body = app(request.environ, start_response)
if body and rs.body: body = itertools.chain(rs.body, body)
rs.body = body or rs.body
return rs
finally:
request.path_shift(-path_depth)
options.setdefault('skip', True)
options.setdefault('method', 'PROXY')
options.setdefault('mountpoint', {'prefix': prefix, 'target': app})
options['callback'] = mountpoint_wrapper
self.route('/%s/<:re:.*>' % '/'.join(segments), **options)
if not prefix.endswith('/'):
self.route('/' + '/'.join(segments), **options)
[docs] def merge(self, routes):
''' 다른 :class:`Bottle` 응용의 라우트나 :class:`Route` 객체 리스트를
이 응용으로 병합. 라우트의 '소유자'가 그대로 유지된다. 즉
:data:`Route.app` 속성이 바뀌지 않는다. '''
if isinstance(routes, Bottle):
routes = routes.routes
for route in routes:
self.add_route(route)
[docs] def install(self, plugin):
''' plugin을 플러그인 목록에 추가하고 이 응용의 모든 라우트에
적용될 수 있게 준비. plugin은 단순 데코레이터일 수도 있고
:class:`Plugin` API를 구현하는 객체일 수도 있다.
'''
if hasattr(plugin, 'setup'): plugin.setup(self)
if not callable(plugin) and not hasattr(plugin, 'apply'):
raise TypeError("Plugins must be callable or implement .apply()")
self.plugins.append(plugin)
self.reset()
return plugin
[docs] def uninstall(self, plugin):
''' 플러그인 제거. 특정 플러그인을 제거하려면 인스턴스를 주면 되고,
어떤 타입의 모든 플러그인을 제거하려면 그 타입 객체를, ``name``
속성이 일치하는 모든 플러그인을 제거하려면 문자열을, 모든
플러그인을 제거하려면 ``True``\를 주면 된다. 제거된 플러그인
목록을 반환한다. '''
removed, remove = [], plugin
for i, plugin in list(enumerate(self.plugins))[::-1]:
if remove is True or remove is plugin or remove is type(plugin) \
or getattr(plugin, 'name', True) == remove:
removed.append(plugin)
del self.plugins[i]
if hasattr(plugin, 'close'): plugin.close()
if removed: self.reset()
return removed
[docs] def reset(self, route=None):
''' 모든 라우트를 재설정(플러그인들을 강제로 재적용)하고 캐시
모두 비우기. ID나 라우트 객체를 주면 그 특정 라우트만
영향받는다. '''
if route is None: routes = self.routes
elif isinstance(route, Route): routes = [route]
else: routes = [self.routes[route]]
for route in routes: route.reset()
if DEBUG:
for route in routes: route.prepare()
self.trigger_hook('app_reset')
[docs] def close(self):
''' 응용과 설치된 모든 플러그인 닫기. '''
for plugin in self.plugins:
if hasattr(plugin, 'close'): plugin.close()
self.stopped = True
[docs] def run(self, **kwargs):
''' 같은 매개변수로 :func:`run` 호출. '''
run(self, **kwargs)
[docs] def match(self, environ):
""" 걸리는 라우트를 검색해서 (:class:`Route` , urlargs) 튜플을
반환. 두 번째 값은 URL에서 추출한 매개변수들을 담은 딕셔너리다.
불일치 시 :exc:`HTTPError` (404/405)를 던진다."""
return self.router.match(environ)
[docs] def get_url(self, routename, **kargs):
""" 지정한 라우트에 걸리는 문자열을 반환. """
scriptname = request.environ.get('SCRIPT_NAME', '').strip('/') + '/'
location = self.router.build(routename, **kargs).lstrip('/')
return urljoin(urljoin('/', scriptname), location)
[docs] def add_route(self, route):
''' 라우트 객체 추가. 단 :data:`Route.app` 속성은 바꾸지 않음. '''
self.routes.append(route)
self.router.add(route.rule, route.method, route, name=route.name)
if DEBUG: route.prepare()
[docs] def route(self, path=None, method='GET', callback=None, name=None,
apply=None, skip=None, **config):
""" 요청 URL에 함수를 결속시키는 데코레이터. 예::
@app.route('/hello/:name')
def hello(name):
return 'Hello %s' % name
``:name`` 부분은 와일드카드다. 자세한 문법은 :class:`Router`
참고.
:param path: 받을 요청 경로 또는 경로 목록. 경로를 지정하지
않으면 함수 시그너처를 가지고 자동으로 생성한다.
:param method: 받을 HTTP 메소드(`GET`, `POST`, `PUT`, ...) 또는
메소드들의 목록. (기본값: `GET`)
:param callback: 데코레이터 문법을 피하기 위한 선택적인 지정 방법.
``route(..., callback=func)``\가 ``route(...)(func)``\와 같다.
:param name: 이 라우트의 이름. (기본값: None)
:param apply: 데코레이터, 또는 플러그인, 또는 플러그인 목록.
설치돼 있는 플러그인들에 더해서 라우트 콜백에 적용된다.
:param skip: 플러그인 또는 플러그인 클래스 또는 이름의 목록.
걸리는 플러그인은 이 라우트에 설치하지 않는다. ``True``\면
모두 건너뛴다.
추가 키워드 인자가 있으면 라우트별 설정에 저장돼서 플러그인들로
전달된다. (:meth:`Plugin.apply` 참고.)
"""
if callable(path): path, callback = None, path
plugins = makelist(apply)
skiplist = makelist(skip)
def decorator(callback):
# TODO: Documentation and tests
if isinstance(callback, basestring): callback = load(callback)
for rule in makelist(path) or yieldroutes(callback):
for verb in makelist(method):
verb = verb.upper()
route = Route(self, rule, verb, callback, name=name,
plugins=plugins, skiplist=skiplist, **config)
self.add_route(route)
return callback
return decorator(callback) if callback else decorator
[docs] def get(self, path=None, method='GET', **options):
""" :meth:`route`\와 같음. """
return self.route(path, method, **options)
[docs] def post(self, path=None, method='POST', **options):
""" :meth:`route`\와 같되 method 매개변수가 ``POST``. """
return self.route(path, method, **options)
[docs] def put(self, path=None, method='PUT', **options):
""" :meth:`route`\와 같되 method 매개변수가 ``PUT``. """
return self.route(path, method, **options)
[docs] def delete(self, path=None, method='DELETE', **options):
""" :meth:`route`\와 같되 method 매개변수가 ``DELETE``. """
return self.route(path, method, **options)
[docs] def error(self, code=500):
""" 데코레이터: HTTP 오류 코드에 대한 출력 핸들러 등록"""
def wrapper(handler):
self.error_handler[int(code)] = handler
return handler
return wrapper
def default_error_handler(self, res):
return tob(template(ERROR_PAGE_TEMPLATE, e=res))
def _handle(self, environ):
path = environ['bottle.raw_path'] = environ['PATH_INFO']
if py3k:
try:
environ['PATH_INFO'] = path.encode('latin1').decode('utf8')
except UnicodeError:
return HTTPError(400, 'Invalid path string. Expected UTF-8')
try:
environ['bottle.app'] = self
request.bind(environ)
response.bind()
try:
self.trigger_hook('before_request')
route, args = self.router.match(environ)
environ['route.handle'] = route
environ['bottle.route'] = route
environ['route.url_args'] = args
return route.call(**args)
finally:
self.trigger_hook('after_request')
except HTTPResponse:
return _e()
except RouteReset:
route.reset()
return self._handle(environ)
except (KeyboardInterrupt, SystemExit, MemoryError):
raise
except Exception:
if not self.catchall: raise
stacktrace = format_exc()
environ['wsgi.errors'].write(stacktrace)
return HTTPError(500, "Internal Server Error", _e(), stacktrace)
def _cast(self, out, peek=None):
""" Try to convert the parameter into something WSGI compatible and set
correct HTTP headers when possible.
Support: False, str, unicode, dict, HTTPResponse, HTTPError, file-like,
iterable of strings and iterable of unicodes
"""
# Empty output is done here
if not out:
if 'Content-Length' not in response:
response['Content-Length'] = 0
return []
# Join lists of byte or unicode strings. Mixed lists are NOT supported
if isinstance(out, (tuple, list))\
and isinstance(out[0], (bytes, unicode)):
out = out[0][0:0].join(out) # b'abc'[0:0] -> b''
# Encode unicode strings
if isinstance(out, unicode):
out = out.encode(response.charset)
# Byte Strings are just returned
if isinstance(out, bytes):
if 'Content-Length' not in response:
response['Content-Length'] = len(out)
return [out]
# HTTPError or HTTPException (recursive, because they may wrap anything)
# TODO: Handle these explicitly in handle() or make them iterable.
if isinstance(out, HTTPError):
out.apply(response)
out = self.error_handler.get(out.status_code, self.default_error_handler)(out)
return self._cast(out)
if isinstance(out, HTTPResponse):
out.apply(response)
return self._cast(out.body)
# File-like objects.
if hasattr(out, 'read'):
if 'wsgi.file_wrapper' in request.environ:
return request.environ['wsgi.file_wrapper'](out)
elif hasattr(out, 'close') or not hasattr(out, '__iter__'):
return WSGIFileWrapper(out)
# Handle Iterables. We peek into them to detect their inner type.
try:
iout = iter(out)
first = next(iout)
while not first:
first = next(iout)
except StopIteration:
return self._cast('')
except HTTPResponse:
first = _e()
except (KeyboardInterrupt, SystemExit, MemoryError):
raise
except Exception:
if not self.catchall: raise
first = HTTPError(500, 'Unhandled exception', _e(), format_exc())
# These are the inner types allowed in iterator or generator objects.
if isinstance(first, HTTPResponse):
return self._cast(first)
elif isinstance(first, bytes):
new_iter = itertools.chain([first], iout)
elif isinstance(first, unicode):
encoder = lambda x: x.encode(response.charset)
new_iter = imap(encoder, itertools.chain([first], iout))
else:
msg = 'Unsupported response type: %s' % type(first)
return self._cast(HTTPError(500, msg))
if hasattr(out, 'close'):
new_iter = _closeiter(new_iter, out.close)
return new_iter
[docs] def wsgi(self, environ, start_response):
""" 보틀의 WSGI 인터페이스. """
try:
out = self._cast(self._handle(environ))
# rfc2616 section 4.3
if response._status_code in (100, 101, 204, 304)\
or environ['REQUEST_METHOD'] == 'HEAD':
if hasattr(out, 'close'): out.close()
out = []
start_response(response._status_line, response.headerlist)
return out
except (KeyboardInterrupt, SystemExit, MemoryError):
raise
except Exception:
if not self.catchall: raise
err = '<h1>Critical error while processing request: %s</h1>' \
% html_escape(environ.get('PATH_INFO', '/'))
if DEBUG:
err += '<h2>Error:</h2>\n<pre>\n%s\n</pre>\n' \
'<h2>Traceback:</h2>\n<pre>\n%s\n</pre>\n' \
% (html_escape(repr(_e())), html_escape(format_exc()))
environ['wsgi.errors'].write(err)
headers = [('Content-Type', 'text/html; charset=UTF-8')]
start_response('500 INTERNAL SERVER ERROR', headers, sys.exc_info())
return [tob(err)]
def __call__(self, environ, start_response):
''' Each instance of :class:'Bottle' is a WSGI application. '''
return self.wsgi(environ, start_response)
###############################################################################
# HTTP and WSGI Tools ##########################################################
###############################################################################
[docs]class BaseRequest(object):
""" WSGI 환경 딕셔너리들의 래퍼에 여러 편리한 접근 메소드와 프로퍼티를
더한 것. 대부분 읽기 전용이다.
요청에 새 속성을 추가하면 실제로는 환경 딕셔너리에
('bottle.request.ext.<name>'으로) 추가된다. 요청별 데이터를
저장하고 접근하려 할 때 권장하는 방식이다.
"""
__slots__ = ('environ')
#: :attr:`body`\를 위한 메모리 버퍼의 바이트 단위 최대 크기.
MEMFILE_MAX = 102400
def __init__(self, environ=None):
""" WSGI environ 딕셔너리 포장. """
#: 포장된 WSGI environ 딕셔너리. 유일한 진짜 속성이다.
#: 다른 속성들은 사실 모두 읽기 전용 프로퍼티다.
self.environ = {} if environ is None else environ
self.environ['bottle.request'] = self
[docs] @DictProperty('environ', 'bottle.app', read_only=True)
def app(self):
''' 이 요청을 처리 중인 보틀 응용. '''
raise RuntimeError('This request is not connected to an application.')
[docs] @DictProperty('environ', 'bottle.route', read_only=True)
def route(self):
""" 이 요청에 걸린 보틀 :class:`Route` 객체. """
raise RuntimeError('This request is not connected to a route.')
[docs] @DictProperty('environ', 'route.url_args', read_only=True)
def url_args(self):
""" URL에서 추출한 인자들. """
raise RuntimeError('This request is not connected to a route.')
@property
def path(self):
''' ``PATH_INFO`` 값 앞에 슬래시를 딱 한 개 붙인 것. (이상
동작하는 클라이언트에 대처하고 "빈 경로" 경우를 피하기 위해.) '''
return '/' + self.environ.get('PATH_INFO','').lstrip('/')
@property
def method(self):
''' ``REQUEST_METHOD`` 값을 대문자 문자열로. '''
return self.environ.get('REQUEST_METHOD', 'GET').upper()
[docs] @DictProperty('environ', 'bottle.request.cookies', read_only=True)
def cookies(self):
""" 쿠키들을 파싱해서 담은 :class:`FormsDict`. 서명된 쿠키가
디코딩되어 있지 않다. 서명된 쿠키에는 :meth:`get_cookie` 사용. """
cookies = SimpleCookie(self.environ.get('HTTP_COOKIE','')).values()
return FormsDict((c.key, c.value) for c in cookies)
[docs] def get_cookie(self, key, default=None, secret=None):
""" 쿠키 내용물 반환. `서명된 쿠키`\를 읽으려면 `secret`\이 쿠키
생성 시 사용한 값과 일치해야 한다.
(:meth:`BaseResponse.set_cookie` 참고.) 뭔가 잘못되면 (쿠키가
없거나 서명이 틀리면) 기본값을 반환한다. """
value = self.cookies.get(key)
if secret and value:
dec = cookie_decode(value, secret) # (key, value) tuple or None
return dec[1] if dec and dec[0] == key else default
return value or default
[docs] @DictProperty('environ', 'bottle.request.query', read_only=True)
def query(self):
''' :attr:`query_string`\을 파싱해서 담은 :class:`FormsDict`.
이 값들을 "URL 인자"나 "GET 매개변수"라고도 하는데,
:class:`Router`\에서 제공하는 "URL 와일드카드"와 혼동하지
말아야 한다. '''
get = self.environ['bottle.get'] = FormsDict()
pairs = _parse_qsl(self.environ.get('QUERY_STRING', ''))
for key, value in pairs:
get[key] = value
return get
[docs] @DictProperty('environ', 'bottle.request.params', read_only=True)
def params(self):
""" :attr:`query`\와 :attr:`forms`\의 값을 합친 :class:`FormsDict`.
파일 업로드는 :attr:`files`\에 저장된다. """
params = FormsDict()
for key, value in self.query.allitems():
params[key] = value
for key, value in self.forms.allitems():
params[key] = value
return params
[docs] @DictProperty('environ', 'bottle.request.files', read_only=True)
def files(self):
""" `multipart/form-data`\로 인코딩된 POST 또는 PUT 요청 바디에서
파싱한 파일 업로드. 값들이 :class:`FileUpload`\의 인스턴스다.
"""
files = FormsDict()
for name, item in self.POST.allitems():
if isinstance(item, FileUpload):
files[name] = item
return files
[docs] @DictProperty('environ', 'bottle.request.json', read_only=True)
def json(self):
''' ``Content-Type`` 헤더가 ``application/json``\인 경우 이
프로퍼티는 파싱된 요청 바디 내용물을 담는다. 메모리 고갈을
피하기 위해 :attr:`MEMFILE_MAX`\보다 작은 요청만 처리한다. '''
ctype = self.environ.get('CONTENT_TYPE', '').lower().split(';')[0]
if ctype == 'application/json':
b = self._get_body_string()
if not b:
return None
return json_loads(b)
return None
def _iter_body(self, read, bufsize):
maxread = max(0, self.content_length)
while maxread:
part = read(min(maxread, bufsize))
if not part: break
yield part
maxread -= len(part)
def _iter_chunked(self, read, bufsize):
err = HTTPError(400, 'Error while parsing chunked transfer body.')
rn, sem, bs = tob('\r\n'), tob(';'), tob('')
while True:
header = read(1)
while header[-2:] != rn:
c = read(1)
header += c
if not c: raise err
if len(header) > bufsize: raise err
size, _, _ = header.partition(sem)
try:
maxread = int(tonat(size.strip()), 16)
except ValueError:
raise err
if maxread == 0: break
buff = bs
while maxread > 0:
if not buff:
buff = read(min(maxread, bufsize))
part, buff = buff[:maxread], buff[maxread:]
if not part: raise err
yield part
maxread -= len(part)
if read(2) != rn:
raise err
@DictProperty('environ', 'bottle.request.body', read_only=True)
def _body(self):
body_iter = self._iter_chunked if self.chunked else self._iter_body
read_func = self.environ['wsgi.input'].read
body, body_size, is_temp_file = BytesIO(), 0, False
for part in body_iter(read_func, self.MEMFILE_MAX):
body.write(part)
body_size += len(part)
if not is_temp_file and body_size > self.MEMFILE_MAX:
body, tmp = TemporaryFile(mode='w+b'), body
body.write(tmp.getvalue())
del tmp
is_temp_file = True
self.environ['wsgi.input'] = body
body.seek(0)
return body
def _get_body_string(self):
''' read body until content-length or MEMFILE_MAX into a string. Raise
HTTPError(413) on requests that are to large. '''
clen = self.content_length
if clen > self.MEMFILE_MAX:
raise HTTPError(413, 'Request to large')
if clen < 0: clen = self.MEMFILE_MAX + 1
data = self.body.read(clen)
if len(data) > self.MEMFILE_MAX: # Fail fast
raise HTTPError(413, 'Request to large')
return data
@property
def body(self):
""" seek 가능한 파일스러운 객체로 된 HTTP 요청 바디.
:attr:`MEMFILE_MAX`\에 따라 임시 파일이거나 :class:`io.BytesIO`
인스턴스다. 이 프로퍼티에 처음 접근할 때 환경 변수
``wsgi.input``\을 읽어서 교체한다. 이후 접근 때는 그 파일
객체에 `seek(0)`\만 한다. """
self._body.seek(0)
return self._body
@property
def chunked(self):
''' chunked 전송 인코딩이었으면 True. '''
return 'chunked' in self.environ.get('HTTP_TRANSFER_ENCODING', '').lower()
#: :attr:`query`\의 별칭.
GET = query
[docs] @DictProperty('environ', 'bottle.request.post', read_only=True)
def POST(self):
""" :attr:`forms`\와 :attr:`files`\의 값을 하나로 합친
:class:`FormsDict`. 값이 문자열(양식 값) 또는
:class:`cgi.FieldStorage` 인스턴스(파일 업로드)다.
"""
post = FormsDict()
# We default to application/x-www-form-urlencoded for everything that
# is not multipart and take the fast path (also: 3.1 workaround)
if not self.content_type.startswith('multipart/'):
pairs = _parse_qsl(tonat(self._get_body_string(), 'latin1'))
for key, value in pairs:
post[key] = value
return post
safe_env = {'QUERY_STRING':''} # Build a safe environment for cgi
for key in ('REQUEST_METHOD', 'CONTENT_TYPE', 'CONTENT_LENGTH'):
if key in self.environ: safe_env[key] = self.environ[key]
args = dict(fp=self.body, environ=safe_env, keep_blank_values=True)
if py31:
args['fp'] = NCTextIOWrapper(args['fp'], encoding='utf8',
newline='\n')
elif py3k:
args['encoding'] = 'utf8'
data = cgi.FieldStorage(**args)
self['_cgi.FieldStorage'] = data #http://bugs.python.org/issue18394#msg207958
data = data.list or []
for item in data:
if item.filename:
post[item.name] = FileUpload(item.file, item.name,
item.filename, item.headers)
else:
post[item.name] = item.value
return post
@property
def url(self):
""" 호스트 이름과 스킴을 포함한 전체 요청 URI. 앱이 역방향 프록시나
로드 밸런서 뒤에서 도는데 이상한 결과가 나온다면
``X-Forwarded-Host`` 헤더가 올바로 설정되는지 확인해 보자. """
return self.urlparts.geturl()
[docs] @DictProperty('environ', 'bottle.request.urlparts', read_only=True)
def urlparts(self):
''' :attr:`url` 문자열을 :class:`urlparse.SplitResult` 튜플로 만든 것.
그 튜플에는 (scheme, host, path, query_string, fragment)가 들어
있는데, fragment는 서버에게 보이지 않으므로 항상 비어 있다. '''
env = self.environ
http = env.get('HTTP_X_FORWARDED_PROTO') or env.get('wsgi.url_scheme', 'http')
host = env.get('HTTP_X_FORWARDED_HOST') or env.get('HTTP_HOST')
if not host:
# HTTP 1.1 requires a Host-header. This is for HTTP/1.0 clients.
host = env.get('SERVER_NAME', '127.0.0.1')
port = env.get('SERVER_PORT')
if port and port != ('80' if http == 'http' else '443'):
host += ':' + port
path = urlquote(self.fullpath)
return UrlSplitResult(http, host, path, env.get('QUERY_STRING'), '')
@property
def fullpath(self):
""" (존재 시) :attr:`script_name`\을 포함한 요청 경로. """
return urljoin(self.script_name, self.path.lstrip('/'))
@property
def query_string(self):
""" URL의 :attr:`query` 부분(``?``\와 ``#`` 사이 전부)을 그대로
문자열로 만든 것. """
return self.environ.get('QUERY_STRING', '')
@property
def script_name(self):
''' 응용을 호출하기 전에 상위 단계(서버 또는 라우팅 미들웨어)에서
제거한 URL `path` 부분 원래 값. 그 스크립트 경로 앞과 뒤에
슬래시를 붙여서 반환한다. '''
script_name = self.environ.get('SCRIPT_NAME', '').strip('/')
return '/' + script_name + '/' if script_name else '/'
[docs] def path_shift(self, shift=1):
''' :attr:`path`\에서 :attr:`script_name`\으로 또는 반대로 경로
분절들을 이동.
:param shift: 옮길 경로 분절 수. 음수로 해서 이동 방향을 바꿀
수도 있다. (기본값: 1)
'''
script = self.environ.get('SCRIPT_NAME','/')
self['SCRIPT_NAME'], self['PATH_INFO'] = path_shift(script, self.path, shift)
@property
def content_length(self):
''' 정수로 된 요청 바디 길이. 이 헤더를 설정할 책임이 클라이언트에게
있다. 설정하지 않았으면 바디의 실제 길이를 알 수 없으므로 -1을
반환한다. 그 경우 :attr:`body`\가 비어 있게 된다. '''
return int(self.environ.get('CONTENT_LENGTH') or -1)
@property
def content_type(self):
''' 소문자로 된 Content-Type 헤더. (기본값: 빈 문자열) '''
return self.environ.get('CONTENT_TYPE', '').lower()
@property
def is_xhr(self):
''' XMLHttpRequest로 시작된 요청이면 True. 자바스크립트 라이브러리에서
`X-Requested-With` 헤더를 지원하는 경우에만 동작한다. (인기 있는
라이브러리들은 대부분 지원한다.) '''
requested_with = self.environ.get('HTTP_X_REQUESTED_WITH','')
return requested_with.lower() == 'xmlhttprequest'
@property
def is_ajax(self):
''' :attr:`is_xhr`\의 별칭. "Ajax"는 올바른 용어가 아니다. '''
return self.is_xhr
@property
def auth(self):
""" (user, password) 튜플로 된 HTTP 인증 데이터. 이 구현체에선
현재 basic 인증만 지원한다. (digest는 지원하지 않는다.)
상위 수준에서 (가령 프론트엔드 웹 서버나 미들웨어에서) 인증이
이뤄지는 경우 password 필드가 None이지만 user 필드는
환경 변수 ``REMOTE_USER``\에서 찾아 넣는다. 오류 발생 시
None을 반환한다. """
basic = parse_auth(self.environ.get('HTTP_AUTHORIZATION',''))
if basic: return basic
ruser = self.environ.get('REMOTE_USER')
if ruser: return (ruser, None)
return None
@property
def remote_route(self):
""" 이 요청에 관련된 모든 IP 주소 목록. 클라이언트 IP로 시작해서
0개 이상의 프록시가 따라온다. 모든 프록시에서 ``X-Forwarded-For``
헤더를 지원하는 경우에만 동작한다. 악의적 클라이언트가 이 정보를
위조할 수 있다는 점에 유의하자. """
proxy = self.environ.get('HTTP_X_FORWARDED_FOR')
if proxy: return [ip.strip() for ip in proxy.split(',')]
remote = self.environ.get('REMOTE_ADDR')
return [remote] if remote else []
@property
def remote_addr(self):
""" 문자열로 된 클라이언트 IP. 악의적 클라이언트가 이 정보를
위조할 수 있다는 점에 유의하자. """
route = self.remote_route
return route[0] if route else None
[docs] def copy(self):
""" :attr:`environ`\을 얕게 복사한 새 :class:`Request` 반환. """
return Request(self.environ.copy())
def get(self, value, default=None): return self.environ.get(value, default)
def __getitem__(self, key): return self.environ[key]
def __delitem__(self, key): self[key] = ""; del(self.environ[key])
def __iter__(self): return iter(self.environ)
def __len__(self): return len(self.environ)
def keys(self): return self.environ.keys()
def __setitem__(self, key, value):
""" Change an environ value and clear all caches that depend on it. """
if self.environ.get('bottle.request.readonly'):
raise KeyError('The environ dictionary is read-only.')
self.environ[key] = value
todelete = ()
if key == 'wsgi.input':
todelete = ('body', 'forms', 'files', 'params', 'post', 'json')
elif key == 'QUERY_STRING':
todelete = ('query', 'params')
elif key.startswith('HTTP_'):
todelete = ('headers', 'cookies')
for key in todelete:
self.environ.pop('bottle.request.'+key, None)
def __repr__(self):
return '<%s: %s %s>' % (self.__class__.__name__, self.method, self.url)
def __getattr__(self, name):
''' Search in self.environ for additional user defined attributes. '''
try:
var = self.environ['bottle.request.ext.%s'%name]
return var.__get__(self) if hasattr(var, '__get__') else var
except KeyError:
raise AttributeError('Attribute %r not defined.' % name)
def __setattr__(self, name, value):
if name == 'environ': return object.__setattr__(self, name, value)
self.environ['bottle.request.ext.%s'%name] = value
def _hkey(key):
if '\n' in key or '\r' in key or '\0' in key:
raise ValueError("Header names must not contain control characters: %r" % key)
return key.title().replace('_', '-')
def _hval(value):
value = tonat(value)
if '\n' in value or '\r' in value or '\0' in value:
raise ValueError("Header value must not contain control characters: %r" % value)
return value
class HeaderProperty(object):
def __init__(self, name, reader=None, writer=None, default=''):
self.name, self.default = name, default
self.reader, self.writer = reader, writer
self.__doc__ = '%r 헤더의 현재 값.' % name.title()
def __get__(self, obj, cls):
if obj is None: return self
value = obj.get_header(self.name, self.default)
return self.reader(value) if self.reader else value
def __set__(self, obj, value):
obj[self.name] = self.writer(value) if self.writer else value
def __delete__(self, obj):
del obj[self.name]
[docs]class BaseResponse(object):
""" 응답 바디, 헤더, 쿠키 저장 클래스.
이 클래스는 분명 헤더에 대한 대소문자 구별 없는 dict스러운 항목
접근을 지원하지만 dict가 아니다. 무엇보다 응답에 반복문을 돌리면
헤더가 아니라 바디 조각들을 내놓는다.
:param body: 지원 타입들 중 하나로 된 응답 바디.
:param status: HTTP 상태 코드(예: 200) 또는 이유 문구를
포함한 상태 행(예: '200 OK').
:param headers: 이름-값 짝의 딕셔너리 또는 리스트.
추가로 준 키워드 인자들은 헤더 목록에 추가된다. 헤더 이름의
밑줄이 대시로 바뀐다.
"""
default_status = 200
default_content_type = 'text/html; charset=UTF-8'
# Header blacklist for specific response codes
# (rfc2616 section 10.2.3 and 10.3.5)
bad_headers = {
204: set(('Content-Type',)),
304: set(('Allow', 'Content-Encoding', 'Content-Language',
'Content-Length', 'Content-Range', 'Content-Type',
'Content-Md5', 'Last-Modified'))}
def __init__(self, body='', status=None, headers=None, **more_headers):
self._cookies = None
self._headers = {}
self.body = body
self.status = status or self.default_status
if headers:
if isinstance(headers, dict):
headers = headers.items()
for name, value in headers:
self.add_header(name, value)
if more_headers:
for name, value in more_headers.items():
self.add_header(name, value)
[docs] def copy(self, cls=None):
''' 자기 사본을 반환. '''
cls = cls or BaseResponse
assert issubclass(cls, BaseResponse)
copy = cls()
copy.status = self.status
copy._headers = dict((k, v[:]) for (k, v) in self._headers.items())
if self._cookies:
copy._cookies = SimpleCookie()
copy._cookies.load(self._cookies.output(header=''))
return copy
def __iter__(self):
return iter(self.body)
def close(self):
if hasattr(self.body, 'close'):
self.body.close()
@property
def status_line(self):
''' 문자열로 된 HTTP 상태 행. (예: ``404 Not Found``)'''
return self._status_line
@property
def status_code(self):
''' 정수로 된 HTTP 상태 코드. (예: 404)'''
return self._status_code
def _set_status(self, status):
if isinstance(status, int):
code, status = status, _HTTP_STATUS_LINES.get(status)
elif ' ' in status:
status = status.strip()
code = int(status.split()[0])
else:
raise ValueError('String status line without a reason phrase.')
if not 100 <= code <= 999: raise ValueError('Status code out of range.')
self._status_code = code
self._status_line = str(status or ('%d Unknown' % code))
def _get_status(self):
return self._status_line
status = property(_get_status, _set_status, None,
''' HTTP 응답 상태를 바꾸기 위한 쓰기 가능 프로퍼티. 숫자 코드
(100-999) 또는 자체 이유 문구(예: "404 Brain not found")가 있는
문자열을 받는다. 그에 따라 :data:`status_line`\과
:data:`status_code`\가 모두 갱신된다. 반환 값은 항상 상태 행
문자열이다. ''')
del _get_status, _set_status
@property
def headers(self):
''' :class:`HeaderDict` 인스턴스. 응답 헤더들에 대한 대소문자
구별 없는 dict스러운 뷰. '''
hdict = HeaderDict()
hdict.dict = self._headers
return hdict
def __contains__(self, name): return _hkey(name) in self._headers
def __delitem__(self, name): del self._headers[_hkey(name)]
def __getitem__(self, name): return self._headers[_hkey(name)][-1]
def __setitem__(self, name, value): self._headers[_hkey(name)] = [_hval(value)]
@property
def headerlist(self):
""" WSGI에 부합하는 (헤더, 값) 튜플 리스트. """
out = []
headers = list(self._headers.items())
if 'Content-Type' not in self._headers:
headers.append(('Content-Type', [self.default_content_type]))
if self._status_code in self.bad_headers:
bad_headers = self.bad_headers[self._status_code]
headers = [h for h in headers if h[0] not in bad_headers]
out += [(name, val) for (name, vals) in headers for val in vals]
if self._cookies:
for c in self._cookies.values():
out.append(('Set-Cookie', _hval(c.OutputString())))
if py3k:
out = [(k, v.encode('utf8').decode('latin1')) for (k, v) in out]
return out
content_type = HeaderProperty('Content-Type')
content_length = HeaderProperty('Content-Length', reader=int)
expires = HeaderProperty('Expires',
reader=lambda x: datetime.utcfromtimestamp(parse_date(x)),
writer=lambda x: http_date(x))
@property
def charset(self, default='UTF-8'):
""" content-type 헤더에 지정된 문자셋 반환. (기본값: utf8) """
if 'charset=' in self.content_type:
return self.content_type.split('charset=')[-1].split(';')[0].strip()
return default
[docs] def set_cookie(self, name, value, secret=None, **options):
''' 새 쿠키를 만들거나 기존 쿠키를 교체. `secret` 매개변수가
설정돼 있으면 `서명된 쿠키`\(아래에서 설명)를 만든다.
:param name: 쿠키 이름.
:param value: 쿠키 값.
:param secret: 서명된 쿠키에 필요한 서명 키.
더불어 이 메소드는 :class:`cookie.Morsel`\에서 지원하는 모든
RFC 2109 속성을 받는다. 다음이 포함된다.
:param max_age: 초 단위 최대 수명. (기본값: None)
:param expires: datetime 객체 또는 유닉스 타임스탬프. (기본값: None)
:param domain: 쿠키 읽는 게 허용되는 도메인.
(기본값: 현재 도메인)
:param path: 쿠키를 지정한 경로로 제한. (기본값: 현재 경로)
:param secure: 쿠키를 HTTPS 연결로 제한. (기본값: 꺼짐)
:param httponly: 클라이언트 측 자바스크립트에서 이 쿠키를 읽지
못하게 하기. (기본값: 꺼짐. 파이썬 2.6 이상 필요.)
`expires`\와 `max_age` 어느 쪽도 설정하지 않으면 (기본값) 브라우저
세션이 끝날 때 (브라우저 창이 닫히자마자) 쿠키가 만료된다.
서명된 쿠키에는 pickle 가능한 어떤 객체든 저장할 수 있으며
암호학적 서명을 해서 변조를 막는다. 대부분 브라우저에서 쿠키가
4kb로 제한돼 있다는 걸 잊지 말자.
경고: 서명된 쿠키가 암호화되는 건 아니다. (클라이언트가 여전히
내용물을 볼 수 있다.) 그리고 복사 방지가 되는 것도 아니다.
(클라이언트가 이전 쿠키를 재사용할 수 있다.) 주된 의도는
pickle 처리를 안전하게 만드는 것이지, 클라이언트 쪽에 비밀 정보를
저장하는 게 아니다.
'''
if not self._cookies:
self._cookies = SimpleCookie()
if secret:
value = touni(cookie_encode((name, value), secret))
elif not isinstance(value, basestring):
raise TypeError('Secret key missing for non-string Cookie.')
if len(value) > 4096: raise ValueError('Cookie value to long.')
self._cookies[name] = value
for key, value in options.items():
if key == 'max_age':
if isinstance(value, timedelta):
value = value.seconds + value.days * 24 * 3600
if key == 'expires':
if isinstance(value, (datedate, datetime)):
value = value.timetuple()
elif isinstance(value, (int, float)):
value = time.gmtime(value)
value = time.strftime("%a, %d %b %Y %H:%M:%S GMT", value)
self._cookies[name][key.replace('_', '-')] = value
[docs] def delete_cookie(self, key, **kwargs):
''' 쿠키 삭제. 쿠키를 만들 때와 같은 `domain` 및 `path` 설정을
쓰도록 하자. '''
kwargs['max_age'] = -1
kwargs['expires'] = 0
self.set_cookie(key, '', **kwargs)
def __repr__(self):
out = ''
for name, value in self.headerlist:
out += '%s: %s\n' % (name.title(), value.strip())
return out
def local_property(name=None):
if name: depr('local_property() is deprecated and will be removed.') #0.12
ls = threading.local()
def fget(self):
try: return ls.var
except AttributeError:
raise RuntimeError("Request context not initialized.")
def fset(self, value): ls.var = value
def fdel(self): del ls.var
return property(fget, fset, fdel, '스레드 로컬 프로퍼티')
[docs]class LocalRequest(BaseRequest):
''' 스레드마다 각기 다른 속성 집합이 있는 :class:`BaseRequest`\의
스레드 로컬 서브클래스. 일반적으로 이 클래스의 전역 인스턴스는
하나만(:data:`request`) 있다. 요청/응답 처리 사이클 중에 접근 시
그 인스턴스는 (다중 스레드 서버에서도) 항상 *현재* 요청을
가리킨다. '''
bind = BaseRequest.__init__
environ = local_property()
[docs]class LocalResponse(BaseResponse):
''' 스레드마다 각기 다른 속성 집합이 있는 :class:`BaseResponse`\의
스레드 로컬 서브클래스. 일반적으로 이 클래스의 전역 인스턴스는
하나만(:data:`response`) 있다. 요청/응답 처리 사이클 마지막에
그 속성들을 사용해 HTTP 응답을 만든다.
'''
bind = BaseResponse.__init__
_status_line = local_property()
_status_code = local_property()
_cookies = local_property()
_headers = local_property()
body = local_property()
Request = BaseRequest
Response = BaseResponse
[docs]class HTTPResponse(Response, BottleException):
def __init__(self, body='', status=None, headers=None, **more_headers):
super(HTTPResponse, self).__init__(body, status, headers, **more_headers)
def apply(self, response):
response._status_code = self._status_code
response._status_line = self._status_line
response._headers = self._headers
response._cookies = self._cookies
response.body = self.body
[docs]class HTTPError(HTTPResponse):
default_status = 500
def __init__(self, status=None, body=None, exception=None, traceback=None,
**options):
self.exception = exception
self.traceback = traceback
super(HTTPError, self).__init__(body, status, **options)
###############################################################################
# Plugins ######################################################################
###############################################################################
class PluginError(BottleException): pass
class JSONPlugin(object):
name = 'json'
api = 2
def __init__(self, json_dumps=json_dumps):
self.json_dumps = json_dumps
def apply(self, callback, route):
dumps = self.json_dumps
if not dumps: return callback
def wrapper(*a, **ka):
try:
rv = callback(*a, **ka)
except HTTPError:
rv = _e()
if isinstance(rv, dict):
#Attempt to serialize, raises exception on failure
json_response = dumps(rv)
#Set content type only if serialization succesful
response.content_type = 'application/json'
return json_response
elif isinstance(rv, HTTPResponse) and isinstance(rv.body, dict):
rv.body = dumps(rv.body)
rv.content_type = 'application/json'
return rv
return wrapper
class TemplatePlugin(object):
''' This plugin applies the :func:`view` decorator to all routes with a
`template` config parameter. If the parameter is a tuple, the second
element must be a dict with additional options (e.g. `template_engine`)
or default variables for the template. '''
name = 'template'
api = 2
def apply(self, callback, route):
conf = route.config.get('template')
if isinstance(conf, (tuple, list)) and len(conf) == 2:
return view(conf[0], **conf[1])(callback)
elif isinstance(conf, str):
return view(conf)(callback)
else:
return callback
#: Not a plugin, but part of the plugin API. TODO: Find a better place.
class _ImportRedirect(object):
def __init__(self, name, impmask):
''' Create a virtual package that redirects imports (see PEP 302). '''
self.name = name
self.impmask = impmask
self.module = sys.modules.setdefault(name, new_module(name))
self.module.__dict__.update({'__file__': __file__, '__path__': [],
'__all__': [], '__loader__': self})
sys.meta_path.append(self)
def find_module(self, fullname, path=None):
if '.' not in fullname: return
packname = fullname.rsplit('.', 1)[0]
if packname != self.name: return
return self
def load_module(self, fullname):
if fullname in sys.modules: return sys.modules[fullname]
modname = fullname.rsplit('.', 1)[1]
realname = self.impmask % modname
__import__(realname)
module = sys.modules[fullname] = sys.modules[realname]
setattr(self.module, modname, module)
module.__loader__ = self
return module
###############################################################################
# Common Utilities #############################################################
###############################################################################
[docs]class MultiDict(DictMixin):
""" 이 dict는 키별로 여러 값을 저장한다는 점을 빼면 일반 dict와
똑같이 동작한다. 키를 주면 최신 값만 반환한다. 전체 값 리스트에
접근하는 데 쓸 수 있는 메소드들이 따로 있다.
"""
def __init__(self, *a, **k):
self.dict = dict((k, [v]) for (k, v) in dict(*a, **k).items())
def __len__(self): return len(self.dict)
def __iter__(self): return iter(self.dict)
def __contains__(self, key): return key in self.dict
def __delitem__(self, key): del self.dict[key]
def __getitem__(self, key): return self.dict[key][-1]
def __setitem__(self, key, value): self.append(key, value)
[docs] def keys(self): return self.dict.keys()
if py3k:
def values(self): return (v[-1] for v in self.dict.values())
def items(self): return ((k, v[-1]) for k, v in self.dict.items())
def allitems(self):
return ((k, v) for k, vl in self.dict.items() for v in vl)
iterkeys = keys
itervalues = values
iteritems = items
iterallitems = allitems
else:
[docs] def values(self): return [v[-1] for v in self.dict.values()]
[docs] def items(self): return [(k, v[-1]) for k, v in self.dict.items()]
[docs] def iterkeys(self): return self.dict.iterkeys()
[docs] def itervalues(self): return (v[-1] for v in self.dict.itervalues())
[docs] def iteritems(self):
return ((k, v[-1]) for k, v in self.dict.iteritems())
def iterallitems(self):
return ((k, v) for k, vl in self.dict.iteritems() for v in vl)
def allitems(self):
return [(k, v) for k, vl in self.dict.iteritems() for v in vl]
[docs] def get(self, key, default=None, index=-1, type=None):
''' 키에 대한 최신 값을 반환.
:param default: 키가 없거나 타입 변환이 실패한 경우 반환할
기본값.
:param index: 값 리스트에 대한 색인.
:param type: 지정돼 있으면 그 콜러블을 이용해 값을 특정 타입으로
캐스트한다. 콜러블에서 예외를 던지면 막고서 기본값을 반환한다.
'''
try:
val = self.dict[key][index]
return type(val) if type else val
except Exception:
pass
return default
[docs] def append(self, key, value):
''' 이 키에 대한 값 리스트에 새 값을 추가. '''
self.dict.setdefault(key, []).append(value)
[docs] def replace(self, key, value):
''' 값 리스트를 값 하나로 교체. '''
self.dict[key] = [value]
[docs] def getall(self, key):
''' 키에 대한 (비어 있을 수 있는) 값 리스트 반환. '''
return self.dict.get(key) or []
#: 다른 multi-dict API (Django)를 흉내내기 위한 WTForms 에일리어스.
getone = get
getlist = getall
[docs]class ConfigDict(dict):
''' dict스러운 설정 저장소. 네임스페이스, 값 검사, 메타데이터,
변경 주시 등을 추가로 지원.
이 저장소는 빠른 읽기 접근이 가능하게 최적화돼 있다. 키를
가져오거나 비변경 dict 메소드(예: `dict.get()`) 사용 시
원래 dict 대비 오버헤드가 전혀 없다.
'''
__slots__ = ('_meta', '_on_change')
class Namespace(DictMixin):
def __init__(self, config, namespace):
self._config = config
self._prefix = namespace
def __getitem__(self, key):
depr('Accessing namespaces as dicts is discouraged. '
'Only use flat item access: '
'cfg["names"]["pace"]["key"] -> cfg["name.space.key"]') #0.12
return self._config[self._prefix + '.' + key]
def __setitem__(self, key, value):
self._config[self._prefix + '.' + key] = value
def __delitem__(self, key):
del self._config[self._prefix + '.' + key]
def __iter__(self):
ns_prefix = self._prefix + '.'
for key in self._config:
ns, dot, name = key.rpartition('.')
if ns == self._prefix and name:
yield name
def keys(self): return [x for x in self]
def __len__(self): return len(self.keys())
def __contains__(self, key): return self._prefix + '.' + key in self._config
def __repr__(self): return '<Config.Namespace %s.*>' % self._prefix
def __str__(self): return '<Config.Namespace %s.*>' % self._prefix
# Deprecated ConfigDict features
def __getattr__(self, key):
depr('Attribute access is deprecated.') #0.12
if key not in self and key[0].isupper():
self[key] = ConfigDict.Namespace(self._config, self._prefix + '.' + key)
if key not in self and key.startswith('__'):
raise AttributeError(key)
return self.get(key)
def __setattr__(self, key, value):
if key in ('_config', '_prefix'):
self.__dict__[key] = value
return
depr('Attribute assignment is deprecated.') #0.12
if hasattr(DictMixin, key):
raise AttributeError('Read-only attribute.')
if key in self and self[key] and isinstance(self[key], self.__class__):
raise AttributeError('Non-empty namespace attribute.')
self[key] = value
def __delattr__(self, key):
if key in self:
val = self.pop(key)
if isinstance(val, self.__class__):
prefix = key + '.'
for key in self:
if key.startswith(prefix):
del self[prefix+key]
def __call__(self, *a, **ka):
depr('Calling ConfDict is deprecated. Use the update() method.') #0.12
self.update(*a, **ka)
return self
def __init__(self, *a, **ka):
self._meta = {}
self._on_change = lambda name, value: None
if a or ka:
depr('Constructor does no longer accept parameters.') #0.12
self.update(*a, **ka)
[docs] def load_config(self, filename):
''' \*.ini 방식 설정 파일에서 값 읽어 들이기.
설정 파일에 섹션이 있으면 그 이름을 네임스페이스로 해서
값을 넣는다. 두 가지 특수 섹션 ``DEFAULT``\와 ``bottle``\은
루트 네임스페이스(프리픽스 없음)를 가리킨다.
'''
conf = ConfigParser()
conf.read(filename)
for section in conf.sections():
for key, value in conf.items(section):
if section not in ('DEFAULT', 'bottle'):
key = section + '.' + key
self[key] = value
return self
[docs] def load_dict(self, source, namespace='', make_namespaces=False):
''' 딕셔너리 구조에서 값 가져오기. 중첩 구조를 써서 네임스페이스를
나타낼 수 있다.
>>> ConfigDict().load_dict({'name': {'space': {'key': 'value'}}})
{'name.space.key': 'value'}
'''
stack = [(namespace, source)]
while stack:
prefix, source = stack.pop()
if not isinstance(source, dict):
raise TypeError('Source is not a dict (r)' % type(key))
for key, value in source.items():
if not isinstance(key, basestring):
raise TypeError('Key is not a string (%r)' % type(key))
full_key = prefix + '.' + key if prefix else key
if isinstance(value, dict):
stack.append((full_key, value))
if make_namespaces:
self[full_key] = self.Namespace(self, full_key)
else:
self[full_key] = value
return self
[docs] def update(self, *a, **ka):
''' 첫 번째 매개변수가 문자열이면 모든 키 이름 앞에 그 네임스페이스를
붙인다. 그 경우 외에는 일반 dict.update()처럼 동작한다.
예: ``update('some.namespace', key='value')`` '''
prefix = ''
if a and isinstance(a[0], basestring):
prefix = a[0].strip('.') + '.'
a = a[1:]
for key, value in dict(*a, **ka).items():
self[prefix+key] = value
[docs] def setdefault(self, key, value):
if key not in self:
self[key] = value
return self[key]
def __setitem__(self, key, value):
if not isinstance(key, basestring):
raise TypeError('Key has type %r (not a string)' % type(key))
value = self.meta_get(key, 'filter', lambda x: x)(value)
if key in self and self[key] is value:
return
self._on_change(key, value)
dict.__setitem__(self, key, value)
def __delitem__(self, key):
dict.__delitem__(self, key)
[docs] def clear(self):
for key in self:
del self[key]
# Deprecated ConfigDict features
def __getattr__(self, key):
depr('Attribute access is deprecated.') #0.12
if key not in self and key[0].isupper():
self[key] = self.Namespace(self, key)
if key not in self and key.startswith('__'):
raise AttributeError(key)
return self.get(key)
def __setattr__(self, key, value):
if key in self.__slots__:
return dict.__setattr__(self, key, value)
depr('Attribute assignment is deprecated.') #0.12
if hasattr(dict, key):
raise AttributeError('Read-only attribute.')
if key in self and self[key] and isinstance(self[key], self.Namespace):
raise AttributeError('Non-empty namespace attribute.')
self[key] = value
def __delattr__(self, key):
if key in self:
val = self.pop(key)
if isinstance(val, self.Namespace):
prefix = key + '.'
for key in self:
if key.startswith(prefix):
del self[prefix+key]
def __call__(self, *a, **ka):
depr('Calling ConfDict is deprecated. Use the update() method.') #0.12
self.update(*a, **ka)
return self
[docs]class AppStack(list):
""" 스택스러운 리스트. 호출하면 스택 상단 항목 반환. """
def __call__(self):
""" Return the current default application. """
return self[-1]
[docs] def push(self, value=None):
""" 스택에 새 :class:`Bottle` 인스턴스 추가. """
if not isinstance(value, Bottle):
value = Bottle()
self.append(value)
return value
class WSGIFileWrapper(object):
def __init__(self, fp, buffer_size=1024*64):
self.fp, self.buffer_size = fp, buffer_size
for attr in ('fileno', 'close', 'read', 'readlines', 'tell', 'seek'):
if hasattr(fp, attr): setattr(self, attr, getattr(fp, attr))
def __iter__(self):
buff, read = self.buffer_size, self.read
while True:
part = read(buff)
if not part: return
yield part
class _closeiter(object):
''' This only exists to be able to attach a .close method to iterators that
do not support attribute assignment (most of itertools). '''
def __init__(self, iterator, close=None):
self.iterator = iterator
self.close_callbacks = makelist(close)
def __iter__(self):
return iter(self.iterator)
def close(self):
for func in self.close_callbacks:
func()
[docs]class ResourceManager(object):
''' 탐색 경로 목록을 관리하고 응용 관련 자원(파일)을 찾아서
여는 걸 돕는 클래스.
:param base: :meth:`add_path` 호출을 위한 기본값.
:param opener: 자원 여는 데 쓰는 콜러블.
:param cachemode: 어떤 탐색 결과를 캐싱할지 제어. 'all',
'found', 'none' 중 하나.
'''
def __init__(self, base='./', opener=open, cachemode='all'):
self.opener = open
self.base = base
self.cachemode = cachemode
#: 탐색 경로 목록. 자세한 건 :meth:`add_path` 참고.
self.path = []
#: 결정된 경로들의 캐시. ``res.cache.clear()``\로 캐시 비울 수 있음.
self.cache = {}
[docs] def add_path(self, path, base=None, index=None, create=False):
''' 탐색 경로 목록에 새 경로 추가. 존재하지 않는 경로면
False 반환.
:param path: 새 탐색 경로. 상대 경로는 정규화된 절대 경로로
바뀐다. 경로가 파일처럼 생겼으면 (`/`\로 끝나지 않으면)
파일명 부분을 없앤다.
:param base: 상대 탐색 경로를 절대 경로로 바꾸는 데 쓰는 경로.
기본은 :attr:`base` 속성 값인데, 그 기본값은 ``os.getcwd()``.
:param index: 탐색 경로 목록 내 위치. 기본은 마지막 색인 위치
(리스트에 덧붙이기).
`base` 매개변수는 파이썬 모듈 내지 패키지와 함께 설치된
파일을 쉽게 참조할 수 있게 해 준다. ::
res.add_path('./resources/', __file__)
'''
base = os.path.abspath(os.path.dirname(base or self.base))
path = os.path.abspath(os.path.join(base, os.path.dirname(path)))
path += os.sep
if path in self.path:
self.path.remove(path)
if create and not os.path.isdir(path):
os.makedirs(path)
if index is None:
self.path.append(path)
else:
self.path.insert(index, path)
self.cache.clear()
return os.path.exists(path)
def __iter__(self):
''' Iterate over all existing files in all registered paths. '''
search = self.path[:]
while search:
path = search.pop()
if not os.path.isdir(path): continue
for name in os.listdir(path):
full = os.path.join(path, name)
if os.path.isdir(full): search.append(full)
else: yield full
[docs] def lookup(self, name):
''' 자원을 탐색해서 파일 절대 경로를 반환. 없으면 `None` 반환.
:attr:`path` 목록을 차례로 탐색한다. 먼저 걸린 경로를 반환한다.
심볼릭 링크를 따라간다. 결과를 캐싱해서 이후 검색 속도를
높인다. '''
if name not in self.cache or DEBUG:
for path in self.path:
fpath = os.path.join(path, name)
if os.path.isfile(fpath):
if self.cachemode in ('all', 'found'):
self.cache[name] = fpath
return fpath
if self.cachemode == 'all':
self.cache[name] = None
return self.cache[name]
[docs] def open(self, name, mode='r', *args, **kwargs):
''' 자원을 찾아서 파일 객체 반환, 또는 IOError 던짐. '''
fname = self.lookup(name)
if not fname: raise IOError("Resource %r not found." % name)
return self.opener(fname, mode=mode, *args, **kwargs)
[docs]class FileUpload(object):
def __init__(self, fileobj, name, filename, headers=None):
''' Wrapper for file uploads. '''
#: 열린 파일(스러운) 객체 (BytesIO 버퍼나 임시 파일)
self.file = fileobj
#: 업로드 양식 필드 이름
self.name = name
#: 클라이언트가 보낸 그대로의 파일 이름 (안전하지 않은 문자 있을 수 있음)
self.raw_filename = filename
#: 추가 헤더들(예: content-type)이 있는 :class:`HeaderDict`
self.headers = HeaderDict(headers) if headers else HeaderDict()
content_type = HeaderProperty('Content-Type')
content_length = HeaderProperty('Content-Length', reader=int, default=-1)
[docs] @cached_property
def filename(self):
''' 클라이언트 파일 시스템 상의 파일 이름. 단 파일 시스템 호환성이
보장되도록 정규화 돼 있음. 빈 파일 이름은 'empty'로 반환.
최종 파일 이름에서는 ASCII 문자, 숫자, 대시, 밑줄, 마침표만
허용한다. 가능한 경우 강세 표시를 없앤다. 공백을 대시 한 개로
바꾼다. 앞이나 뒤의 마침표 및 대시를 제거한다. 파일 이름
길이를 255문자로 제한한다.
'''
fname = self.raw_filename
if not isinstance(fname, unicode):
fname = fname.decode('utf8', 'ignore')
fname = normalize('NFKD', fname).encode('ASCII', 'ignore').decode('ASCII')
fname = os.path.basename(fname.replace('\\', os.path.sep))
fname = re.sub(r'[^a-zA-Z0-9-_.\s]', '', fname).strip()
fname = re.sub(r'[-\s]+', '-', fname).strip('.-')
return fname[:255] or 'empty'
def _copy_file(self, fp, chunk_size=2**16):
read, write, offset = self.file.read, fp.write, self.file.tell()
while 1:
buf = read(chunk_size)
if not buf: break
write(buf)
self.file.seek(offset)
[docs] def save(self, destination, overwrite=False, chunk_size=2**16):
''' 파일을 디스크에 저장하거나 내용물을 열린 파일(스러운) 객체로
복사한다. *destination*\이 디렉터리면 경로에 :attr:`filename`\을
덧붙인다. 기본적으로 기존 파일을 덮어 쓰지 않는다 (IOError).
:param destination: 파일 경로나 디렉터리, 파일(스러운) 객체.
:param overwrite: 참이면 기본 파일을 교체. (기본값: False)
:param chunk_size: 한 번에 읽을 바이트 수. (기본값: 64kb)
'''
if isinstance(destination, basestring): # Except file-likes here
if os.path.isdir(destination):
destination = os.path.join(destination, self.filename)
if not overwrite and os.path.exists(destination):
raise IOError('File exists.')
with open(destination, 'wb') as fp:
self._copy_file(fp, chunk_size)
else:
self._copy_file(destination, chunk_size)
###############################################################################
# Application Helper ###########################################################
###############################################################################
def abort(code=500, text='Unknown Error.'):
""" Aborts execution and causes a HTTP error. """
raise HTTPError(code, text)
def redirect(url, code=None):
""" Aborts execution and causes a 303 or 302 redirect, depending on
the HTTP protocol version. """
if not code:
code = 303 if request.get('SERVER_PROTOCOL') == "HTTP/1.1" else 302
res = response.copy(cls=HTTPResponse)
res.status = code
res.body = ""
res.set_header('Location', urljoin(request.url, url))
raise res
def _file_iter_range(fp, offset, bytes, maxread=1024*1024):
''' Yield chunks from a range in a file. No chunk is bigger than maxread.'''
fp.seek(offset)
while bytes > 0:
part = fp.read(min(bytes, maxread))
if not part: break
bytes -= len(part)
yield part
def static_file(filename, root, mimetype='auto', download=False, charset='UTF-8'):
""" Open a file in a safe way and return :exc:`HTTPResponse` with status
code 200, 305, 403 or 404. The ``Content-Type``, ``Content-Encoding``,
``Content-Length`` and ``Last-Modified`` headers are set if possible.
Special support for ``If-Modified-Since``, ``Range`` and ``HEAD``
requests.
:param filename: Name or path of the file to send.
:param root: Root path for file lookups. Should be an absolute directory
path.
:param mimetype: Defines the content-type header (default: guess from
file extension)
:param download: If True, ask the browser to open a `Save as...` dialog
instead of opening the file with the associated program. You can
specify a custom filename as a string. If not specified, the
original filename is used (default: False).
:param charset: The charset to use for files with a ``text/*``
mime-type. (default: UTF-8)
"""
root = os.path.abspath(root) + os.sep
filename = os.path.abspath(os.path.join(root, filename.strip('/\\')))
headers = dict()
if not filename.startswith(root):
return HTTPError(403, "Access denied.")
if not os.path.exists(filename) or not os.path.isfile(filename):
return HTTPError(404, "File does not exist.")
if not os.access(filename, os.R_OK):
return HTTPError(403, "You do not have permission to access this file.")
if mimetype == 'auto':
mimetype, encoding = mimetypes.guess_type(filename)
if encoding: headers['Content-Encoding'] = encoding
if mimetype:
if mimetype[:5] == 'text/' and charset and 'charset' not in mimetype:
mimetype += '; charset=%s' % charset
headers['Content-Type'] = mimetype
if download:
download = os.path.basename(filename if download == True else download)
headers['Content-Disposition'] = 'attachment; filename="%s"' % download
stats = os.stat(filename)
headers['Content-Length'] = clen = stats.st_size
lm = time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(stats.st_mtime))
headers['Last-Modified'] = lm
ims = request.environ.get('HTTP_IF_MODIFIED_SINCE')
if ims:
ims = parse_date(ims.split(";")[0].strip())
if ims is not None and ims >= int(stats.st_mtime):
headers['Date'] = time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime())
return HTTPResponse(status=304, **headers)
body = '' if request.method == 'HEAD' else open(filename, 'rb')
headers["Accept-Ranges"] = "bytes"
ranges = request.environ.get('HTTP_RANGE')
if 'HTTP_RANGE' in request.environ:
ranges = list(parse_range_header(request.environ['HTTP_RANGE'], clen))
if not ranges:
return HTTPError(416, "Requested Range Not Satisfiable")
offset, end = ranges[0]
headers["Content-Range"] = "bytes %d-%d/%d" % (offset, end-1, clen)
headers["Content-Length"] = str(end-offset)
if body: body = _file_iter_range(body, offset, end-offset)
return HTTPResponse(body, status=206, **headers)
return HTTPResponse(body, **headers)
###############################################################################
# HTTP Utilities and MISC (TODO) ###############################################
###############################################################################
[docs]def debug(mode=True):
""" 디버그 수준 변경.
현재 지원하는 디버그 수준은 한 가지뿐이다."""
global DEBUG
if mode: warnings.simplefilter('default')
DEBUG = bool(mode)
def http_date(value):
if isinstance(value, (datedate, datetime)):
value = value.utctimetuple()
elif isinstance(value, (int, float)):
value = time.gmtime(value)
if not isinstance(value, basestring):
value = time.strftime("%a, %d %b %Y %H:%M:%S GMT", value)
return value
[docs]def parse_date(ims):
""" rfc1123, rfc850, asctime 형식 타임스탬프를 파싱해서 UTC 에포크 시간 반환. """
try:
ts = email.utils.parsedate_tz(ims)
return time.mktime(ts[:8] + (0,)) - (ts[9] or 0) - time.timezone
except (TypeError, ValueError, IndexError, OverflowError):
return None
[docs]def parse_auth(header):
""" rfc2617 HTTP 인증 헤더 문자열(basic)을 파싱해서 (user,pass) 튜플 또는 None 반환."""
try:
method, data = header.split(None, 1)
if method.lower() == 'basic':
user, pwd = touni(base64.b64decode(tob(data))).split(':',1)
return user, pwd
except (KeyError, ValueError):
return None
def parse_range_header(header, maxlen=0):
''' Yield (start, end) ranges parsed from a HTTP Range header. Skip
unsatisfiable ranges. The end index is non-inclusive.'''
if not header or header[:6] != 'bytes=': return
ranges = [r.split('-', 1) for r in header[6:].split(',') if '-' in r]
for start, end in ranges:
try:
if not start: # bytes=-100 -> last 100 bytes
start, end = max(0, maxlen-int(end)), maxlen
elif not end: # bytes=100- -> all but the first 99 bytes
start, end = int(start), maxlen
else: # bytes=100-200 -> bytes 100-200 (inclusive)
start, end = int(start), min(int(end)+1, maxlen)
if 0 <= start < end <= maxlen:
yield start, end
except ValueError:
pass
def _parse_qsl(qs):
r = []
for pair in qs.replace(';','&').split('&'):
if not pair: continue
nv = pair.split('=', 1)
if len(nv) != 2: nv.append('')
key = urlunquote(nv[0].replace('+', ' '))
value = urlunquote(nv[1].replace('+', ' '))
r.append((key, value))
return r
def _lscmp(a, b):
''' Compares two strings in a cryptographically safe way:
Runtime is not affected by length of common prefix. '''
return not sum(0 if x==y else 1 for x, y in zip(a, b)) and len(a) == len(b)
[docs]def cookie_encode(data, key):
''' pickle 가능 객체를 인코딩 및 서명. (바이트)열 반환. '''
msg = base64.b64encode(pickle.dumps(data, -1))
sig = base64.b64encode(hmac.new(tob(key), msg, digestmod=hashlib.md5).digest())
return tob('!') + sig + tob('?') + msg
[docs]def cookie_decode(data, key):
''' 인코딩된 문자열을 검사 및 디코딩. 객체 또는 None 반환.'''
data = tob(data)
if cookie_is_encoded(data):
sig, msg = data.split(tob('?'), 1)
if _lscmp(sig[1:], base64.b64encode(hmac.new(tob(key), msg, digestmod=hashlib.md5).digest())):
return pickle.loads(base64.b64decode(msg))
return None
[docs]def cookie_is_encoded(data):
''' 인자가 인코딩된 쿠키처럼 보이면 True 반환.'''
return bool(data.startswith(tob('!')) and tob('?') in data)
def html_escape(string):
''' Escape HTML special characters ``&<>`` and quotes ``'"``. '''
return string.replace('&','&').replace('<','<').replace('>','>')\
.replace('"','"').replace("'",''')
def html_quote(string):
''' Escape and quote a string to be used as an HTTP attribute.'''
return '"%s"' % html_escape(string).replace('\n',' ')\
.replace('\r',' ').replace('\t','	')
[docs]def yieldroutes(func):
""" func 매개변수의 시그너처(이름, 인자)에 맞는 라우트를 내놓는
제너레이터 반환. 함수가 선택적 키워드 인자를 받는 경우 여러 라우트를
내놓을 수도 있다. 출력 예를 보자. ::
a() -> '/a'
b(x, y) -> '/b/<x>/<y>'
c(x, y=5) -> '/c/<x>' 및 '/c/<x>/<y>'
d(x=5, y=6) -> '/d' 및 '/d/<x>' 및 '/d/<x>/<y>'
"""
path = '/' + func.__name__.replace('__','/').lstrip('/')
spec = getargspec(func)
argc = len(spec[0]) - len(spec[3] or [])
path += ('/<%s>' * argc) % tuple(spec[0][:argc])
yield path
for arg in spec[0][argc:]:
path += '/<%s>' % arg
yield path
[docs]def path_shift(script_name, path_info, shift=1):
''' PATH_INFO에서 SCRIPT_NAME으로 또는 반대로 경로 분절들을 이동.
:return: 수정된 경로들.
:param script_name: SCRIPT_NAME 경로.
:param path_info: PATH_INFO 경로.
:param shift: 옮길 경로 분절 수. 음수로 해서 이동 방향을
바꿀 수도 있다. (기본값: 1)
'''
if shift == 0: return script_name, path_info
pathlist = path_info.strip('/').split('/')
scriptlist = script_name.strip('/').split('/')
if pathlist and pathlist[0] == '': pathlist = []
if scriptlist and scriptlist[0] == '': scriptlist = []
if shift > 0 and shift <= len(pathlist):
moved = pathlist[:shift]
scriptlist = scriptlist + moved
pathlist = pathlist[shift:]
elif shift < 0 and shift >= -len(scriptlist):
moved = scriptlist[shift:]
pathlist = moved + pathlist
scriptlist = scriptlist[:shift]
else:
empty = 'SCRIPT_NAME' if shift < 0 else 'PATH_INFO'
raise AssertionError("Cannot shift. Nothing left from %s" % empty)
new_script_name = '/' + '/'.join(scriptlist)
new_path_info = '/' + '/'.join(pathlist)
if path_info.endswith('/') and pathlist: new_path_info += '/'
return new_script_name, new_path_info
def auth_basic(check, realm="private", text="Access denied"):
''' Callback decorator to require HTTP auth (basic).
TODO: Add route(check_auth=...) parameter. '''
def decorator(func):
def wrapper(*a, **ka):
user, password = request.auth or (None, None)
if user is None or not check(user, password):
err = HTTPError(401, text)
err.add_header('WWW-Authenticate', 'Basic realm="%s"' % realm)
return err
return func(*a, **ka)
return wrapper
return decorator
# Shortcuts for common Bottle methods.
# They all refer to the current default application.
def make_default_app_wrapper(name):
''' Return a callable that relays calls to the current default app. '''
@functools.wraps(getattr(Bottle, name))
def wrapper(*a, **ka):
return getattr(app(), name)(*a, **ka)
return wrapper
route = make_default_app_wrapper('route')
get = make_default_app_wrapper('get')
post = make_default_app_wrapper('post')
put = make_default_app_wrapper('put')
delete = make_default_app_wrapper('delete')
error = make_default_app_wrapper('error')
mount = make_default_app_wrapper('mount')
hook = make_default_app_wrapper('hook')
install = make_default_app_wrapper('install')
uninstall = make_default_app_wrapper('uninstall')
url = make_default_app_wrapper('get_url')
###############################################################################
# Server Adapter ###############################################################
###############################################################################
class ServerAdapter(object):
quiet = False
def __init__(self, host='127.0.0.1', port=8080, **options):
self.options = options
self.host = host
self.port = int(port)
def run(self, handler): # pragma: no cover
pass
def __repr__(self):
args = ', '.join(['%s=%s'%(k,repr(v)) for k, v in self.options.items()])
return "%s(%s)" % (self.__class__.__name__, args)
class CGIServer(ServerAdapter):
quiet = True
def run(self, handler): # pragma: no cover
from wsgiref.handlers import CGIHandler
def fixed_environ(environ, start_response):
environ.setdefault('PATH_INFO', '')
return handler(environ, start_response)
CGIHandler().run(fixed_environ)
class FlupFCGIServer(ServerAdapter):
def run(self, handler): # pragma: no cover
import flup.server.fcgi
self.options.setdefault('bindAddress', (self.host, self.port))
flup.server.fcgi.WSGIServer(handler, **self.options).run()
class WSGIRefServer(ServerAdapter):
def run(self, app): # pragma: no cover
from wsgiref.simple_server import WSGIRequestHandler, WSGIServer
from wsgiref.simple_server import make_server
import socket
class FixedHandler(WSGIRequestHandler):
def address_string(self): # Prevent reverse DNS lookups please.
return self.client_address[0]
def log_request(*args, **kw):
if not self.quiet:
return WSGIRequestHandler.log_request(*args, **kw)
handler_cls = self.options.get('handler_class', FixedHandler)
server_cls = self.options.get('server_class', WSGIServer)
if ':' in self.host: # Fix wsgiref for IPv6 addresses.
if getattr(server_cls, 'address_family') == socket.AF_INET:
class server_cls(server_cls):
address_family = socket.AF_INET6
srv = make_server(self.host, self.port, app, server_cls, handler_cls)
srv.serve_forever()
class CherryPyServer(ServerAdapter):
def run(self, handler): # pragma: no cover
from cherrypy import wsgiserver
self.options['bind_addr'] = (self.host, self.port)
self.options['wsgi_app'] = handler
certfile = self.options.get('certfile')
if certfile:
del self.options['certfile']
keyfile = self.options.get('keyfile')
if keyfile:
del self.options['keyfile']
server = wsgiserver.CherryPyWSGIServer(**self.options)
if certfile:
server.ssl_certificate = certfile
if keyfile:
server.ssl_private_key = keyfile
try:
server.start()
finally:
server.stop()
class WaitressServer(ServerAdapter):
def run(self, handler):
from waitress import serve
serve(handler, host=self.host, port=self.port)
class PasteServer(ServerAdapter):
def run(self, handler): # pragma: no cover
from paste import httpserver
from paste.translogger import TransLogger
handler = TransLogger(handler, setup_console_handler=(not self.quiet))
httpserver.serve(handler, host=self.host, port=str(self.port),
**self.options)
class MeinheldServer(ServerAdapter):
def run(self, handler):
from meinheld import server
server.listen((self.host, self.port))
server.run(handler)
class FapwsServer(ServerAdapter):
""" Extremely fast webserver using libev. See http://www.fapws.org/ """
def run(self, handler): # pragma: no cover
import fapws._evwsgi as evwsgi
from fapws import base, config
port = self.port
if float(config.SERVER_IDENT[-2:]) > 0.4:
# fapws3 silently changed its API in 0.5
port = str(port)
evwsgi.start(self.host, port)
# fapws3 never releases the GIL. Complain upstream. I tried. No luck.
if 'BOTTLE_CHILD' in os.environ and not self.quiet:
_stderr("WARNING: Auto-reloading does not work with Fapws3.\n")
_stderr(" (Fapws3 breaks python thread support)\n")
evwsgi.set_base_module(base)
def app(environ, start_response):
environ['wsgi.multiprocess'] = False
return handler(environ, start_response)
evwsgi.wsgi_cb(('', app))
evwsgi.run()
class TornadoServer(ServerAdapter):
""" The super hyped asynchronous server by facebook. Untested. """
def run(self, handler): # pragma: no cover
import tornado.wsgi, tornado.httpserver, tornado.ioloop
container = tornado.wsgi.WSGIContainer(handler)
server = tornado.httpserver.HTTPServer(container)
server.listen(port=self.port,address=self.host)
tornado.ioloop.IOLoop.instance().start()
class AppEngineServer(ServerAdapter):
""" Adapter for Google App Engine. """
quiet = True
def run(self, handler):
from google.appengine.ext.webapp import util
# A main() function in the handler script enables 'App Caching'.
# Lets makes sure it is there. This _really_ improves performance.
module = sys.modules.get('__main__')
if module and not hasattr(module, 'main'):
module.main = lambda: util.run_wsgi_app(handler)
util.run_wsgi_app(handler)
class TwistedServer(ServerAdapter):
""" Untested. """
def run(self, handler):
from twisted.web import server, wsgi
from twisted.python.threadpool import ThreadPool
from twisted.internet import reactor
thread_pool = ThreadPool()
thread_pool.start()
reactor.addSystemEventTrigger('after', 'shutdown', thread_pool.stop)
factory = server.Site(wsgi.WSGIResource(reactor, thread_pool, handler))
reactor.listenTCP(self.port, factory, interface=self.host)
reactor.run()
class DieselServer(ServerAdapter):
""" Untested. """
def run(self, handler):
from diesel.protocols.wsgi import WSGIApplication
app = WSGIApplication(handler, port=self.port)
app.run()
class GeventServer(ServerAdapter):
""" Untested. Options:
* `fast` (default: False) uses libevent's http server, but has some
issues: No streaming, no pipelining, no SSL.
* See gevent.wsgi.WSGIServer() documentation for more options.
"""
def run(self, handler):
from gevent import pywsgi, local
if not isinstance(threading.local(), local.local):
msg = "Bottle requires gevent.monkey.patch_all() (before import)"
raise RuntimeError(msg)
if self.options.pop('fast', None):
depr('The "fast" option has been deprecated and removed by Gevent.')
if self.quiet:
self.options['log'] = None
address = (self.host, self.port)
server = pywsgi.WSGIServer(address, handler, **self.options)
if 'BOTTLE_CHILD' in os.environ:
import signal
signal.signal(signal.SIGINT, lambda s, f: server.stop())
server.serve_forever()
class GeventSocketIOServer(ServerAdapter):
def run(self,handler):
from socketio import server
address = (self.host, self.port)
server.SocketIOServer(address, handler, **self.options).serve_forever()
class GunicornServer(ServerAdapter):
""" Untested. See http://gunicorn.org/configure.html for options. """
def run(self, handler):
from gunicorn.app.base import Application
config = {'bind': "%s:%d" % (self.host, int(self.port))}
config.update(self.options)
class GunicornApplication(Application):
def init(self, parser, opts, args):
return config
def load(self):
return handler
GunicornApplication().run()
class EventletServer(ServerAdapter):
""" Untested """
def run(self, handler):
from eventlet import wsgi, listen
try:
wsgi.server(listen((self.host, self.port)), handler,
log_output=(not self.quiet))
except TypeError:
# Fallback, if we have old version of eventlet
wsgi.server(listen((self.host, self.port)), handler)
class RocketServer(ServerAdapter):
""" Untested. """
def run(self, handler):
from rocket import Rocket
server = Rocket((self.host, self.port), 'wsgi', { 'wsgi_app' : handler })
server.start()
class BjoernServer(ServerAdapter):
""" Fast server written in C: https://github.com/jonashaag/bjoern """
def run(self, handler):
from bjoern import run
run(handler, self.host, self.port)
class AutoServer(ServerAdapter):
""" Untested. """
adapters = [WaitressServer, PasteServer, TwistedServer, CherryPyServer, WSGIRefServer]
def run(self, handler):
for sa in self.adapters:
try:
return sa(self.host, self.port, **self.options).run(handler)
except ImportError:
pass
server_names = {
'cgi': CGIServer,
'flup': FlupFCGIServer,
'wsgiref': WSGIRefServer,
'waitress': WaitressServer,
'cherrypy': CherryPyServer,
'paste': PasteServer,
'fapws3': FapwsServer,
'tornado': TornadoServer,
'gae': AppEngineServer,
'twisted': TwistedServer,
'diesel': DieselServer,
'meinheld': MeinheldServer,
'gunicorn': GunicornServer,
'eventlet': EventletServer,
'gevent': GeventServer,
'geventSocketIO':GeventSocketIOServer,
'rocket': RocketServer,
'bjoern' : BjoernServer,
'auto': AutoServer,
}
###############################################################################
# Application Control ##########################################################
###############################################################################
[docs]def load(target, **namespace):
""" 모듈을 임포트하거나 모듈에서 객체 가져오기.
* ``package.module``: `module`\을 모듈 객체로 반환.
* ``pack.mod:name``: `pack.mod`\의 모듈 변수 `name` 반환.
* ``pack.mod:func()``:`pack.mod.func()` 호출하고 결과 반환.
마지막 형식은 함수 호출뿐 아니라 임의 타입의 식도 받는다.
이 함수에 전달한 키워드 인자들을 지역 변수로 사용할 수 있게 된다.
예: ``import_string('re:compile(x)', x='[a-z]')``
"""
module, target = target.split(":", 1) if ':' in target else (target, None)
if module not in sys.modules: __import__(module)
if not target: return sys.modules[module]
if target.isalnum(): return getattr(sys.modules[module], target)
package_name = module.split('.')[0]
namespace[package_name] = sys.modules[package_name]
return eval('%s.%s' % (module, target), namespace)
[docs]def load_app(target):
""" 모듈에서 보틀 응용을 적재하면서 임포트가 현재 기본 응용에 영향을
주지 않도록 하고, 응용 객체를 따로 반환한다. target 매개변수에
대해선 :func:`load` 참고. """
global NORUN; NORUN, nr_old = True, NORUN
try:
tmp = default_app.push() # Create a new "default application"
rv = load(target) # Import the target module
return rv if callable(rv) else tmp
finally:
default_app.remove(tmp) # Remove the temporary added default application
NORUN = nr_old
_debug = debug
[docs]def run(app=None, server='wsgiref', host='127.0.0.1', port=8080,
interval=1, reloader=False, quiet=False, plugins=None,
debug=None, **kargs):
""" 서버 인스턴스 시작. 서버가 종료할 때까지 이 메소드는 블록된다.
:param app: WSGI 응용 또는 :func:`load_app`\에서 지원하는
대상 문자열. (기본값: :func:`default_app`)
:param server: 사용할 서버 어댑터. 유효한 이름은
:data:`server_names`\의 키 참고. :class:`ServerAdapter`
서브클래스 전달해도 됨. (기본값: `wsgiref`)
:param host: 바인드할 서버 주소. 외부 인터페이스 포함 모든
인터페이스에 리슨하려면 ``0.0.0.0``. (기본값: 127.0.0.1)
:param port: 바인드할 서버 포트. 1024 아래 값에는 루트 권한
필요. (기본값: 8080)
:param reloader: 자동 재적재 모드로 서버 시작? (기본값: False)
:param interval: 자동 재적재 확인 간격. 초 단위. (기본값: 1)
:param quiet: stdout 및 stderr 출력 끄기? (기본값: False)
:param options: 서버 어댑터로 전달할 옵션들.
"""
if NORUN: return
if reloader and not os.environ.get('BOTTLE_CHILD'):
try:
lockfile = None
fd, lockfile = tempfile.mkstemp(prefix='bottle.', suffix='.lock')
os.close(fd) # We only need this file to exist. We never write to it
while os.path.exists(lockfile):
args = [sys.executable] + sys.argv
environ = os.environ.copy()
environ['BOTTLE_CHILD'] = 'true'
environ['BOTTLE_LOCKFILE'] = lockfile
p = subprocess.Popen(args, env=environ)
while p.poll() is None: # Busy wait...
os.utime(lockfile, None) # I am alive!
time.sleep(interval)
if p.poll() != 3:
if os.path.exists(lockfile): os.unlink(lockfile)
sys.exit(p.poll())
except KeyboardInterrupt:
pass
finally:
if os.path.exists(lockfile):
os.unlink(lockfile)
return
try:
if debug is not None: _debug(debug)
app = app or default_app()
if isinstance(app, basestring):
app = load_app(app)
if not callable(app):
raise ValueError("Application is not callable: %r" % app)
for plugin in plugins or []:
app.install(plugin)
if server in server_names:
server = server_names.get(server)
if isinstance(server, basestring):
server = load(server)
if isinstance(server, type):
server = server(host=host, port=port, **kargs)
if not isinstance(server, ServerAdapter):
raise ValueError("Unknown or unsupported server: %r" % server)
server.quiet = server.quiet or quiet
if not server.quiet:
_stderr("Bottle v%s server starting up (using %s)...\n" % (__version__, repr(server)))
_stderr("Listening on http://%s:%d/\n" % (server.host, server.port))
_stderr("Hit Ctrl-C to quit.\n\n")
if reloader:
lockfile = os.environ.get('BOTTLE_LOCKFILE')
bgcheck = FileCheckerThread(lockfile, interval)
with bgcheck:
server.run(app)
if bgcheck.status == 'reload':
sys.exit(3)
else:
server.run(app)
except KeyboardInterrupt:
pass
except (SystemExit, MemoryError):
raise
except:
if not reloader: raise
if not getattr(server, 'quiet', quiet):
print_exc()
time.sleep(interval)
sys.exit(3)
class FileCheckerThread(threading.Thread):
''' Interrupt main-thread as soon as a changed module file is detected,
the lockfile gets deleted or gets to old. '''
def __init__(self, lockfile, interval):
threading.Thread.__init__(self)
self.lockfile, self.interval = lockfile, interval
#: Is one of 'reload', 'error' or 'exit'
self.status = None
def run(self):
exists = os.path.exists
mtime = lambda path: os.stat(path).st_mtime
files = dict()
for module in list(sys.modules.values()):
path = getattr(module, '__file__', '') or ''
if path[-4:] in ('.pyo', '.pyc'): path = path[:-1]
if path and exists(path): files[path] = mtime(path)
while not self.status:
if not exists(self.lockfile)\
or mtime(self.lockfile) < time.time() - self.interval - 5:
self.status = 'error'
thread.interrupt_main()
for path, lmtime in list(files.items()):
if not exists(path) or mtime(path) > lmtime:
self.status = 'reload'
thread.interrupt_main()
break
time.sleep(self.interval)
def __enter__(self):
self.start()
def __exit__(self, exc_type, exc_val, exc_tb):
if not self.status: self.status = 'exit' # silent exit
self.join()
return exc_type is not None and issubclass(exc_type, KeyboardInterrupt)
###############################################################################
# Template Adapters ############################################################
###############################################################################
class TemplateError(HTTPError):
def __init__(self, message):
HTTPError.__init__(self, 500, message)
[docs]class BaseTemplate(object):
""" 템플릿 어댑터 기반 클래스이자 최소 API """
extensions = ['tpl','html','thtml','stpl']
settings = {} #used in prepare()
defaults = {} #used in render()
[docs] def __init__(self, source=None, name=None, lookup=[], encoding='utf8', **settings):
""" 새 템플릿 만들기.
source 매개변수(str 또는 buffer)가 없으면 name 인자를 이용해 템플릿
파일 이름을 추측한다. 서브클래스에선 self.source 및/또는 self.filename이
설정돼 있다고 상정할 수 있다. 둘 모두 문자열이다.
lookup, encoding, settings 매개변수는 인스턴스 변수로 저장된다.
lookup 매개변수는 디렉터리 경로들을 담은 리스트다.
encoding 매개변수는 바이트열이나 파일을 디코딩하는 데 쓰게 된다.
settings 매개변수는 엔진별 설정들의 딕셔너리다.
"""
self.name = name
self.source = source.read() if hasattr(source, 'read') else source
self.filename = source.filename if hasattr(source, 'filename') else None
self.lookup = [os.path.abspath(x) for x in lookup]
self.encoding = encoding
self.settings = self.settings.copy() # Copy from class variable
self.settings.update(settings) # Apply
if not self.source and self.name:
self.filename = self.search(self.name, self.lookup)
if not self.filename:
raise TemplateError('Template %s not found.' % repr(name))
if not self.source and not self.filename:
raise TemplateError('No template specified.')
self.prepare(**self.settings)
[docs] @classmethod
def search(cls, name, lookup=[]):
""" lookup에 지정된 모든 디렉터리에서 이름을 탐색.
일단 확장자 없이, 다음엔 많이 쓰는 확장자들로. 먼저 걸린 것 반환. """
if not lookup:
depr('The template lookup path list should not be empty.') #0.12
lookup = ['.']
if os.path.isabs(name) and os.path.isfile(name):
depr('Absolute template path names are deprecated.') #0.12
return os.path.abspath(name)
for spath in lookup:
spath = os.path.abspath(spath) + os.sep
fname = os.path.abspath(os.path.join(spath, name))
if not fname.startswith(spath): continue
if os.path.isfile(fname): return fname
for ext in cls.extensions:
if os.path.isfile('%s.%s' % (fname, ext)):
return '%s.%s' % (fname, ext)
[docs] @classmethod
def global_config(cls, key, *args):
''' class.settings에 저장된 전역 설정을 읽거나 설정한다. '''
if args:
cls.settings = cls.settings.copy() # Make settings local to class
cls.settings[key] = args[0]
else:
return cls.settings[key]
[docs] def prepare(self, **options):
""" 준비 동작(파싱, 캐싱, ...) 수행.
템플릿을 갱신하거나 설정을 바꾸기 위해 다시 호출하는 게
가능해야 한다.
"""
raise NotImplementedError
[docs] def render(self, *args, **kwargs):
""" 지정한 지역 변수들로 템플릿을 렌더링해서 바이트열 또는
유니코드열 하나를 반환한다. 바이트열인 경우 인코딩이
self.encoding과 일치해야 한다. 이 메소드는 스레드에 안전해야
한다. 지역 변수들이 딕셔너리로(args) 제공될 수도 있고 바로
키워드로(kwargs) 제공될 수도 있다.
"""
raise NotImplementedError
class MakoTemplate(BaseTemplate):
def prepare(self, **options):
from mako.template import Template
from mako.lookup import TemplateLookup
options.update({'input_encoding':self.encoding})
options.setdefault('format_exceptions', bool(DEBUG))
lookup = TemplateLookup(directories=self.lookup, **options)
if self.source:
self.tpl = Template(self.source, lookup=lookup, **options)
else:
self.tpl = Template(uri=self.name, filename=self.filename, lookup=lookup, **options)
def render(self, *args, **kwargs):
for dictarg in args: kwargs.update(dictarg)
_defaults = self.defaults.copy()
_defaults.update(kwargs)
return self.tpl.render(**_defaults)
class CheetahTemplate(BaseTemplate):
def prepare(self, **options):
from Cheetah.Template import Template
self.context = threading.local()
self.context.vars = {}
options['searchList'] = [self.context.vars]
if self.source:
self.tpl = Template(source=self.source, **options)
else:
self.tpl = Template(file=self.filename, **options)
def render(self, *args, **kwargs):
for dictarg in args: kwargs.update(dictarg)
self.context.vars.update(self.defaults)
self.context.vars.update(kwargs)
out = str(self.tpl)
self.context.vars.clear()
return out
class Jinja2Template(BaseTemplate):
def prepare(self, filters=None, tests=None, globals={}, **kwargs):
from jinja2 import Environment, FunctionLoader
if 'prefix' in kwargs: # TODO: to be removed after a while
raise RuntimeError('The keyword argument `prefix` has been removed. '
'Use the full jinja2 environment name line_statement_prefix instead.')
self.env = Environment(loader=FunctionLoader(self.loader), **kwargs)
if filters: self.env.filters.update(filters)
if tests: self.env.tests.update(tests)
if globals: self.env.globals.update(globals)
if self.source:
self.tpl = self.env.from_string(self.source)
else:
self.tpl = self.env.get_template(self.filename)
def render(self, *args, **kwargs):
for dictarg in args: kwargs.update(dictarg)
_defaults = self.defaults.copy()
_defaults.update(kwargs)
return self.tpl.render(**_defaults)
def loader(self, name):
fname = self.search(name, self.lookup)
if not fname: return
with open(fname, "rb") as f:
return f.read().decode(self.encoding)
[docs]class SimpleTemplate(BaseTemplate):
[docs] def prepare(self, escape_func=html_escape, noescape=False, syntax=None, **ka):
self.cache = {}
enc = self.encoding
self._str = lambda x: touni(x, enc)
self._escape = lambda x: escape_func(touni(x, enc))
self.syntax = syntax
if noescape:
self._str, self._escape = self._escape, self._str
@cached_property
def co(self):
return compile(self.code, self.filename or '<string>', 'exec')
@cached_property
def code(self):
source = self.source
if not source:
with open(self.filename, 'rb') as f:
source = f.read()
try:
source, encoding = touni(source), 'utf8'
except UnicodeError:
depr('Template encodings other than utf8 are no longer supported.') #0.11
source, encoding = touni(source, 'latin1'), 'latin1'
parser = StplParser(source, encoding=encoding, syntax=self.syntax)
code = parser.translate()
self.encoding = parser.encoding
return code
def _rebase(self, _env, _name=None, **kwargs):
if _name is None:
depr('Rebase function called without arguments.'
' You were probably looking for {{base}}?', True) #0.12
_env['_rebase'] = (_name, kwargs)
def _include(self, _env, _name=None, **kwargs):
if _name is None:
depr('Rebase function called without arguments.'
' You were probably looking for {{base}}?', True) #0.12
env = _env.copy()
env.update(kwargs)
if _name not in self.cache:
self.cache[_name] = self.__class__(name=_name, lookup=self.lookup)
return self.cache[_name].execute(env['_stdout'], env)
def execute(self, _stdout, kwargs):
env = self.defaults.copy()
env.update(kwargs)
env.update({'_stdout': _stdout, '_printlist': _stdout.extend,
'include': functools.partial(self._include, env),
'rebase': functools.partial(self._rebase, env), '_rebase': None,
'_str': self._str, '_escape': self._escape, 'get': env.get,
'setdefault': env.setdefault, 'defined': env.__contains__ })
eval(self.co, env)
if env.get('_rebase'):
subtpl, rargs = env.pop('_rebase')
rargs['base'] = ''.join(_stdout) #copy stdout
del _stdout[:] # clear stdout
return self._include(env, subtpl, **rargs)
return env
[docs] def render(self, *args, **kwargs):
""" 키워드 인자들을 지역 변수로 해서 템플릿을 렌더링한다. """
env = {}; stdout = []
for dictarg in args: env.update(dictarg)
env.update(kwargs)
self.execute(stdout, env)
return ''.join(stdout)
class StplSyntaxError(TemplateError): pass
class StplParser(object):
''' Parser for stpl templates. '''
_re_cache = {} #: Cache for compiled re patterns
# This huge pile of voodoo magic splits python code into 8 different tokens.
# 1: All kinds of python strings (trust me, it works)
_re_tok = '([urbURB]?(?:\'\'(?!\')|""(?!")|\'{6}|"{6}' \
'|\'(?:[^\\\\\']|\\\\.)+?\'|"(?:[^\\\\"]|\\\\.)+?"' \
'|\'{3}(?:[^\\\\]|\\\\.|\\n)+?\'{3}' \
'|"{3}(?:[^\\\\]|\\\\.|\\n)+?"{3}))'
_re_inl = _re_tok.replace('|\\n','') # We re-use this string pattern later
# 2: Comments (until end of line, but not the newline itself)
_re_tok += '|(#.*)'
# 3,4: Open and close grouping tokens
_re_tok += '|([\\[\\{\\(])'
_re_tok += '|([\\]\\}\\)])'
# 5,6: Keywords that start or continue a python block (only start of line)
_re_tok += '|^([ \\t]*(?:if|for|while|with|try|def|class)\\b)' \
'|^([ \\t]*(?:elif|else|except|finally)\\b)'
# 7: Our special 'end' keyword (but only if it stands alone)
_re_tok += '|((?:^|;)[ \\t]*end[ \\t]*(?=(?:%(block_close)s[ \\t]*)?\\r?$|;|#))'
# 8: A customizable end-of-code-block template token (only end of line)
_re_tok += '|(%(block_close)s[ \\t]*(?=\\r?$))'
# 9: And finally, a single newline. The 10th token is 'everything else'
_re_tok += '|(\\r?\\n)'
# Match the start tokens of code areas in a template
_re_split = '(?m)^[ \t]*(\\\\?)((%(line_start)s)|(%(block_start)s))(%%?)'
# Match inline statements (may contain python strings)
_re_inl = '(?m)%%(inline_start)s((?:%s|[^\'"\n]*?)+)%%(inline_end)s' % _re_inl
_re_tok = '(?m)' + _re_tok
default_syntax = '<% %> % {{ }}'
def __init__(self, source, syntax=None, encoding='utf8'):
self.source, self.encoding = touni(source, encoding), encoding
self.set_syntax(syntax or self.default_syntax)
self.code_buffer, self.text_buffer = [], []
self.lineno, self.offset = 1, 0
self.indent, self.indent_mod = 0, 0
self.paren_depth = 0
def get_syntax(self):
''' Tokens as a space separated string (default: <% %> % {{ }}) '''
return self._syntax
def set_syntax(self, syntax):
self._syntax = syntax
self._tokens = syntax.split()
if not syntax in self._re_cache:
names = 'block_start block_close line_start inline_start inline_end'
etokens = map(re.escape, self._tokens)
pattern_vars = dict(zip(names.split(), etokens))
patterns = (self._re_split, self._re_tok, self._re_inl)
patterns = [re.compile(p%pattern_vars) for p in patterns]
self._re_cache[syntax] = patterns
self.re_split, self.re_tok, self.re_inl = self._re_cache[syntax]
syntax = property(get_syntax, set_syntax)
def translate(self):
if self.offset: raise RuntimeError('Parser is a one time instance.')
while True:
m = self.re_split.search(self.source[self.offset:])
if m:
text = self.source[self.offset:self.offset+m.start()]
self.text_buffer.append(text)
self.offset += m.end()
if m.group(1): # New escape syntax
line, sep, _ = self.source[self.offset:].partition('\n')
self.text_buffer.append(m.group(2)+m.group(5)+line+sep)
self.offset += len(line+sep)+1
continue
elif m.group(5): # Old escape syntax
depr('Escape code lines with a backslash.') #0.12
line, sep, _ = self.source[self.offset:].partition('\n')
self.text_buffer.append(m.group(2)+line+sep)
self.offset += len(line+sep)+1
continue
self.flush_text()
self.read_code(multiline=bool(m.group(4)))
else: break
self.text_buffer.append(self.source[self.offset:])
self.flush_text()
return ''.join(self.code_buffer)
def read_code(self, multiline):
code_line, comment = '', ''
while True:
m = self.re_tok.search(self.source[self.offset:])
if not m:
code_line += self.source[self.offset:]
self.offset = len(self.source)
self.write_code(code_line.strip(), comment)
return
code_line += self.source[self.offset:self.offset+m.start()]
self.offset += m.end()
_str, _com, _po, _pc, _blk1, _blk2, _end, _cend, _nl = m.groups()
if (code_line or self.paren_depth > 0) and (_blk1 or _blk2): # a if b else c
code_line += _blk1 or _blk2
continue
if _str: # Python string
code_line += _str
elif _com: # Python comment (up to EOL)
comment = _com
if multiline and _com.strip().endswith(self._tokens[1]):
multiline = False # Allow end-of-block in comments
elif _po: # open parenthesis
self.paren_depth += 1
code_line += _po
elif _pc: # close parenthesis
if self.paren_depth > 0:
# we could check for matching parentheses here, but it's
# easier to leave that to python - just check counts
self.paren_depth -= 1
code_line += _pc
elif _blk1: # Start-block keyword (if/for/while/def/try/...)
code_line, self.indent_mod = _blk1, -1
self.indent += 1
elif _blk2: # Continue-block keyword (else/elif/except/...)
code_line, self.indent_mod = _blk2, -1
elif _end: # The non-standard 'end'-keyword (ends a block)
self.indent -= 1
elif _cend: # The end-code-block template token (usually '%>')
if multiline: multiline = False
else: code_line += _cend
else: # \n
self.write_code(code_line.strip(), comment)
self.lineno += 1
code_line, comment, self.indent_mod = '', '', 0
if not multiline:
break
def flush_text(self):
text = ''.join(self.text_buffer)
del self.text_buffer[:]
if not text: return
parts, pos, nl = [], 0, '\\\n'+' '*self.indent
for m in self.re_inl.finditer(text):
prefix, pos = text[pos:m.start()], m.end()
if prefix:
parts.append(nl.join(map(repr, prefix.splitlines(True))))
if prefix.endswith('\n'): parts[-1] += nl
parts.append(self.process_inline(m.group(1).strip()))
if pos < len(text):
prefix = text[pos:]
lines = prefix.splitlines(True)
if lines[-1].endswith('\\\\\n'): lines[-1] = lines[-1][:-3]
elif lines[-1].endswith('\\\\\r\n'): lines[-1] = lines[-1][:-4]
parts.append(nl.join(map(repr, lines)))
code = '_printlist((%s,))' % ', '.join(parts)
self.lineno += code.count('\n')+1
self.write_code(code)
def process_inline(self, chunk):
if chunk[0] == '!': return '_str(%s)' % chunk[1:]
return '_escape(%s)' % chunk
def write_code(self, line, comment=''):
line, comment = self.fix_backward_compatibility(line, comment)
code = ' ' * (self.indent+self.indent_mod)
code += line.lstrip() + comment + '\n'
self.code_buffer.append(code)
def fix_backward_compatibility(self, line, comment):
parts = line.strip().split(None, 2)
if parts and parts[0] in ('include', 'rebase'):
depr('The include and rebase keywords are functions now.') #0.12
if len(parts) == 1: return "_printlist([base])", comment
elif len(parts) == 2: return "_=%s(%r)" % tuple(parts), comment
else: return "_=%s(%r, %s)" % tuple(parts), comment
if self.lineno <= 2 and not line.strip() and 'coding' in comment:
m = re.match(r"#.*coding[:=]\s*([-\w.]+)", comment)
if m:
depr('PEP263 encoding strings in templates are deprecated.') #0.12
enc = m.group(1)
self.source = self.source.encode(self.encoding).decode(enc)
self.encoding = enc
return line, comment.replace('coding','coding*')
return line, comment
[docs]def template(*args, **kwargs):
'''
렌더링된 템플릿을 문자열 이터레이터로 얻기.
첫 번째 매개변수로 이름이나 파일 이름, 템플릿 문자열을 쓸 수 있다.
템플릿 렌더링 인자를 딕셔너리로 또는 (키워드 인자로) 직접
줄 수 있다.
'''
tpl = args[0] if args else None
adapter = kwargs.pop('template_adapter', SimpleTemplate)
lookup = kwargs.pop('template_lookup', TEMPLATE_PATH)
tplid = (id(lookup), tpl)
if tplid not in TEMPLATES or DEBUG:
settings = kwargs.pop('template_settings', {})
if isinstance(tpl, adapter):
TEMPLATES[tplid] = tpl
if settings: TEMPLATES[tplid].prepare(**settings)
elif "\n" in tpl or "{" in tpl or "%" in tpl or '$' in tpl:
TEMPLATES[tplid] = adapter(source=tpl, lookup=lookup, **settings)
else:
TEMPLATES[tplid] = adapter(name=tpl, lookup=lookup, **settings)
if not TEMPLATES[tplid]:
abort(500, 'Template (%s) not found' % tpl)
for dictarg in args[1:]: kwargs.update(dictarg)
return TEMPLATES[tplid].render(kwargs)
mako_template = functools.partial(template, template_adapter=MakoTemplate)
cheetah_template = functools.partial(template, template_adapter=CheetahTemplate)
jinja2_template = functools.partial(template, template_adapter=Jinja2Template)
[docs]def view(tpl_name, **defaults):
''' 데코레이터: 핸들러 대신 템플릿을 렌더링.
핸들러에서 다음처럼 동작을 제어할 수 있다.
- 템플릿 변수들의 dict를 반환하면 그걸로 템플릿을 채운다.
- dict 아닌 뭔가를 반환하면 view 데코레이터가 템플릿을 처리하지
않고 핸들러 결과를 그대로 반환한다. 예를 들어
HTTPResponse(dict)를 반환해서 autojson이나 기타 castfilter로
JSON을 얻을 수 있다.
'''
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
result = func(*args, **kwargs)
if isinstance(result, (dict, DictMixin)):
tplvars = defaults.copy()
tplvars.update(result)
return template(tpl_name, **tplvars)
elif result is None:
return template(tpl_name, defaults)
return result
return wrapper
return decorator
mako_view = functools.partial(view, template_adapter=MakoTemplate)
cheetah_view = functools.partial(view, template_adapter=CheetahTemplate)
jinja2_view = functools.partial(view, template_adapter=Jinja2Template)
###############################################################################
# Constants and Globals ########################################################
###############################################################################
TEMPLATE_PATH = ['./', './views/']
TEMPLATES = {}
DEBUG = False
NORUN = False # If set, run() does nothing. Used by load_app()
#: HTTP 상태 코드(예: 404)를 문구(예: 'Not Found')로 매핑하는 dict
HTTP_CODES = httplib.responses
HTTP_CODES[418] = "I'm a teapot" # RFC 2324
HTTP_CODES[422] = "Unprocessable Entity" # RFC 4918
HTTP_CODES[428] = "Precondition Required"
HTTP_CODES[429] = "Too Many Requests"
HTTP_CODES[431] = "Request Header Fields Too Large"
HTTP_CODES[511] = "Network Authentication Required"
_HTTP_STATUS_LINES = dict((k, '%d %s'%(k,v)) for (k,v) in HTTP_CODES.items())
#: The default template used for error pages. Override with @error()
ERROR_PAGE_TEMPLATE = """
%%try:
%%from %s import DEBUG, HTTP_CODES, request, touni
<!DOCTYPE HTML PUBLIC "-//IETF//DTD HTML 2.0//EN">
<html>
<head>
<title>Error: {{e.status}}</title>
<style type="text/css">
html {background-color: #eee; font-family: sans;}
body {background-color: #fff; border: 1px solid #ddd;
padding: 15px; margin: 15px;}
pre {background-color: #eee; border: 1px solid #ddd; padding: 5px;}
</style>
</head>
<body>
<h1>Error: {{e.status}}</h1>
<p>Sorry, the requested URL <tt>{{repr(request.url)}}</tt>
caused an error:</p>
<pre>{{e.body}}</pre>
%%if DEBUG and e.exception:
<h2>Exception:</h2>
<pre>{{repr(e.exception)}}</pre>
%%end
%%if DEBUG and e.traceback:
<h2>Traceback:</h2>
<pre>{{e.traceback}}</pre>
%%end
</body>
</html>
%%except ImportError:
<b>ImportError:</b> Could not generate the error page. Please add bottle to
the import path.
%%end
""" % __name__
#: 스레드에 안전한 :class:`LocalRequest` 인스턴스. 요청 콜백 내에서
#: 접근 시 이 인스턴스는 (다중 스레드 서버에서도) 항상 *현재* 요청을
#: 가리킨다.
request = LocalRequest()
#: 스레드에 안전한 :class:`LocalResponse` 인스턴스. *현재* 요청에 대한
#: HTTP 응답을 바꾸는 데 쓴다.
response = LocalResponse()
#: A thread-safe namespace. Not used by Bottle.
local = threading.local()
# Initialize app stack (create first empty Bottle app)
# BC: 0.6.4 and needed for run()
app = default_app = AppStack()
app.push()
#: A virtual package that redirects import statements.
#: Example: ``import bottle.ext.sqlite`` actually imports `bottle_sqlite`.
ext = _ImportRedirect('bottle.ext' if __name__ == '__main__' else __name__+".ext", 'bottle_%s').module
if __name__ == '__main__':
opt, args, parser = _cmd_options, _cmd_args, _cmd_parser
if opt.version:
_stdout('Bottle %s\n'%__version__)
sys.exit(0)
if not args:
parser.print_help()
_stderr('\nError: No application specified.\n')
sys.exit(1)
sys.path.insert(0, '.')
sys.modules.setdefault('bottle', sys.modules['__main__'])
host, port = (opt.bind or 'localhost'), 8080
if ':' in host and host.rfind(']') < host.rfind(':'):
host, port = host.rsplit(':', 1)
host = host.strip('[]')
run(args[0], host=host, port=int(port), server=opt.server,
reloader=opt.reload, plugins=opt.plugin, debug=opt.debug)
# THE END