Simple Python WebDAV Server
A generic stand-alone tiny WebDAV server written in Python 2. Runs on Windows, macOS, Linux, iOS (Pythonista), RaspBerry Pi and more.
Config
Open the webdav.py file and edit these variables:
##########################
## SERVER CONFIGURATION ##
##########################
# ADDRESS FOR FQDN
# import socket
# domain = socket.getfqdn(socket.gethostname())
# address = socket.gethostbyname(domain)
# MANUAL ADDRESS
address = '127.0.0.1'
# SERVER PORT
port = 8000
# ROOT & VIRTUAL DIRECTORY
# First path is the root the second
# is the virtual directory.
#
root = DirCollection('./', '/')
####### END CONFIG #######
Users
The script will look for a file called webdav.conf in the same directory where the script is being running from. Use this file to set users, using the format:
user1:pass
user2:duck
john:energy
If the file is absent, guest login will be used.
Example
MTeam7 [~] $ webdav
Generic Python 2 WebDav Server
Serving on http://127.0.0.1:8000
Accepting users from webdav.conf
Download
Copy the script below or download it here: webdav.py.zip
webdav.py
#!/usr/bin/python
# -*- coding: utf-8 -*-
#
# WEBDAV PYTHON SERVER
#
from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
from SocketServer import ThreadingMixIn
from StringIO import StringIO
from time import time, timezone, strftime, localtime, gmtime
import sys
import urllib
import re
import urlparse
import os
import shutil
import uuid
import md5
import mimetypes
import base64
class Member:
M_MEMBER = 1
M_COLLECTION = 2
def getProperties(self):
return {}
class Collection(Member):
def __init__(self, name):
self.name = name
def getMembers(self):
return []
class FileMember(Member):
def __init__(self, name, parent):
self.name = name
self.parent = parent
self.name = name
self.fsname = parent.fsname + name
self.virname = parent.virname + name
self.type = Member.M_MEMBER
def __str__(self):
return '%s -> %s' % (self.virname, self.fsname)
def getProperties(self):
"""Return dictionary with WebDAV properties. Values shold be
formatted according to the WebDAV specs."""
st = os.stat(self.fsname)
p = {}
p['creationdate'] = unixdate2iso8601(st.st_ctime)
p['getlastmodified'] = unixdate2httpdate(st.st_mtime)
p['displayname'] = self.name
p['getetag'] = md5.new(self.fsname).hexdigest()
if self.type == Member.M_MEMBER:
p['getcontentlength'] = st.st_size
(p['getcontenttype'], z) = mimetypes.guess_type(self.name)
p['getcontentlanguage'] = None
else:
p['resourcetype'] = '<D:collection/>'
if self.name[0] == '.':
p['ishidden'] = 1
if not os.access(self.fsname, os.W_OK):
p['isreadonly'] = 1
if self.name == '/':
p['isroot'] = 1
return p
def sendData(
self,
wfile,
bpoint=0,
epoint=0,
):
"""Send the file to the client. Literally."""
st = os.stat(self.fsname)
f = file(self.fsname, 'rb')
writ = 0
if bpoint > 0 and bpoint < st.st_size:
f.seek(bpoint)
if epoint > bpoint:
if epoint <= st.st_size:
rsize = epoint - bpoint + 1
else:
rsize = st.st_size - bpoint
else:
rsize = st.st_size
while writ < rsize:
if rsize - writ < 65536:
buf = f.read(rsize)
else:
buf = f.read(65536)
if not buf:
break
writ += len(buf)
wfile.write(buf)
f.close()
class DirCollection(FileMember, Collection):
COLLECTION_MIME_TYPE = 'httpd/unix-directory'
def __init__(
self,
fsdir,
virdir,
parent=None,
):
if not os.path.exists(fsdir):
raise 'Local directory (fsdir) not found: ' + fsdir
self.fsname = fsdir
self.name = virdir
if self.fsname[-1] != os.sep:
if self.fsname[-1] == '/':
self.fsname = self.fsname[:-1] + os.sep
else:
self.fsname += os.sep
self.virname = virdir
if self.virname[-1] != '/':
self.virname += '/'
self.parent = parent
self.type = Member.M_COLLECTION
def getProperties(self):
p = FileMember.getProperties(self)
p['iscollection'] = 1
p['getcontenttype'] = DirCollection.COLLECTION_MIME_TYPE
return p
def getMembers(self):
"""Get immediate members of this collection."""
l = os.listdir(self.fsname)
tcount = 0
for tmpi in l:
if os.path.isfile(self.fsname + tmpi) == False:
l[tcount] = l[tcount] + '/'
tcount += 1
r = []
for f in l:
if f[-1] != '/':
m = FileMember(f, self)
else:
m = DirCollection(self.fsname + f, self.virname + f,
self)
r.append(m)
return r
def rootdir(self):
return self.fsname
def findMember(self, name):
"""Search for a particular member."""
l = os.listdir(self.fsname)
tcount = 0
for tmpi in l:
if os.path.isfile(self.fsname + tmpi) == False:
l[tcount] = l[tcount] + '/'
tcount += 1
if name in l:
if name[-1] != '/':
return FileMember(name, self)
else:
return DirCollection(self.fsname + name, self.virname
+ name, self)
elif name[-1] != '/':
name += '/'
if name in l:
return DirCollection(self.fsname + name, self.virname
+ name, self)
def sendData(self, wfile):
"""Send "file" to the client. Since this is a directory, build some arbitrary HTML."""
memb = self.getMembers()
data = '<html><head><title>%s</title></head><body>' \
% self.virname
data += \
'<table><tr><th>Name</th><th>Size</th><th>Timestamp</th></tr>'
for m in memb:
p = m.getProperties()
if 'getcontentlength' in p:
p['size'] = int(p['getcontentlength'])
p['timestamp'] = p['getlastmodified']
else:
p['size'] = 0
p['timestamp'] = '-DIR-'
data += '<tr><td>%s</td><td>%d</td><td>%s</td></tr>' \
% (p['displayname'], p['size'], p['timestamp'])
data += '</table></body></html>'
wfile.write(data)
def recvMember(
self,
rfile,
name,
size,
req,
):
"""Receive (save) a member file"""
fname = os.path.join(self.fsname, urllib.unquote(name))
f = file(fname, 'wb')
if size == -2:
l = int(rfile.readline(), 16)
ltotal = 0
while l > 0:
buf = rfile.read(l)
f.write(buf)
rfile.readline()
ltotal += l
l = int(rfile.readline(), 16)
elif size > 0:
writ = 0
bs = 65536
while True:
if size != -1 and bs > size - writ:
bs = size - writ
buf = rfile.read(bs)
if len(buf) == 0:
break
f.write(buf)
writ += len(buf)
if size != -1 and writ >= size:
break
f.close()
def unixdate2iso8601(d):
tz = timezone / 3600
tz = '%+03d' % tz
return strftime('%Y-%m-%dT%H:%M:%S', localtime(d)) + tz + ':00'
def unixdate2httpdate(d):
return strftime('%a, %d %b %Y %H:%M:%S GMT', gmtime(d))
class Tag:
def __init__(
self,
name,
attrs,
data='',
parser=None,
):
self.d = {}
self.name = name
self.attrs = attrs
if type(self.attrs) == type(''):
self.attrs = splitattrs(self.attrs)
for a in self.attrs:
if a.startswith('xmlns'):
nsname = a[6:]
parser.namespaces[nsname] = self.attrs[a]
self.rawname = self.name
p = name.find(':')
if p > 0:
nsname = name[0:p]
if nsname in parser.namespaces:
self.ns = parser.namespaces[nsname]
self.name = self.rawname[p + 1:]
else:
self.ns = ''
self.data = data
def __len__(self):
return len(self.d)
def __getitem__(self, key):
return self.d[key]
def __setitem__(self, key, value):
self.d[key] = value
def __delitem__(self, key):
del self.d[key]
def __iter__(self):
return self.d.iterkeys()
def __contains__(self, key):
return key in self.d
def __str__(self):
"""Returns unicode semi human-readable representation of the structure"""
if self.attrs:
s = u'<%s %s> %s ' % (self.name, self.attrs, self.data)
else:
s = u'<%s> %s ' % (self.name, self.data)
for k in self.d:
if type(self.d[k]) == type(self):
s += u'|%s: %s|' % (k, str(self.d[k]))
else:
s += u'|' + u','.join([str(x) for x in self.d[k]]) \
+ u'|'
return s
def addChild(self, tag):
"""Adds a child to self. tag must be instance of Tag"""
if tag.name in self.d:
if type(self.d[tag.name]) == type(self):
self.d[tag.name] = [self.d[tag.name]]
self.d[tag.name].append(tag)
else:
self.d[tag.name] = tag
return tag
class XMLDict_Parser:
def __init__(self, xml):
self.xml = xml
self.p = 0
self.encoding = sys.getdefaultencoding()
self.namespaces = {}
def getnexttag(self):
ptag = self.xml.find('<', self.p)
if ptag < 0:
return (None, None, self.xml[self.p:].strip())
data = self.xml[self.p:ptag].strip()
self.p = ptag
self.tagbegin = ptag
p2 = self.xml.find('>', self.p + 1)
if p2 < 0:
raise 'Malformed XML - unclosed tag?'
tag = self.xml[ptag + 1:p2]
self.p = p2 + 1
self.tagend = p2 + 1
ps = tag.find(' ')
if ps > 0:
(tag, attrs) = tag.split(' ', 1)
else:
attrs = ''
return (tag, attrs, data)
def builddict(self):
"""Builds a nested-dictionary-like structure from the xml. This method
picks up tags on the main level and calls processTag() for nested tags."""
d = Tag('<root>', '')
while True:
(tag, attrs, data) = self.getnexttag()
if data != '':
sys.stderr.write('Warning: inline data between tags?!\n'
)
if not tag:
break
if tag[-1] == '/':
d.addChild(Tag(tag[:-1], attrs, parser=self))
continue
elif tag[0] == '?':
t = d.addChild(Tag(tag, attrs, parser=self))
if tag == '?xml' and 'encoding' in t.attrs:
self.encoding = t.attrs['encoding']
else:
try:
self.processTag(d.addChild(Tag(tag, attrs,
parser=self)))
except:
sys.stderr.write('Error processing tag %s\n' % tag)
d.encoding = self.encoding
return d
def processTag(self, dtag):
"""Process single tag's data"""
until = '/' + dtag.rawname
while True:
(tag, attrs, data) = self.getnexttag()
if data:
dtag.data += data
if tag == None:
sys.stderr.write("Unterminated tag '" + dtag.rawname
+ "'?\n")
break
if tag == until:
break
if tag[-1] == '/':
dtag.addChild(Tag(tag[:-1], attrs, parser=self))
continue
self.processTag(dtag.addChild(Tag(tag, attrs, parser=self)))
def splitattrs(att):
"""Extracts name="value" pairs from string; returns them as dictionary"""
d = {}
for m in re.findall('([a-zA-Z_][a-zA-Z_:0-9]*?)="(.+?)"', att):
d[m[0]] = m[1]
return d
def builddict(xml):
"""Wrapper function for straightforward parsing"""
p = XMLDict_Parser(xml)
return p.builddict()
class DAVRequestHandler(BaseHTTPRequestHandler):
server_version = 'Pythonista_dav'
all_props = [
'name',
'parentname',
'href',
'ishidden',
'isreadonly',
'getcontenttype',
'contentclass',
'getcontentlanguage',
'creationdate',
'lastaccessed',
'getlastmodified',
'getcontentlength',
'iscollection',
'isstructureddocument',
'defaultdocument',
'displayname',
'isroot',
'resourcetype',
]
basic_props = ['name', 'getcontenttype', 'getcontentlength',
'creationdate', 'iscollection']
auth_file = False
auth_enable = False
Auserlist = []
def WebAuth(self):
if self.server.auth_enable:
if 'Authorization' in self.headers:
try:
AuthInfo = (self.headers['Authorization'])[6:]
except:
AuthInfo = ''
if AuthInfo in self.server.userpwd:
return False
self.send_response(401, 'Authorization Required')
self.send_header('WWW-Authenticate',
'Basic realm="WebDav Auth"')
self.send_header('Content-type', 'text/html')
self.end_headers()
return True
else:
return False
def do_OPTIONS(self):
if self.WebAuth():
return
self.send_response(200, DAVRequestHandler.server_version)
self.send_header('Allow',
'GET, HEAD, POST, PUT, DELETE, OPTIONS, PROPFIND, PROPPATCH, MKCOL, LOCK, UNLOCK, MOVE, COPY'
)
self.send_header('Content-length', '0')
self.send_header('X-Server-Copyright',
DAVRequestHandler.server_version)
self.send_header('DAV', '1, 2')
self.send_header('MS-Author-Via', 'DAV')
self.end_headers()
def do_DELETE(self):
if self.WebAuth():
return
path = urllib.unquote(self.path)
if path == '':
self.send_error(404, 'Object not found')
self.send_header('Content-length', '0')
self.end_headers()
return
path = self.server.root.rootdir() + path
if os.path.isfile(path):
os.remove(path)
elif os.path.isdir(path):
shutil.rmtree(path)
else:
self.send_response(404, 'Not Found')
self.send_header('Content-length', '0')
self.end_headers()
return
self.send_response(204, 'No Content')
self.send_header('Content-length', '0')
self.end_headers()
def do_MKCOL(self):
if self.WebAuth():
return
path = urllib.unquote(self.path)
if path != '':
path = self.server.root.rootdir() + path
if os.path.isdir(path) == False:
os.mkdir(path)
self.send_response(201, 'Created')
self.send_header('Content-length', '0')
self.end_headers()
return
self.send_response(403, 'OK')
self.send_header('Content-length', '0')
self.end_headers()
def do_MOVE(self):
if self.WebAuth():
return
oldfile = self.server.root.rootdir() + urllib.unquote(self.path)
newfile = self.server.root.rootdir() \
+ urlparse.urlparse(urllib.unquote(self.headers['Destination'
])).path
if os.path.isfile(oldfile) == True and os.path.isfile(newfile) \
== False:
shutil.move(oldfile, newfile)
if os.path.isdir(oldfile) == True and os.path.isdir(newfile) \
== False:
os.rename(oldfile, newfile)
self.send_response(201, 'Created')
self.send_header('Content-length', '0')
self.end_headers()
def do_COPY(self):
if self.WebAuth():
return
oldfile = self.server.root.rootdir() + urllib.unquote(self.path)
newfile = self.server.root.rootdir() \
+ urlparse.urlparse(urllib.unquote(self.headers['Destination'
])).path
if os.path.isfile(oldfile) == True:
shutil.copyfile(oldfile, newfile)
self.send_response(201, 'Created')
self.send_header('Content-length', '0')
self.end_headers()
def do_LOCK(self):
if 'Content-length' in self.headers:
req = self.rfile.read(int(self.headers['Content-length']))
else:
req = self.rfile.read()
d = builddict(req)
clientid = str(d['lockinfo']['owner']['href'])[7:]
lockid = str(uuid.uuid1())
retstr = \
'''<?xml version="1.0" encoding="utf-8" ?>
<D:prop xmlns:D="DAV:">
<D:lockdiscovery>
<D:activelock>
<D:locktype><D:write/></D:locktype>
<D:lockscope><D:exclusive/></D:lockscope>
<D:depth>Infinity</D:depth>
<D:owner>
<D:href>''' \
+ clientid \
+ '''</D:href>
</D:owner>
<D:timeout>Infinite</D:timeout>
<D:locktoken><D:href>opaquelocktoken:''' \
+ lockid \
+ '''</D:href></D:locktoken>
</D:activelock>
</D:lockdiscovery>
</D:prop>
'''
self.send_response(201, 'Created')
self.send_header('Content-type', 'text/xml')
self.send_header('charset', '"utf-8"')
self.send_header('Lock-Token', '<opaquelocktoken:' + lockid
+ '>')
self.send_header('Content-Length', len(retstr))
self.end_headers()
self.wfile.write(retstr)
self.wfile.flush()
def do_UNLOCK(self):
self.send_response(204, 'No Content')
self.send_header('Content-length', '0')
self.end_headers()
def do_PROPFIND(self):
if self.WebAuth():
return
depth = 'infinity'
if 'Depth' in self.headers:
depth = self.headers['Depth'].lower()
if 'Content-length' in self.headers:
req = self.rfile.read(int(self.headers['Content-length']))
else:
req = self.rfile.read()
d = builddict(req)
wished_all = False
if len(d) == 0:
wished_props = DAVRequestHandler.basic_props
else:
if 'allprop' in d['propfind']:
wished_props = DAVRequestHandler.all_props
wished_all = True
else:
wished_props = []
for prop in d['propfind']['prop']:
wished_props.append(prop)
(path, elem) = self.path_elem()
if not elem:
if len(path) >= 1:
self.send_response(404, 'Not Found')
self.send_header('Content-length', '0')
self.end_headers()
return
else:
elem = self.server.root
if depth != '0' and not elem:
self.send_response(406, 'This is not allowed')
self.send_header('Content-length', '0')
self.end_headers()
return
self.send_response(207, 'Multi-Status')
self.send_header('Content-Type', 'text/xml')
self.send_header('charset', '"utf-8"')
#
# Set to True to debug XML output
#
w = BufWriter(self.wfile, False)
w.write('<?xml version="1.0" encoding="utf-8" ?>\n')
w.write('<D:multistatus xmlns:D="DAV:" xmlns:Z="urn:schemas-microsoft-com:">\n'
)
def write_props_member(w, m):
w.write('''<D:response>
<D:href>%s</D:href>
<D:propstat>
<D:prop>
'''
% urllib.quote(m.virname)) # add urllib.quote for chinese
props = m.getProperties()
#
# For macOS getlastmodified,getcontentlength and resourceType
#
if 'quota-available-bytes' in wished_props \
or 'quota-used-bytes' in wished_props or 'quota' \
in wished_props or 'quotaused' in wished_props:
sDisk = os.statvfs('/')
props['quota-used-bytes'] = (sDisk.f_blocks
- sDisk.f_bavail) * sDisk.f_frsize
props['quotaused'] = (sDisk.f_blocks - sDisk.f_bavail) \
* sDisk.f_frsize
props['quota-available-bytes'] = sDisk.f_bavail \
* sDisk.f_frsize
props['quota'] = sDisk.f_bavail * sDisk.f_frsize
for wp in wished_props:
if props.has_key(wp) == False:
w.write(' <D:%s/>\n' % wp)
else:
w.write(' <D:%s>%s</D:%s>\n' % (wp,
str(props[wp]), wp))
w.write('''</D:prop>
<D:status>HTTP/1.1 200 OK</D:status>
</D:propstat>
</D:response>
''')
write_props_member(w, elem)
if depth == '1':
for m in elem.getMembers():
write_props_member(w, m)
w.write('</D:multistatus>')
self.send_header('Content-Length', str(w.getSize()))
self.end_headers()
w.flush()
def do_GET(self, onlyhead=False):
if self.WebAuth():
return
(path, elem) = self.path_elem()
if not elem:
self.send_error(404, 'Object not found')
return
try:
props = elem.getProperties()
except:
self.send_response(500, 'Error retrieving properties')
self.end_headers()
return
bpoint = 0
epoint = 0
fullen = props['getcontentlength']
if 'Range' in self.headers:
stmp = (self.headers['Range'])[6:]
stmp = stmp.split('-')
try:
bpoint = int(stmp[0])
except:
bpoint = 0
try:
epoint = int(stmp[1])
except:
epoint = fullen - 1
if epoint <= bpoint:
bpoint = 0
epoint = fullen - 1
fullen = epoint - bpoint + 1
if epoint > 0:
self.send_response(206, 'Partial Content')
self.send_header('Content-Range', ' Bytes %s-%s/%s'
% (bpoint, epoint, fullen))
else:
self.send_response(200, 'OK')
if elem.type == Member.M_MEMBER:
self.send_header('Content-type', props['getcontenttype'])
self.send_header('Last-modified', props['getlastmodified'])
self.send_header('Content-length', fullen)
else:
try:
ctype = props['getcontenttype']
except:
ctype = DirCollection.COLLECTION_MIME_TYPE
self.send_header('Content-type', ctype)
self.end_headers()
if not onlyhead:
if fullen > 0:
elem.sendData(self.wfile, bpoint, epoint)
def do_HEAD(self):
self.do_GET(True)
def do_PUT(self):
if self.WebAuth():
return
try:
if 'Content-length' in self.headers:
size = int(self.headers['Content-length'])
elif 'Transfer-Encoding' in self.headers:
if self.headers['Transfer-Encoding'].lower() \
== 'chunked':
size = -2
else:
size = -1
(path, elem) = self.path_elem_prev()
ename = path[-1]
except:
self.send_response(400, 'Cannot parse request')
self.send_header('Content-length', '0')
self.end_headers()
return
#
# For macOS send a 0 byte file first and
# wait for a 201 responde code.
#
if ename == '.DS_Store':
self.send_response(403, 'Forbidden')
self.send_header('Content-length', '0')
self.end_headers()
else:
try:
elem.recvMember(self.rfile, ename, size, self)
except:
self.send_response(500, 'Cannot save file')
self.send_header('Content-length', '0')
self.end_headers()
return
if size == 0:
self.send_response(201, 'Created')
else:
self.send_response(200, 'OK')
self.send_header('Content-length', '0')
self.end_headers()
def split_path(self, path):
p = path.split('/')[1:]
while p and p[-1] in ('', '/'):
p = p[:-1]
if len(p) > 0:
p[-1] += '/'
return p
def path_elem(self):
path = self.split_path(urllib.unquote(self.path))
elem = self.server.root
for e in path:
elem = elem.findMember(e)
if elem == None:
break
return (path, elem)
def path_elem_prev(self):
path = self.split_path(urllib.unquote(self.path))
elem = self.server.root
for e in path[:-1]:
elem = elem.findMember(e)
if elem == None:
break
return (path, elem)
#
# Comment to enable log info to stdout
#
def log_message(self,format,*args):
pass
class BufWriter:
def __init__(self, w, debug=True):
self.w = w
self.buf = StringIO(u'')
self.debug = debug
def write(self, s):
if self.debug:
sys.stderr.write(s)
self.buf.write(unicode(s, 'utf-8'))
def flush(self):
self.w.write(self.buf.getvalue().encode('utf-8'))
self.w.flush()
def getSize(self):
return len(self.buf.getvalue().encode('utf-8'))
class DAVServer(ThreadingMixIn, HTTPServer):
def __init__(
self,
addr,
handler,
root,
userpwd,
):
HTTPServer.__init__(self, addr, handler)
self.root = root
self.userpwd = userpwd
if len(userpwd) > 0:
self.auth_enable = True
else:
self.auth_enable = False
#
# Disable Broken Pipe Error Message
#
def finish_request(self, request, client_address):
try:
HTTPServer.finish_request(self, request, client_address)
except socket.error, e:
pass
if __name__ == '__main__':
##########################
## SERVER CONFIGURATION ##
##########################
# ADDRESS FOR FQDN
# import socket
# domain = socket.getfqdn(socket.gethostname())
# address = socket.gethostbyname(domain)
# MANUAL ADDRESS
address = '127.0.0.1'
# SERVER PORT
port = 8000
# ROOT & VIRTUAL DIRECTORIES
# First path is the root the second
# is the virtual directory.
#
root = DirCollection('./', '/')
####### END CONFIG #######
print '\n Generic Python 2 WebDav Server'
print ' Serving on http://' + address + ':' + str(port)
script_path = os.path.realpath(__file__)
exists = os.path.isfile(script_path + '.conf')
if exists:
print ' Accepting users from webdav.conf'
else:
print ' Accepting guest users'
pass
server_address = ('', port)
userpwd = []
try:
f = file('webdav.conf', 'r')
for uinfo in f.readlines():
uinfo = uinfo.replace('\n', '')
if len(uinfo) > 2:
userpwd.append(base64.b64encode(uinfo))
except:
pass
try:
httpd = DAVServer(server_address, DAVRequestHandler, root, userpwd)
httpd.serve_forever()
except KeyboardInterrupt:
sys.exit(0)
{{ 'Comments (%count%)' | trans {count:count} }}
{{ 'Comments are closed.' | trans }}