Transforming JSON to Blue Prism Queue Item

I am still playing with Blue Prism and Kafka integration. I decided to build Blue Prism process which connects to Kafka, fetches messages and saves them in work queue. I used techniques for Blue Prism and Kafka communication described in my earlier post. Kafka messages in Json format need to be transformed to Blue Prism collection before saving them in work queue. Utility – JSON object with action JSON to Collection provides such functionality out-of-the box.

Another built in action Add To Queue from Work Queues object saves collection elements as work queue items.

If we check Blue Prism database and table BPAWorkQueueItem it turns out that Blue Prism stores non encrypted queue items in XML format in data column.

More information regarding queue items encryption and decryption you may find in my previous post.

Therefore e.g following JSON

{
  "topic": "sample topic",
  "key": null,
  "value": {
    "firstname": "Anna",
    "lastname": "Smith",
    "pets": [
      "dog",
      "cat",
      "hamster"
    ]
  },
  "partition": 0,
  "offset": 103423
}

is converted to single line xml

<collection><row><field name="topic" type="text" value="sample topic" /><field name="key" type="text" value="" /><field name="value" type="collection"><row><field name="firstname" type="text" value="Anna" /><field name="lastname" type="text" value="Smith" /><field name="pets" type="collection"><row><field name="JSON:Array" type="text" value="dog" /></row><row><field name="JSON:Array" type="text" value="cat" /></row><row><field name="JSON:Array" type="text" value="hamster" /></row></field></row></field><field name="partition" type="number" value="0" /><field name="offset" type="number" value="103423" /></row></collection>

which after formatting is

<collection>
    <row>
        <field name="topic" type="text" value="sample topic"/>
        <field name="key" type="text" value=""/>
        <field name="value" type="collection">
            <row>
                <field name="firstname" type="text" value="Anna"/>
                <field name="lastname" type="text" value="Smith"/>
                <field name="pets" type="collection">
                    <row>
                        <field name="JSON:Array" type="text" value="dog"/>
                    </row>
                    <row>
                        <field name="JSON:Array" type="text" value="cat"/>
                    </row>
                    <row>
                        <field name="JSON:Array" type="text" value="hamster"/>
                    </row>
                </field>
            </row>
        </field>
        <field name="partition" type="number" value="0"/>
        <field name="offset" type="number" value="103423"/>
    </row>
</collection>

I wrote Python code which transforms any JSON to Queue Item XML the same way as Blue Prism objects do.

import logging
from lxml import etree


class JsonXmlConverter:
    logger = logging.getLogger(__name__)

    def convert_to_queue_item_xml(self, json) -> str:
        root = etree.Element('collection')
        row = etree.SubElement(root, 'row')
        for el in json:
            self.__add_child(row, el, json[el])
        self.logger.info(f"Converted xml [%s]", etree.tostring(root, encoding='unicode'))
        return etree.tostring(root, encoding='unicode')

    def __add_child(self, parent, key, json):
        json_type = type(json)
        if json_type in set([str, int, float, bool]):
            field = etree.SubElement(parent, 'field')
            field.set('value', str(json))
            field.set('type', self.__get_bp_type__(json))
            field.set('name', key)
        elif json_type == list:
            field = etree.SubElement(parent, 'field')
            field.set('name', key)
            field.set('type', 'collection')
            for item in json:
                row = etree.SubElement(field, 'row')
                self.__add_child(row, 'JSON:Array', item)
        elif json_type == dict:
            field = etree.SubElement(parent, 'field')
            field.set('name', key)
            field.set('type', 'collection')
            row = etree.SubElement(field, 'row')
            for el in json:
                self.__add_child(row, el, json[el])
        elif json is None:
            field = etree.SubElement(parent, 'field')
            field.set('value', '')
            field.set('type', 'text')
            field.set('name', key)
        else:
            raise Exception(f"Not supported type {json_type}")

    def __add_list_children(self, field, collection):
        for item in collection:
            row = etree.SubElement(field, 'row')
            self.__add_child(row, item)

    def __get_bp_type__(self, el):
        el_type = type(el)
        if el_type == str:
            return 'text'
        if el_type == bool:
            return 'flag'
        elif el_type == int or el_type == float:
            return 'number'
        elif el_type == list:
            return 'collection'
        elif el_type == bool:
            return 'flag'
        else:
            raise Exception(f"Not supported type {el_type}")

I created also unit test to check conversion

class TestJsonXmlConverter(TestCase):
    logging.basicConfig(level=logging.DEBUG)
    jsonXmlConverter = JsonXmlConverter()

    def assertXmlEqual(self, got, want):
        checker = LXMLOutputChecker()
        if not checker.check_output(want, got, 0):
            message = checker.output_difference(Example("", want), got, 0)
            raise AssertionError(message)

    def test_simple_scenario(self):
        # given
        with open('test/resources/converter/simple.json') as res_file:
            cards_json = res_file.read()
        json_object = json.loads(cards_json)

        with open('test/resources/converter/simple.xml') as res_file:
            xml = res_file.read()

        # when
        result = self.jsonXmlConverter.convert_to_queue_item_xml(json_object)

        # then
        self.assertXmlEqual(result, xml)

May such Python code be useful? Probably not, but I was considering whether it would be possible to create queue items outside of Blue Prism by directly generating SQL insert statement to BPAWorkQueueItem table. And I was able to write such code and Blue Prism works perfectly with “hand-made” queue items.

    def create(self, queue_item: QueueItem):
        self.logger.info('Saving [%s] message to database', queue_item)
        conn = self.__get_connection()
        with conn:
            cursor = conn.cursor()
            insert_statement = f"INSERT INTO BPAWorkQueueItem  (id, queueid, queueident, keyvalue, status, " \
                               f"sessionid, attempt, loaded, " \
                               f"encryptid, data, deferred, priority) values" \
                               f"(CONVERT(uniqueidentifier,'{queue_item.id}'), CONVERT(uniqueidentifier,'{queue_item.queueid}'), {queue_item.queueident}, '{queue_item.keyvalue}', '{queue_item.status}', " \
                               f"CONVERT(uniqueidentifier,'{queue_item.sessionid}'), 1, '{queue_item.loaded}', " \
                               f"'{queue_item.encryptid}', '{queue_item.data}', NULL, 0)"
            self.logger.debug(f'Insert statement [{insert_statement}]')
            cursor.execute(insert_statement)

Of course I discourage anyone from doing such things as we should use official APIs exposed by Blue Prism.

JSON to XML conversion in Python you may find on my Github repository. I exported also Blue Prism process for generating queue items from JSON which I used for testing.

Leave a Comment