summaryrefslogtreecommitdiffstats
path: root/src/lib/python/isc/memmgr/builder.py
blob: 5aa23dc6d020e69192202656d46f81b436522788 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
# Copyright (C) 2013  Internet Systems Consortium.
#
# Permission to use, copy, modify, and distribute this software for any
# purpose with or without fee is hereby granted, provided that the above
# copyright notice and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
# DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
# INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
# FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.

import json
from isc.datasrc import ConfigurableClientList
from isc.memmgr.datasrc_info import SegmentInfo

class MemorySegmentBuilder:
    """The builder runs in a different thread in the memory manager. It
    waits for commands from the memory manager, and then executes them
    in the given order sequentially.
    """

    def __init__(self, sock, cv, command_queue, response_queue):
        """ The constructor takes the following arguments:

            sock: A socket using which this builder object notifies the
                  main thread that it has a response waiting for it.

            cv: A condition variable object that is used by the main
                thread to tell this builder object that new commands are
                available to it. Note that this is also used for
                synchronizing access to the queues, so code that uses
                MemorySegmentBuilder must use this condition variable's
                lock object to synchronize its access to the queues.

            command_queue: A list of commands sent by the main thread to
                           this object. Commands should be executed
                           sequentially in the given order by this
                           object.

            response_queue: A list of responses sent by this object to
                            the main thread. The format of this is
                            currently not strictly defined. Future
                            tickets will be able to define it based on
                            how it's used.
        """

        self._sock = sock
        self._cv = cv
        self._command_queue = command_queue
        self._response_queue = response_queue
        self._shutdown = False

    def __handle_shutdown(self):
        # This method is called when handling the 'shutdown' command. The
        # following tuple is passed:
        #
        # ('shutdown',)
        self._shutdown = True

    def __handle_bad_command(self):
        # A bad command was received. Raising an exception is not useful
        # in this case as we are likely running in a different thread
        # from the main thread which would need to be notified. Instead
        # return this in the response queue.
        self._response_queue.append(('bad_command',))
        self._shutdown = True

    def __handle_load(self, zone_name, dsrc_info, rrclass, dsrc_name):
        # This method is called when handling the 'load' command. The
        # following tuple is passed:
        #
        # ('load', zone_name, dsrc_info, rrclass, dsrc_name)
        #
        # where:
        #
        #  * zone_name is None or isc.dns.Name, specifying the zone name
        #    to load. If it's None, it means all zones to be cached in
        #    the specified data source (used for initialization).
        #
        #  * dsrc_info is a DataSrcInfo object corresponding to the
        #    generation ID of the set of data sources for this loading.
        #
        #  * rrclass is an isc.dns.RRClass object, the RR class of the
        #    data source.
        #
        #  * dsrc_name is a string, specifying a data source name.

        clist = dsrc_info.clients_map[rrclass]
        sgmt_info = dsrc_info.segment_info_map[(rrclass, dsrc_name)]
        params = json.dumps(sgmt_info.get_reset_param(SegmentInfo.WRITER))
        clist.reset_memory_segment(dsrc_name,
                                   ConfigurableClientList.READ_WRITE,
                                   params)

        if zone_name is not None:
            zones = [(None, zone_name)]
        else:
            zones = clist.get_zone_table_accessor(dsrc_name, True)

        for _, zone_name in zones:
            catch_load_error = (zone_name is None) # install empty zone initially
            result, writer = clist.get_cached_zone_writer(zone_name, catch_load_error,
                                                          dsrc_name)
            if result != ConfigurableClientList.CACHE_STATUS_ZONE_SUCCESS:
                # FIXME: log the error
                continue

            try:
                error = writer.load()
                if error is not None:
                    # FIXME: log the error
                    continue
            except Exception:
                # FIXME: log it
                continue
            writer.install()
            writer.cleanup()

        # need to reset the segment so readers can read it (note: memmgr
        # itself doesn't have to keep it open, but there's currently no
        # public API to just clear the segment)
        clist.reset_memory_segment(dsrc_name,
                                   ConfigurableClientList.READ_ONLY,
                                   params)

        self._response_queue.append(('load-completed', dsrc_info, rrclass,
                                     dsrc_name))

    def run(self):
        """ This is the method invoked when the builder thread is
            started.  In this thread, be careful when modifying
            variables passed-by-reference in the constructor. If they
            are reassigned, they will not refer to the main thread's
            objects any longer. Any use of command_queue and
            response_queue must be synchronized by acquiring the lock in
            the condition variable. This method must normally terminate
            only when the 'shutdown' command is sent to it.
        """

        # Acquire the condition variable while running the loop.
        with self._cv:
            while not self._shutdown:
                while len(self._command_queue) == 0:
                    self._cv.wait()
                # Move the queue content to a local queue. Be careful of
                # not making assignments to reference variables.
                local_command_queue = self._command_queue[:]
                del self._command_queue[:]

                # Run commands passed in the command queue sequentially
                # in the given order.  For now, it only supports the
                # "shutdown" command, which just exits the thread.
                for command_tuple in local_command_queue:
                    command = command_tuple[0]
                    if command == 'load':
                        # See the comments for __handle_load() for
                        # details of the tuple passed to the "load"
                        # command.
                        _, zone_name, dsrc_info, rrclass, dsrc_name = command_tuple
                        self.__handle_load(zone_name, dsrc_info, rrclass, dsrc_name)
                    elif command == 'shutdown':
                        self.__handle_shutdown()
                        # When the shutdown command is received, we do
                        # not process any further commands.
                        break
                    else:
                        self.__handle_bad_command()
                        # When a bad command is received, we do not
                        # process any further commands.
                        break

                # Notify (any main thread) on the socket about a
                # response. Otherwise, the main thread may wait in its
                # loop without knowing there was a problem.
                if len(self._response_queue) > 0:
                    while self._sock.send(b'x') != 1:
                        continue