//
//  server_date_time.c - Receives and answers current date/time requests
//
//  Copyright (c) 1996-2007 iMatix Corporation
//  All rights reserved.
//  
//  This file is licensed under the BSD license as follows:
//  
//  Redistribution and use in source and binary forms, with or without
//  modification, are permitted provided that the following conditions
//  are met:
//  
//  * Redistributions of source code must retain the above copyright
//    notice, this list of conditions and the following disclaimer.
//  * Redistributions in binary form must reproduce the above copyright
//    notice, this list of conditions and the following disclaimer in
//    the documentation andor other materials provided with the
//    distribution.
//  * Neither the name of iMatix Corporation nor the names of its
//    contributors may be used to endorse or promote products derived
//    from this software without specific prior written permission.
//  
//  THIS SOFTWARE IS PROVIDED BY IMATIX CORPORATION "AS IS" AND ANY
//  EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
//  IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
//  PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL IMATIX CORPORATION BE
//  LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
//  CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
//  SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
//  BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
//  LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
//  NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
//  SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//
//  Name:     server_date_time
//  Usage:    server_date_time  <broker IP address> <date/time server ID>
//  Example:  server_date_time 127.0.0.1 datetimeserver
//  Function: Receives and answers current date/time requests

#include "base.h"
#include "amq_client_connection.h"
#include "amq_client_session.h"

int
main (int argc, char *argv [])
{
    if (argc != 3) {
        printf ("Usage: server_date_time "
            "<OpenAMQ broker IP address> <server ID>\n");
        return 0;
    }
    amq_client_connection_t *connection = NULL;
    amq_client_session_t    *session = NULL;
    icl_longstr_t           *auth_data;
    amq_content_basic_t     *content;

    char                    message_text [1024];
    char                    message_from [1024];
    size_t                  message_size;


    icl_system_initialise (2, argv);
    auth_data  = amq_client_connection_auth_plain ("guest", "guest");
    connection = amq_client_connection_new (argv [1], "/", auth_data, "test",
        0, 30000);
    icl_longstr_destroy (&auth_data);
    assert (connection);
    
    //  Print server info to stdout
    printf ("%s: ", argv [1]);
    printf ("%s/%s - %s - %s\n", connection->server_product, 
        connection->server_version, connection->server_platform, 
        connection->server_information);

    //  Taken as a second argument from command line
    printf ("My ID: %s\n", argv [2]);

    session = amq_client_session_new (connection);
    assert (session);
   

    //  Make sure that 'services' exchange exists, if it does not, create it
    amq_client_session_exchange_declare (
        session,                        //  session
        0,                              //  ticket
        "services",                     //  exchange name
        "direct",                       //  exchange type
        FALSE,                          //  shoudn't the exchange be created?
        FALSE,                          //  durable
        FALSE,                          //  auto-delete when unused
        FALSE,                          //  create internal exchange
        NULL);                          //  arguments for declaration

    //  Create a 'date_time_pool' queue (if it does not exist yet)
    amq_client_session_queue_declare (
        session,                        //  session
        0,                              //  ticket
        "date_time_pool",               //  queue name
        FALSE,                          //  passive
        FALSE,                          //  durable
        FALSE,                          //  exclusive
        TRUE,                           //  auto-delete
        NULL);                          //  arguments

    //  Bind the queue to the exchange
    amq_client_session_queue_bind (
        session,                        //  session
        0,                              //  ticket
        "date_time_pool",               //  queue
        "services",                     //  exchange
        "cur_date_time",                //  routing-key
        NULL);                          //  arguments
    
    //  Make sure that 'responses' exchange exists, if it does not, create it
    amq_client_session_exchange_declare (
        session,                        //  session
        0,                              //  ticket
        "responses",                    //  exchange name
        "direct",                       //  exchange type
        FALSE,                          //  shoudn't the exchange be created?
        FALSE,                          //  durable
        FALSE,                          //  auto-delete when unused
        FALSE,                          //  create internal exchange
        NULL);                          //  arguments for declaration

    //  Create a response queue with queue_name = ""
    //  (name will be asigned by broker)
    amq_client_session_queue_declare (
        session,                        //  session
        0,                              //  ticket
        "",                             //  queue name
        FALSE,                          //  passive
        FALSE,                          //  durable
        TRUE,                           //  exclusive
        TRUE,                           //  auto-delete
        NULL);                          //  arguments

    //  Bind the response queue to the exchange
    amq_client_session_queue_bind (
        session,                        //  session
        0,                              //  ticket
        session->queue,                 //  queue
        "responses",                    //  exchange
        session->queue,                 //  routing-key
        NULL);                          //  arguments
 
    //  Consume from the' date_time_pool' queue
    amq_client_session_basic_consume (
        session,                        //  session
        0,                              //  ticket
        "date_time_pool",               //  queue
        NULL,                           //  consumer-tag
        TRUE,                           //  no-local
        TRUE,                           //  no-ack
        FALSE,                          //  exclusive
        NULL);                          //  arguments

    //  Consume from the response queue
    amq_client_session_basic_consume (
        session,                        //  session
        0,                              //  ticket
        session->queue,                 //  queue
        NULL,                           //  consumer-tag
        TRUE,                           //  no-local
        TRUE,                           //  no-ack
        TRUE,                           //  exclusive
        NULL);                          //  arguments

    while (1) {
        if (!connection->alive)
            break;
        
        while (1) {
            content = amq_client_session_basic_arrived (session);
            if (!content)
                break;
            
            //  Get the request message body
            message_size = amq_content_basic_get_body (content,
                (byte*) message_text, sizeof (message_text));

            if (message_size) {
                message_text [message_size] = 0;
                printf ("Message from \'%s\': \'%s\'\n", message_from,
                    message_text);
            }

            // Copy reply_to request message preperty to message_from variable
            assert (strlen (content->reply_to) < sizeof (message_from));
            strcpy (message_from, content->reply_to);

            amq_content_basic_unlink (&content);

            //  Parse request message text
            if (strlen (message_text) > 3 && message_text [0] == 'G' 
                && message_text [1] == 'E' && message_text [2] == 'T') {
                
                //  Prepare and send GET command to curent date service
                sprintf (message_text, "GET: It is me %s.", argv [2]);
                printf ("Send : \'%s\'\n", message_text);
                content = amq_content_basic_new ();
                amq_content_basic_set_body (content, message_text,
                    strlen (message_text), NULL);
                amq_content_basic_set_message_id (content, "ID001");
    
                //  Set reply_to message property
                //  Service provider will send reply to to my response queue
                amq_content_basic_set_reply_to (content, session->queue);

                amq_client_session_basic_publish (
                    session,                    //  session
                    content,                    //  content to send
                    0,                          //  ticket
                    "services",                 //  exchange to send message to
                    "cur_date",                 //  routing-key
                    TRUE,                       //  mandatory
                    TRUE);                      //  immediate

                amq_content_basic_unlink (&content);

                //  Prepare and send GET command to curent time service
                sprintf (message_text, "GET: It is me %s.", argv [2]);
                printf ("Send : \'%s\'\n", message_text);
                content = amq_content_basic_new ();
                amq_content_basic_set_body (content, message_text,
                    strlen (message_text), NULL);
                amq_content_basic_set_message_id (content, "ID001");

                //  Service provider will send reply to to my response queue
                amq_content_basic_set_reply_to (content, session->queue);

                amq_client_session_basic_publish (
                    session,                    //  session
                    content,                    //  content to send
                    0,                          //  ticket
                    "services",                 //  exchange to send message to
                    "cur_time",                 //  routing-key
                    TRUE,                       //  mandatory //true
                    TRUE);                      //  immediate //true

                amq_content_basic_unlink (&content);

                //  Wait for 2 messages
                int i;
                int have_arrived = 0;
                int text_off = 0;

                for (i = 0 ; i < 2; i++ ) {

                    amq_client_session_wait (session, 0);

                    //  Check if it is "returned" message
                    content = amq_client_session_basic_returned (session);
                    if (content) {
                       printf ("No service provider connected. MSG: %s\n",
                           session->reply_text);
                       break;
                    }

                    content = amq_client_session_basic_arrived (session);
                    if (!content)
                        break;

                    //  Get the message body
                    message_size = amq_content_basic_get_body (content,
                        (byte*) (message_text + text_off),
                        sizeof (message_text));

                    if (message_size) {
                        message_text [message_size + text_off] = 0;
                    } else {
                        break;
                    }
                   
                    printf ("Recv : \'%s\'\n", message_text + text_off);

                    text_off = message_size;
                    have_arrived++;
                    amq_content_basic_unlink (&content);
                }

                //  Do not have two arrived messages - sending ERROR to client
                if (have_arrived != 2) {
                    strcpy (message_text,
                        "ERROR: service provider not available.");
                    printf ("\nSending: %s\n", message_text);
                } else {
                // Have 2 arrived messages
                    printf ("\nSending: %s\n", message_text);
                }

                //  Send message back to the clients response queue
                content = amq_content_basic_new ();
                amq_content_basic_set_body (content, message_text,
                    strlen (message_text), NULL);
                amq_content_basic_set_message_id (content, "ID001");
                amq_client_session_basic_publish (
                    session,                    //  session
                    content,                    //  content to send
                    0,                          //  ticket
                    "responses",                //  exchange to send message to
                    message_from,               //  routing-key
                    TRUE,                       //  mandatory //true
                    TRUE);                      //  immediate //true

                amq_content_basic_unlink (&content);


            } else {
                printf ("Unknown message type.\n");
            }
        }
        printf ("Waiting: ");
        fflush (stdout);

        //  Wait for incomming message
        amq_client_session_wait (session, 0);
    }

    //  Close the session and connection
    amq_client_session_destroy (&session); 
    amq_client_connection_destroy (&connection);
    icl_system_terminate ();
    return 0;
}

