1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
|
# Copyright (C) 2013 Internet Systems Consortium.
#
# Permission to use, copy, modify, and distribute this software for any
# purpose with or without fee is hereby granted, provided that the above
# copyright notice and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
# DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
# INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
# FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
import json
from isc.datasrc import ConfigurableClientList
from isc.memmgr.datasrc_info import SegmentInfo
class MemorySegmentBuilder:
"""The builder runs in a different thread in the memory manager. It
waits for commands from the memory manager, and then executes them
in the given order sequentially.
"""
def __init__(self, sock, cv, command_queue, response_queue):
""" The constructor takes the following arguments:
sock: A socket using which this builder object notifies the
main thread that it has a response waiting for it.
cv: A condition variable object that is used by the main
thread to tell this builder object that new commands are
available to it. Note that this is also used for
synchronizing access to the queues, so code that uses
MemorySegmentBuilder must use this condition variable's
lock object to synchronize its access to the queues.
command_queue: A list of commands sent by the main thread to
this object. Commands should be executed
sequentially in the given order by this
object.
response_queue: A list of responses sent by this object to
the main thread. The format of this is
currently not strictly defined. Future
tickets will be able to define it based on
how it's used.
"""
self._sock = sock
self._cv = cv
self._command_queue = command_queue
self._response_queue = response_queue
self._shutdown = False
def __handle_shutdown(self):
# This method is called when handling the 'shutdown' command. The
# following tuple is passed:
#
# ('shutdown',)
self._shutdown = True
def __handle_bad_command(self):
# A bad command was received. Raising an exception is not useful
# in this case as we are likely running in a different thread
# from the main thread which would need to be notified. Instead
# return this in the response queue.
self._response_queue.append(('bad_command',))
self._shutdown = True
def __handle_load(self, zone_name, dsrc_info, rrclass, dsrc_name):
# This method is called when handling the 'load' command. The
# following tuple is passed:
#
# ('load', zone_name, dsrc_info, rrclass, dsrc_name)
#
# where:
#
# * zone_name is None or isc.dns.Name, specifying the zone name
# to load. If it's None, it means all zones to be cached in
# the specified data source (used for initialization).
#
# * dsrc_info is a DataSrcInfo object corresponding to the
# generation ID of the set of data sources for this loading.
#
# * rrclass is an isc.dns.RRClass object, the RR class of the
# data source.
#
# * dsrc_name is a string, specifying a data source name.
clist = dsrc_info.clients_map[rrclass]
sgmt_info = dsrc_info.segment_info_map[(rrclass, dsrc_name)]
params = json.dumps(sgmt_info.get_reset_param(SegmentInfo.WRITER))
clist.reset_memory_segment(dsrc_name,
ConfigurableClientList.READ_WRITE,
params)
if zone_name is not None:
zones = [(None, zone_name)]
else:
zones = clist.get_zone_table_accessor(dsrc_name, True)
for _, zone_name in zones:
catch_load_error = (zone_name is None) # install empty zone initially
result, writer = clist.get_cached_zone_writer(zone_name, catch_load_error,
dsrc_name)
if result != ConfigurableClientList.CACHE_STATUS_ZONE_SUCCESS:
# FIXME: log the error
continue
try:
error = writer.load()
if error is not None:
# FIXME: log the error
continue
except Exception:
# FIXME: log it
continue
writer.install()
writer.cleanup()
# need to reset the segment so readers can read it (note: memmgr
# itself doesn't have to keep it open, but there's currently no
# public API to just clear the segment)
clist.reset_memory_segment(dsrc_name,
ConfigurableClientList.READ_ONLY,
params)
self._response_queue.append(('load-completed', dsrc_info, rrclass,
dsrc_name))
def run(self):
""" This is the method invoked when the builder thread is
started. In this thread, be careful when modifying
variables passed-by-reference in the constructor. If they
are reassigned, they will not refer to the main thread's
objects any longer. Any use of command_queue and
response_queue must be synchronized by acquiring the lock in
the condition variable. This method must normally terminate
only when the 'shutdown' command is sent to it.
"""
# Acquire the condition variable while running the loop.
with self._cv:
while not self._shutdown:
while len(self._command_queue) == 0:
self._cv.wait()
# Move the queue content to a local queue. Be careful of
# not making assignments to reference variables.
local_command_queue = self._command_queue[:]
del self._command_queue[:]
# Run commands passed in the command queue sequentially
# in the given order. For now, it only supports the
# "shutdown" command, which just exits the thread.
for command_tuple in local_command_queue:
command = command_tuple[0]
if command == 'load':
# See the comments for __handle_load() for
# details of the tuple passed to the "load"
# command.
_, zone_name, dsrc_info, rrclass, dsrc_name = command_tuple
self.__handle_load(zone_name, dsrc_info, rrclass, dsrc_name)
elif command == 'shutdown':
self.__handle_shutdown()
# When the shutdown command is received, we do
# not process any further commands.
break
else:
self.__handle_bad_command()
# When a bad command is received, we do not
# process any further commands.
break
# Notify (any main thread) on the socket about a
# response. Otherwise, the main thread may wait in its
# loop without knowing there was a problem.
if len(self._response_queue) > 0:
while self._sock.send(b'x') != 1:
continue
|