时间: 2020-11-21|43次围观|0 条评论

使用policykit 的程序一般都有一个dbus daemon程序来完成相关操作,这个dbus daemon 会在系统注册一个system bus 服务名,用于响应要求root privileged的操作,当dbus请求到达时会先验证请求程序是否有相应的权限来调用这个操作(方法),而这是在.conf文件中定义的(后面说明)。

首先定义个System Dbus daemon,写一个.service文件来启动我们的daemon

 org.example.foo.service

文件放置目录:/usr/share/dbus-1/system-services

1 
[D
-
BUS Service]

2 
Name
=
org.example.foo

3 
Exec
=/
usr
/
local
/
libexec
/
policykit_dbus_foo_daemon.py

4 
User
=
root

 

其中Name是注册的SystemBus 服务名

Exec 是daemon 程序所在路径

我们以root权限启动

当有程序请求org.example.foo服务时,系统会自动以root启动我们的daemon。

相关信息看这里D-Bus system bus activation

 注: SessionBus 的 'org.freedesktop.PolicyKit.AuthenticationAgent' 的服务,只有在请求认证的时候才自动启动,打开过一段时间会自动关闭。

再看我们的daemon程序

policykit_dbus_foo_daemon.py

文件放置目录:/usr/local/libexec

 1 
#
!/usr/bin/python


 2 
#
 -*- coding: UTF-8 -*-


 3 
"""


 4 
Author: joe.zhou

 5 
"""


 6 
import
 os

 7 
import
 sys

 8 
import
 gobject

 9 
import
 dbus

10 
import
 dbus.service

11 
import
 dbus.mainloop.glib

12 


13 
class
 NotPrivilegedException (dbus.DBusException):

14 
    _dbus_error_name 
=
 
"
org.example.foo.dbus.service.PolKit.NotPrivilegedException
"


15 
    
def
 
__init__
 (self, action_id, 
*
p, 
**
k):

16 
        self._dbus_error_name 
=
 self.
__class__
._dbus_error_name 
+
 
"
.
"
 
+
 action_id

17 
        super (NotPrivilegedException, self).
__init__
 (
*
p, 
**
k)

18 
        

19 
def
 require_auth (action_id):

20 
    
def
 require_auth_decorator(func):        

21 
        
def
 _func(
*
args,
**
kwds):

22 
            revoke_if_one_shot 
=
 True

23 
            system_bus 
=
 dbus.SystemBus()

24 
            auth_obj 
=
 system_bus.get_object(
'
org.freedesktop.PolicyKit
'
,
'
/
'
)

25 
            auth_interface 
=
 dbus.Interface(auth_obj,
'
org.freedesktop.PolicyKit
'
)

26 
            
try
:

27 
                dbus_name 
=
 kwds[
'
sender_keyword
'
]                

28 
            
except
:

29 
                
raise
 NotPrivilegedException (action_id)

30 
            granted 
=
 auth_interface.IsSystemBusNameAuthorized(action_id,dbus_name,revoke_if_one_shot)

31 
            
if
 granted 
!=
 
'
yes
'
:

32 
                
raise
 NotPrivilegedException (action_id)

33 


34 
            
return
 func(
*
args,
**
kwds)

35 
            

36 
        _func.func_name 
=
 func.func_name

37 
        _func.
__name__
 
=
 func.
__name__


38 
        _func.
__doc__
 
=
 func.
__doc__
        

39 
        
return
 _func    

40 
    
return
 require_auth_decorator

41 


42 
'''


43 
A D-Bus service that PolicyKit controls access to.

44 
'''


45 
class
 PolicyKitFooMechanism(dbus.service.Object):      

46 
    SERVICE_NAME 
=
 
'
org.example.foo
'


47 
    SERVICE_PATH 
=
 
'
/org/example/foo
'


48 
    INTERFACE_NAME 
=
 
'
org.example.foo
'


49 


50 
    
def
 
__init__
(self, conn, object_path
=
SERVICE_PATH):

51 
        dbus.service.Object.
__init__
(self, conn, object_path)

52 


53 
    @dbus.service.method(dbus_interface
=
INTERFACE_NAME, in_signature
=
'
ss
'
,out_signature
=
'
s
'
,sender_keyword
=
'
sender
'
)

54 
    
def
 WriteFile(self, filepath, contents,sender
=
None):

55 
        
'''


56 
        Write the contents to a file that requires sudo/root access to do so.

57 
        PolicyKit will not allow this function to be called without sudo/root 

58 
        access, and will ask the user to authenticate if necessary, when 

59 
        the application calls PolicyKit's ObtainAuthentication().

60 
        
'''


61 
        @require_auth(
'
org.example.foo.modify
'
)

62 
        
def
 _write_file(filepath,contents,sender_keyword 
=
 None):

63 
            f 
=
 open(filepath, 
'
w
'
)

64 
            f.write(contents)

65 
            f.close()

66 
            
return
 
'
done
'


67 
        
return
 _write_file(filepath,contents,sender_keyword 
=
 sender)     

68 
    

69 
    @dbus.service.method(dbus_interface
=
INTERFACE_NAME, in_signature
=
'
s
'
,out_signature
=
'
as
'
,sender_keyword
=
'
sender
'
)

70 
    
def
 RunCmd(self, cmdStr, sender
=
None):

71 
        @require_auth(
'
org.example.foo.sys
'
)

72 
        
def
 _run_cmd(cmdStr,sender_keyword 
=
 None):

73 
            f 
=
 os.popen(cmdStr)

74 
            output 
=
 f.readlines()

75 
            f.close()

76 
            
return
 output        

77 
        
return
 _run_cmd(cmdStr,sender_keyword 
=
 sender)

78 


79 
    @dbus.service.method(dbus_interface
=
INTERFACE_NAME,in_signature
=
''
, out_signature
=
''
,sender_keyword
=
'
sender
'
)

80 
    
def
 Exit(self, sender
=
None):

81 
        @require_auth(
'
org.example.foo.sys
'
)

82 
        
def
 _exit(sender_keyword 
=
 None):

83 
            loop.quit()

84 
        
return
 _exit(sender_keyword 
=
 sender)

85 
        

86 
    @dbus.service.method(dbus_interface
=
INTERFACE_NAME,in_signature
=
''
, out_signature
=
'
s
'
)    

87 
    
def
 hello(self):

88 
        
return
 
'
hello
'


89 


90 
if
 
__name__
 
==
 
'
__main__
'
:

91 
    dbus.mainloop.glib.DBusGMainLoop(set_as_default
=
True)

92 
    bus 
=
 dbus.SystemBus()

93 
    name 
=
 dbus.service.BusName(PolicyKitFooMechanism.SERVICE_NAME, bus)

94 
    the_object 
=
 PolicyKitFooMechanism(bus)

95 
    loop 
=
 gobject.MainLoop()

96 
    loop.run()

97 


98 

 

在daemon程序中定义了三个需要权限的操作,一个不需要权限的操作,也定义了操作(方法)要求验证的action id,程序請求写文件操作时需先向org.freedesktop.PolicyKit.AuthenticationAgent 请求对应的 action id权限才能再进行调用,

否则会提示没有权限,hello操作不需要操作权限,两者区别在于来请求时是否先向 org.freedesktop.Policykit 检测这个 dbus 請求是否有 previleged。

具体是调用  IsSystemBusNameAuthorized 方法来验证,通过则返回'yes',否则 返回其它字符串

使用命令以下命令查看方法的 IsSystemBusNameAuthorized 详细 introspec

dbus
-
send 
--
system 
--
print
-
reply 
--
dest
=
org.freedesktop.PolicyKit 
/
 org.freedesktop.DBus.Introspectable.Introspect

 

在这里值得注意的是如果你定义了一系列的系统级调用操作(以root方式启动前面的程序,但去除了前面的@require_auth 部分),你必须保证每个操作要进行权限验证,即加上这个东西@require_auth('org.example.foo.sys')

如果你定义了写文件的dbus操作,但是没有进行权限验证的话,一个普通用户的dbus 调用請求也会调用通过,即普通用户可以随意改写任何文件,这是很危险的

你也可以尝试把前面的@require_auth部分去掉,再启动服务,用 d-feet  就可以调用WriteFile方法随意地在根目录上写入文件

 

 

--题外话——

本想将程序写成这种形式的

1 
@require_auth(
'
org.example.foo.sys
'
)

2 
@dbus.service.method(dbus_interface
=
INTERFACE_NAME,in_signature
=
''
, out_signature
=
''
,sender_keyword
=
'
sender
'
)

3 
def
 Exit(self, sender
=
None):

4 
    loop.quit()

 这样写,用d-feet 看了下,服务起不来,调了很久也找不出原因,无奈写成这种冗余的方式 --!,看能否有高手指点下,不尽感激!!

 

接着定义谁可以调用这些操作(方法),在.conf 文件定义

org.example.foo.conf

文件放置目录:/etc/dbus-1/system.d

 1 
<
?xml version
=
"
1.0
"
 encoding
=
"
UTF-8
"
?
>
 
<
!
--
 
-*-
 XML 
-*-
 
-->


 2 


 3 
<
!DOCTYPE busconfig PUBLIC

 4 
 
"
-//freedesktop//DTD D-BUS Bus Configuration 1.0//EN
"


 5 
 
"
http://www.freedesktop.org/standards/dbus/1.0/busconfig.dtd
"
>


 6 
<
busconfig
>


 7 


 8 
  
<
!
--
 Only root can own the service 
-->


 9 
  
<
policy user
=
"
root
"
>


10 
    
<
allow own
=
"
org.example.foo
"
/>


11 
    
<
allow send_interface
=
"
org.example.foo
"
/>


12 
  
</
policy
>


13 


14 
  
<
!
--
 allow Introspectable 
-->
<!-- 任何人都可以调用,在后面使用.policy进行約束-->

15 
  
<
policy context
=
"
default
"
>


16 
    
<
allow send_interface
=
"
org.example.foo
"
/>


17 
    
<
allow send_interface
=
"
org.freedesktop.DBus.Introspectable
"
/>


18 
  
</
policy
>


19 


20 
</
busconfig
>


21 

 

 

再跟着是定义相关的 action id了,在.policy 文件定义

其中定义授权认证的方式和时效可以有以下几种

no 
auth_self$$
auth_admin$$
yes

其中加$$的可以附加后缀 _one_shot,_keep_session,_keep_always
其意义字面已经很清楚了

 另外也可以看看 man policykit.conf

不会写?参照/usr/share/PolicyKit/policy 目录下一堆 .policy文件总会了吧

写好后可以用工具 polkit-policy-file-validate 验证下是否有效

org.example.foo.policy

文件放置目录:/usr/share/PolicyKit/policy

 1 
<
?xml version
=
"
1.0
"
 encoding
=
"
UTF-8
"
?
>


 2 
<
!DOCTYPE policyconfig PUBLIC

 3 
 
"
-//freedesktop//DTD PolicyKit Policy Configuration 1.0//EN
"


 4 
 
"
http://www.freedesktop.org/standards/PolicyKit/1.0/policyconfig.dtd
"
>


 5 
<
policyconfig
>


 6 


 7 
  
<
vendor
>
Example Application
</
vendor
>


 8 
  
<
vendor_url
>
http:
//
fedoraproject.org
/
example
</
vendor_url
>


 9 


10 
   
<
action id
=
"
org.example.foo.modify
"
>


11 
    
<
description
>
Example Write Access
</
description
>


12 
    
<
message
>
System policy prevents write access to the Example service
</
message
>


13 
    
<
defaults
>


14 
      
<
allow_inactive
>
no
</
allow_inactive
>


15 
      
<
allow_active
>
auth_admin
</
allow_active
>


16 
    
</
defaults
>


17 
  
</
action
>


18 


19 
  
<
action id
=
"
org.example.foo.sys
"
>


20 
    
<
description
>
Example system action
</
description
>


21 
    
<
message
>
System policy prevents do system action to the Example service
</
message
>


22 
    
<
defaults
>


23 
      
<
allow_inactive
>
no
</
allow_inactive
>


24 
      
<
allow_active
>
auth_admin
</
allow_active
>


25 
    
</
defaults
>


26 
  
</
action
>


27 


28 
 

29 
</
policyconfig
>

 

做好以上工作,我们可以写我们的调用端程序了

 

 1 
#
!/usr/bin/python


 2 
#
 -*- coding: UTF-8 -*-


 3 
import
 os

 4 
import
 sys

 5 
import
 gobject

 6 
import
 dbus

 7 
import
 dbus.service

 8 
import
 dbus.mainloop.glib

 9 
import
 traceback

10 


11 
def
 auth_proxy (func):

12 
    DBUSNAME 
=
 
'
org.freedesktop.PolicyKit.AuthenticationAgent
'


13 
    DBUSPATH 
=
 
'
/
'
 

14 
    DBUSINTERFACE 
=
 
'
org.freedesktop.PolicyKit.AuthenticationAgent
'


15 
    EXC_NAME 
=
  
"
org.example.foo.dbus.service.PolKit.NotPrivilegedException
"
  

16 
    
def
 auth_proxy_wrapper (
*
args, 
**
kwds):

17 
        
try
:

18 
            
return
 func (
*
args, 
**
kwds)

19 
        
except
 dbus.DBusException, e:

20 
            exc_name 
=
 e.get_dbus_name ()

21 
            
if
 exc_name.startswith (EXC_NAME 
+
 
"
.
"
):

22 
                session_bus 
=
 dbus.SessionBus ()

23 
                auth_obj 
=
 session_bus.get_object (DBUSNAME, DBUSPATH)

24 
                auth_interface 
=
 dbus.Interface(auth_obj,DBUSINTERFACE)

25 
                action_id 
=
 exc_name[len (EXC_NAME)
+
1
:]

26 
                granted 
=
 auth_interface.ObtainAuthorization (action_id, dbus.UInt32 (0),dbus.UInt32 (os.getpid ()))

27 
                
if
 
not
 granted:

28 
                    
raise


29 
            
else
:

30 
                
raise


31 


32 
        
return
 func(
*
args, 
**
kwds)

33 
    
return
 auth_proxy_wrapper

34 


35 
class
 DbusTestProxy:

36 
    SERVICE_NAME 
=
 
'
org.example.foo
'


37 
    SERVICE_PATH 
=
 
'
/org/example/foo
'


38 
    INTERFACE_NAME 
=
 
'
org.example.foo
'


39 
    
def
 
__init__
(self):

40 
        self.bus 
=
 dbus.SystemBus()

41 
        self.o 
=
 self.bus.get_object(self.SERVICE_NAME,self.SERVICE_PATH)

42 
        self.i 
=
 dbus.Interface(self.o,self.INTERFACE_NAME)

43 
    

44 
    @auth_proxy

45 
    
def
 WriteFileWithAuth(self,filePath,contents):

46 
        
return
 self.i.WriteFile(filePath,contents)

47 


48 
    
def
 WriteFileWithoutAuth(self,filePath,contents):

49 
        
return
 self.i.WriteFile(filePath,contents)

50 
   

51 
    @auth_proxy

52 
    
def
 RunCmd(self,cmdStr):

53 
        
return
 self.i.RunCmd(cmdStr)

54 
    

55 
    @auth_proxy

56 
    
def
 Exit(self):

57 
        
return
 self.i.Exit()

58 
    

59 
    
#
do not need to auth


60 
    
def
 hello(self):

61 
        
return
 self.i.hello()

62 
    

63 
    

64 
if
 
__name__
 
==
 
"
__main__
"
:

65 
    p 
=
 DbusTestProxy()

66 
    
#
print p.RunCmd('ls -al')


67 
    
print
 p.WriteFileWithAuth(
'
/text
'
,
'
test\n
'
)

68 
    
#
print p.WriteFileWithoutAuth('/text','test\n')


69 
    
#
p.Exit()


70 
    
print
 p.hello()

 

运行上面的程序尝试WriteFileWithAuth 方法会弹出验证的对话框,口令正确的话会在根目录写入文件,调用WriteFileWithoutAuth会因为没有调用权限验证

而返回没有privileged的 异常,因为WriteFile操作是需要权限的。

 以上程序相当的简单,因为我也是python新手,相信你也看得明白。

最后打个包,点击下载 policykit_dbus_foo.7z

 

#
install


sudo .
/
install.sh install


#
remove


sudo .
/
install.sh uninstall


#
test


.
/
policykit_dbus_foo_client.py

以上系统环境为ubuntu 8.04

Reference

 

http://hal.freedesktop.org/docs/PolicyKit 

Policy, Mechanism and Time zones

http://dbus.freedesktop.org/doc/dbus-specification.html

http://dbus.freedesktop.org/doc/dbus-python/doc/tutorial.html

python Decorator for functions and methods

 最后发现了个开源项目 python-slip

其中 slip.dbus.polkit 部分,封装了policykit, 能使你在项目中使用policykit 更容易些.(我觉得python已经够简单的了Orz)

 

注:转载注明出处

转载于:https://www.cnblogs.com/joe2k8/archive/2009/05/24/1488074.html

原文链接:https://blog.csdn.net/weixin_30342827/article/details/99834024

本站声明:网站内容来源于网络,如有侵权,请联系我们,我们将及时处理。

本博客所有文章如无特别注明均为原创。
复制或转载请以超链接形式注明转自起风了,原文地址《dbus 和 policykit 实例篇(python)
   

还没有人抢沙发呢~