implemented dlna. Testing on my own server now

This commit is contained in:
Yatharth Sood 2026-04-05 21:00:27 +05:30
parent a3ce1ecde5
commit 708ba3bf57

View file

@ -1,13 +1,18 @@
# coding: utf-8 # coding: utf-8
from __future__ import print_function, unicode_literals from __future__ import print_function, unicode_literals
import base64
import errno import errno
import os
import re import re
import select import select
import socket import socket
import stat
import time import time
import xml.etree.ElementTree as ET
from .__init__ import TYPE_CHECKING from .__init__ import TYPE_CHECKING
from .authsrv import LEELOO_DALLAS
from .multicast import MC_Sck, MCast from .multicast import MC_Sck, MCast
from .util import CachedSet, formatdate, html_escape, min_ex from .util import CachedSet, formatdate, html_escape, min_ex
@ -22,6 +27,71 @@ if True: # pylint: disable=using-constant-test
GRP = "239.255.255.250" GRP = "239.255.255.250"
# DLNA Protocol Info for media types with DLNA.ORG_PN and OP flags
DLNA_PROTOCOL_INFO = {
# photo
".jpg": "http-get:*:image/jpeg:DLNA.ORG_PN=JPEG_SM;DLNA.ORG_OP=01",
".jpeg": "http-get:*:image/jpeg:DLNA.ORG_PN=JPEG_SM;DLNA.ORG_OP=01",
".png": "http-get:*:image/png:DLNA.ORG_PN=PNG_LRG;DLNA.ORG_OP=01",
# audio
".mp3": "http-get:*:audio/mpeg:DLNA.ORG_PN=MP3;DLNA.ORG_OP=01",
".flac": "http-get:*:audio/flac:*",
".wav": "http-get:*:audio/wav:*",
".ogg": "http-get:*:audio/ogg:*",
".aac": "http-get:*:audio/mp4:DLNA.ORG_PN=AAC_ISO_320;DLNA.ORG_OP=01",
".m4a": "http-get:*:audio/mp4:DLNA.ORG_PN=AAC_ISO_320;DLNA.ORG_OP=01",
# video
".mp4": "http-get:*:video/mp4:DLNA.ORG_PN=AVC_MP4_MP_SD;DLNA.ORG_OP=01;DLNA.ORG_CI=0;DLNA.ORG_FLAGS=01700000000000000000000000000000",
".m4v": "http-get:*:video/mp4:DLNA.ORG_PN=AVC_MP4_MP_SD;DLNA.ORG_OP=01;DLNA.ORG_CI=0;DLNA.ORG_FLAGS=01700000000000000000000000000000",
".mkv": "http-get:*:video/x-matroska:DLNA.ORG_PN=MATROSKA;DLNA.ORG_OP=01;DLNA.ORG_CI=0;DLNA.ORG_FLAGS=01700000000000000000000000000000",
".webm": "http-get:*:video/x-matroska:DLNA.ORG_PN=MATROSKA;DLNA.ORG_OP=01;DLNA.ORG_CI=0;DLNA.ORG_FLAGS=01700000000000000000000000000000",
".avi": "http-get:*:video/x-msvideo:*",
".wmv": "http-get:*:video/x-ms-wmv:*",
}
CONTAINER_MIME = "object.container.storageFolder"
IMAGE_MIME = "object.item.imageItem.photo"
AUDIO_MIME = "object.item.audioItem.musicTrack"
VIDEO_MIME = "object.item.videoItem"
def _esc_xml(s):
return s.replace("&", "&amp;").replace("<", "&lt;").replace(">", "&gt;").replace('"', "&quot;").replace("'", "&apos;")
def _oid_encode(vpath):
if not vpath:
return "0"
return base64.urlsafe_b64encode(vpath.encode("utf-8")).decode("ascii").rstrip("=")
def _oid_decode(oid):
if oid == "0":
return ""
padded = oid + "=" * (4 - len(oid) % 4) if len(oid) % 4 else oid
return base64.urlsafe_b64decode(padded).decode("utf-8", errors="replace")
def _mime_for_ext(ext):
return DLNA_PROTOCOL_INFO.get(ext.lower(), "http-get:*:application/octet-stream:*")
def _dlna_class_for_ext(ext):
ext = ext.lower()
if ext in (".mp4", ".mkv", ".avi", ".wmv", ".webm", ".mov", ".ts", ".mpg", ".mpeg"):
return VIDEO_MIME
if ext in (".mp3", ".flac", ".wav", ".ogg", ".aac", ".m4a", ".wma"):
return AUDIO_MIME
if ext in (".jpg", ".jpeg", ".png", ".gif", ".bmp", ".webp"):
return IMAGE_MIME
return "object.item"
def _url_quote(s):
from urllib.parse import quote as _q
return _q(s, safe="")
class DLNA_Sck(MC_Sck): class DLNA_Sck(MC_Sck):
def __init__(self, *a): def __init__(self, *a):
@ -40,39 +110,65 @@ class DLNAr(object):
if hc.vpath.endswith("device.xml"): if hc.vpath.endswith("device.xml"):
return self.tx_device(hc) return self.tx_device(hc)
if hc.vpath.endswith("ConnectionMgr.xml"):
return self.tx_connmgr_scpd(hc)
if hc.vpath.endswith("ContentDir.xml"):
return self.tx_contentdir_scpd(hc)
if "/ctl/ConnectionMgr" in hc.vpath:
return self.tx_connmgr_ctl(hc)
if "/ctl/ContentDir" in hc.vpath:
return self.tx_contentdir_ctl(hc)
if "/ctl/" in hc.vpath:
return self.tx_soap_fault(hc, 401, "Invalid Action")
hc.reply(b"unknown request", 400) hc.reply(b"unknown request", 400)
return False return False
# device description (MediaServer:1 + ConnectionManager:1 and ContentDirectory:1 services)
def tx_device(self, hc: "HttpCli") -> bool: def tx_device(self, hc: "HttpCli") -> bool:
'''UPnP-spec taken from running miniDLNA instance'''
zs = """ zs = """
<?xml version="1.0"?> <?xml version="1.0"?>
<root xmlns="urn:schemas-upnp-org:device-1-0"> <root xmlns="urn:schemas-upnp-org:device-1-0" xmlns:dlna="urn:schemas-dlna-org:device-1-0">
<specVersion> <specVersion>
<major>1</major> <major>1</major>
<minor>0</minor> <minor>0</minor>
</specVersion> </specVersion>
<URLBase>{}</URLBase> <URLBase>{}</URLBase>
<device> <device>
<presentationURL>{}</presentationURL> <deviceType>urn:schemas-upnp-org:device:MediaServer:1</deviceType>
<deviceType>urn:schemas-upnp-org:device:Basic:1</deviceType> <friendlyName>{}</friendlyName>
<friendlyName>{}</friendlyName> <modelDescription>file server</modelDescription>
<modelDescription>file server</modelDescription> <manufacturer>ed</manufacturer>
<manufacturer>ed</manufacturer> <manufacturerURL>https://ocv.me/</manufacturerURL>
<manufacturerURL>https://ocv.me/</manufacturerURL> <modelName>copyparty</modelName>
<modelName>copyparty</modelName> <modelNumber>1.0</modelNumber>
<modelURL>https://github.com/9001/copyparty/</modelURL> <modelURL>https://github.com/9001/copyparty/</modelURL>
<UDN>{}</UDN> <UDN>{}</UDN>
<serviceList> <dlna:X_DLNADOC>DMS-1.50</dlna:X_DLNADOC>
<service> <serviceList>
<serviceType>urn:schemas-upnp-org:device:Basic:1</serviceType> <service>
<serviceId>urn:schemas-upnp-org:device:Basic</serviceId> <serviceType>urn:schemas-upnp-org:service:ConnectionManager:1</serviceType>
<controlURL>/.cpr/dlna/services.xml</controlURL> <serviceId>urn:upnp-org:serviceId:ConnectionManager</serviceId>
<eventSubURL>/.cpr/dlna/services.xml</eventSubURL> <controlURL>/.cpr/dlna/ctl/ConnectionMgr</controlURL>
<SCPDURL>/.cpr/dlna/services.xml</SCPDURL> <eventSubURL>/.cpr/dlna/evt/ConnectionMgr</eventSubURL>
</service> <SCPDURL>/.cpr/dlna/ConnectionMgr.xml</SCPDURL>
</serviceList> </service>
</device> <service>
</root>""" <serviceType>urn:schemas-upnp-org:service:ContentDirectory:1</serviceType>
<serviceId>urn:upnp-org:serviceId:ContentDirectory</serviceId>
<controlURL>/.cpr/dlna/ctl/ContentDir</controlURL>
<eventSubURL>/.cpr/dlna/evt/ContentDir</eventSubURL>
<SCPDURL>/.cpr/dlna/ContentDir.xml</SCPDURL>
</service>
</serviceList>
</device>
</root>
"""
c = html_escape c = html_escape
sip, sport = hc.s.getsockname()[:2] sip, sport = hc.s.getsockname()[:2]
@ -82,19 +178,418 @@ class DLNAr(object):
zsl = self.args.zsl zsl = self.args.zsl
url = zsl if "://" in zsl else ubase + "/" + zsl.lstrip("/") url = zsl if "://" in zsl else ubase + "/" + zsl.lstrip("/")
name = self.args.doctitle name = self.args.doctitle
zs = zs.strip().format(c(ubase), c(url), c(name), c(self.args.zsid)) zs = zs.strip().format(c(ubase), c(name), c(self.args.zsid))
hc.reply(zs.encode("utf-8", "replace")) hc.reply(zs.encode("utf-8", "replace"))
return False # close connection return False # close connection
# SCPD stuff
def tx_connmgr_scpd(self, hc: "HttpCli") -> bool:
'''UPnP-spec taken from running miniDLNA instance'''
zs = """
<?xml version="1.0"?>
<scpd xmlns="urn:schemas-upnp-org:service-1-0">
<specVersion><major>1</major><minor>0</minor></specVersion>
<actionList>
<action>
<name>GetProtocolInfo</name>
<argumentList>
<argument><name>Source</name><direction>out</direction><relatedStateVariable>SourceProtocolInfo</relatedStateVariable></argument>
<argument><name>Sink</name><direction>out</direction><relatedStateVariable>SinkProtocolInfo</relatedStateVariable></argument>
</argumentList>
</action>
<action>
<name>GetCurrentConnectionIDs</name>
<argumentList>
<argument><name>ConnectionIDs</name><direction>out</direction><relatedStateVariable>CurrentConnectionIDs</relatedStateVariable></argument>
</argumentList>
</action>
<action>
<name>GetCurrentConnectionInfo</name>
<argumentList>
<argument><name>ConnectionID</name><direction>in</direction><relatedStateVariable>A_ARG_TYPE_ConnectionID</relatedStateVariable></argument>
<argument><name>RcsID</name><direction>out</direction><relatedStateVariable>A_ARG_TYPE_RcsID</relatedStateVariable></argument>
<argument><name>AVTransportID</name><direction>out</direction><relatedStateVariable>A_ARG_TYPE_AVTransportID</relatedStateVariable></argument>
<argument><name>ProtocolInfo</name><direction>out</direction><relatedStateVariable>A_ARG_TYPE_ProtocolInfo</relatedStateVariable></argument>
<argument><name>PeerConnectionManager</name><direction>out</direction><relatedStateVariable>A_ARG_TYPE_ConnectionManager</relatedStateVariable></argument>
<argument><name>PeerConnectionID</name><direction>out</direction><relatedStateVariable>A_ARG_TYPE_ConnectionID</relatedStateVariable></argument>
<argument><name>Direction</name><direction>out</direction><relatedStateVariable>A_ARG_TYPE_Direction</relatedStateVariable></argument>
<argument><name>Status</name><direction>out</direction><relatedStateVariable>A_ARG_TYPE_ConnectionStatus</relatedStateVariable></argument>
</argumentList>
</action>
</actionList>
<serviceStateTable>
<stateVariable sendEvents="yes"><name>SourceProtocolInfo</name><dataType>string</dataType></stateVariable>
<stateVariable sendEvents="yes"><name>SinkProtocolInfo</name><dataType>string</dataType></stateVariable>
<stateVariable sendEvents="yes"><name>CurrentConnectionIDs</name><dataType>string</dataType></stateVariable>
<stateVariable sendEvents="no"><name>A_ARG_TYPE_ConnectionStatus</name><dataType>string</dataType><allowedValueList><allowedValue>OK</allowedValue><allowedValue>ContentFormatMismatch</allowedValue><allowedValue>InsufficientBandwidth</allowedValue><allowedValue>UnreliableChannel</allowedValue><allowedValue>Unknown</allowedValue></allowedValueList></stateVariable>
<stateVariable sendEvents="no"><name>A_ARG_TYPE_ConnectionManager</name><dataType>string</dataType></stateVariable>
<stateVariable sendEvents="no"><name>A_ARG_TYPE_Direction</name><dataType>string</dataType><allowedValueList><allowedValue>Input</allowedValue><allowedValue>Output</allowedValue></allowedValueList></stateVariable>
<stateVariable sendEvents="no"><name>A_ARG_TYPE_ProtocolInfo</name><dataType>string</dataType></stateVariable>
<stateVariable sendEvents="no"><name>A_ARG_TYPE_ConnectionID</name><dataType>i4</dataType></stateVariable>
<stateVariable sendEvents="no"><name>A_ARG_TYPE_AVTransportID</name><dataType>i4</dataType></stateVariable>
<stateVariable sendEvents="no"><name>A_ARG_TYPE_RcsID</name><dataType>i4</dataType></stateVariable>
</serviceStateTable>
</scpd>
"""
hc.reply(zs.encode("utf-8", "replace"))
return False
def tx_contentdir_scpd(self, hc: "HttpCli") -> bool:
zs = """
<?xml version="1.0"?>
<scpd xmlns="urn:schemas-upnp-org:service-1-0">
<specVersion><major>1</major><minor>0</minor></specVersion>
<actionList>
<action>
<name>Browse</name>
<argumentList>
<argument><name>ObjectID</name><direction>in</direction><relatedStateVariable>A_ARG_TYPE_ObjectID</relatedStateVariable></argument>
<argument><name>BrowseFlag</name><direction>in</direction><relatedStateVariable>A_ARG_TYPE_BrowseFlag</relatedStateVariable></argument>
<argument><name>Filter</name><direction>in</direction><relatedStateVariable>A_ARG_TYPE_Filter</relatedStateVariable></argument>
<argument><name>StartingIndex</name><direction>in</direction><relatedStateVariable>A_ARG_TYPE_Index</relatedStateVariable></argument>
<argument><name>RequestedCount</name><direction>in</direction><relatedStateVariable>A_ARG_TYPE_Count</relatedStateVariable></argument>
<argument><name>SortCriteria</name><direction>in</direction><relatedStateVariable>A_ARG_TYPE_SortCriteria</relatedStateVariable></argument>
<argument><name>Result</name><direction>out</direction><relatedStateVariable>A_ARG_TYPE_Result</relatedStateVariable></argument>
<argument><name>NumberReturned</name><direction>out</direction><relatedStateVariable>A_ARG_TYPE_Count</relatedStateVariable></argument>
<argument><name>TotalMatches</name><direction>out</direction><relatedStateVariable>A_ARG_TYPE_Count</relatedStateVariable></argument>
<argument><name>UpdateID</name><direction>out</direction><relatedStateVariable>A_ARG_TYPE_UpdateID</relatedStateVariable></argument>
</argumentList>
</action>
<action>
<name>GetSystemUpdateID</name>
<argumentList>
<argument><name>Id</name><direction>out</direction><relatedStateVariable>SystemUpdateID</relatedStateVariable></argument>
</argumentList>
</action>
<action>
<name>GetSortCapabilities</name>
<argumentList>
<argument><name>SortCaps</name><direction>out</direction><relatedStateVariable>SortCapabilities</relatedStateVariable></argument>
</argumentList>
</action>
<action>
<name>GetSearchCapabilities</name>
<argumentList>
<argument><name>SearchCaps</name><direction>out</direction><relatedStateVariable>SearchCapabilities</relatedStateVariable></argument>
</argumentList>
</action>
</actionList>
<serviceStateTable>
<stateVariable sendEvents="yes"><name>SystemUpdateID</name><dataType>ui4</dataType></stateVariable>
<stateVariable sendEvents="no"><name>SortCapabilities</name><dataType>string</dataType></stateVariable>
<stateVariable sendEvents="no"><name>SearchCapabilities</name><dataType>string</dataType></stateVariable>
<stateVariable sendEvents="no"><name>A_ARG_TYPE_ObjectID</name><dataType>string</dataType></stateVariable>
<stateVariable sendEvents="no"><name>A_ARG_TYPE_BrowseFlag</name><dataType>string</dataType><allowedValueList><allowedValue>BrowseMetadata</allowedValue><allowedValue>BrowseDirectChildren</allowedValue></allowedValueList></stateVariable>
<stateVariable sendEvents="no"><name>A_ARG_TYPE_Filter</name><dataType>string</dataType></stateVariable>
<stateVariable sendEvents="no"><name>A_ARG_TYPE_Index</name><dataType>ui4</dataType></stateVariable>
<stateVariable sendEvents="no"><name>A_ARG_TYPE_Count</name><dataType>ui4</dataType></stateVariable>
<stateVariable sendEvents="no"><name>A_ARG_TYPE_SortCriteria</name><dataType>string</dataType></stateVariable>
<stateVariable sendEvents="no"><name>A_ARG_TYPE_Result</name><dataType>string</dataType></stateVariable>
<stateVariable sendEvents="no"><name>A_ARG_TYPE_UpdateID</name><dataType>ui4</dataType></stateVariable>
</serviceStateTable>
</scpd>
"""
hc.reply(zs.encode("utf-8", "replace"))
return False
# SOAP stuff
# Helpers for SOAP
def _read_soap_body(self, hc):
'''to SOAP XML from the HTTP request'''
try:
clen = int(hc.headers.get("content-length", 0))
if clen <= 0 or clen >= 1_000_000:
return None
data = b""
while len(data) < clen:
chunk = hc.sr.recv(min(clen - len(data), 65536))
if not chunk:
break
data += chunk
return data
except Exception:
return None
def _parse_soap_action(self, body_bytes):
'''parses SOAP XML, returns (service_type, action_name, params_dict)'''
try:
root = ET.fromstring(body_bytes)
except ET.ParseError:
return None, None, {}
# find first element in the body
ns_soap = "http://schemas.xmlsoap.org/soap/envelope/"
body = root.find(f"{{{ns_soap}}}Body")
if body is None:
return None, None, {}
# find action element in the body
action_elem = None
for child in body:
action_elem = child
break
if action_elem is None:
return None, None, {}
# then find service type from action element
service_type = ""
tag = action_elem.tag
if "{" in tag:
service_type = tag.split("}")[0].lstrip("{")
# action name is the local part
action_name = tag.split("}")[-1] if "}" in tag else tag
# get params
params = {}
for param in action_elem:
local_name = param.tag.split("}")[-1] if "}" in param.tag else param.tag
params[local_name] = param.text or ""
return service_type, action_name, params
def _soap_response(self, service_type, action_name, body_xml):
"""Wrap body_xml in a SOAP envelope response."""
return (
'<?xml version="1.0"?>\n'
'<s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/" '
's:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">\n'
" <s:Body>\n"
" <u:{0}Response xmlns:u=\"{1}\">\n"
" {2}\n"
" </u:{0}Response>\n"
" </s:Body>\n"
"</s:Envelope>"
).format(action_name, service_type, body_xml)
def _soap_fault(self, code, desc):
"""Generate a SOAP fault response."""
return (
'<?xml version="1.0"?>\n'
'<s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/" '
's:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">\n'
" <s:Body>\n"
" <s:Fault>\n"
" <faultcode>s:Client</faultcode>\n"
" <faultstring>UPnPError</faultstring>\n"
" <detail>\n"
' <UPnPError xmlns="urn:schemas-upnp-org:control-1-0">\n'
" <errorCode>{}</errorCode>\n"
" <errorDescription>{}</errorDescription>\n"
" </UPnPError>\n"
" </detail>\n"
" </s:Fault>\n"
" </s:Body>\n"
"</s:Envelope>"
).format(code, desc)
def _reply_soap(self, hc, xml_str, status=200):
"""Send a SOAP XML response."""
body = xml_str.encode("utf-8", "replace")
hc.reply(body, status, "text/xml; charset=utf-8", {"EXT": ""})
return False
def tx_soap_fault(self, hc, code=401, desc="Invalid Action"):
return self._reply_soap(hc, self._soap_fault(code, desc), 500)
# ConnectionManager control
def tx_connmgr_ctl(self, hc: "HttpCli") -> bool:
svc = "urn:schemas-upnp-org:service:ConnectionManager:1"
soap_body = self._read_soap_body(hc)
if not soap_body:
return self.tx_soap_fault(hc, 401, "Invalid Action")
_, action, params = self._parse_soap_action(soap_body)
if action == "GetProtocolInfo":
sources = []
for ext, info in DLNA_PROTOCOL_INFO.items():
sources.append(info)
sources.append("http-get:*:application/octet-stream:*")
resp_body = "<Source>{}</Source>\n <Sink></Sink>".format(",".join(sources))
resp = self._soap_response(svc, action, resp_body)
return self._reply_soap(hc, resp)
if action == "GetCurrentConnectionIDs":
resp_body = "<ConnectionIDs></ConnectionIDs>"
resp = self._soap_response(svc, action, resp_body)
return self._reply_soap(hc, resp)
if action == "GetCurrentConnectionInfo":
resp_body = (
"<RcsID>0</RcsID>\n"
" <AVTransportID>0</AVTransportID>\n"
" <ProtocolInfo></ProtocolInfo>\n"
" <PeerConnectionManager></PeerConnectionManager>\n"
" <PeerConnectionID>-1</PeerConnectionID>\n"
" <Direction>Output</Direction>\n"
" <Status>Unknown</Status>"
)
resp = self._soap_response(svc, action, resp_body)
return self._reply_soap(hc, resp)
return self.tx_soap_fault(hc, 401, "Invalid Action")
# ContentDirectory control
def tx_contentdir_ctl(self, hc: "HttpCli") -> bool:
svc = "urn:schemas-upnp-org:service:ContentDirectory:1"
soap_body = self._read_soap_body(hc)
if not soap_body:
return self.tx_soap_fault(hc, 401, "Invalid Action")
_, action, params = self._parse_soap_action(soap_body)
if action == "GetSystemUpdateID":
resp_body = "<Id>0</Id>"
resp = self._soap_response(svc, action, resp_body)
return self._reply_soap(hc, resp)
if action == "GetSortCapabilities":
resp_body = "<SortCaps></SortCaps>"
resp = self._soap_response(svc, action, resp_body)
return self._reply_soap(hc, resp)
if action == "GetSearchCapabilities":
resp_body = "<SearchCaps></SearchCaps>"
resp = self._soap_response(svc, action, resp_body)
return self._reply_soap(hc, resp)
if action == "Browse":
return self._handle_browse(hc, svc, params)
return self.tx_soap_fault(hc, 401, "Invalid Action")
def _handle_browse(self, hc, svc, params):
oid = params.get("ObjectID", "0")
start = int(params.get("StartingIndex", "0"))
count = int(params.get("RequestedCount", "0"))
vp = _oid_decode(oid)
base = self._base_url(hc)
try:
vfs, rem = self.broker.asrv.vfs.get(vp, LEELOO_DALLAS, True, False)
except Exception:
return self._reply_soap(hc, self._soap_fault(701, "No such object"), 500)
dirs, files = self._collect(vfs, rem, vp)
all_items = []
for nm, coid in dirs:
all_items.append(("dir", nm, coid, "", 0, 0))
for nm, coid, ext, sz, mt in files:
all_items.append(("file", nm, coid, ext, sz, mt))
n_total = len(all_items)
end = n_total if count == 0 else min(start + count, n_total)
page = all_items[start:end]
xml = self._build_didl(page, oid, vp, base)
resp = self._soap_response(svc, "Browse",
"<Result>{}</Result>\n <NumberReturned>{}</NumberReturned>\n"
" <TotalMatches>{}</TotalMatches>\n <UpdateID>0</UpdateID>"
.format(_esc_xml(xml), len(page), n_total))
return self._reply_soap(hc, resp)
def _base_url(self, hc):
'''constructs http://ip:port from the socket info'''
sip, sport = hc.s.getsockname()[:2]
sip = sip.replace("::ffff:", "")
proto = "https" if self.args.https_only else "http"
return "{}://{}:{}".format(proto, sip, sport)
def _collect(self, vfs, rem, vp):
'''walks vfs, returns (dirs, files) in lists'''
dirs = []
files = []
try:
_, entries, vnodes = vfs._ls(rem, LEELOO_DALLAS, True, [[True]])
except Exception:
entries, vnodes = [], {}
if not vp:
for nm, vn in sorted(self.broker.asrv.vfs.nodes.items()):
# root level: lists volumes from vfs.nodes
dirs.append((nm, _oid_encode(nm)))
for nm in sorted(vnodes):
c_vp = "{}/{}".format(vp, nm) if vp else nm
dirs.append((nm, _oid_encode(c_vp)))
for fname, st in entries:
if fname.startswith("."):
continue
c_vp = "{}/{}".format(vp, fname) if vp else fname
c_oid = _oid_encode(c_vp)
if stat.S_ISDIR(st.st_mode):
dirs.append((fname, c_oid))
else:
ext = os.path.splitext(fname)[1]
files.append((fname, c_oid, ext, st.st_size, int(st.st_mtime)))
return dirs, files
def _build_didl(self, page, parent_oid, vp, base):
'''build didl-lite xml for actual browsing'''
ns = 'xmlns="urn:schemas-upnp-org:metadata-1-0/DIDL-Lite/"'
ns += ' xmlns:dc="http://purl.org/dc/elements/1.1/"'
ns += ' xmlns:upnp="urn:schemas-upnp-org:metadata-1-0/upnp/"'
ns += ' xmlns:dlna="urn:schemas-dlna-org:metadata-1-0/"'
parts = ['<?xml version="1.0"?>\n<DIDL-Lite {}>\n'.format(ns)]
for kind, nm, coid, ext, sz, mt in page:
esc = _esc_xml(nm)
if kind == "dir":
parts.append(
' <container id="{}" parentID="{}" restricted="1" childCount="0">\n'
" <dc:title>{}</dc:title>\n"
" <upnp:class>{}</upnp:class>\n"
" </container>\n".format(coid, parent_oid, esc, CONTAINER_MIME)
)
else:
fpath = "{}/{}".format(vp, nm) if vp else nm
url = "{}/{}".format(base, "/".join(_url_quote(s) for s in fpath.split("/")))
proto = _mime_for_ext(ext)
cls = _dlna_class_for_ext(ext)
date = time.strftime("%Y-%m-%dT%H:%M:%S", time.gmtime(mt)) if mt else ""
parts.append(
' <item id="{}" parentID="{}" restricted="1">\n'
" <dc:title>{}</dc:title>\n"
" <upnp:class>{}</upnp:class>\n".format(coid, parent_oid, esc, cls)
)
if date:
parts.append(" <dc:date>{}</dc:date>\n".format(date))
parts.append(
' <res protocolInfo="{}" size="{}">{}</res>\n'
" </item>\n".format(proto, sz, _esc_xml(url))
)
parts.append("</DIDL-Lite>")
return "".join(parts)
class DLNAd(MCast): class DLNAd(MCast):
"""communicates with dlna clients over multicast""" """communicates with dlna clients over multicast, same as SSDP"""
def __init__(self, hub: "SvcHub", ngen: int) -> None: def __init__(self, hub: "SvcHub", ngen: int) -> None:
al = hub.args al = hub.args
vinit = al.zsv and not al.zmv vinit = al.zdv and not al.zmv
super(DLNAd, self).__init__( super(DLNAd, self).__init__(
hub, DLNA_Sck, al.zs_on, al.zs_off, GRP, "", 1900, vinit hub, DLNA_Sck, getattr(al, "zs_on", []), getattr(al, "zs_off", []), GRP, "", 1900, vinit
) )
self.srv: dict[socket.socket, DLNA_Sck] = {} self.srv: dict[socket.socket, DLNA_Sck] = {}
self.logsrc = "DLNA-{}".format(ngen) self.logsrc = "DLNA-{}".format(ngen)
@ -209,7 +704,7 @@ class DLNAd(MCast):
if not self.ptn_st.search(buf): if not self.ptn_st.search(buf):
return return
if self.args.zsv: if self.args.zdv:
t = "{} [{}] \033[36m{} \033[0m|{}|" t = "{} [{}] \033[36m{} \033[0m|{}|"
self.log(t.format(srv.name, srv.ip, cip, len(buf)), "90") self.log(t.format(srv.name, srv.ip, cip, len(buf)), "90")